Appearance
第31天:Skills能力发现
学习目标
- 理解Skills能力发现机制
- 掌握技能注册和加载流程
- 学会实现技能发现器
- 能够动态加载和管理Skills
- 掌握技能依赖解析
- 学会构建技能生态系统
核心内容
能力发现机制概述
什么是能力发现
能力发现(Capability Discovery)是指系统自动检测、识别和加载可用Skills的过程。在Skills标准中,能力发现机制允许:
- 自动扫描:自动扫描指定目录中的Skills
- 动态加载:运行时动态加载和卸载Skills
- 依赖解析:自动解析和管理Skills之间的依赖关系
- 版本管理:支持多版本Skills共存
- 热更新:支持Skills的热更新和重新加载
能力发现的重要性
提高开发效率:
- 无需手动注册每个Skill
- 自动发现新添加的Skills
- 减少配置错误
增强可扩展性:
- 轻松添加新Skills
- 支持插件化架构
- 便于第三方集成
简化维护:
- 集中管理Skills
- 统一更新和卸载
- 版本控制更简单
能力发现流程
1. 扫描指定目录
↓
2. 识别skill.md文件
↓
3. 解析Front Matter
↓
4. 验证Skill有效性
↓
5. 解析依赖关系
↓
6. 注册Skill
↓
7. 加载Skill实现
↓
8. 初始化Skill
↓
9. Skill可用技能注册机制
基础注册器
python
from typing import Dict, List, Optional
from pathlib import Path
import yaml
import importlib.util
class SkillRegistry:
def __init__(self):
self.skills: Dict[str, Dict] = {}
self.skill_instances: Dict[str, object] = {}
def register(self, skill_path: str, skill_info: Dict):
skill_name = skill_info.get('name')
if not skill_name:
raise ValueError("Skill name is required")
if skill_name in self.skills:
raise ValueError(f"Skill {skill_name} already registered")
self.skills[skill_name] = {
'path': skill_path,
'info': skill_info,
'loaded': False
}
print(f"Skill {skill_name} registered successfully")
def unregister(self, skill_name: str):
if skill_name not in self.skills:
raise ValueError(f"Skill {skill_name} not found")
if skill_name in self.skill_instances:
del self.skill_instances[skill_name]
del self.skills[skill_name]
print(f"Skill {skill_name} unregistered successfully")
def get_skill(self, skill_name: str) -> Optional[Dict]:
return self.skills.get(skill_name)
def list_skills(self) -> List[str]:
return list(self.skills.keys())高级注册器
python
from typing import Dict, List, Optional, Set
from pathlib import Path
import yaml
import importlib.util
from dataclasses import dataclass
from datetime import datetime
@dataclass
class SkillMetadata:
name: str
version: str
author: str
description: str
tags: List[str]
license: str
dependencies: List[str]
path: str
registered_at: datetime
loaded: bool = False
class AdvancedSkillRegistry:
def __init__(self):
self.skills: Dict[str, SkillMetadata] = {}
self.skill_instances: Dict[str, object] = {}
self.dependency_graph: Dict[str, Set[str]] = {}
def register(self, skill_path: str, skill_info: Dict):
skill_name = skill_info.get('name')
if not skill_name:
raise ValueError("Skill name is required")
if skill_name in self.skills:
raise ValueError(f"Skill {skill_name} already registered")
metadata = SkillMetadata(
name=skill_name,
version=skill_info.get('version', '1.0.0'),
author=skill_info.get('author', 'Unknown'),
description=skill_info.get('description', ''),
tags=skill_info.get('tags', []),
license=skill_info.get('license', 'MIT'),
dependencies=skill_info.get('dependencies', []),
path=skill_path,
registered_at=datetime.now(),
loaded=False
)
self.skills[skill_name] = metadata
self._build_dependency_graph(skill_name, metadata.dependencies)
print(f"Skill {skill_name} v{metadata.version} registered successfully")
def unregister(self, skill_name: str):
if skill_name not in self.skills:
raise ValueError(f"Skill {skill_name} not found")
# 检查是否有其他Skills依赖此Skill
dependents = self._get_dependents(skill_name)
if dependents:
raise ValueError(
f"Cannot unregister {skill_name}: "
f"Depended by: {', '.join(dependents)}"
)
if skill_name in self.skill_instances:
del self.skill_instances[skill_name]
del self.skills[skill_name]
self._remove_from_dependency_graph(skill_name)
print(f"Skill {skill_name} unregistered successfully")
def get_skill(self, skill_name: str) -> Optional[SkillMetadata]:
return self.skills.get(skill_name)
def list_skills(self, loaded_only: bool = False) -> List[str]:
if loaded_only:
return [name for name, meta in self.skills.items() if meta.loaded]
return list(self.skills.keys())
def search_skills(self, query: str) -> List[str]:
query = query.lower()
results = []
for name, meta in self.skills.items():
if (query in name.lower() or
query in meta.description.lower() or
any(query in tag.lower() for tag in meta.tags)):
results.append(name)
return results
def _build_dependency_graph(self, skill_name: str, dependencies: List[str]):
self.dependency_graph[skill_name] = set(dependencies)
def _remove_from_dependency_graph(self, skill_name: str):
if skill_name in self.dependency_graph:
del self.dependency_graph[skill_name]
for deps in self.dependency_graph.values():
deps.discard(skill_name)
def _get_dependents(self, skill_name: str) -> List[str]:
dependents = []
for name, deps in self.dependency_graph.items():
if skill_name in deps:
dependents.append(name)
return dependents技能发现器
基础发现器
python
from typing import List, Dict
from pathlib import Path
import yaml
class SkillDiscovery:
def __init__(self, registry: SkillRegistry):
self.registry = registry
def discover(self, search_path: str) -> List[str]:
search_dir = Path(search_path)
if not search_dir.exists():
raise ValueError(f"Search path {search_path} does not exist")
discovered_skills = []
for skill_md in search_dir.glob("**/skill.md"):
try:
skill_info = self._parse_skill_md(skill_md)
self.registry.register(str(skill_md.parent), skill_info)
discovered_skills.append(skill_info['name'])
except Exception as e:
print(f"Failed to load skill from {skill_md}: {e}")
return discovered_skills
def _parse_skill_md(self, skill_md_path: Path) -> Dict:
with open(skill_md_path, 'r', encoding='utf-8') as f:
content = f.read()
lines = content.split('\n')
if lines[0] != '---':
raise ValueError("Invalid skill.md format")
end_index = lines[1:].index('---') + 1
front_matter_text = '\n'.join(lines[1:end_index])
skill_info = yaml.safe_load(front_matter_text)
return skill_info高级发现器
python
from typing import List, Dict, Set
from pathlib import Path
import yaml
import json
import hashlib
from datetime import datetime
class AdvancedSkillDiscovery:
def __init__(self, registry: AdvancedSkillRegistry):
self.registry = registry
self.discovery_cache: Dict[str, Dict] = {}
def discover(
self,
search_path: str,
recursive: bool = True,
use_cache: bool = True
) -> List[str]:
search_dir = Path(search_path)
if not search_dir.exists():
raise ValueError(f"Search path {search_path} does not exist")
discovered_skills = []
if recursive:
skill_md_files = list(search_dir.glob("**/skill.md"))
else:
skill_md_files = list(search_dir.glob("skill.md"))
for skill_md in skill_md_files:
try:
if use_cache and self._is_cached(skill_md):
skill_info = self._get_cached(skill_md)
else:
skill_info = self._parse_skill_md(skill_md)
self._cache_skill(skill_md, skill_info)
self.registry.register(str(skill_md.parent), skill_info)
discovered_skills.append(skill_info['name'])
except Exception as e:
print(f"Failed to load skill from {skill_md}: {e}")
return discovered_skills
def discover_from_config(self, config_path: str) -> List[str]:
config_file = Path(config_path)
if not config_file.exists():
raise ValueError(f"Config file {config_path} does not exist")
with open(config_file, 'r', encoding='utf-8') as f:
config = yaml.safe_load(f)
discovered_skills = []
for skill_config in config.get('skills', []):
skill_path = skill_config.get('path')
if skill_path:
try:
skill_md = Path(skill_path) / "skill.md"
skill_info = self._parse_skill_md(skill_md)
if 'enabled' in skill_config:
skill_info['enabled'] = skill_config['enabled']
self.registry.register(skill_path, skill_info)
discovered_skills.append(skill_info['name'])
except Exception as e:
print(f"Failed to load skill from {skill_path}: {e}")
return discovered_skills
def _parse_skill_md(self, skill_md_path: Path) -> Dict:
with open(skill_md_path, 'r', encoding='utf-8') as f:
content = f.read()
lines = content.split('\n')
if lines[0] != '---':
raise ValueError("Invalid skill.md format")
end_index = lines[1:].index('---') + 1
front_matter_text = '\n'.join(lines[1:end_index])
skill_info = yaml.safe_load(front_matter_text)
return skill_info
def _is_cached(self, skill_md_path: Path) -> bool:
cache_key = self._get_cache_key(skill_md_path)
if cache_key not in self.discovery_cache:
return False
cached_info = self.discovery_cache[cache_key]
if not skill_md_path.exists():
return False
current_mtime = skill_md_path.stat().st_mtime
return cached_info.get('mtime') == current_mtime
def _get_cached(self, skill_md_path: Path) -> Dict:
cache_key = self._get_cache_key(skill_md_path)
return self.discovery_cache[cache_key]['data']
def _cache_skill(self, skill_md_path: Path, skill_info: Dict):
cache_key = self._get_cache_key(skill_md_path)
self.discovery_cache[cache_key] = {
'mtime': skill_md_path.stat().st_mtime,
'data': skill_info
}
def _get_cache_key(self, skill_md_path: Path) -> str:
return str(skill_md_path.resolve())
def clear_cache(self):
self.discovery_cache.clear()动态加载机制
技能加载器
python
from typing import Dict, Any, Optional
from pathlib import Path
import importlib.util
import sys
class SkillLoader:
def __init__(self, registry: AdvancedSkillRegistry):
self.registry = registry
def load(self, skill_name: str) -> object:
skill_meta = self.registry.get_skill(skill_name)
if not skill_meta:
raise ValueError(f"Skill {skill_name} not found")
if skill_meta.loaded:
print(f"Skill {skill_name} is already loaded")
return self.registry.skill_instances.get(skill_name)
# 检查依赖
self._check_dependencies(skill_meta)
# 加载Skill实现
skill_instance = self._load_skill_implementation(skill_meta)
# 注册实例
self.registry.skill_instances[skill_name] = skill_instance
skill_meta.loaded = True
print(f"Skill {skill_name} loaded successfully")
return skill_instance
def unload(self, skill_name: str):
skill_meta = self.registry.get_skill(skill_name)
if not skill_meta:
raise ValueError(f"Skill {skill_name} not found")
if not skill_meta.loaded:
print(f"Skill {skill_name} is not loaded")
return
# 检查是否有其他Skills依赖此Skill
dependents = self.registry._get_dependents(skill_name)
if dependents:
raise ValueError(
f"Cannot unload {skill_name}: "
f"Depended by: {', '.join(dependents)}"
)
# 移除实例
if skill_name in self.registry.skill_instances:
del self.registry.skill_instances[skill_name]
skill_meta.loaded = False
print(f"Skill {skill_name} unloaded successfully")
def reload(self, skill_name: str) -> object:
skill_meta = self.registry.get_skill(skill_name)
if not skill_meta:
raise ValueError(f"Skill {skill_name} not found")
if skill_meta.loaded:
self.unload(skill_name)
return self.load(skill_name)
def _check_dependencies(self, skill_meta: SkillMetadata):
for dep in skill_meta.dependencies:
dep_meta = self.registry.get_skill(dep)
if not dep_meta:
raise ValueError(
f"Dependency {dep} not found for skill {skill_meta.name}"
)
if not dep_meta.loaded:
print(f"Loading dependency {dep} for {skill_meta.name}")
self.load(dep)
def _load_skill_implementation(self, skill_meta: SkillMetadata) -> object:
skill_path = Path(skill_meta.path)
# 尝试加载main.py
main_py = skill_path / "src" / "main.py"
if not main_py.exists():
main_py = skill_path / "main.py"
if not main_py.exists():
raise ValueError(
f"No implementation file found for skill {skill_meta.name}"
)
# 动态导入模块
spec = importlib.util.spec_from_file_location(
f"{skill_meta.name}_module",
main_py
)
if spec is None or spec.loader is None:
raise ValueError(f"Failed to load module for skill {skill_meta.name}")
module = importlib.util.module_from_spec(spec)
sys.modules[f"{skill_meta.name}_module"] = module
spec.loader.exec_module(module)
# 获取Skill类
skill_class_name = self._get_skill_class_name(skill_meta.name)
skill_class = getattr(module, skill_class_name, None)
if skill_class is None:
raise ValueError(
f"Skill class {skill_class_name} not found in module"
)
# 实例化Skill
skill_instance = skill_class()
return skill_instance
def _get_skill_class_name(self, skill_name: str) -> str:
return ''.join(word.capitalize() for word in skill_name.split('-'))批量加载器
python
from typing import List
from concurrent.futures import ThreadPoolExecutor, as_completed
class BatchSkillLoader:
def __init__(self, loader: SkillLoader):
self.loader = loader
def load_all(self, skill_names: List[str], parallel: bool = False) -> Dict[str, Any]:
results = {}
if parallel:
with ThreadPoolExecutor() as executor:
future_to_name = {
executor.submit(self.loader.load, name): name
for name in skill_names
}
for future in as_completed(future_to_name):
name = future_to_name[future]
try:
results[name] = future.result()
except Exception as e:
print(f"Failed to load {name}: {e}")
results[name] = None
else:
for name in skill_names:
try:
results[name] = self.loader.load(name)
except Exception as e:
print(f"Failed to load {name}: {e}")
results[name] = None
return results
def unload_all(self, skill_names: List[str]) -> Dict[str, bool]:
results = {}
for name in skill_names:
try:
self.loader.unload(name)
results[name] = True
except Exception as e:
print(f"Failed to unload {name}: {e}")
results[name] = False
return results依赖解析
依赖解析器
python
from typing import List, Dict, Set
from collections import defaultdict, deque
class DependencyResolver:
def __init__(self, registry: AdvancedSkillRegistry):
self.registry = registry
def resolve_load_order(self, skill_names: List[str]) -> List[str]:
load_order = []
visited = set()
visiting = set()
def visit(skill_name: str):
if skill_name in visited:
return
if skill_name in visiting:
raise ValueError(f"Circular dependency detected involving {skill_name}")
visiting.add(skill_name)
skill_meta = self.registry.get_skill(skill_name)
if not skill_meta:
raise ValueError(f"Skill {skill_name} not found")
for dep in skill_meta.dependencies:
visit(dep)
visiting.remove(skill_name)
visited.add(skill_name)
load_order.append(skill_name)
for name in skill_names:
visit(name)
return load_order
def check_dependencies(self, skill_name: str) -> Dict[str, bool]:
skill_meta = self.registry.get_skill(skill_name)
if not skill_meta:
raise ValueError(f"Skill {skill_name} not found")
result = {}
for dep in skill_meta.dependencies:
dep_meta = self.registry.get_skill(dep)
result[dep] = dep_meta is not None and dep_meta.loaded
return result
def get_dependency_tree(self, skill_name: str) -> Dict:
skill_meta = self.registry.get_skill(skill_name)
if not skill_meta:
raise ValueError(f"Skill {skill_name} not found")
def build_tree(name: str) -> Dict:
skill_meta = self.registry.get_skill(name)
if not skill_meta:
return {'name': name, 'found': False}
tree = {
'name': name,
'version': skill_meta.version,
'found': True,
'loaded': skill_meta.loaded,
'dependencies': []
}
for dep in skill_meta.dependencies:
tree['dependencies'].append(build_tree(dep))
return tree
return build_tree(skill_name)
def detect_circular_dependencies(self) -> List[List[str]]:
cycles = []
visited = set()
rec_stack = set()
path = []
def dfs(skill_name: str):
if skill_name in rec_stack:
cycle_start = path.index(skill_name)
cycles.append(path[cycle_start:] + [skill_name])
return
if skill_name in visited:
return
visited.add(skill_name)
rec_stack.add(skill_name)
path.append(skill_name)
skill_meta = self.registry.get_skill(skill_name)
if skill_meta:
for dep in skill_meta.dependencies:
dfs(dep)
path.pop()
rec_stack.remove(skill_name)
for skill_name in self.registry.list_skills():
dfs(skill_name)
return cycles技能生态系统
技能市场客户端
python
import requests
from typing import List, Dict, Optional
from pathlib import Path
import zipfile
import shutil
class SkillMarketClient:
def __init__(self, base_url: str = "https://skills.market/api"):
self.base_url = base_url
def search(self, query: str, limit: int = 10) -> List[Dict]:
response = requests.get(
f"{self.base_url}/search",
params={'q': query, 'limit': limit}
)
response.raise_for_status()
return response.json()['results']
def get_skill_info(self, skill_name: str) -> Dict:
response = requests.get(f"{self.base_url}/skills/{skill_name}")
response.raise_for_status()
return response.json()
def download(self, skill_name: str, version: str, dest_path: str) -> str:
response = requests.get(
f"{self.base_url}/skills/{skill_name}/download",
params={'version': version},
stream=True
)
response.raise_for_status()
dest_dir = Path(dest_path) / skill_name
dest_dir.mkdir(parents=True, exist_ok=True)
zip_path = dest_dir / f"{skill_name}-{version}.zip"
with open(zip_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
zip_ref.extractall(dest_dir)
zip_path.unlink()
return str(dest_dir)
def install(
self,
skill_name: str,
version: str,
install_path: str,
registry: AdvancedSkillRegistry
) -> Dict:
try:
skill_path = self.download(skill_name, version, install_path)
discovery = AdvancedSkillDiscovery(registry)
discovered = discovery.discover(skill_path, recursive=False)
return {
'success': True,
'skill_name': skill_name,
'version': version,
'path': skill_path,
'discovered': discovered
}
except Exception as e:
return {
'success': False,
'skill_name': skill_name,
'error': str(e)
}
def list_categories(self) -> List[str]:
response = requests.get(f"{self.base_url}/categories")
response.raise_for_status()
return response.json()['categories']
def get_trending(self, limit: int = 10) -> List[Dict]:
response = requests.get(
f"{self.base_url}/trending",
params={'limit': limit}
)
response.raise_for_status()
return response.json()['skills']技能管理器
python
from typing import Dict, List, Optional
class SkillManager:
def __init__(self, registry: AdvancedSkillRegistry):
self.registry = registry
self.discovery = AdvancedSkillDiscovery(registry)
self.loader = SkillLoader(registry)
self.batch_loader = BatchSkillLoader(self.loader)
self.dependency_resolver = DependencyResolver(registry)
self.market_client = SkillMarketClient()
def discover_and_load(
self,
search_path: str,
load_all: bool = False
) -> Dict[str, Any]:
discovered = self.discovery.discover(search_path)
result = {
'discovered': discovered,
'loaded': [],
'failed': []
}
if load_all:
load_order = self.dependency_resolver.resolve_load_order(discovered)
loaded = self.batch_loader.load_all(load_order)
for name, instance in loaded.items():
if instance is not None:
result['loaded'].append(name)
else:
result['failed'].append(name)
return result
def install_from_market(
self,
skill_name: str,
version: str,
install_path: str
) -> Dict:
return self.market_client.install(skill_name, version, install_path, self.registry)
def search_market(self, query: str) -> List[Dict]:
return self.market_client.search(query)
def get_skill_status(self, skill_name: str) -> Dict:
skill_meta = self.registry.get_skill(skill_name)
if not skill_meta:
return {'found': False}
dependencies_ok = self.dependency_resolver.check_dependencies(skill_name)
dependency_tree = self.dependency_resolver.get_dependency_tree(skill_name)
return {
'found': True,
'name': skill_meta.name,
'version': skill_meta.version,
'loaded': skill_meta.loaded,
'dependencies': skill_meta.dependencies,
'dependencies_ok': dependencies_ok,
'dependency_tree': dependency_tree
}
def get_all_skills_status(self) -> List[Dict]:
return [
self.get_skill_status(name)
for name in self.registry.list_skills()
]实践任务
任务1:实现技能发现器
实现一个完整的技能发现器。
步骤:
- 创建SkillRegistry类
- 实现SkillDiscovery类
- 实现SkillLoader类
- 实现DependencyResolver类
- 测试发现和加载功能
输出:
- 完整的技能发现器实现
- 测试代码
- 使用示例
任务2:构建技能生态系统
构建一个简单的技能生态系统。
步骤:
- 实现SkillMarketClient
- 实现SkillManager
- 创建技能市场API(可选)
- 实现技能安装和卸载
- 测试生态系统功能
输出:
- 技能生态系统实现
- API文档
- 使用示例
任务3:优化性能
优化技能发现和加载的性能。
步骤:
- 实现缓存机制
- 优化扫描算法
- 实现并行加载
- 添加性能监控
- 性能测试和对比
输出:
- 优化后的代码
- 性能测试报告
- 优化建议
代码示例
示例1:完整的技能管理系统
python
import os
import sys
from pathlib import Path
from typing import Dict, List, Optional
class SkillManagementSystem:
def __init__(self, skills_dir: str = "./skills"):
self.skills_dir = Path(skills_dir)
self.skills_dir.mkdir(exist_ok=True)
self.registry = AdvancedSkillRegistry()
self.discovery = AdvancedSkillDiscovery(self.registry)
self.loader = SkillLoader(self.registry)
self.batch_loader = BatchSkillLoader(self.loader)
self.dependency_resolver = DependencyResolver(self.registry)
self.manager = SkillManager(self.registry)
def initialize(self):
print("Initializing Skill Management System...")
if not self.skills_dir.exists():
print(f"Creating skills directory: {self.skills_dir}")
self.skills_dir.mkdir(parents=True, exist_ok=True)
print("Discovering skills...")
result = self.discover_and_load(load_all=True)
print(f"\nDiscovered {len(result['discovered'])} skills")
print(f"Loaded {len(result['loaded'])} skills")
if result['failed']:
print(f"Failed to load {len(result['failed'])} skills:")
for name in result['failed']:
print(f" - {name}")
def discover_and_load(self, load_all: bool = False) -> Dict[str, List[str]]:
return self.manager.discover_and_load(str(self.skills_dir), load_all)
def list_skills(self, show_details: bool = False) -> List[Dict]:
skills = self.registry.list_skills()
if show_details:
return [self.manager.get_skill_status(name) for name in skills]
return [{'name': name} for name in skills]
def load_skill(self, skill_name: str) -> bool:
try:
self.loader.load(skill_name)
return True
except Exception as e:
print(f"Failed to load {skill_name}: {e}")
return False
def unload_skill(self, skill_name: str) -> bool:
try:
self.loader.unload(skill_name)
return True
except Exception as e:
print(f"Failed to unload {skill_name}: {e}")
return False
def reload_skill(self, skill_name: str) -> bool:
try:
self.loader.reload(skill_name)
return True
except Exception as e:
print(f"Failed to reload {skill_name}: {e}")
return False
def install_skill(self, skill_name: str, version: str) -> bool:
try:
result = self.manager.install_from_market(
skill_name,
version,
str(self.skills_dir)
)
if result['success']:
print(f"Successfully installed {skill_name} v{version}")
return True
else:
print(f"Failed to install {skill_name}: {result.get('error')}")
return False
except Exception as e:
print(f"Failed to install {skill_name}: {e}")
return False
def search_skills(self, query: str) -> List[Dict]:
return self.manager.search_market(query)
def get_skill_info(self, skill_name: str) -> Optional[Dict]:
return self.manager.get_skill_status(skill_name)
def check_dependencies(self, skill_name: str) -> Dict[str, bool]:
return self.dependency_resolver.check_dependencies(skill_name)
def get_dependency_tree(self, skill_name: str) -> Dict:
return self.dependency_resolver.get_dependency_tree(skill_name)
def detect_cycles(self) -> List[List[str]]:
return self.dependency_resolver.detect_circular_dependencies()
# 使用示例
if __name__ == "__main__":
sms = SkillManagementSystem("./skills")
# 初始化系统
sms.initialize()
# 列出所有技能
print("\n=== All Skills ===")
for skill in sms.list_skills(show_details=True):
print(f"- {skill['name']} v{skill['version']} (Loaded: {skill['loaded']})")
# 搜索技能
print("\n=== Search Skills ===")
results = sms.search_skills("text")
for result in results[:5]:
print(f"- {result['name']}: {result['description']}")
# 获取技能信息
print("\n=== Skill Info ===")
skill_info = sms.get_skill_info("text-analyzer")
if skill_info:
print(f"Name: {skill_info['name']}")
print(f"Version: {skill_info['version']}")
print(f"Loaded: {skill_info['loaded']}")
print(f"Dependencies: {skill_info['dependencies']}")示例2:技能发现和加载测试
python
import unittest
import tempfile
import shutil
from pathlib import Path
class TestSkillDiscovery(unittest.TestCase):
def setUp(self):
self.temp_dir = tempfile.mkdtemp()
self.skills_dir = Path(self.temp_dir) / "skills"
self.skills_dir.mkdir()
self.create_test_skill("skill1")
self.create_test_skill("skill2")
self.registry = AdvancedSkillRegistry()
self.discovery = AdvancedSkillDiscovery(self.registry)
self.loader = SkillLoader(self.registry)
def tearDown(self):
shutil.rmtree(self.temp_dir)
def create_test_skill(self, skill_name: str):
skill_dir = self.skills_dir / skill_name
skill_dir.mkdir()
skill_md = skill_dir / "skill.md"
skill_md.write_text(f"""---
name: "{skill_name}"
version: "1.0.0"
author: "Test Author"
description: "Test skill"
tags: ["test"]
license: "MIT"
dependencies: []
---
# {skill_name}
Test skill description.
""")
src_dir = skill_dir / "src"
src_dir.mkdir()
main_py = src_dir / "main.py"
class_name = ''.join(word.capitalize() for word in skill_name.split('-'))
main_py.write_text(f"""class {class_name}:
def __init__(self):
self.name = "{skill_name}"
def execute(self, params):
return {{"success": True, "data": "test"}}
""")
def test_discovery(self):
discovered = self.discovery.discover(str(self.skills_dir))
self.assertEqual(len(discovered), 2)
self.assertIn("skill1", discovered)
self.assertIn("skill2", discovered)
def test_loading(self):
self.discovery.discover(str(self.skills_dir))
skill1 = self.loader.load("skill1")
self.assertIsNotNone(skill1)
self.assertEqual(skill1.name, "skill1")
skill2 = self.loader.load("skill2")
self.assertIsNotNone(skill2)
self.assertEqual(skill2.name, "skill2")
def test_unloading(self):
self.discovery.discover(str(self.skills_dir))
self.loader.load("skill1")
self.loader.unload("skill1")
skill1_meta = self.registry.get_skill("skill1")
self.assertFalse(skill1_meta.loaded)
def test_reloading(self):
self.discovery.discover(str(self.skills_dir))
skill1 = self.loader.load("skill1")
reloaded = self.loader.reload("skill1")
self.assertIsNotNone(reloaded)
self.assertEqual(reloaded.name, "skill1")
def test_dependency_resolution(self):
# 创建有依赖的技能
skill3_dir = self.skills_dir / "skill3"
skill3_dir.mkdir()
skill3_md = skill3_dir / "skill.md"
skill3_md.write_text("""---
name: "skill3"
version: "1.0.0"
author: "Test Author"
description: "Test skill with dependencies"
tags: ["test"]
license: "MIT"
dependencies:
- skill1
- skill2
---
# Skill3
Test skill with dependencies.
""")
src_dir = skill3_dir / "src"
src_dir.mkdir()
main_py = src_dir / "main.py"
main_py.write_text("""class Skill3:
def __init__(self):
self.name = "skill3"
def execute(self, params):
return {"success": True, "data": "test"}
""")
self.discovery.discover(str(self.skills_dir))
load_order = self.loader.dependency_resolver.resolve_load_order(["skill3"])
self.assertIn("skill1", load_order)
self.assertIn("skill2", load_order)
self.assertIn("skill3", load_order)
self.assertLess(load_order.index("skill1"), load_order.index("skill3"))
self.assertLess(load_order.index("skill2"), load_order.index("skill3"))
if __name__ == "__main__":
unittest.main()总结
Skills能力发现是Skills生态系统的核心功能,它允许系统自动发现、加载和管理Skills。本节我们学习了:
- 能力发现机制概述
- 技能注册机制
- 技能发现器
- 动态加载机制
- 依赖解析
- 技能生态系统
下一步,我们将学习Skills最佳实践,掌握如何编写高质量的Skills。
