mirror of
https://github.com/microsoft/agent-framework.git
synced 2026-06-16 21:04:09 +08:00
Python: Support Agent Skills (#4210)
* Python: Support Agent Skills Add FileAgentSkillsProvider, a context provider that discovers and exposes Agent Skills from filesystem directories following the Agent Skills specification (https://agentskills.io/) progressive disclosure pattern: advertise, load, read resources. Changes: - FileAgentSkillsProvider - discovers SKILL.md files from configured directories, advertises skills via system prompt injection, and provides load_skill / read_skill_resource tools for on-demand access. - Internal helpers for skill discovery, frontmatter parsing, and secure resource reading (path traversal / symlink guards). - Unit tests covering discovery, loading, resource reading, and security scenarios. - Sample (basic_file_skills) demonstrating usage with an expense-report skill. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Python: Move skills sample to samples/02-agents/basic_skills/ Align sample directory name with .NET equivalent (Agent_Step01_BasicSkills). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * fix code quality checks * address pr review comment and code quality check issue * address pr review comments * move the sample to the skills folder * update readme * reame consts and use types for them * leverage pathlib for working with files * refactor the test * supply schema to functions * update readme * update sample name * address pr review comments * fix failing lint check * address failing check --------- Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
committed by
GitHub
Unverified
parent
2ba7ee9ce5
commit
4dc35e9bb0
@@ -59,6 +59,7 @@ from ._sessions import (
|
||||
register_state_type,
|
||||
)
|
||||
from ._settings import SecretString, load_settings
|
||||
from ._skills import FileAgentSkillsProvider
|
||||
from ._telemetry import (
|
||||
AGENT_FRAMEWORK_USER_AGENT,
|
||||
APP_INFO,
|
||||
@@ -233,6 +234,7 @@ __all__ = [
|
||||
"Executor",
|
||||
"FanInEdgeGroup",
|
||||
"FanOutEdgeGroup",
|
||||
"FileAgentSkillsProvider",
|
||||
"FileCheckpointStorage",
|
||||
"FinalT",
|
||||
"FinishReason",
|
||||
|
||||
@@ -0,0 +1,597 @@
|
||||
# Copyright (c) Microsoft. All rights reserved.
|
||||
|
||||
"""File-based Agent Skills provider for the agent framework.
|
||||
|
||||
This module implements the progressive disclosure pattern from the
|
||||
`Agent Skills specification <https://agentskills.io/>`_:
|
||||
|
||||
1. **Advertise** — skill names and descriptions are injected into the system prompt.
|
||||
2. **Load** — the full SKILL.md body is returned via the ``load_skill`` tool.
|
||||
3. **Read resources** — supplementary files are read from disk on demand via
|
||||
the ``read_skill_resource`` tool.
|
||||
|
||||
Skills are discovered by searching configured directories for ``SKILL.md`` files.
|
||||
Referenced resources are validated at initialization; invalid skills are excluded
|
||||
and logged.
|
||||
|
||||
**Security:** this provider only reads static content. Skill metadata is XML-escaped
|
||||
before prompt embedding, and resource reads are guarded against path traversal and
|
||||
symlink escape. Only use skills from trusted sources.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
from collections.abc import Sequence
|
||||
from dataclasses import dataclass, field
|
||||
from html import escape as xml_escape
|
||||
from pathlib import Path, PurePosixPath
|
||||
from typing import TYPE_CHECKING, Any, ClassVar, Final
|
||||
|
||||
from ._sessions import BaseContextProvider
|
||||
from ._tools import FunctionTool
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from ._agents import SupportsAgentRun
|
||||
from ._sessions import AgentSession, SessionContext
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# region Constants
|
||||
|
||||
SKILL_FILE_NAME: Final[str] = "SKILL.md"
|
||||
MAX_SEARCH_DEPTH: Final[int] = 2
|
||||
MAX_NAME_LENGTH: Final[int] = 64
|
||||
MAX_DESCRIPTION_LENGTH: Final[int] = 1024
|
||||
|
||||
# endregion
|
||||
|
||||
# region Compiled regex patterns (ported from .NET FileAgentSkillLoader)
|
||||
|
||||
# Matches YAML frontmatter delimited by "---" lines.
|
||||
# The \uFEFF? prefix allows an optional UTF-8 BOM.
|
||||
_FRONTMATTER_RE = re.compile(
|
||||
r"\A\uFEFF?---\s*$(.+?)^---\s*$",
|
||||
re.MULTILINE | re.DOTALL,
|
||||
)
|
||||
|
||||
# Matches resource file references in skill markdown. Group 1 = relative file path.
|
||||
# Supports two forms:
|
||||
# 1. Markdown links: [text](path/file.ext)
|
||||
# 2. Backtick-quoted paths: `path/file.ext`
|
||||
# Supports optional ./ or ../ prefixes; excludes URLs (no ":" in the path character class).
|
||||
_RESOURCE_LINK_RE = re.compile(
|
||||
r"(?:\[.*?\]\(|`)(\.?\.?/?[\w][\w\-./]*\.\w+)(?:\)|`)",
|
||||
)
|
||||
|
||||
# Matches YAML "key: value" lines. Group 1 = key, Group 2 = quoted value,
|
||||
# Group 3 = unquoted value.
|
||||
_YAML_KV_RE = re.compile(
|
||||
r"^\s*(\w+)\s*:\s*(?:[\"'](.+?)[\"']|(.+?))\s*$",
|
||||
re.MULTILINE,
|
||||
)
|
||||
|
||||
# Validates skill names: lowercase letters, numbers, hyphens only;
|
||||
# must not start or end with a hyphen.
|
||||
_VALID_NAME_RE = re.compile(r"^[a-z0-9]([a-z0-9\-]*[a-z0-9])?$")
|
||||
|
||||
_DEFAULT_SKILLS_INSTRUCTION_PROMPT = """\
|
||||
You have access to skills containing domain-specific knowledge and capabilities.
|
||||
Each skill provides specialized instructions, reference documents, and assets for specific tasks.
|
||||
|
||||
<available_skills>
|
||||
{0}
|
||||
</available_skills>
|
||||
|
||||
When a task aligns with a skill's domain:
|
||||
1. Use `load_skill` to retrieve the skill's instructions
|
||||
2. Follow the provided guidance
|
||||
3. Use `read_skill_resource` to read any references or other files mentioned by the skill,
|
||||
always using the full path as written (e.g. `references/FAQ.md`, not just `FAQ.md`)
|
||||
|
||||
Only load what is needed, when it is needed."""
|
||||
|
||||
# endregion
|
||||
|
||||
# region Private data classes
|
||||
|
||||
|
||||
@dataclass
|
||||
class _SkillFrontmatter:
|
||||
"""Parsed YAML frontmatter from a SKILL.md file."""
|
||||
|
||||
name: str
|
||||
description: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class _FileAgentSkill:
|
||||
"""Represents a loaded Agent Skill discovered from a filesystem directory."""
|
||||
|
||||
frontmatter: _SkillFrontmatter
|
||||
body: str
|
||||
source_path: str
|
||||
resource_names: list[str] = field(default_factory=list)
|
||||
|
||||
# endregion
|
||||
|
||||
# region Private module-level functions (skill discovery, parsing, security)
|
||||
|
||||
|
||||
def _normalize_resource_path(path: str) -> str:
|
||||
"""Normalize a relative resource path.
|
||||
|
||||
Replaces backslashes with forward slashes and removes leading ``./`` prefixes
|
||||
so that ``./refs/doc.md`` and ``refs/doc.md`` are treated as the same resource.
|
||||
"""
|
||||
return PurePosixPath(path.replace("\\", "/")).as_posix()
|
||||
|
||||
|
||||
def _extract_resource_paths(content: str) -> list[str]:
|
||||
"""Extract deduplicated resource paths from markdown link syntax."""
|
||||
seen: set[str] = set()
|
||||
paths: list[str] = []
|
||||
for match in _RESOURCE_LINK_RE.finditer(content):
|
||||
normalized = _normalize_resource_path(match.group(1))
|
||||
lower = normalized.lower()
|
||||
if lower not in seen:
|
||||
seen.add(lower)
|
||||
paths.append(normalized)
|
||||
return paths
|
||||
|
||||
|
||||
def _is_path_within_directory(full_path: str, directory_path: str) -> bool:
|
||||
"""Check that *full_path* is under *directory_path*.
|
||||
|
||||
Uses :meth:`pathlib.Path.is_relative_to` for cross-platform comparison,
|
||||
which handles case sensitivity correctly per platform.
|
||||
"""
|
||||
try:
|
||||
return Path(full_path).is_relative_to(directory_path)
|
||||
except (ValueError, OSError):
|
||||
return False
|
||||
|
||||
|
||||
def _has_symlink_in_path(full_path: str, directory_path: str) -> bool:
|
||||
"""Check whether any segment in *full_path* below *directory_path* is a symlink.
|
||||
|
||||
Precondition: *full_path* must start with *directory_path*. Callers are
|
||||
expected to verify containment via :func:`_is_path_within_directory` before
|
||||
invoking this function.
|
||||
"""
|
||||
dir_path = Path(directory_path)
|
||||
try:
|
||||
relative = Path(full_path).relative_to(dir_path)
|
||||
except ValueError as exc:
|
||||
raise ValueError(
|
||||
f"full_path {full_path!r} does not start with directory_path {directory_path!r}"
|
||||
) from exc
|
||||
|
||||
current = dir_path
|
||||
for part in relative.parts:
|
||||
current = current / part
|
||||
if current.is_symlink():
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def _try_parse_skill_document(
|
||||
content: str,
|
||||
skill_file_path: str,
|
||||
) -> tuple[_SkillFrontmatter, str] | None:
|
||||
"""Parse a SKILL.md file into frontmatter and body.
|
||||
|
||||
Returns:
|
||||
A ``(frontmatter, body)`` tuple on success, or ``None`` if parsing fails.
|
||||
"""
|
||||
match = _FRONTMATTER_RE.search(content)
|
||||
if not match:
|
||||
logger.error("SKILL.md at '%s' does not contain valid YAML frontmatter delimited by '---'", skill_file_path)
|
||||
return None
|
||||
|
||||
yaml_content = match.group(1).strip()
|
||||
name: str | None = None
|
||||
description: str | None = None
|
||||
|
||||
for kv_match in _YAML_KV_RE.finditer(yaml_content):
|
||||
key = kv_match.group(1)
|
||||
value = kv_match.group(2) if kv_match.group(2) is not None else kv_match.group(3)
|
||||
|
||||
if key.lower() == "name":
|
||||
name = value
|
||||
elif key.lower() == "description":
|
||||
description = value
|
||||
|
||||
if not name or not name.strip():
|
||||
logger.error("SKILL.md at '%s' is missing a 'name' field in frontmatter", skill_file_path)
|
||||
return None
|
||||
|
||||
if len(name) > MAX_NAME_LENGTH or not _VALID_NAME_RE.match(name):
|
||||
logger.error(
|
||||
"SKILL.md at '%s' has an invalid 'name' value: Must be %d characters or fewer, "
|
||||
"using only lowercase letters, numbers, and hyphens, and must not start or end with a hyphen.",
|
||||
skill_file_path,
|
||||
MAX_NAME_LENGTH,
|
||||
)
|
||||
return None
|
||||
|
||||
if not description or not description.strip():
|
||||
logger.error("SKILL.md at '%s' is missing a 'description' field in frontmatter", skill_file_path)
|
||||
return None
|
||||
|
||||
if len(description) > MAX_DESCRIPTION_LENGTH:
|
||||
logger.error(
|
||||
"SKILL.md at '%s' has an invalid 'description' value: Must be %d characters or fewer.",
|
||||
skill_file_path,
|
||||
MAX_DESCRIPTION_LENGTH,
|
||||
)
|
||||
return None
|
||||
|
||||
body = content[match.end() :].lstrip()
|
||||
return _SkillFrontmatter(name, description), body
|
||||
|
||||
|
||||
def _validate_resources(
|
||||
skill_dir_path: str,
|
||||
resource_names: list[str],
|
||||
skill_name: str,
|
||||
) -> bool:
|
||||
"""Validate that all resource paths exist and are safe."""
|
||||
skill_dir = Path(skill_dir_path).absolute()
|
||||
|
||||
for resource_name in resource_names:
|
||||
resource_path = Path(os.path.normpath(skill_dir / resource_name))
|
||||
|
||||
if not _is_path_within_directory(str(resource_path), str(skill_dir)):
|
||||
logger.warning(
|
||||
"Excluding skill '%s': resource '%s' references a path outside the skill directory",
|
||||
skill_name,
|
||||
resource_name,
|
||||
)
|
||||
return False
|
||||
|
||||
if not resource_path.is_file():
|
||||
logger.warning(
|
||||
"Excluding skill '%s': referenced resource '%s' does not exist",
|
||||
skill_name,
|
||||
resource_name,
|
||||
)
|
||||
return False
|
||||
|
||||
if _has_symlink_in_path(str(resource_path), str(skill_dir)):
|
||||
logger.warning(
|
||||
"Excluding skill '%s': resource '%s' is a symlink that resolves outside the skill directory",
|
||||
skill_name,
|
||||
resource_name,
|
||||
)
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def _parse_skill_file(skill_dir_path: str) -> _FileAgentSkill | None:
|
||||
"""Parse a SKILL.md file from the given directory."""
|
||||
skill_file = Path(skill_dir_path) / SKILL_FILE_NAME
|
||||
|
||||
try:
|
||||
content = skill_file.read_text(encoding="utf-8")
|
||||
except OSError:
|
||||
logger.error("Failed to read SKILL.md at '%s'", skill_file)
|
||||
return None
|
||||
|
||||
result = _try_parse_skill_document(content, str(skill_file))
|
||||
if result is None:
|
||||
return None
|
||||
|
||||
frontmatter, body = result
|
||||
resource_names = _extract_resource_paths(body)
|
||||
|
||||
if not _validate_resources(skill_dir_path, resource_names, frontmatter.name):
|
||||
return None
|
||||
|
||||
return _FileAgentSkill(
|
||||
frontmatter=frontmatter,
|
||||
body=body,
|
||||
source_path=skill_dir_path,
|
||||
resource_names=resource_names,
|
||||
)
|
||||
|
||||
|
||||
def _search_directories_for_skills(
|
||||
directory: str,
|
||||
results: list[str],
|
||||
current_depth: int,
|
||||
) -> None:
|
||||
"""Recursively search for SKILL.md files up to *MAX_SEARCH_DEPTH*."""
|
||||
dir_path = Path(directory)
|
||||
if (dir_path / SKILL_FILE_NAME).is_file():
|
||||
results.append(str(dir_path.absolute()))
|
||||
|
||||
if current_depth >= MAX_SEARCH_DEPTH:
|
||||
return
|
||||
|
||||
try:
|
||||
entries = list(dir_path.iterdir())
|
||||
except OSError:
|
||||
return
|
||||
|
||||
for entry in entries:
|
||||
if entry.is_dir():
|
||||
_search_directories_for_skills(str(entry), results, current_depth + 1)
|
||||
|
||||
|
||||
def _discover_skill_directories(skill_paths: Sequence[str]) -> list[str]:
|
||||
"""Discover all directories containing SKILL.md files."""
|
||||
discovered: list[str] = []
|
||||
for root_dir in skill_paths:
|
||||
if not root_dir or not root_dir.strip() or not Path(root_dir).is_dir():
|
||||
continue
|
||||
_search_directories_for_skills(root_dir, discovered, current_depth=0)
|
||||
return discovered
|
||||
|
||||
|
||||
def _discover_and_load_skills(skill_paths: Sequence[str]) -> dict[str, _FileAgentSkill]:
|
||||
"""Discover and load all valid skills from the given paths."""
|
||||
skills: dict[str, _FileAgentSkill] = {}
|
||||
|
||||
discovered = _discover_skill_directories(skill_paths)
|
||||
logger.info("Discovered %d potential skills", len(discovered))
|
||||
|
||||
for skill_path in discovered:
|
||||
skill = _parse_skill_file(skill_path)
|
||||
if skill is None:
|
||||
continue
|
||||
|
||||
if skill.frontmatter.name in skills:
|
||||
existing = skills[skill.frontmatter.name]
|
||||
logger.warning(
|
||||
"Duplicate skill name '%s': skill from '%s' skipped in favor of existing skill from '%s'",
|
||||
skill.frontmatter.name,
|
||||
skill_path,
|
||||
existing.source_path,
|
||||
)
|
||||
continue
|
||||
|
||||
skills[skill.frontmatter.name] = skill
|
||||
logger.info("Loaded skill: %s", skill.frontmatter.name)
|
||||
|
||||
logger.info("Successfully loaded %d skills", len(skills))
|
||||
return skills
|
||||
|
||||
|
||||
def _read_skill_resource(skill: _FileAgentSkill, resource_name: str) -> str:
|
||||
"""Read a resource file from disk with path traversal and symlink guards.
|
||||
|
||||
Args:
|
||||
skill: The skill that owns the resource.
|
||||
resource_name: Relative path of the resource within the skill directory.
|
||||
|
||||
Returns:
|
||||
The UTF-8 text content of the resource file.
|
||||
|
||||
Raises:
|
||||
ValueError: The resource is not registered, resolves outside the skill
|
||||
directory, or does not exist.
|
||||
"""
|
||||
resource_name = _normalize_resource_path(resource_name)
|
||||
|
||||
# Find the registered resource name with the original casing so the
|
||||
# file path is correct on case-sensitive filesystems.
|
||||
registered_name: str | None = None
|
||||
for r in skill.resource_names:
|
||||
if r.lower() == resource_name.lower():
|
||||
registered_name = r
|
||||
break
|
||||
|
||||
if registered_name is None:
|
||||
raise ValueError(f"Resource '{resource_name}' not found in skill '{skill.frontmatter.name}'.")
|
||||
|
||||
full_path = os.path.normpath(Path(skill.source_path) / registered_name)
|
||||
source_dir = str(Path(skill.source_path).absolute())
|
||||
|
||||
if not _is_path_within_directory(full_path, source_dir):
|
||||
raise ValueError(f"Resource file '{resource_name}' references a path outside the skill directory.")
|
||||
|
||||
if not Path(full_path).is_file():
|
||||
raise ValueError(f"Resource file '{resource_name}' not found in skill '{skill.frontmatter.name}'.")
|
||||
|
||||
if _has_symlink_in_path(full_path, source_dir):
|
||||
raise ValueError(f"Resource file '{resource_name}' is a symlink that resolves outside the skill directory.")
|
||||
|
||||
logger.info("Reading resource '%s' from skill '%s'", resource_name, skill.frontmatter.name)
|
||||
return Path(full_path).read_text(encoding="utf-8")
|
||||
|
||||
|
||||
def _build_skills_instruction_prompt(
|
||||
prompt_template: str | None,
|
||||
skills: dict[str, _FileAgentSkill],
|
||||
) -> str | None:
|
||||
"""Build the system prompt advertising available skills."""
|
||||
template = _DEFAULT_SKILLS_INSTRUCTION_PROMPT
|
||||
|
||||
if prompt_template is not None:
|
||||
# Validate that the custom template contains a valid {0} placeholder
|
||||
try:
|
||||
prompt_template.format("")
|
||||
template = prompt_template
|
||||
except (KeyError, IndexError) as exc:
|
||||
raise ValueError(
|
||||
"The provided skills_instruction_prompt is not a valid format string. "
|
||||
"It must contain a '{0}' placeholder and escape any literal '{' or '}' "
|
||||
"by doubling them ('{{' or '}}')."
|
||||
) from exc
|
||||
|
||||
if not skills:
|
||||
return None
|
||||
|
||||
lines: list[str] = []
|
||||
# Sort by name for deterministic output
|
||||
for skill in sorted(skills.values(), key=lambda s: s.frontmatter.name):
|
||||
lines.append(" <skill>")
|
||||
lines.append(f" <name>{xml_escape(skill.frontmatter.name)}</name>")
|
||||
lines.append(f" <description>{xml_escape(skill.frontmatter.description)}</description>")
|
||||
lines.append(" </skill>")
|
||||
|
||||
return template.format("\n".join(lines))
|
||||
|
||||
# endregion
|
||||
|
||||
# region Public API
|
||||
|
||||
|
||||
class FileAgentSkillsProvider(BaseContextProvider):
|
||||
"""A context provider that discovers and exposes Agent Skills from filesystem directories.
|
||||
|
||||
This provider implements the progressive disclosure pattern from the
|
||||
`Agent Skills specification <https://agentskills.io/>`_:
|
||||
|
||||
1. **Advertise** — skill names and descriptions are injected into the system prompt
|
||||
(~100 tokens per skill).
|
||||
2. **Load** — the full SKILL.md body is returned via the ``load_skill`` tool.
|
||||
3. **Read resources** — supplementary files are read on demand via the
|
||||
``read_skill_resource`` tool.
|
||||
|
||||
Skills are discovered by searching the configured directories for ``SKILL.md`` files.
|
||||
Referenced resources are validated at initialization; invalid skills are excluded and
|
||||
logged.
|
||||
|
||||
**Security:** this provider only reads static content. Skill metadata is XML-escaped
|
||||
before prompt embedding, and resource reads are guarded against path traversal and
|
||||
symlink escape. Only use skills from trusted sources.
|
||||
|
||||
Args:
|
||||
skill_paths: A single path or sequence of paths to search. Each can be an
|
||||
individual skill folder (containing a SKILL.md file) or a parent folder
|
||||
with skill subdirectories.
|
||||
|
||||
Keyword Args:
|
||||
skills_instruction_prompt: A custom system prompt template for advertising
|
||||
skills. Use ``{0}`` as the placeholder for the generated skills list.
|
||||
When ``None``, a default template is used.
|
||||
source_id: Unique identifier for this provider instance.
|
||||
logger: Optional logger instance. When ``None``, uses the module logger.
|
||||
"""
|
||||
|
||||
DEFAULT_SOURCE_ID: ClassVar[str] = "file_agent_skills"
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
skill_paths: str | Path | Sequence[str | Path],
|
||||
*,
|
||||
skills_instruction_prompt: str | None = None,
|
||||
source_id: str | None = None,
|
||||
) -> None:
|
||||
"""Initialize the FileAgentSkillsProvider.
|
||||
|
||||
Args:
|
||||
skill_paths: A single path or sequence of paths to search for skills.
|
||||
|
||||
Keyword Args:
|
||||
skills_instruction_prompt: Custom system prompt template with ``{0}`` placeholder.
|
||||
source_id: Unique identifier for this provider instance.
|
||||
"""
|
||||
super().__init__(source_id or self.DEFAULT_SOURCE_ID)
|
||||
|
||||
resolved_paths: Sequence[str] = [str(skill_paths)] if isinstance(skill_paths, (str, Path)) else [str(p) for p in skill_paths]
|
||||
|
||||
self._skills = _discover_and_load_skills(resolved_paths)
|
||||
self._skills_instruction_prompt = _build_skills_instruction_prompt(skills_instruction_prompt, self._skills)
|
||||
self._tools = [
|
||||
FunctionTool(
|
||||
name="load_skill",
|
||||
description="Loads the full instructions for a specific skill.",
|
||||
func=self._load_skill,
|
||||
input_model={
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"skill_name": {"type": "string", "description": "The name of the skill to load."},
|
||||
},
|
||||
"required": ["skill_name"],
|
||||
},
|
||||
),
|
||||
FunctionTool(
|
||||
name="read_skill_resource",
|
||||
description="Reads a file associated with a skill, such as references or assets.",
|
||||
func=self._read_skill_resource,
|
||||
input_model={
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"skill_name": {"type": "string", "description": "The name of the skill."},
|
||||
"resource_name": {
|
||||
"type": "string",
|
||||
"description": "The relative path of the resource file.",
|
||||
},
|
||||
},
|
||||
"required": ["skill_name", "resource_name"],
|
||||
},
|
||||
),
|
||||
]
|
||||
|
||||
async def before_run(
|
||||
self,
|
||||
*,
|
||||
agent: SupportsAgentRun,
|
||||
session: AgentSession,
|
||||
context: SessionContext,
|
||||
state: dict[str, Any],
|
||||
) -> None:
|
||||
"""Inject skill instructions and tools into the session context.
|
||||
|
||||
When skills are available, adds the skills instruction prompt and
|
||||
``load_skill`` / ``read_skill_resource`` tools.
|
||||
"""
|
||||
if not self._skills:
|
||||
return
|
||||
|
||||
if self._skills_instruction_prompt:
|
||||
context.extend_instructions(self.source_id, self._skills_instruction_prompt)
|
||||
context.extend_tools(self.source_id, self._tools)
|
||||
|
||||
def _load_skill(self, skill_name: str) -> str:
|
||||
"""Load the full instructions for a specific skill.
|
||||
|
||||
Args:
|
||||
skill_name: The name of the skill to load.
|
||||
|
||||
Returns:
|
||||
The skill body text, or an error message if not found.
|
||||
"""
|
||||
if not skill_name or not skill_name.strip():
|
||||
return "Error: Skill name cannot be empty."
|
||||
|
||||
skill = self._skills.get(skill_name)
|
||||
if skill is None:
|
||||
return f"Error: Skill '{skill_name}' not found."
|
||||
|
||||
logger.info("Loading skill: %s", skill_name)
|
||||
return skill.body
|
||||
|
||||
def _read_skill_resource(self, skill_name: str, resource_name: str) -> str:
|
||||
"""Read a file associated with a skill.
|
||||
|
||||
Args:
|
||||
skill_name: The name of the skill.
|
||||
resource_name: The relative path of the resource file.
|
||||
|
||||
Returns:
|
||||
The resource file content, or an error message if not found.
|
||||
"""
|
||||
if not skill_name or not skill_name.strip():
|
||||
return "Error: Skill name cannot be empty."
|
||||
|
||||
if not resource_name or not resource_name.strip():
|
||||
return "Error: Resource name cannot be empty."
|
||||
|
||||
skill = self._skills.get(skill_name)
|
||||
if skill is None:
|
||||
return f"Error: Skill '{skill_name}' not found."
|
||||
|
||||
try:
|
||||
return _read_skill_resource(skill, resource_name)
|
||||
except Exception:
|
||||
logger.exception("Failed to read resource '%s' from skill '%s'", resource_name, skill_name)
|
||||
return f"Error: Failed to read resource '{resource_name}' from skill '{skill_name}'."
|
||||
|
||||
# endregion
|
||||
@@ -0,0 +1,697 @@
|
||||
# Copyright (c) Microsoft. All rights reserved.
|
||||
|
||||
"""Tests for file-based Agent Skills provider."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
from pathlib import Path
|
||||
from unittest.mock import AsyncMock
|
||||
|
||||
import pytest
|
||||
|
||||
from agent_framework import FileAgentSkillsProvider, SessionContext
|
||||
from agent_framework._skills import (
|
||||
_build_skills_instruction_prompt,
|
||||
_discover_and_load_skills,
|
||||
_extract_resource_paths,
|
||||
_FileAgentSkill,
|
||||
_has_symlink_in_path,
|
||||
_normalize_resource_path,
|
||||
_read_skill_resource,
|
||||
_SkillFrontmatter,
|
||||
_try_parse_skill_document,
|
||||
)
|
||||
|
||||
|
||||
def _symlinks_supported(tmp: Path) -> bool:
|
||||
"""Return True if the current platform/environment supports symlinks."""
|
||||
test_target = tmp / "_symlink_test_target"
|
||||
test_link = tmp / "_symlink_test_link"
|
||||
try:
|
||||
test_target.write_text("test", encoding="utf-8")
|
||||
test_link.symlink_to(test_target)
|
||||
return True
|
||||
except (OSError, NotImplementedError):
|
||||
return False
|
||||
finally:
|
||||
test_link.unlink(missing_ok=True)
|
||||
test_target.unlink(missing_ok=True)
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _write_skill(
|
||||
base: Path,
|
||||
name: str,
|
||||
description: str = "A test skill.",
|
||||
body: str = "# Instructions\nDo the thing.",
|
||||
*,
|
||||
extra_frontmatter: str = "",
|
||||
resources: dict[str, str] | None = None,
|
||||
) -> Path:
|
||||
"""Create a skill directory with SKILL.md and optional resource files."""
|
||||
skill_dir = base / name
|
||||
skill_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
frontmatter = f"---\nname: {name}\ndescription: {description}\n{extra_frontmatter}---\n"
|
||||
skill_md = skill_dir / "SKILL.md"
|
||||
skill_md.write_text(frontmatter + body, encoding="utf-8")
|
||||
|
||||
if resources:
|
||||
for rel_path, content in resources.items():
|
||||
res_file = skill_dir / rel_path
|
||||
res_file.parent.mkdir(parents=True, exist_ok=True)
|
||||
res_file.write_text(content, encoding="utf-8")
|
||||
|
||||
return skill_dir
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Tests: module-level helper functions
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestNormalizeResourcePath:
|
||||
"""Tests for _normalize_resource_path."""
|
||||
|
||||
def test_strips_dot_slash_prefix(self) -> None:
|
||||
assert _normalize_resource_path("./refs/doc.md") == "refs/doc.md"
|
||||
|
||||
def test_replaces_backslashes(self) -> None:
|
||||
assert _normalize_resource_path("refs\\doc.md") == "refs/doc.md"
|
||||
|
||||
def test_strips_dot_slash_and_replaces_backslashes(self) -> None:
|
||||
assert _normalize_resource_path(".\\refs\\doc.md") == "refs/doc.md"
|
||||
|
||||
def test_no_change_for_clean_path(self) -> None:
|
||||
assert _normalize_resource_path("refs/doc.md") == "refs/doc.md"
|
||||
|
||||
|
||||
class TestExtractResourcePaths:
|
||||
"""Tests for _extract_resource_paths."""
|
||||
|
||||
def test_extracts_markdown_links(self) -> None:
|
||||
content = "See [doc](refs/FAQ.md) and [template](assets/template.md)."
|
||||
paths = _extract_resource_paths(content)
|
||||
assert paths == ["refs/FAQ.md", "assets/template.md"]
|
||||
|
||||
def test_deduplicates_case_insensitive(self) -> None:
|
||||
content = "See [a](refs/FAQ.md) and [b](refs/faq.md)."
|
||||
paths = _extract_resource_paths(content)
|
||||
assert len(paths) == 1
|
||||
|
||||
def test_normalizes_dot_slash_prefix(self) -> None:
|
||||
content = "See [doc](./refs/FAQ.md)."
|
||||
paths = _extract_resource_paths(content)
|
||||
assert paths == ["refs/FAQ.md"]
|
||||
|
||||
def test_ignores_urls(self) -> None:
|
||||
content = "See [link](https://example.com/doc.md)."
|
||||
paths = _extract_resource_paths(content)
|
||||
assert paths == []
|
||||
|
||||
def test_empty_content(self) -> None:
|
||||
assert _extract_resource_paths("") == []
|
||||
|
||||
def test_extracts_backtick_quoted_paths(self) -> None:
|
||||
content = "Use the template at `assets/template.md` and the script `./scripts/run.py`."
|
||||
paths = _extract_resource_paths(content)
|
||||
assert paths == ["assets/template.md", "scripts/run.py"]
|
||||
|
||||
def test_deduplicates_across_link_and_backtick(self) -> None:
|
||||
content = "See [doc](refs/FAQ.md) and also `refs/FAQ.md`."
|
||||
paths = _extract_resource_paths(content)
|
||||
assert len(paths) == 1
|
||||
|
||||
|
||||
class TestTryParseSkillDocument:
|
||||
"""Tests for _try_parse_skill_document."""
|
||||
|
||||
def test_valid_skill(self) -> None:
|
||||
content = "---\nname: test-skill\ndescription: A test skill.\n---\n# Body\nInstructions here."
|
||||
result = _try_parse_skill_document(content, "test.md")
|
||||
assert result is not None
|
||||
frontmatter, body = result
|
||||
assert frontmatter.name == "test-skill"
|
||||
assert frontmatter.description == "A test skill."
|
||||
assert "Instructions here." in body
|
||||
|
||||
def test_quoted_values(self) -> None:
|
||||
content = "---\nname: \"test-skill\"\ndescription: 'A test skill.'\n---\nBody."
|
||||
result = _try_parse_skill_document(content, "test.md")
|
||||
assert result is not None
|
||||
assert result[0].name == "test-skill"
|
||||
assert result[0].description == "A test skill."
|
||||
|
||||
def test_utf8_bom(self) -> None:
|
||||
content = "\ufeff---\nname: test-skill\ndescription: A test skill.\n---\nBody."
|
||||
result = _try_parse_skill_document(content, "test.md")
|
||||
assert result is not None
|
||||
assert result[0].name == "test-skill"
|
||||
|
||||
def test_missing_frontmatter(self) -> None:
|
||||
content = "# Just a markdown file\nNo frontmatter here."
|
||||
result = _try_parse_skill_document(content, "test.md")
|
||||
assert result is None
|
||||
|
||||
def test_missing_name(self) -> None:
|
||||
content = "---\ndescription: A test skill.\n---\nBody."
|
||||
result = _try_parse_skill_document(content, "test.md")
|
||||
assert result is None
|
||||
|
||||
def test_missing_description(self) -> None:
|
||||
content = "---\nname: test-skill\n---\nBody."
|
||||
result = _try_parse_skill_document(content, "test.md")
|
||||
assert result is None
|
||||
|
||||
def test_invalid_name_uppercase(self) -> None:
|
||||
content = "---\nname: Test-Skill\ndescription: A test skill.\n---\nBody."
|
||||
result = _try_parse_skill_document(content, "test.md")
|
||||
assert result is None
|
||||
|
||||
def test_invalid_name_starts_with_hyphen(self) -> None:
|
||||
content = "---\nname: -test-skill\ndescription: A test skill.\n---\nBody."
|
||||
result = _try_parse_skill_document(content, "test.md")
|
||||
assert result is None
|
||||
|
||||
def test_invalid_name_ends_with_hyphen(self) -> None:
|
||||
content = "---\nname: test-skill-\ndescription: A test skill.\n---\nBody."
|
||||
result = _try_parse_skill_document(content, "test.md")
|
||||
assert result is None
|
||||
|
||||
def test_name_too_long(self) -> None:
|
||||
long_name = "a" * 65
|
||||
content = f"---\nname: {long_name}\ndescription: A test skill.\n---\nBody."
|
||||
result = _try_parse_skill_document(content, "test.md")
|
||||
assert result is None
|
||||
|
||||
def test_description_too_long(self) -> None:
|
||||
long_desc = "a" * 1025
|
||||
content = f"---\nname: test-skill\ndescription: {long_desc}\n---\nBody."
|
||||
result = _try_parse_skill_document(content, "test.md")
|
||||
assert result is None
|
||||
|
||||
def test_extra_metadata_ignored(self) -> None:
|
||||
content = "---\nname: test-skill\ndescription: A test skill.\nauthor: someone\nversion: 1.0\n---\nBody."
|
||||
result = _try_parse_skill_document(content, "test.md")
|
||||
assert result is not None
|
||||
assert result[0].name == "test-skill"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Tests: skill discovery and loading
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestDiscoverAndLoadSkills:
|
||||
"""Tests for _discover_and_load_skills."""
|
||||
|
||||
def test_discovers_valid_skill(self, tmp_path: Path) -> None:
|
||||
_write_skill(tmp_path, "my-skill")
|
||||
skills = _discover_and_load_skills([str(tmp_path)])
|
||||
assert "my-skill" in skills
|
||||
assert skills["my-skill"].frontmatter.name == "my-skill"
|
||||
|
||||
def test_discovers_nested_skills(self, tmp_path: Path) -> None:
|
||||
skills_dir = tmp_path / "skills"
|
||||
_write_skill(skills_dir, "skill-a")
|
||||
_write_skill(skills_dir, "skill-b")
|
||||
skills = _discover_and_load_skills([str(skills_dir)])
|
||||
assert len(skills) == 2
|
||||
assert "skill-a" in skills
|
||||
assert "skill-b" in skills
|
||||
|
||||
def test_skips_invalid_skill(self, tmp_path: Path) -> None:
|
||||
skill_dir = tmp_path / "bad-skill"
|
||||
skill_dir.mkdir()
|
||||
(skill_dir / "SKILL.md").write_text("No frontmatter here.", encoding="utf-8")
|
||||
skills = _discover_and_load_skills([str(tmp_path)])
|
||||
assert len(skills) == 0
|
||||
|
||||
def test_deduplicates_skill_names(self, tmp_path: Path) -> None:
|
||||
dir1 = tmp_path / "dir1"
|
||||
dir2 = tmp_path / "dir2"
|
||||
_write_skill(dir1, "my-skill", body="First")
|
||||
_write_skill(dir2, "my-skill", body="Second")
|
||||
skills = _discover_and_load_skills([str(dir1), str(dir2)])
|
||||
assert len(skills) == 1
|
||||
assert skills["my-skill"].body == "First"
|
||||
|
||||
def test_empty_directory(self, tmp_path: Path) -> None:
|
||||
skills = _discover_and_load_skills([str(tmp_path)])
|
||||
assert len(skills) == 0
|
||||
|
||||
def test_nonexistent_directory(self) -> None:
|
||||
skills = _discover_and_load_skills(["/nonexistent/path"])
|
||||
assert len(skills) == 0
|
||||
|
||||
def test_multiple_paths(self, tmp_path: Path) -> None:
|
||||
dir1 = tmp_path / "dir1"
|
||||
dir2 = tmp_path / "dir2"
|
||||
_write_skill(dir1, "skill-a")
|
||||
_write_skill(dir2, "skill-b")
|
||||
skills = _discover_and_load_skills([str(dir1), str(dir2)])
|
||||
assert len(skills) == 2
|
||||
|
||||
def test_depth_limit(self, tmp_path: Path) -> None:
|
||||
# Depth 0: tmp_path itself
|
||||
# Depth 1: tmp_path/level1
|
||||
# Depth 2: tmp_path/level1/level2 (should be found)
|
||||
# Depth 3: tmp_path/level1/level2/level3 (should NOT be found)
|
||||
deep = tmp_path / "level1" / "level2" / "level3"
|
||||
deep.mkdir(parents=True)
|
||||
(deep / "SKILL.md").write_text("---\nname: deep-skill\ndescription: Too deep.\n---\nBody.", encoding="utf-8")
|
||||
skills = _discover_and_load_skills([str(tmp_path)])
|
||||
assert "deep-skill" not in skills
|
||||
|
||||
def test_skill_with_resources(self, tmp_path: Path) -> None:
|
||||
_write_skill(
|
||||
tmp_path,
|
||||
"my-skill",
|
||||
body="See [doc](refs/FAQ.md).",
|
||||
resources={"refs/FAQ.md": "FAQ content"},
|
||||
)
|
||||
skills = _discover_and_load_skills([str(tmp_path)])
|
||||
assert "my-skill" in skills
|
||||
assert skills["my-skill"].resource_names == ["refs/FAQ.md"]
|
||||
|
||||
def test_excludes_skill_with_missing_resource(self, tmp_path: Path) -> None:
|
||||
_write_skill(
|
||||
tmp_path,
|
||||
"my-skill",
|
||||
body="See [doc](refs/MISSING.md).",
|
||||
)
|
||||
skills = _discover_and_load_skills([str(tmp_path)])
|
||||
assert len(skills) == 0
|
||||
|
||||
def test_excludes_skill_with_path_traversal_resource(self, tmp_path: Path) -> None:
|
||||
_write_skill(
|
||||
tmp_path,
|
||||
"my-skill",
|
||||
body="See [doc](../secret.md).",
|
||||
resources={}, # resource points outside
|
||||
)
|
||||
# Create the file outside the skill directory
|
||||
(tmp_path / "secret.md").write_text("secret", encoding="utf-8")
|
||||
skills = _discover_and_load_skills([str(tmp_path)])
|
||||
assert len(skills) == 0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Tests: read_skill_resource
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestReadSkillResource:
|
||||
"""Tests for _read_skill_resource."""
|
||||
|
||||
def test_reads_valid_resource(self, tmp_path: Path) -> None:
|
||||
_write_skill(
|
||||
tmp_path,
|
||||
"my-skill",
|
||||
body="See [doc](refs/FAQ.md).",
|
||||
resources={"refs/FAQ.md": "FAQ content here"},
|
||||
)
|
||||
skills = _discover_and_load_skills([str(tmp_path)])
|
||||
content = _read_skill_resource(skills["my-skill"], "refs/FAQ.md")
|
||||
assert content == "FAQ content here"
|
||||
|
||||
def test_normalizes_dot_slash(self, tmp_path: Path) -> None:
|
||||
_write_skill(
|
||||
tmp_path,
|
||||
"my-skill",
|
||||
body="See [doc](refs/FAQ.md).",
|
||||
resources={"refs/FAQ.md": "FAQ content"},
|
||||
)
|
||||
skills = _discover_and_load_skills([str(tmp_path)])
|
||||
content = _read_skill_resource(skills["my-skill"], "./refs/FAQ.md")
|
||||
assert content == "FAQ content"
|
||||
|
||||
def test_unregistered_resource_raises(self, tmp_path: Path) -> None:
|
||||
_write_skill(tmp_path, "my-skill")
|
||||
skills = _discover_and_load_skills([str(tmp_path)])
|
||||
with pytest.raises(ValueError, match="not found in skill"):
|
||||
_read_skill_resource(skills["my-skill"], "nonexistent.md")
|
||||
|
||||
def test_case_insensitive_lookup_uses_registered_casing(self, tmp_path: Path) -> None:
|
||||
_write_skill(
|
||||
tmp_path,
|
||||
"my-skill",
|
||||
body="See [doc](refs/FAQ.md).",
|
||||
resources={"refs/FAQ.md": "FAQ content"},
|
||||
)
|
||||
skills = _discover_and_load_skills([str(tmp_path)])
|
||||
# Request with different casing; the registered name should be used for the file path
|
||||
content = _read_skill_resource(skills["my-skill"], "REFS/faq.md")
|
||||
assert content == "FAQ content"
|
||||
|
||||
def test_path_traversal_raises(self, tmp_path: Path) -> None:
|
||||
skill = _FileAgentSkill(
|
||||
frontmatter=_SkillFrontmatter("test", "Test skill"),
|
||||
body="Body",
|
||||
source_path=str(tmp_path / "skill"),
|
||||
resource_names=["../secret.md"],
|
||||
)
|
||||
(tmp_path / "secret.md").write_text("secret", encoding="utf-8")
|
||||
with pytest.raises(ValueError, match="outside the skill directory"):
|
||||
_read_skill_resource(skill, "../secret.md")
|
||||
|
||||
def test_similar_prefix_directory_does_not_match(self, tmp_path: Path) -> None:
|
||||
"""A skill directory named 'skill-a-evil' must not access resources from 'skill-a'."""
|
||||
skill = _FileAgentSkill(
|
||||
frontmatter=_SkillFrontmatter("test", "Test skill"),
|
||||
body="Body",
|
||||
source_path=str(tmp_path / "skill-a"),
|
||||
resource_names=["../skill-a-evil/secret.md"],
|
||||
)
|
||||
evil_dir = tmp_path / "skill-a-evil"
|
||||
evil_dir.mkdir()
|
||||
(evil_dir / "secret.md").write_text("evil", encoding="utf-8")
|
||||
with pytest.raises(ValueError, match="outside the skill directory"):
|
||||
_read_skill_resource(skill, "../skill-a-evil/secret.md")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Tests: _build_skills_instruction_prompt
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestBuildSkillsInstructionPrompt:
|
||||
"""Tests for _build_skills_instruction_prompt."""
|
||||
|
||||
def test_returns_none_for_empty_skills(self) -> None:
|
||||
assert _build_skills_instruction_prompt(None, {}) is None
|
||||
|
||||
def test_default_prompt_contains_skills(self) -> None:
|
||||
skills = {
|
||||
"my-skill": _FileAgentSkill(
|
||||
frontmatter=_SkillFrontmatter("my-skill", "Does stuff."),
|
||||
body="Body",
|
||||
source_path="/tmp/skill",
|
||||
),
|
||||
}
|
||||
prompt = _build_skills_instruction_prompt(None, skills)
|
||||
assert prompt is not None
|
||||
assert "<name>my-skill</name>" in prompt
|
||||
assert "<description>Does stuff.</description>" in prompt
|
||||
assert "load_skill" in prompt
|
||||
|
||||
def test_skills_sorted_alphabetically(self) -> None:
|
||||
skills = {
|
||||
"zebra": _FileAgentSkill(
|
||||
frontmatter=_SkillFrontmatter("zebra", "Z skill."),
|
||||
body="Body",
|
||||
source_path="/tmp/z",
|
||||
),
|
||||
"alpha": _FileAgentSkill(
|
||||
frontmatter=_SkillFrontmatter("alpha", "A skill."),
|
||||
body="Body",
|
||||
source_path="/tmp/a",
|
||||
),
|
||||
}
|
||||
prompt = _build_skills_instruction_prompt(None, skills)
|
||||
assert prompt is not None
|
||||
alpha_pos = prompt.index("alpha")
|
||||
zebra_pos = prompt.index("zebra")
|
||||
assert alpha_pos < zebra_pos
|
||||
|
||||
def test_xml_escapes_metadata(self) -> None:
|
||||
skills = {
|
||||
"my-skill": _FileAgentSkill(
|
||||
frontmatter=_SkillFrontmatter("my-skill", 'Uses <tags> & "quotes"'),
|
||||
body="Body",
|
||||
source_path="/tmp/skill",
|
||||
),
|
||||
}
|
||||
prompt = _build_skills_instruction_prompt(None, skills)
|
||||
assert prompt is not None
|
||||
assert "<tags>" in prompt
|
||||
assert "&" in prompt
|
||||
|
||||
def test_custom_prompt_template(self) -> None:
|
||||
skills = {
|
||||
"my-skill": _FileAgentSkill(
|
||||
frontmatter=_SkillFrontmatter("my-skill", "Does stuff."),
|
||||
body="Body",
|
||||
source_path="/tmp/skill",
|
||||
),
|
||||
}
|
||||
custom = "Custom header:\n{0}\nCustom footer."
|
||||
prompt = _build_skills_instruction_prompt(custom, skills)
|
||||
assert prompt is not None
|
||||
assert prompt.startswith("Custom header:")
|
||||
assert prompt.endswith("Custom footer.")
|
||||
|
||||
def test_invalid_prompt_template_raises(self) -> None:
|
||||
skills = {
|
||||
"my-skill": _FileAgentSkill(
|
||||
frontmatter=_SkillFrontmatter("my-skill", "Does stuff."),
|
||||
body="Body",
|
||||
source_path="/tmp/skill",
|
||||
),
|
||||
}
|
||||
with pytest.raises(ValueError, match="valid format string"):
|
||||
_build_skills_instruction_prompt("{invalid}", skills)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Tests: FileAgentSkillsProvider
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestFileAgentSkillsProvider:
|
||||
"""Tests for the public FileAgentSkillsProvider class."""
|
||||
|
||||
def test_default_source_id(self, tmp_path: Path) -> None:
|
||||
provider = FileAgentSkillsProvider(str(tmp_path))
|
||||
assert provider.source_id == "file_agent_skills"
|
||||
|
||||
def test_custom_source_id(self, tmp_path: Path) -> None:
|
||||
provider = FileAgentSkillsProvider(str(tmp_path), source_id="custom")
|
||||
assert provider.source_id == "custom"
|
||||
|
||||
def test_accepts_single_path_string(self, tmp_path: Path) -> None:
|
||||
_write_skill(tmp_path, "my-skill")
|
||||
provider = FileAgentSkillsProvider(str(tmp_path))
|
||||
assert len(provider._skills) == 1
|
||||
|
||||
def test_accepts_sequence_of_paths(self, tmp_path: Path) -> None:
|
||||
dir1 = tmp_path / "dir1"
|
||||
dir2 = tmp_path / "dir2"
|
||||
_write_skill(dir1, "skill-a")
|
||||
_write_skill(dir2, "skill-b")
|
||||
provider = FileAgentSkillsProvider([str(dir1), str(dir2)])
|
||||
assert len(provider._skills) == 2
|
||||
|
||||
async def test_before_run_with_skills(self, tmp_path: Path) -> None:
|
||||
_write_skill(tmp_path, "my-skill")
|
||||
provider = FileAgentSkillsProvider(str(tmp_path))
|
||||
context = SessionContext(input_messages=[])
|
||||
|
||||
await provider.before_run(
|
||||
agent=AsyncMock(),
|
||||
session=AsyncMock(),
|
||||
context=context,
|
||||
state={},
|
||||
)
|
||||
|
||||
assert len(context.instructions) == 1
|
||||
assert "my-skill" in context.instructions[0]
|
||||
assert len(context.tools) == 2
|
||||
tool_names = {t.name for t in context.tools}
|
||||
assert tool_names == {"load_skill", "read_skill_resource"}
|
||||
|
||||
async def test_before_run_without_skills(self, tmp_path: Path) -> None:
|
||||
provider = FileAgentSkillsProvider(str(tmp_path))
|
||||
context = SessionContext(input_messages=[])
|
||||
|
||||
await provider.before_run(
|
||||
agent=AsyncMock(),
|
||||
session=AsyncMock(),
|
||||
context=context,
|
||||
state={},
|
||||
)
|
||||
|
||||
assert len(context.instructions) == 0
|
||||
assert len(context.tools) == 0
|
||||
|
||||
def test_load_skill_returns_body(self, tmp_path: Path) -> None:
|
||||
_write_skill(tmp_path, "my-skill", body="Skill body content.")
|
||||
provider = FileAgentSkillsProvider(str(tmp_path))
|
||||
result = provider._load_skill("my-skill")
|
||||
assert result == "Skill body content."
|
||||
|
||||
def test_load_skill_unknown_returns_error(self, tmp_path: Path) -> None:
|
||||
provider = FileAgentSkillsProvider(str(tmp_path))
|
||||
result = provider._load_skill("nonexistent")
|
||||
assert result.startswith("Error:")
|
||||
|
||||
def test_load_skill_empty_name_returns_error(self, tmp_path: Path) -> None:
|
||||
provider = FileAgentSkillsProvider(str(tmp_path))
|
||||
result = provider._load_skill("")
|
||||
assert result.startswith("Error:")
|
||||
|
||||
def test_read_skill_resource_returns_content(self, tmp_path: Path) -> None:
|
||||
_write_skill(
|
||||
tmp_path,
|
||||
"my-skill",
|
||||
body="See [doc](refs/FAQ.md).",
|
||||
resources={"refs/FAQ.md": "FAQ content"},
|
||||
)
|
||||
provider = FileAgentSkillsProvider(str(tmp_path))
|
||||
result = provider._read_skill_resource("my-skill", "refs/FAQ.md")
|
||||
assert result == "FAQ content"
|
||||
|
||||
def test_read_skill_resource_unknown_skill_returns_error(self, tmp_path: Path) -> None:
|
||||
provider = FileAgentSkillsProvider(str(tmp_path))
|
||||
result = provider._read_skill_resource("nonexistent", "file.md")
|
||||
assert result.startswith("Error:")
|
||||
|
||||
def test_read_skill_resource_empty_name_returns_error(self, tmp_path: Path) -> None:
|
||||
_write_skill(tmp_path, "my-skill")
|
||||
provider = FileAgentSkillsProvider(str(tmp_path))
|
||||
result = provider._read_skill_resource("my-skill", "")
|
||||
assert result.startswith("Error:")
|
||||
|
||||
def test_read_skill_resource_unknown_resource_returns_error(self, tmp_path: Path) -> None:
|
||||
_write_skill(tmp_path, "my-skill")
|
||||
provider = FileAgentSkillsProvider(str(tmp_path))
|
||||
result = provider._read_skill_resource("my-skill", "nonexistent.md")
|
||||
assert result.startswith("Error:")
|
||||
|
||||
async def test_skills_sorted_in_prompt(self, tmp_path: Path) -> None:
|
||||
skills_dir = tmp_path / "skills"
|
||||
_write_skill(skills_dir, "zebra", description="Z skill.")
|
||||
_write_skill(skills_dir, "alpha", description="A skill.")
|
||||
provider = FileAgentSkillsProvider(str(skills_dir))
|
||||
context = SessionContext(input_messages=[])
|
||||
|
||||
await provider.before_run(
|
||||
agent=AsyncMock(),
|
||||
session=AsyncMock(),
|
||||
context=context,
|
||||
state={},
|
||||
)
|
||||
|
||||
prompt = context.instructions[0]
|
||||
assert prompt.index("alpha") < prompt.index("zebra")
|
||||
|
||||
async def test_xml_escaping_in_prompt(self, tmp_path: Path) -> None:
|
||||
_write_skill(tmp_path, "my-skill", description="Uses <tags> & stuff")
|
||||
provider = FileAgentSkillsProvider(str(tmp_path))
|
||||
context = SessionContext(input_messages=[])
|
||||
|
||||
await provider.before_run(
|
||||
agent=AsyncMock(),
|
||||
session=AsyncMock(),
|
||||
context=context,
|
||||
state={},
|
||||
)
|
||||
|
||||
prompt = context.instructions[0]
|
||||
assert "<tags>" in prompt
|
||||
assert "&" in prompt
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Tests: symlink detection (_has_symlink_in_path and end-to-end guards)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def _requires_symlinks(tmp_path: Path) -> None:
|
||||
"""Skip the test if the platform does not support symlinks."""
|
||||
if not _symlinks_supported(tmp_path):
|
||||
pytest.skip("Symlinks not supported on this platform/environment")
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("_requires_symlinks")
|
||||
class TestSymlinkDetection:
|
||||
"""Tests for _has_symlink_in_path and the symlink guards in validation/read."""
|
||||
|
||||
def test_detects_symlinked_file(self, tmp_path: Path) -> None:
|
||||
"""A symlink to a file outside the directory should be detected."""
|
||||
skill_dir = tmp_path / "skill"
|
||||
skill_dir.mkdir()
|
||||
|
||||
outside_file = tmp_path / "secret.txt"
|
||||
outside_file.write_text("secret", encoding="utf-8")
|
||||
|
||||
symlink_path = skill_dir / "link.txt"
|
||||
symlink_path.symlink_to(outside_file)
|
||||
|
||||
full_path = str(symlink_path)
|
||||
directory_path = str(skill_dir) + os.sep
|
||||
assert _has_symlink_in_path(full_path, directory_path) is True
|
||||
|
||||
def test_detects_symlinked_directory(self, tmp_path: Path) -> None:
|
||||
"""A symlink to a directory outside should be detected for paths through it."""
|
||||
skill_dir = tmp_path / "skill"
|
||||
skill_dir.mkdir()
|
||||
|
||||
outside_dir = tmp_path / "outside"
|
||||
outside_dir.mkdir()
|
||||
(outside_dir / "data.txt").write_text("data", encoding="utf-8")
|
||||
|
||||
symlink_dir = skill_dir / "linked-dir"
|
||||
symlink_dir.symlink_to(outside_dir)
|
||||
|
||||
full_path = str(skill_dir / "linked-dir" / "data.txt")
|
||||
directory_path = str(skill_dir) + os.sep
|
||||
assert _has_symlink_in_path(full_path, directory_path) is True
|
||||
|
||||
def test_returns_false_for_regular_files(self, tmp_path: Path) -> None:
|
||||
"""Regular (non-symlinked) files should not be flagged."""
|
||||
skill_dir = tmp_path / "skill"
|
||||
skill_dir.mkdir()
|
||||
|
||||
regular_file = skill_dir / "doc.txt"
|
||||
regular_file.write_text("content", encoding="utf-8")
|
||||
|
||||
full_path = str(regular_file)
|
||||
directory_path = str(skill_dir) + os.sep
|
||||
assert _has_symlink_in_path(full_path, directory_path) is False
|
||||
|
||||
def test_validate_resources_rejects_symlinked_resource(self, tmp_path: Path) -> None:
|
||||
"""_discover_and_load_skills should exclude a skill whose resource is a symlink."""
|
||||
skill_dir = tmp_path / "my-skill"
|
||||
skill_dir.mkdir()
|
||||
|
||||
outside_file = tmp_path / "secret.md"
|
||||
outside_file.write_text("secret content", encoding="utf-8")
|
||||
|
||||
# Create SKILL.md referencing a resource
|
||||
(skill_dir / "SKILL.md").write_text(
|
||||
"---\nname: my-skill\ndescription: A test skill.\n---\nSee [doc](refs/leak.md).\n",
|
||||
encoding="utf-8",
|
||||
)
|
||||
refs_dir = skill_dir / "refs"
|
||||
refs_dir.mkdir()
|
||||
(refs_dir / "leak.md").symlink_to(outside_file)
|
||||
|
||||
skills = _discover_and_load_skills([str(tmp_path)])
|
||||
assert "my-skill" not in skills
|
||||
|
||||
def test_read_skill_resource_rejects_symlinked_resource(self, tmp_path: Path) -> None:
|
||||
"""_read_skill_resource should raise ValueError for a symlinked resource."""
|
||||
skill_dir = tmp_path / "skill"
|
||||
skill_dir.mkdir()
|
||||
|
||||
outside_file = tmp_path / "secret.md"
|
||||
outside_file.write_text("secret content", encoding="utf-8")
|
||||
|
||||
refs_dir = skill_dir / "refs"
|
||||
refs_dir.mkdir()
|
||||
(refs_dir / "leak.md").symlink_to(outside_file)
|
||||
|
||||
skill = _FileAgentSkill(
|
||||
frontmatter=_SkillFrontmatter("test", "Test skill"),
|
||||
body="See [doc](refs/leak.md).",
|
||||
source_path=str(skill_dir),
|
||||
resource_names=["refs/leak.md"],
|
||||
)
|
||||
with pytest.raises(ValueError, match="symlink"):
|
||||
_read_skill_resource(skill, "refs/leak.md")
|
||||
Reference in New Issue
Block a user