Skip to content

第31天:Skills能力发现

学习目标

  • 理解Skills能力发现机制
  • 掌握技能注册和加载流程
  • 学会实现技能发现器
  • 能够动态加载和管理Skills
  • 掌握技能依赖解析
  • 学会构建技能生态系统

核心内容

能力发现机制概述

什么是能力发现

能力发现(Capability Discovery)是指系统自动检测、识别和加载可用Skills的过程。在Skills标准中,能力发现机制允许:

  1. 自动扫描:自动扫描指定目录中的Skills
  2. 动态加载:运行时动态加载和卸载Skills
  3. 依赖解析:自动解析和管理Skills之间的依赖关系
  4. 版本管理:支持多版本Skills共存
  5. 热更新:支持Skills的热更新和重新加载

能力发现的重要性

提高开发效率

  • 无需手动注册每个Skill
  • 自动发现新添加的Skills
  • 减少配置错误

增强可扩展性

  • 轻松添加新Skills
  • 支持插件化架构
  • 便于第三方集成

简化维护

  • 集中管理Skills
  • 统一更新和卸载
  • 版本控制更简单

能力发现流程

1. 扫描指定目录

2. 识别skill.md文件

3. 解析Front Matter

4. 验证Skill有效性

5. 解析依赖关系

6. 注册Skill

7. 加载Skill实现

8. 初始化Skill

9. Skill可用

技能注册机制

基础注册器

python
from typing import Dict, List, Optional
from pathlib import Path
import yaml
import importlib.util

class SkillRegistry:
    def __init__(self):
        self.skills: Dict[str, Dict] = {}
        self.skill_instances: Dict[str, object] = {}
    
    def register(self, skill_path: str, skill_info: Dict):
        skill_name = skill_info.get('name')
        if not skill_name:
            raise ValueError("Skill name is required")
        
        if skill_name in self.skills:
            raise ValueError(f"Skill {skill_name} already registered")
        
        self.skills[skill_name] = {
            'path': skill_path,
            'info': skill_info,
            'loaded': False
        }
        
        print(f"Skill {skill_name} registered successfully")
    
    def unregister(self, skill_name: str):
        if skill_name not in self.skills:
            raise ValueError(f"Skill {skill_name} not found")
        
        if skill_name in self.skill_instances:
            del self.skill_instances[skill_name]
        
        del self.skills[skill_name]
        print(f"Skill {skill_name} unregistered successfully")
    
    def get_skill(self, skill_name: str) -> Optional[Dict]:
        return self.skills.get(skill_name)
    
    def list_skills(self) -> List[str]:
        return list(self.skills.keys())

高级注册器

python
from typing import Dict, List, Optional, Set
from pathlib import Path
import yaml
import importlib.util
from dataclasses import dataclass
from datetime import datetime

@dataclass
class SkillMetadata:
    name: str
    version: str
    author: str
    description: str
    tags: List[str]
    license: str
    dependencies: List[str]
    path: str
    registered_at: datetime
    loaded: bool = False

class AdvancedSkillRegistry:
    def __init__(self):
        self.skills: Dict[str, SkillMetadata] = {}
        self.skill_instances: Dict[str, object] = {}
        self.dependency_graph: Dict[str, Set[str]] = {}
    
    def register(self, skill_path: str, skill_info: Dict):
        skill_name = skill_info.get('name')
        if not skill_name:
            raise ValueError("Skill name is required")
        
        if skill_name in self.skills:
            raise ValueError(f"Skill {skill_name} already registered")
        
        metadata = SkillMetadata(
            name=skill_name,
            version=skill_info.get('version', '1.0.0'),
            author=skill_info.get('author', 'Unknown'),
            description=skill_info.get('description', ''),
            tags=skill_info.get('tags', []),
            license=skill_info.get('license', 'MIT'),
            dependencies=skill_info.get('dependencies', []),
            path=skill_path,
            registered_at=datetime.now(),
            loaded=False
        )
        
        self.skills[skill_name] = metadata
        self._build_dependency_graph(skill_name, metadata.dependencies)
        
        print(f"Skill {skill_name} v{metadata.version} registered successfully")
    
    def unregister(self, skill_name: str):
        if skill_name not in self.skills:
            raise ValueError(f"Skill {skill_name} not found")
        
        # 检查是否有其他Skills依赖此Skill
        dependents = self._get_dependents(skill_name)
        if dependents:
            raise ValueError(
                f"Cannot unregister {skill_name}: "
                f"Depended by: {', '.join(dependents)}"
            )
        
        if skill_name in self.skill_instances:
            del self.skill_instances[skill_name]
        
        del self.skills[skill_name]
        self._remove_from_dependency_graph(skill_name)
        
        print(f"Skill {skill_name} unregistered successfully")
    
    def get_skill(self, skill_name: str) -> Optional[SkillMetadata]:
        return self.skills.get(skill_name)
    
    def list_skills(self, loaded_only: bool = False) -> List[str]:
        if loaded_only:
            return [name for name, meta in self.skills.items() if meta.loaded]
        return list(self.skills.keys())
    
    def search_skills(self, query: str) -> List[str]:
        query = query.lower()
        results = []
        
        for name, meta in self.skills.items():
            if (query in name.lower() or 
                query in meta.description.lower() or
                any(query in tag.lower() for tag in meta.tags)):
                results.append(name)
        
        return results
    
    def _build_dependency_graph(self, skill_name: str, dependencies: List[str]):
        self.dependency_graph[skill_name] = set(dependencies)
    
    def _remove_from_dependency_graph(self, skill_name: str):
        if skill_name in self.dependency_graph:
            del self.dependency_graph[skill_name]
        
        for deps in self.dependency_graph.values():
            deps.discard(skill_name)
    
    def _get_dependents(self, skill_name: str) -> List[str]:
        dependents = []
        for name, deps in self.dependency_graph.items():
            if skill_name in deps:
                dependents.append(name)
        return dependents

技能发现器

基础发现器

python
from typing import List, Dict
from pathlib import Path
import yaml

class SkillDiscovery:
    def __init__(self, registry: SkillRegistry):
        self.registry = registry
    
    def discover(self, search_path: str) -> List[str]:
        search_dir = Path(search_path)
        if not search_dir.exists():
            raise ValueError(f"Search path {search_path} does not exist")
        
        discovered_skills = []
        
        for skill_md in search_dir.glob("**/skill.md"):
            try:
                skill_info = self._parse_skill_md(skill_md)
                self.registry.register(str(skill_md.parent), skill_info)
                discovered_skills.append(skill_info['name'])
            except Exception as e:
                print(f"Failed to load skill from {skill_md}: {e}")
        
        return discovered_skills
    
    def _parse_skill_md(self, skill_md_path: Path) -> Dict:
        with open(skill_md_path, 'r', encoding='utf-8') as f:
            content = f.read()
        
        lines = content.split('\n')
        if lines[0] != '---':
            raise ValueError("Invalid skill.md format")
        
        end_index = lines[1:].index('---') + 1
        front_matter_text = '\n'.join(lines[1:end_index])
        
        skill_info = yaml.safe_load(front_matter_text)
        return skill_info

高级发现器

python
from typing import List, Dict, Set
from pathlib import Path
import yaml
import json
import hashlib
from datetime import datetime

class AdvancedSkillDiscovery:
    def __init__(self, registry: AdvancedSkillRegistry):
        self.registry = registry
        self.discovery_cache: Dict[str, Dict] = {}
    
    def discover(
        self, 
        search_path: str, 
        recursive: bool = True,
        use_cache: bool = True
    ) -> List[str]:
        search_dir = Path(search_path)
        if not search_dir.exists():
            raise ValueError(f"Search path {search_path} does not exist")
        
        discovered_skills = []
        
        if recursive:
            skill_md_files = list(search_dir.glob("**/skill.md"))
        else:
            skill_md_files = list(search_dir.glob("skill.md"))
        
        for skill_md in skill_md_files:
            try:
                if use_cache and self._is_cached(skill_md):
                    skill_info = self._get_cached(skill_md)
                else:
                    skill_info = self._parse_skill_md(skill_md)
                    self._cache_skill(skill_md, skill_info)
                
                self.registry.register(str(skill_md.parent), skill_info)
                discovered_skills.append(skill_info['name'])
            except Exception as e:
                print(f"Failed to load skill from {skill_md}: {e}")
        
        return discovered_skills
    
    def discover_from_config(self, config_path: str) -> List[str]:
        config_file = Path(config_path)
        if not config_file.exists():
            raise ValueError(f"Config file {config_path} does not exist")
        
        with open(config_file, 'r', encoding='utf-8') as f:
            config = yaml.safe_load(f)
        
        discovered_skills = []
        
        for skill_config in config.get('skills', []):
            skill_path = skill_config.get('path')
            if skill_path:
                try:
                    skill_md = Path(skill_path) / "skill.md"
                    skill_info = self._parse_skill_md(skill_md)
                    
                    if 'enabled' in skill_config:
                        skill_info['enabled'] = skill_config['enabled']
                    
                    self.registry.register(skill_path, skill_info)
                    discovered_skills.append(skill_info['name'])
                except Exception as e:
                    print(f"Failed to load skill from {skill_path}: {e}")
        
        return discovered_skills
    
    def _parse_skill_md(self, skill_md_path: Path) -> Dict:
        with open(skill_md_path, 'r', encoding='utf-8') as f:
            content = f.read()
        
        lines = content.split('\n')
        if lines[0] != '---':
            raise ValueError("Invalid skill.md format")
        
        end_index = lines[1:].index('---') + 1
        front_matter_text = '\n'.join(lines[1:end_index])
        
        skill_info = yaml.safe_load(front_matter_text)
        return skill_info
    
    def _is_cached(self, skill_md_path: Path) -> bool:
        cache_key = self._get_cache_key(skill_md_path)
        if cache_key not in self.discovery_cache:
            return False
        
        cached_info = self.discovery_cache[cache_key]
        if not skill_md_path.exists():
            return False
        
        current_mtime = skill_md_path.stat().st_mtime
        return cached_info.get('mtime') == current_mtime
    
    def _get_cached(self, skill_md_path: Path) -> Dict:
        cache_key = self._get_cache_key(skill_md_path)
        return self.discovery_cache[cache_key]['data']
    
    def _cache_skill(self, skill_md_path: Path, skill_info: Dict):
        cache_key = self._get_cache_key(skill_md_path)
        self.discovery_cache[cache_key] = {
            'mtime': skill_md_path.stat().st_mtime,
            'data': skill_info
        }
    
    def _get_cache_key(self, skill_md_path: Path) -> str:
        return str(skill_md_path.resolve())
    
    def clear_cache(self):
        self.discovery_cache.clear()

动态加载机制

技能加载器

python
from typing import Dict, Any, Optional
from pathlib import Path
import importlib.util
import sys

class SkillLoader:
    def __init__(self, registry: AdvancedSkillRegistry):
        self.registry = registry
    
    def load(self, skill_name: str) -> object:
        skill_meta = self.registry.get_skill(skill_name)
        if not skill_meta:
            raise ValueError(f"Skill {skill_name} not found")
        
        if skill_meta.loaded:
            print(f"Skill {skill_name} is already loaded")
            return self.registry.skill_instances.get(skill_name)
        
        # 检查依赖
        self._check_dependencies(skill_meta)
        
        # 加载Skill实现
        skill_instance = self._load_skill_implementation(skill_meta)
        
        # 注册实例
        self.registry.skill_instances[skill_name] = skill_instance
        skill_meta.loaded = True
        
        print(f"Skill {skill_name} loaded successfully")
        return skill_instance
    
    def unload(self, skill_name: str):
        skill_meta = self.registry.get_skill(skill_name)
        if not skill_meta:
            raise ValueError(f"Skill {skill_name} not found")
        
        if not skill_meta.loaded:
            print(f"Skill {skill_name} is not loaded")
            return
        
        # 检查是否有其他Skills依赖此Skill
        dependents = self.registry._get_dependents(skill_name)
        if dependents:
            raise ValueError(
                f"Cannot unload {skill_name}: "
                f"Depended by: {', '.join(dependents)}"
            )
        
        # 移除实例
        if skill_name in self.registry.skill_instances:
            del self.registry.skill_instances[skill_name]
        
        skill_meta.loaded = False
        print(f"Skill {skill_name} unloaded successfully")
    
    def reload(self, skill_name: str) -> object:
        skill_meta = self.registry.get_skill(skill_name)
        if not skill_meta:
            raise ValueError(f"Skill {skill_name} not found")
        
        if skill_meta.loaded:
            self.unload(skill_name)
        
        return self.load(skill_name)
    
    def _check_dependencies(self, skill_meta: SkillMetadata):
        for dep in skill_meta.dependencies:
            dep_meta = self.registry.get_skill(dep)
            if not dep_meta:
                raise ValueError(
                    f"Dependency {dep} not found for skill {skill_meta.name}"
                )
            
            if not dep_meta.loaded:
                print(f"Loading dependency {dep} for {skill_meta.name}")
                self.load(dep)
    
    def _load_skill_implementation(self, skill_meta: SkillMetadata) -> object:
        skill_path = Path(skill_meta.path)
        
        # 尝试加载main.py
        main_py = skill_path / "src" / "main.py"
        if not main_py.exists():
            main_py = skill_path / "main.py"
        
        if not main_py.exists():
            raise ValueError(
                f"No implementation file found for skill {skill_meta.name}"
            )
        
        # 动态导入模块
        spec = importlib.util.spec_from_file_location(
            f"{skill_meta.name}_module",
            main_py
        )
        
        if spec is None or spec.loader is None:
            raise ValueError(f"Failed to load module for skill {skill_meta.name}")
        
        module = importlib.util.module_from_spec(spec)
        sys.modules[f"{skill_meta.name}_module"] = module
        spec.loader.exec_module(module)
        
        # 获取Skill类
        skill_class_name = self._get_skill_class_name(skill_meta.name)
        skill_class = getattr(module, skill_class_name, None)
        
        if skill_class is None:
            raise ValueError(
                f"Skill class {skill_class_name} not found in module"
            )
        
        # 实例化Skill
        skill_instance = skill_class()
        return skill_instance
    
    def _get_skill_class_name(self, skill_name: str) -> str:
        return ''.join(word.capitalize() for word in skill_name.split('-'))

批量加载器

python
from typing import List
from concurrent.futures import ThreadPoolExecutor, as_completed

class BatchSkillLoader:
    def __init__(self, loader: SkillLoader):
        self.loader = loader
    
    def load_all(self, skill_names: List[str], parallel: bool = False) -> Dict[str, Any]:
        results = {}
        
        if parallel:
            with ThreadPoolExecutor() as executor:
                future_to_name = {
                    executor.submit(self.loader.load, name): name
                    for name in skill_names
                }
                
                for future in as_completed(future_to_name):
                    name = future_to_name[future]
                    try:
                        results[name] = future.result()
                    except Exception as e:
                        print(f"Failed to load {name}: {e}")
                        results[name] = None
        else:
            for name in skill_names:
                try:
                    results[name] = self.loader.load(name)
                except Exception as e:
                    print(f"Failed to load {name}: {e}")
                    results[name] = None
        
        return results
    
    def unload_all(self, skill_names: List[str]) -> Dict[str, bool]:
        results = {}
        
        for name in skill_names:
            try:
                self.loader.unload(name)
                results[name] = True
            except Exception as e:
                print(f"Failed to unload {name}: {e}")
                results[name] = False
        
        return results

依赖解析

依赖解析器

python
from typing import List, Dict, Set
from collections import defaultdict, deque

class DependencyResolver:
    def __init__(self, registry: AdvancedSkillRegistry):
        self.registry = registry
    
    def resolve_load_order(self, skill_names: List[str]) -> List[str]:
        load_order = []
        visited = set()
        visiting = set()
        
        def visit(skill_name: str):
            if skill_name in visited:
                return
            
            if skill_name in visiting:
                raise ValueError(f"Circular dependency detected involving {skill_name}")
            
            visiting.add(skill_name)
            
            skill_meta = self.registry.get_skill(skill_name)
            if not skill_meta:
                raise ValueError(f"Skill {skill_name} not found")
            
            for dep in skill_meta.dependencies:
                visit(dep)
            
            visiting.remove(skill_name)
            visited.add(skill_name)
            load_order.append(skill_name)
        
        for name in skill_names:
            visit(name)
        
        return load_order
    
    def check_dependencies(self, skill_name: str) -> Dict[str, bool]:
        skill_meta = self.registry.get_skill(skill_name)
        if not skill_meta:
            raise ValueError(f"Skill {skill_name} not found")
        
        result = {}
        
        for dep in skill_meta.dependencies:
            dep_meta = self.registry.get_skill(dep)
            result[dep] = dep_meta is not None and dep_meta.loaded
        
        return result
    
    def get_dependency_tree(self, skill_name: str) -> Dict:
        skill_meta = self.registry.get_skill(skill_name)
        if not skill_meta:
            raise ValueError(f"Skill {skill_name} not found")
        
        def build_tree(name: str) -> Dict:
            skill_meta = self.registry.get_skill(name)
            if not skill_meta:
                return {'name': name, 'found': False}
            
            tree = {
                'name': name,
                'version': skill_meta.version,
                'found': True,
                'loaded': skill_meta.loaded,
                'dependencies': []
            }
            
            for dep in skill_meta.dependencies:
                tree['dependencies'].append(build_tree(dep))
            
            return tree
        
        return build_tree(skill_name)
    
    def detect_circular_dependencies(self) -> List[List[str]]:
        cycles = []
        visited = set()
        rec_stack = set()
        path = []
        
        def dfs(skill_name: str):
            if skill_name in rec_stack:
                cycle_start = path.index(skill_name)
                cycles.append(path[cycle_start:] + [skill_name])
                return
            
            if skill_name in visited:
                return
            
            visited.add(skill_name)
            rec_stack.add(skill_name)
            path.append(skill_name)
            
            skill_meta = self.registry.get_skill(skill_name)
            if skill_meta:
                for dep in skill_meta.dependencies:
                    dfs(dep)
            
            path.pop()
            rec_stack.remove(skill_name)
        
        for skill_name in self.registry.list_skills():
            dfs(skill_name)
        
        return cycles

技能生态系统

技能市场客户端

python
import requests
from typing import List, Dict, Optional
from pathlib import Path
import zipfile
import shutil

class SkillMarketClient:
    def __init__(self, base_url: str = "https://skills.market/api"):
        self.base_url = base_url
    
    def search(self, query: str, limit: int = 10) -> List[Dict]:
        response = requests.get(
            f"{self.base_url}/search",
            params={'q': query, 'limit': limit}
        )
        response.raise_for_status()
        return response.json()['results']
    
    def get_skill_info(self, skill_name: str) -> Dict:
        response = requests.get(f"{self.base_url}/skills/{skill_name}")
        response.raise_for_status()
        return response.json()
    
    def download(self, skill_name: str, version: str, dest_path: str) -> str:
        response = requests.get(
            f"{self.base_url}/skills/{skill_name}/download",
            params={'version': version},
            stream=True
        )
        response.raise_for_status()
        
        dest_dir = Path(dest_path) / skill_name
        dest_dir.mkdir(parents=True, exist_ok=True)
        
        zip_path = dest_dir / f"{skill_name}-{version}.zip"
        
        with open(zip_path, 'wb') as f:
            for chunk in response.iter_content(chunk_size=8192):
                f.write(chunk)
        
        with zipfile.ZipFile(zip_path, 'r') as zip_ref:
            zip_ref.extractall(dest_dir)
        
        zip_path.unlink()
        
        return str(dest_dir)
    
    def install(
        self, 
        skill_name: str, 
        version: str, 
        install_path: str,
        registry: AdvancedSkillRegistry
    ) -> Dict:
        try:
            skill_path = self.download(skill_name, version, install_path)
            
            discovery = AdvancedSkillDiscovery(registry)
            discovered = discovery.discover(skill_path, recursive=False)
            
            return {
                'success': True,
                'skill_name': skill_name,
                'version': version,
                'path': skill_path,
                'discovered': discovered
            }
        except Exception as e:
            return {
                'success': False,
                'skill_name': skill_name,
                'error': str(e)
            }
    
    def list_categories(self) -> List[str]:
        response = requests.get(f"{self.base_url}/categories")
        response.raise_for_status()
        return response.json()['categories']
    
    def get_trending(self, limit: int = 10) -> List[Dict]:
        response = requests.get(
            f"{self.base_url}/trending",
            params={'limit': limit}
        )
        response.raise_for_status()
        return response.json()['skills']

技能管理器

python
from typing import Dict, List, Optional

class SkillManager:
    def __init__(self, registry: AdvancedSkillRegistry):
        self.registry = registry
        self.discovery = AdvancedSkillDiscovery(registry)
        self.loader = SkillLoader(registry)
        self.batch_loader = BatchSkillLoader(self.loader)
        self.dependency_resolver = DependencyResolver(registry)
        self.market_client = SkillMarketClient()
    
    def discover_and_load(
        self, 
        search_path: str, 
        load_all: bool = False
    ) -> Dict[str, Any]:
        discovered = self.discovery.discover(search_path)
        
        result = {
            'discovered': discovered,
            'loaded': [],
            'failed': []
        }
        
        if load_all:
            load_order = self.dependency_resolver.resolve_load_order(discovered)
            loaded = self.batch_loader.load_all(load_order)
            
            for name, instance in loaded.items():
                if instance is not None:
                    result['loaded'].append(name)
                else:
                    result['failed'].append(name)
        
        return result
    
    def install_from_market(
        self, 
        skill_name: str, 
        version: str, 
        install_path: str
    ) -> Dict:
        return self.market_client.install(skill_name, version, install_path, self.registry)
    
    def search_market(self, query: str) -> List[Dict]:
        return self.market_client.search(query)
    
    def get_skill_status(self, skill_name: str) -> Dict:
        skill_meta = self.registry.get_skill(skill_name)
        if not skill_meta:
            return {'found': False}
        
        dependencies_ok = self.dependency_resolver.check_dependencies(skill_name)
        dependency_tree = self.dependency_resolver.get_dependency_tree(skill_name)
        
        return {
            'found': True,
            'name': skill_meta.name,
            'version': skill_meta.version,
            'loaded': skill_meta.loaded,
            'dependencies': skill_meta.dependencies,
            'dependencies_ok': dependencies_ok,
            'dependency_tree': dependency_tree
        }
    
    def get_all_skills_status(self) -> List[Dict]:
        return [
            self.get_skill_status(name)
            for name in self.registry.list_skills()
        ]

实践任务

任务1:实现技能发现器

实现一个完整的技能发现器。

步骤

  1. 创建SkillRegistry类
  2. 实现SkillDiscovery类
  3. 实现SkillLoader类
  4. 实现DependencyResolver类
  5. 测试发现和加载功能

输出

  • 完整的技能发现器实现
  • 测试代码
  • 使用示例

任务2:构建技能生态系统

构建一个简单的技能生态系统。

步骤

  1. 实现SkillMarketClient
  2. 实现SkillManager
  3. 创建技能市场API(可选)
  4. 实现技能安装和卸载
  5. 测试生态系统功能

输出

  • 技能生态系统实现
  • API文档
  • 使用示例

任务3:优化性能

优化技能发现和加载的性能。

步骤

  1. 实现缓存机制
  2. 优化扫描算法
  3. 实现并行加载
  4. 添加性能监控
  5. 性能测试和对比

输出

  • 优化后的代码
  • 性能测试报告
  • 优化建议

代码示例

示例1:完整的技能管理系统

python
import os
import sys
from pathlib import Path
from typing import Dict, List, Optional

class SkillManagementSystem:
    def __init__(self, skills_dir: str = "./skills"):
        self.skills_dir = Path(skills_dir)
        self.skills_dir.mkdir(exist_ok=True)
        
        self.registry = AdvancedSkillRegistry()
        self.discovery = AdvancedSkillDiscovery(self.registry)
        self.loader = SkillLoader(self.registry)
        self.batch_loader = BatchSkillLoader(self.loader)
        self.dependency_resolver = DependencyResolver(self.registry)
        self.manager = SkillManager(self.registry)
    
    def initialize(self):
        print("Initializing Skill Management System...")
        
        if not self.skills_dir.exists():
            print(f"Creating skills directory: {self.skills_dir}")
            self.skills_dir.mkdir(parents=True, exist_ok=True)
        
        print("Discovering skills...")
        result = self.discover_and_load(load_all=True)
        
        print(f"\nDiscovered {len(result['discovered'])} skills")
        print(f"Loaded {len(result['loaded'])} skills")
        
        if result['failed']:
            print(f"Failed to load {len(result['failed'])} skills:")
            for name in result['failed']:
                print(f"  - {name}")
    
    def discover_and_load(self, load_all: bool = False) -> Dict[str, List[str]]:
        return self.manager.discover_and_load(str(self.skills_dir), load_all)
    
    def list_skills(self, show_details: bool = False) -> List[Dict]:
        skills = self.registry.list_skills()
        
        if show_details:
            return [self.manager.get_skill_status(name) for name in skills]
        
        return [{'name': name} for name in skills]
    
    def load_skill(self, skill_name: str) -> bool:
        try:
            self.loader.load(skill_name)
            return True
        except Exception as e:
            print(f"Failed to load {skill_name}: {e}")
            return False
    
    def unload_skill(self, skill_name: str) -> bool:
        try:
            self.loader.unload(skill_name)
            return True
        except Exception as e:
            print(f"Failed to unload {skill_name}: {e}")
            return False
    
    def reload_skill(self, skill_name: str) -> bool:
        try:
            self.loader.reload(skill_name)
            return True
        except Exception as e:
            print(f"Failed to reload {skill_name}: {e}")
            return False
    
    def install_skill(self, skill_name: str, version: str) -> bool:
        try:
            result = self.manager.install_from_market(
                skill_name, 
                version, 
                str(self.skills_dir)
            )
            
            if result['success']:
                print(f"Successfully installed {skill_name} v{version}")
                return True
            else:
                print(f"Failed to install {skill_name}: {result.get('error')}")
                return False
        except Exception as e:
            print(f"Failed to install {skill_name}: {e}")
            return False
    
    def search_skills(self, query: str) -> List[Dict]:
        return self.manager.search_market(query)
    
    def get_skill_info(self, skill_name: str) -> Optional[Dict]:
        return self.manager.get_skill_status(skill_name)
    
    def check_dependencies(self, skill_name: str) -> Dict[str, bool]:
        return self.dependency_resolver.check_dependencies(skill_name)
    
    def get_dependency_tree(self, skill_name: str) -> Dict:
        return self.dependency_resolver.get_dependency_tree(skill_name)
    
    def detect_cycles(self) -> List[List[str]]:
        return self.dependency_resolver.detect_circular_dependencies()

# 使用示例
if __name__ == "__main__":
    sms = SkillManagementSystem("./skills")
    
    # 初始化系统
    sms.initialize()
    
    # 列出所有技能
    print("\n=== All Skills ===")
    for skill in sms.list_skills(show_details=True):
        print(f"- {skill['name']} v{skill['version']} (Loaded: {skill['loaded']})")
    
    # 搜索技能
    print("\n=== Search Skills ===")
    results = sms.search_skills("text")
    for result in results[:5]:
        print(f"- {result['name']}: {result['description']}")
    
    # 获取技能信息
    print("\n=== Skill Info ===")
    skill_info = sms.get_skill_info("text-analyzer")
    if skill_info:
        print(f"Name: {skill_info['name']}")
        print(f"Version: {skill_info['version']}")
        print(f"Loaded: {skill_info['loaded']}")
        print(f"Dependencies: {skill_info['dependencies']}")

示例2:技能发现和加载测试

python
import unittest
import tempfile
import shutil
from pathlib import Path

class TestSkillDiscovery(unittest.TestCase):
    def setUp(self):
        self.temp_dir = tempfile.mkdtemp()
        self.skills_dir = Path(self.temp_dir) / "skills"
        self.skills_dir.mkdir()
        
        self.create_test_skill("skill1")
        self.create_test_skill("skill2")
        
        self.registry = AdvancedSkillRegistry()
        self.discovery = AdvancedSkillDiscovery(self.registry)
        self.loader = SkillLoader(self.registry)
    
    def tearDown(self):
        shutil.rmtree(self.temp_dir)
    
    def create_test_skill(self, skill_name: str):
        skill_dir = self.skills_dir / skill_name
        skill_dir.mkdir()
        
        skill_md = skill_dir / "skill.md"
        skill_md.write_text(f"""---
name: "{skill_name}"
version: "1.0.0"
author: "Test Author"
description: "Test skill"
tags: ["test"]
license: "MIT"
dependencies: []
---
# {skill_name}

Test skill description.
""")
        
        src_dir = skill_dir / "src"
        src_dir.mkdir()
        
        main_py = src_dir / "main.py"
        class_name = ''.join(word.capitalize() for word in skill_name.split('-'))
        main_py.write_text(f"""class {class_name}:
    def __init__(self):
        self.name = "{skill_name}"
    
    def execute(self, params):
        return {{"success": True, "data": "test"}}
""")
    
    def test_discovery(self):
        discovered = self.discovery.discover(str(self.skills_dir))
        
        self.assertEqual(len(discovered), 2)
        self.assertIn("skill1", discovered)
        self.assertIn("skill2", discovered)
    
    def test_loading(self):
        self.discovery.discover(str(self.skills_dir))
        
        skill1 = self.loader.load("skill1")
        self.assertIsNotNone(skill1)
        self.assertEqual(skill1.name, "skill1")
        
        skill2 = self.loader.load("skill2")
        self.assertIsNotNone(skill2)
        self.assertEqual(skill2.name, "skill2")
    
    def test_unloading(self):
        self.discovery.discover(str(self.skills_dir))
        self.loader.load("skill1")
        
        self.loader.unload("skill1")
        
        skill1_meta = self.registry.get_skill("skill1")
        self.assertFalse(skill1_meta.loaded)
    
    def test_reloading(self):
        self.discovery.discover(str(self.skills_dir))
        skill1 = self.loader.load("skill1")
        
        reloaded = self.loader.reload("skill1")
        
        self.assertIsNotNone(reloaded)
        self.assertEqual(reloaded.name, "skill1")
    
    def test_dependency_resolution(self):
        # 创建有依赖的技能
        skill3_dir = self.skills_dir / "skill3"
        skill3_dir.mkdir()
        
        skill3_md = skill3_dir / "skill.md"
        skill3_md.write_text("""---
name: "skill3"
version: "1.0.0"
author: "Test Author"
description: "Test skill with dependencies"
tags: ["test"]
license: "MIT"
dependencies:
  - skill1
  - skill2
---
# Skill3

Test skill with dependencies.
""")
        
        src_dir = skill3_dir / "src"
        src_dir.mkdir()
        
        main_py = src_dir / "main.py"
        main_py.write_text("""class Skill3:
    def __init__(self):
        self.name = "skill3"
    
    def execute(self, params):
        return {"success": True, "data": "test"}
""")
        
        self.discovery.discover(str(self.skills_dir))
        
        load_order = self.loader.dependency_resolver.resolve_load_order(["skill3"])
        
        self.assertIn("skill1", load_order)
        self.assertIn("skill2", load_order)
        self.assertIn("skill3", load_order)
        self.assertLess(load_order.index("skill1"), load_order.index("skill3"))
        self.assertLess(load_order.index("skill2"), load_order.index("skill3"))

if __name__ == "__main__":
    unittest.main()

总结

Skills能力发现是Skills生态系统的核心功能,它允许系统自动发现、加载和管理Skills。本节我们学习了:

  1. 能力发现机制概述
  2. 技能注册机制
  3. 技能发现器
  4. 动态加载机制
  5. 依赖解析
  6. 技能生态系统

下一步,我们将学习Skills最佳实践,掌握如何编写高质量的Skills。

参考资源