Skip to content

第33天:Skills生态与市场

学习目标

  • 了解Skills生态系统
  • 掌握Skill发布流程
  • 学会构建Skill市场
  • 理解Skill商业化
  • 掌握Skill版本管理
  • 学会Skill推广策略

核心内容

Skills生态系统

生态系统组成

Skills生态系统由以下部分组成:

┌─────────────────────────────────────────────────────────┐
│                   Skills Ecosystem                    │
├─────────────────────────────────────────────────────────┤
│                                                       │
│  ┌─────────────┐      ┌─────────────┐              │
│  │   Skill     │──────▶│   Market    │              │
│  │  Authors    │      │   Platform  │              │
│  └─────────────┘      └─────────────┘              │
│         │                      │                     │
│         │                      │                     │
│         ▼                      ▼                     │
│  ┌─────────────┐      ┌─────────────┐              │
│  │   Skill     │◀────▶│   Skill     │              │
│  │  Registry  │      │  Users     │              │
│  └─────────────┘      └─────────────┘              │
│         │                      │                     │
│         │                      │                     │
│         ▼                      ▼                     │
│  ┌─────────────┐      ┌─────────────┐              │
│  │   Skill     │      │   Skill     │              │
│  │  Tools     │      │  Services  │              │
│  └─────────────┘      └─────────────┘              │
│                                                       │
└─────────────────────────────────────────────────────────┘

核心组件

  1. Skill Authors:技能开发者
  2. Skill Market:技能市场平台
  3. Skill Registry:技能注册中心
  4. Skill Users:技能使用者
  5. Skill Tools:技能开发工具
  6. Skill Services:技能相关服务

生态系统价值

对开发者

  • 展示和分享技能
  • 获得用户反馈
  • 建立个人品牌
  • 获得商业收益
  • 学习和成长

对使用者

  • 发现优质技能
  • 快速集成功能
  • 节省开发时间
  • 获得技术支持
  • 参与社区建设

对平台

  • 丰富生态系统
  • 提高用户粘性
  • 建立技术壁垒
  • 创造商业价值
  • 推动技术发展

Skill发布流程

准备阶段

1. 完善Skill文档

确保skill.md包含完整信息:

yaml
---
name: "your-skill-name"
version: "1.0.0"
author: "Your Name <email@example.com>"
description: "Clear and concise description"
tags: ["tag1", "tag2", "tag3"]
license: "MIT"
python_requires: ">=3.8"
dependencies:
  - package1>=1.0.0
  - package2>=2.0.0
repository: "https://github.com/username/repo"
homepage: "https://your-skill-homepage.com"
---

# Skill Name

Complete documentation...

2. 编写测试代码

确保有完整的测试覆盖:

python
import unittest

class TestYourSkill(unittest.TestCase):
    def setUp(self):
        self.skill = YourSkill()
    
    def test_basic_functionality(self):
        pass
    
    def test_edge_cases(self):
        pass
    
    def test_error_handling(self):
        pass

if __name__ == "__main__":
    unittest.main()

3. 准备发布材料

  • README.md
  • CHANGELOG.md
  • LICENSE
  • 示例代码
  • 演示视频(可选)

打包阶段

1. 创建项目结构

your-skill/
├── skill.md
├── README.md
├── CHANGELOG.md
├── LICENSE
├── src/
│   ├── __init__.py
│   └── main.py
├── tests/
│   ├── __init__.py
│   └── test_skill.py
├── examples/
│   └── example.py
└── setup.py

2. 编写setup.py

python
from setuptools import setup, find_packages

with open("README.md", "r", encoding="utf-8") as fh:
    long_description = fh.read()

setup(
    name="your-skill-name",
    version="1.0.0",
    author="Your Name",
    author_email="email@example.com",
    description="Short description",
    long_description=long_description,
    long_description_content_type="text/markdown",
    url="https://github.com/username/repo",
    packages=find_packages(where="src"),
    package_dir={"": "src"},
    classifiers=[
        "Development Status :: 4 - Beta",
        "Intended Audience :: Developers",
        "License :: OSI Approved :: MIT License",
        "Programming Language :: Python :: 3",
        "Programming Language :: Python :: 3.8",
        "Programming Language :: Python :: 3.9",
        "Programming Language :: Python :: 3.10",
    ],
    python_requires=">=3.8",
    install_requires=[
        "pydantic>=2.0.0",
    ],
    extras_require={
        "dev": ["pytest>=7.0.0", "black>=22.0.0"],
    },
)

3. 构建发布包

bash
pip install build
python -m build

发布阶段

1. 发布到PyPI

bash
pip install twine
twine upload dist/*

2. 发布到Skill Market

python
from skill_market_client import SkillMarketClient

client = SkillMarketClient()

skill_info = {
    "name": "your-skill-name",
    "version": "1.0.0",
    "description": "Skill description",
    "tags": ["tag1", "tag2"],
    "repository": "https://github.com/username/repo",
    "license": "MIT"
}

result = client.publish_skill(skill_info)
print(f"Published: {result}")

3. 发布公告

  • 在社交媒体发布
  • 在技术社区分享
  • 发送邮件通知
  • 更新个人博客

Skill市场构建

市场架构

前端架构

javascript
// 前端技术栈
{
  "framework": "React/Vue",
  "ui_library": "Ant Design/Element UI",
  "state_management": "Redux/Pinia",
  "routing": "React Router/Vue Router",
  "http_client": "Axios",
  "build_tool": "Vite/Webpack"
}

后端架构

python
# 后端技术栈
{
  "framework": "FastAPI/Django",
  "database": "PostgreSQL",
  "cache": "Redis",
  "search": "Elasticsearch",
  "queue": "Celery/RQ",
  "storage": "AWS S3/MinIO"
}

核心功能

1. Skill搜索

python
from fastapi import FastAPI, Query
from typing import List, Optional

app = FastAPI()

@app.get("/api/skills/search")
async def search_skills(
    q: str = Query(..., description="Search query"),
    category: Optional[str] = Query(None),
    tags: Optional[List[str]] = Query(None),
    sort: str = Query("relevance", regex="^(relevance|downloads|rating|updated)$"),
    page: int = Query(1, ge=1),
    page_size: int = Query(20, ge=1, le=100)
):
    results = await skill_service.search(
        query=q,
        category=category,
        tags=tags,
        sort=sort,
        page=page,
        page_size=page_size
    )
    return results

2. Skill详情

python
@app.get("/api/skills/{skill_id}")
async def get_skill(skill_id: str):
    skill = await skill_service.get_skill(skill_id)
    if not skill:
        raise HTTPException(status_code=404, detail="Skill not found")
    
    # 增加浏览次数
    await skill_service.increment_views(skill_id)
    
    return skill

3. Skill下载

python
@app.get("/api/skills/{skill_id}/download")
async def download_skill(skill_id: str, version: Optional[str] = None):
    skill = await skill_service.get_skill(skill_id)
    if not skill:
        raise HTTPException(status_code=404, detail="Skill not found")
    
    # 获取版本信息
    skill_version = await skill_service.get_version(skill_id, version)
    
    # 增加下载次数
    await skill_service.increment_downloads(skill_id, skill_version.version)
    
    # 返回下载链接
    return {
        "download_url": skill_version.download_url,
        "checksum": skill_version.checksum
    }

4. Skill评价

python
from pydantic import BaseModel

class SkillReview(BaseModel):
    skill_id: str
    rating: int  # 1-5
    comment: str

@app.post("/api/skills/reviews")
async def create_review(review: SkillReview, user_id: str):
    # 验证评分
    if not 1 <= review.rating <= 5:
        raise HTTPException(status_code=400, detail="Rating must be between 1 and 5")
    
    # 检查是否已评价
    existing = await review_service.get_user_review(user_id, review.skill_id)
    if existing:
        raise HTTPException(status_code=400, detail="Already reviewed")
    
    # 创建评价
    await review_service.create_review(user_id, review)
    
    # 更新技能评分
    await skill_service.update_rating(review.skill_id)
    
    return {"success": True}

数据库设计

Skills表

sql
CREATE TABLE skills (
    id SERIAL PRIMARY KEY,
    name VARCHAR(255) NOT NULL UNIQUE,
    description TEXT,
    author_id INTEGER REFERENCES users(id),
    category_id INTEGER REFERENCES categories(id),
    repository_url VARCHAR(500),
    homepage_url VARCHAR(500),
    license VARCHAR(50),
    downloads INTEGER DEFAULT 0,
    views INTEGER DEFAULT 0,
    rating DECIMAL(3, 2) DEFAULT 0.0,
    review_count INTEGER DEFAULT 0,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE INDEX idx_skills_name ON skills(name);
CREATE INDEX idx_skills_category ON skills(category_id);
CREATE INDEX idx_skills_rating ON skills(rating DESC);
CREATE INDEX idx_skills_downloads ON skills(downloads DESC);

Skill Versions表

sql
CREATE TABLE skill_versions (
    id SERIAL PRIMARY KEY,
    skill_id INTEGER REFERENCES skills(id) ON DELETE CASCADE,
    version VARCHAR(50) NOT NULL,
    download_url VARCHAR(500),
    checksum VARCHAR(100),
    downloads INTEGER DEFAULT 0,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    UNIQUE(skill_id, version)
);

CREATE INDEX idx_skill_versions_skill ON skill_versions(skill_id);

Skill Tags表

sql
CREATE TABLE skill_tags (
    id SERIAL PRIMARY KEY,
    skill_id INTEGER REFERENCES skills(id) ON DELETE CASCADE,
    tag_id INTEGER REFERENCES tags(id) ON DELETE CASCADE,
    UNIQUE(skill_id, tag_id)
);

CREATE INDEX idx_skill_tags_skill ON skill_tags(skill_id);
CREATE INDEX idx_skill_tags_tag ON skill_tags(tag_id);

Skill商业化

商业模式

1. 免费模式

  • 完全免费使用
  • 通过捐赠获得收入
  • 适合开源项目

2. 付费模式

  • 一次性购买
  • 订阅制
  • 按使用量计费

3. 混合模式

  • 基础功能免费
  • 高级功能付费
  • 企业版收费

收费策略

定价策略

python
class PricingStrategy:
    def __init__(self, base_price: float):
        self.base_price = base_price
    
    def calculate_price(self, usage: dict) -> float:
        pass

class FreePricing(PricingStrategy):
    def calculate_price(self, usage: dict) -> float:
        return 0.0

class OneTimePricing(PricingStrategy):
    def calculate_price(self, usage: dict) -> float:
        return self.base_price

class SubscriptionPricing(PricingStrategy):
    def __init__(self, monthly_price: float):
        self.monthly_price = monthly_price
    
    def calculate_price(self, usage: dict) -> float:
        months = usage.get('months', 1)
        return self.monthly_price * months

class UsageBasedPricing(PricingStrategy):
    def __init__(self, unit_price: float):
        self.unit_price = unit_price
    
    def calculate_price(self, usage: dict) -> float:
        units = usage.get('units', 0)
        return self.unit_price * units

class TieredPricing(PricingStrategy):
    def __init__(self, tiers: list):
        self.tiers = tiers
    
    def calculate_price(self, usage: dict) -> float:
        units = usage.get('units', 0)
        total_price = 0.0
        
        for tier in self.tiers:
            if units <= 0:
                break
            
            tier_units = min(units, tier['max_units'] - tier['min_units'] + 1)
            total_price += tier_units * tier['unit_price']
            units -= tier_units
        
        return total_price

使用示例

python
# 免费定价
free_pricing = FreePricing(0)
print(f"Free: ${free_pricing.calculate_price({})}")

# 一次性购买
one_time = OneTimePricing(99.99)
print(f"One-time: ${one_time.calculate_price({})}")

# 订阅制
subscription = SubscriptionPricing(9.99)
print(f"Monthly: ${subscription.calculate_price({'months': 12})}")

# 按使用量
usage_based = UsageBasedPricing(0.01)
print(f"Usage-based: ${usage_based.calculate_price({'units': 1000})}")

# 分层定价
tiered = TieredPricing([
    {'min_units': 0, 'max_units': 1000, 'unit_price': 0.01},
    {'min_units': 1001, 'max_units': 10000, 'unit_price': 0.005},
    {'min_units': 10001, 'max_units': float('inf'), 'unit_price': 0.001}
])
print(f"Tiered: ${tiered.calculate_price({'units': 5000})}")

支付集成

Stripe集成

python
import stripe
from fastapi import FastAPI, HTTPException

app = FastAPI()
stripe.api_key = "your_stripe_secret_key"

@app.post("/api/payments/create-checkout-session")
async def create_checkout_session(
    skill_id: str,
    pricing_type: str,
    user_id: str
):
    skill = await skill_service.get_skill(skill_id)
    if not skill:
        raise HTTPException(status_code=404, detail="Skill not found")
    
    # 获取定价
    pricing = await pricing_service.get_pricing(skill_id, pricing_type)
    
    # 创建Stripe Checkout Session
    session = stripe.checkout.Session.create(
        payment_method_types=['card'],
        line_items=[{
            'price_data': {
                'currency': 'usd',
                'product_data': {
                    'name': skill.name,
                    'description': skill.description,
                },
                'unit_amount': int(pricing.price * 100),  # 转换为分
            },
            'quantity': 1,
        }],
        mode='payment',
        success_url=f'https://your-site.com/success?session_id={{CHECKOUT_SESSION_ID}}',
        cancel_url=f'https://your-site.com/cancel',
        metadata={
            'skill_id': skill_id,
            'user_id': user_id,
            'pricing_type': pricing_type
        }
    )
    
    return {"session_id": session.id, "url": session.url}

@app.post("/api/payments/webhook")
async def stripe_webhook(request: Request):
    payload = await request.body()
    sig_header = request.headers.get('stripe-signature')
    
    try:
        event = stripe.Webhook.construct_event(
            payload, sig_header, "your_webhook_secret"
        )
    except ValueError as e:
        raise HTTPException(status_code=400, detail="Invalid payload")
    except stripe.error.SignatureVerificationError as e:
        raise HTTPException(status_code=400, detail="Invalid signature")
    
    # 处理事件
    if event['type'] == 'checkout.session.completed':
        session = event['data']['object']
        await payment_service.process_payment(session)
    
    return {"status": "success"}

Skill版本管理

版本控制

语义化版本

MAJOR.MINOR.PATCH

MAJOR: 不兼容的API修改
MINOR: 向下兼容的功能性新增
PATCH: 向下兼容的问题修正

版本管理工具

python
from typing import Dict, List
from dataclasses import dataclass
from datetime import datetime

@dataclass
class SkillVersion:
    version: str
    skill_id: str
    changelog: str
    created_at: datetime
    downloads: int = 0

class VersionManager:
    def __init__(self):
        self.versions: Dict[str, List[SkillVersion]] = {}
    
    def add_version(self, version: SkillVersion):
        if version.skill_id not in self.versions:
            self.versions[version.skill_id] = []
        
        self.versions[version.skill_id].append(version)
        self.versions[version.skill_id].sort(
            key=lambda v: v.created_at,
            reverse=True
        )
    
    def get_latest_version(self, skill_id: str) -> SkillVersion:
        versions = self.versions.get(skill_id, [])
        if not versions:
            raise ValueError(f"No versions found for skill {skill_id}")
        return versions[0]
    
    def get_version(self, skill_id: str, version: str) -> SkillVersion:
        versions = self.versions.get(skill_id, [])
        for v in versions:
            if v.version == version:
                return v
        raise ValueError(f"Version {version} not found for skill {skill_id}")
    
    def get_all_versions(self, skill_id: str) -> List[SkillVersion]:
        return self.versions.get(skill_id, [])
    
    def compare_versions(self, v1: str, v2: str) -> int:
        parts1 = [int(x) for x in v1.split('.')]
        parts2 = [int(x) for x in v2.split('.')]
        
        for p1, p2 in zip(parts1, parts2):
            if p1 > p2:
                return 1
            elif p1 < p2:
                return -1
        
        return 0
    
    def is_newer(self, skill_id: str, version: str) -> bool:
        try:
            latest = self.get_latest_version(skill_id)
            return self.compare_versions(version, latest.version) > 0
        except ValueError:
            return True

更新策略

自动更新

python
from typing import Optional

class UpdateManager:
    def __init__(self, version_manager: VersionManager):
        self.version_manager = version_manager
    
    def check_for_updates(self, skill_id: str, current_version: str) -> Optional[SkillVersion]:
        try:
            latest = self.version_manager.get_latest_version(skill_id)
            
            if self.version_manager.compare_versions(
                latest.version,
                current_version
            ) > 0:
                return latest
            
            return None
        except ValueError:
            return None
    
    def get_update_path(
        self,
        skill_id: str,
        from_version: str,
        to_version: str
    ) -> List[SkillVersion]:
        versions = self.version_manager.get_all_versions(skill_id)
        
        path = []
        for v in versions:
            if (self.version_manager.compare_versions(v.version, from_version) > 0 and
                self.version_manager.compare_versions(v.version, to_version) <= 0):
                path.append(v)
        
        return sorted(path, key=lambda v: v.version)
    
    def is_compatible_update(
        self,
        from_version: str,
        to_version: str
    ) -> bool:
        from_parts = [int(x) for x in from_version.split('.')]
        to_parts = [int(x) for x in to_version.split('.')]
        
        # MAJOR版本变更可能不兼容
        if from_parts[0] != to_parts[0]:
            return False
        
        return True

Skill推广策略

内容营销

1. 技术博客

markdown
# 如何使用Your Skill解决实际问题

## 简介
介绍Skill解决的问题

## 安装
```bash
pip install your-skill

使用示例

python
from your_skill import YourSkill

skill = YourSkill()
result = skill.execute(params)

最佳实践

提供使用建议

常见问题

解答用户疑问


**2. 视频教程**

- 基础使用教程
- 高级功能演示
- 实战案例分析
- 问题排查指南

**3. 社交媒体**

- Twitter/X:分享更新和技巧
- LinkedIn:发布专业内容
- Reddit:参与社区讨论
- 微博:中文社区推广

#### 社区建设

**1. GitHub社区**

- 完善README
- 提供Issue模板
- 编写贡献指南
- 及时回复Issue

**2. 技术论坛**

- Stack Overflow:回答相关问题
- 知乎:发布技术文章
- 掘金:分享开发经验
- CSDN:发布教程

**3. 开发者活动**

- 技术分享会
- 开源贡献活动
- Hackathon
- 线下聚会

#### 合作推广

**1. 技术合作**

- 与相关项目集成
- 提供API接口
- 共同开发功能
- 互相推荐

**2. 商业合作**

- 企业定制开发
- 技术咨询服务
- 培训服务
- 维护服务

**3. 媒体合作**

- 技术媒体报道
- 播客访谈
- 技术专栏
- 案例研究

## 实践任务

### 任务1:发布一个Skill

将之前开发的Skill发布到市场。

**步骤**:

1. 完善Skill文档
2. 编写测试代码
3. 准备发布材料
4. 构建发布包
5. 发布到PyPI
6. 发布到Skill Market
7. 发布公告

**输出**:

- 发布的Skill
- 发布文档
- 发布公告

### 任务2:构建Skill市场

构建一个简单的Skill市场平台。

**步骤**:

1. 设计数据库架构
2. 实现后端API
3. 实现前端界面
4. 实现搜索功能
5. 实现评价系统
6. 实现下载统计

**输出**:

- Skill市场平台
- API文档
- 使用说明

### 任务3:制定推广策略

为你的Skill制定推广策略。

**步骤**:

1. 分析目标用户
2. 制定内容计划
3. 选择推广渠道
4. 设计推广活动
5. 制定时间表
6. 评估效果

**输出**:

- 推广策略文档
- 内容计划
- 推广时间表

## 代码示例

### 示例1:Skill发布工具

```python
import os
import shutil
import subprocess
from pathlib import Path
from typing import Dict, List
import requests

class SkillPublisher:
    def __init__(self, skill_path: str):
        self.skill_path = Path(skill_path)
        self.dist_path = self.skill_path / "dist"
    
    def validate_skill(self) -> bool:
        print("Validating skill...")
        
        # 检查必要文件
        required_files = [
            "skill.md",
            "README.md",
            "LICENSE",
            "setup.py"
        ]
        
        for file in required_files:
            if not (self.skill_path / file).exists():
                print(f"Missing required file: {file}")
                return False
        
        # 检查源代码
        if not (self.skill_path / "src").exists():
            print("Missing src directory")
            return False
        
        print("Skill validation passed")
        return True
    
    def run_tests(self) -> bool:
        print("Running tests...")
        
        try:
            result = subprocess.run(
                ["python", "-m", "pytest", "tests/"],
                cwd=self.skill_path,
                capture_output=True,
                text=True
            )
            
            if result.returncode != 0:
                print("Tests failed:")
                print(result.stdout)
                print(result.stderr)
                return False
            
            print("All tests passed")
            return True
        except Exception as e:
            print(f"Test execution failed: {e}")
            return False
    
    def build_package(self) -> bool:
        print("Building package...")
        
        # 清理旧的构建
        if self.dist_path.exists():
            shutil.rmtree(self.dist_path)
        
        # 构建包
        try:
            result = subprocess.run(
                ["python", "-m", "build"],
                cwd=self.skill_path,
                capture_output=True,
                text=True
            )
            
            if result.returncode != 0:
                print("Build failed:")
                print(result.stdout)
                print(result.stderr)
                return False
            
            print("Package built successfully")
            return True
        except Exception as e:
            print(f"Build failed: {e}")
            return False
    
    def publish_to_pypi(self, test_pypi: bool = False) -> bool:
        print(f"Publishing to {'Test' if test_pypi else 'PyPI'}...")
        
        # 获取发布包
        packages = list(self.dist_path.glob("*"))
        if not packages:
            print("No packages found to publish")
            return False
        
        # 发布
        try:
            repository = "testpypi" if test_pypi else "pypi"
            result = subprocess.run(
                ["twine", "upload", "--repository", repository] + [str(p) for p in packages],
                cwd=self.skill_path,
                capture_output=True,
                text=True
            )
            
            if result.returncode != 0:
                print("Publish failed:")
                print(result.stdout)
                print(result.stderr)
                return False
            
            print("Published successfully")
            return True
        except Exception as e:
            print(f"Publish failed: {e}")
            return False
    
    def publish_to_market(self, market_url: str, api_key: str) -> bool:
        print("Publishing to Skill Market...")
        
        # 读取skill.md
        skill_md = self.skill_path / "skill.md"
        with open(skill_md, 'r', encoding='utf-8') as f:
            content = f.read()
        
        # 解析Front Matter
        import yaml
        lines = content.split('\n')
        if lines[0] != '---':
            print("Invalid skill.md format")
            return False
        
        end_index = lines[1:].index('---') + 1
        front_matter_text = '\n'.join(lines[1:end_index])
        skill_info = yaml.safe_load(front_matter_text)
        
        # 准备发布数据
        publish_data = {
            "name": skill_info.get('name'),
            "version": skill_info.get('version'),
            "description": skill_info.get('description'),
            "tags": skill_info.get('tags', []),
            "license": skill_info.get('license'),
            "repository": skill_info.get('repository'),
            "author": skill_info.get('author')
        }
        
        # 发布
        try:
            response = requests.post(
                f"{market_url}/api/skills/publish",
                json=publish_data,
                headers={"Authorization": f"Bearer {api_key}"}
            )
            
            if response.status_code != 200:
                print(f"Publish failed: {response.text}")
                return False
            
            print("Published to Skill Market successfully")
            return True
        except Exception as e:
            print(f"Publish failed: {e}")
            return False
    
    def publish(
        self,
        pypi: bool = True,
        test_pypi: bool = False,
        market_url: str = None,
        market_api_key: str = None
    ) -> bool:
        if not self.validate_skill():
            return False
        
        if not self.run_tests():
            return False
        
        if not self.build_package():
            return False
        
        if pypi:
            if not self.publish_to_pypi(test_pypi):
                return False
        
        if market_url and market_api_key:
            if not self.publish_to_market(market_url, market_api_key):
                return False
        
        print("All publish steps completed successfully")
        return True

# 使用示例
if __name__ == "__main__":
    publisher = SkillPublisher("./your-skill")
    publisher.publish(
        pypi=True,
        test_pypi=False,
        market_url="https://skills.market",
        market_api_key="your-api-key"
    )

示例2:Skill市场API

python
from fastapi import FastAPI, HTTPException, Depends, Query
from fastapi.middleware.cors import CORSMiddleware
from typing import List, Optional
from pydantic import BaseModel
import uvicorn

app = FastAPI(title="Skill Market API")

# CORS配置
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# 数据模型
class Skill(BaseModel):
    id: str
    name: str
    version: str
    description: str
    author: str
    tags: List[str]
    license: str
    repository: str
    downloads: int
    views: int
    rating: float
    review_count: int
    created_at: str
    updated_at: str

class SkillSearchResult(BaseModel):
    skills: List[Skill]
    total: int
    page: int
    page_size: int

# 模拟数据库
skills_db = {}

# API端点
@app.get("/api/skills", response_model=SkillSearchResult)
async def list_skills(
    page: int = Query(1, ge=1),
    page_size: int = Query(20, ge=1, le=100),
    sort: str = Query("updated", regex="^(updated|downloads|rating|name)$")
):
    skills = list(skills_db.values())
    
    # 排序
    if sort == "updated":
        skills.sort(key=lambda x: x.updated_at, reverse=True)
    elif sort == "downloads":
        skills.sort(key=lambda x: x.downloads, reverse=True)
    elif sort == "rating":
        skills.sort(key=lambda x: x.rating, reverse=True)
    elif sort == "name":
        skills.sort(key=lambda x: x.name)
    
    # 分页
    total = len(skills)
    start = (page - 1) * page_size
    end = start + page_size
    paginated_skills = skills[start:end]
    
    return SkillSearchResult(
        skills=paginated_skills,
        total=total,
        page=page,
        page_size=page_size
    )

@app.get("/api/skills/search", response_model=SkillSearchResult)
async def search_skills(
    q: str = Query(..., description="Search query"),
    category: Optional[str] = None,
    tags: Optional[List[str]] = None,
    sort: str = Query("relevance", regex="^(relevance|downloads|rating|updated)$"),
    page: int = Query(1, ge=1),
    page_size: int = Query(20, ge=1, le=100)
):
    skills = list(skills_db.values())
    
    # 搜索过滤
    filtered_skills = []
    query_lower = q.lower()
    
    for skill in skills:
        # 文本搜索
        if (query_lower in skill.name.lower() or
            query_lower in skill.description.lower()):
            filtered_skills.append(skill)
            continue
        
        # 标签搜索
        if tags and any(tag in skill.tags for tag in tags):
            filtered_skills.append(skill)
            continue
    
    # 排序
    if sort == "downloads":
        filtered_skills.sort(key=lambda x: x.downloads, reverse=True)
    elif sort == "rating":
        filtered_skills.sort(key=lambda x: x.rating, reverse=True)
    elif sort == "updated":
        filtered_skills.sort(key=lambda x: x.updated_at, reverse=True)
    
    # 分页
    total = len(filtered_skills)
    start = (page - 1) * page_size
    end = start + page_size
    paginated_skills = filtered_skills[start:end]
    
    return SkillSearchResult(
        skills=paginated_skills,
        total=total,
        page=page,
        page_size=page_size
    )

@app.get("/api/skills/{skill_id}", response_model=Skill)
async def get_skill(skill_id: str):
    skill = skills_db.get(skill_id)
    if not skill:
        raise HTTPException(status_code=404, detail="Skill not found")
    
    # 增加浏览次数
    skill.views += 1
    
    return skill

@app.get("/api/skills/{skill_id}/versions")
async def get_skill_versions(skill_id: str):
    skill = skills_db.get(skill_id)
    if not skill:
        raise HTTPException(status_code=404, detail="Skill not found")
    
    # 返回版本列表(模拟)
    return [
        {
            "version": skill.version,
            "download_url": f"https://example.com/skills/{skill_id}/{skill.version}.tar.gz",
            "created_at": skill.updated_at
        }
    ]

@app.post("/api/skills/{skill_id}/download")
async def download_skill(skill_id: str, version: Optional[str] = None):
    skill = skills_db.get(skill_id)
    if not skill:
        raise HTTPException(status_code=404, detail="Skill not found")
    
    # 增加下载次数
    skill.downloads += 1
    
    # 返回下载信息
    return {
        "download_url": f"https://example.com/skills/{skill_id}/{skill.version}.tar.gz",
        "checksum": "abc123def456"
    }

@app.get("/api/categories")
async def list_categories():
    return [
        {"id": "text", "name": "Text Processing"},
        {"id": "image", "name": "Image Processing"},
        {"id": "data", "name": "Data Analysis"},
        {"id": "web", "name": "Web Scraping"},
        {"id": "ai", "name": "AI/ML"}
    ]

@app.get("/api/tags")
async def list_tags():
    return [
        {"id": "nlp", "name": "NLP"},
        {"id": "cv", "name": "Computer Vision"},
        {"id": "ml", "name": "Machine Learning"},
        {"id": "api", "name": "API"},
        {"id": "utils", "name": "Utilities"}
    ]

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)

总结

Skills生态与市场是Skills标准的重要组成部分,它为开发者提供了展示和分享技能的平台,为使用者提供了发现和使用技能的渠道。本节我们学习了:

  1. Skills生态系统
  2. Skill发布流程
  3. Skill市场构建
  4. Skill商业化
  5. Skill版本管理
  6. Skill推广策略

通过构建健康的Skills生态系统,可以促进技能的共享和创新,推动AI技术的发展。

参考资源