Appearance
第33天:Skills生态与市场
学习目标
- 了解Skills生态系统
- 掌握Skill发布流程
- 学会构建Skill市场
- 理解Skill商业化
- 掌握Skill版本管理
- 学会Skill推广策略
核心内容
Skills生态系统
生态系统组成
Skills生态系统由以下部分组成:
┌─────────────────────────────────────────────────────────┐
│ Skills Ecosystem │
├─────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ Skill │──────▶│ Market │ │
│ │ Authors │ │ Platform │ │
│ └─────────────┘ └─────────────┘ │
│ │ │ │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ Skill │◀────▶│ Skill │ │
│ │ Registry │ │ Users │ │
│ └─────────────┘ └─────────────┘ │
│ │ │ │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ Skill │ │ Skill │ │
│ │ Tools │ │ Services │ │
│ └─────────────┘ └─────────────┘ │
│ │
└─────────────────────────────────────────────────────────┘核心组件:
- Skill Authors:技能开发者
- Skill Market:技能市场平台
- Skill Registry:技能注册中心
- Skill Users:技能使用者
- Skill Tools:技能开发工具
- Skill Services:技能相关服务
生态系统价值
对开发者:
- 展示和分享技能
- 获得用户反馈
- 建立个人品牌
- 获得商业收益
- 学习和成长
对使用者:
- 发现优质技能
- 快速集成功能
- 节省开发时间
- 获得技术支持
- 参与社区建设
对平台:
- 丰富生态系统
- 提高用户粘性
- 建立技术壁垒
- 创造商业价值
- 推动技术发展
Skill发布流程
准备阶段
1. 完善Skill文档
确保skill.md包含完整信息:
yaml
---
name: "your-skill-name"
version: "1.0.0"
author: "Your Name <email@example.com>"
description: "Clear and concise description"
tags: ["tag1", "tag2", "tag3"]
license: "MIT"
python_requires: ">=3.8"
dependencies:
- package1>=1.0.0
- package2>=2.0.0
repository: "https://github.com/username/repo"
homepage: "https://your-skill-homepage.com"
---
# Skill Name
Complete documentation...2. 编写测试代码
确保有完整的测试覆盖:
python
import unittest
class TestYourSkill(unittest.TestCase):
def setUp(self):
self.skill = YourSkill()
def test_basic_functionality(self):
pass
def test_edge_cases(self):
pass
def test_error_handling(self):
pass
if __name__ == "__main__":
unittest.main()3. 准备发布材料
- README.md
- CHANGELOG.md
- LICENSE
- 示例代码
- 演示视频(可选)
打包阶段
1. 创建项目结构
your-skill/
├── skill.md
├── README.md
├── CHANGELOG.md
├── LICENSE
├── src/
│ ├── __init__.py
│ └── main.py
├── tests/
│ ├── __init__.py
│ └── test_skill.py
├── examples/
│ └── example.py
└── setup.py2. 编写setup.py
python
from setuptools import setup, find_packages
with open("README.md", "r", encoding="utf-8") as fh:
long_description = fh.read()
setup(
name="your-skill-name",
version="1.0.0",
author="Your Name",
author_email="email@example.com",
description="Short description",
long_description=long_description,
long_description_content_type="text/markdown",
url="https://github.com/username/repo",
packages=find_packages(where="src"),
package_dir={"": "src"},
classifiers=[
"Development Status :: 4 - Beta",
"Intended Audience :: Developers",
"License :: OSI Approved :: MIT License",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
],
python_requires=">=3.8",
install_requires=[
"pydantic>=2.0.0",
],
extras_require={
"dev": ["pytest>=7.0.0", "black>=22.0.0"],
},
)3. 构建发布包
bash
pip install build
python -m build发布阶段
1. 发布到PyPI
bash
pip install twine
twine upload dist/*2. 发布到Skill Market
python
from skill_market_client import SkillMarketClient
client = SkillMarketClient()
skill_info = {
"name": "your-skill-name",
"version": "1.0.0",
"description": "Skill description",
"tags": ["tag1", "tag2"],
"repository": "https://github.com/username/repo",
"license": "MIT"
}
result = client.publish_skill(skill_info)
print(f"Published: {result}")3. 发布公告
- 在社交媒体发布
- 在技术社区分享
- 发送邮件通知
- 更新个人博客
Skill市场构建
市场架构
前端架构:
javascript
// 前端技术栈
{
"framework": "React/Vue",
"ui_library": "Ant Design/Element UI",
"state_management": "Redux/Pinia",
"routing": "React Router/Vue Router",
"http_client": "Axios",
"build_tool": "Vite/Webpack"
}后端架构:
python
# 后端技术栈
{
"framework": "FastAPI/Django",
"database": "PostgreSQL",
"cache": "Redis",
"search": "Elasticsearch",
"queue": "Celery/RQ",
"storage": "AWS S3/MinIO"
}核心功能
1. Skill搜索
python
from fastapi import FastAPI, Query
from typing import List, Optional
app = FastAPI()
@app.get("/api/skills/search")
async def search_skills(
q: str = Query(..., description="Search query"),
category: Optional[str] = Query(None),
tags: Optional[List[str]] = Query(None),
sort: str = Query("relevance", regex="^(relevance|downloads|rating|updated)$"),
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=1, le=100)
):
results = await skill_service.search(
query=q,
category=category,
tags=tags,
sort=sort,
page=page,
page_size=page_size
)
return results2. Skill详情
python
@app.get("/api/skills/{skill_id}")
async def get_skill(skill_id: str):
skill = await skill_service.get_skill(skill_id)
if not skill:
raise HTTPException(status_code=404, detail="Skill not found")
# 增加浏览次数
await skill_service.increment_views(skill_id)
return skill3. Skill下载
python
@app.get("/api/skills/{skill_id}/download")
async def download_skill(skill_id: str, version: Optional[str] = None):
skill = await skill_service.get_skill(skill_id)
if not skill:
raise HTTPException(status_code=404, detail="Skill not found")
# 获取版本信息
skill_version = await skill_service.get_version(skill_id, version)
# 增加下载次数
await skill_service.increment_downloads(skill_id, skill_version.version)
# 返回下载链接
return {
"download_url": skill_version.download_url,
"checksum": skill_version.checksum
}4. Skill评价
python
from pydantic import BaseModel
class SkillReview(BaseModel):
skill_id: str
rating: int # 1-5
comment: str
@app.post("/api/skills/reviews")
async def create_review(review: SkillReview, user_id: str):
# 验证评分
if not 1 <= review.rating <= 5:
raise HTTPException(status_code=400, detail="Rating must be between 1 and 5")
# 检查是否已评价
existing = await review_service.get_user_review(user_id, review.skill_id)
if existing:
raise HTTPException(status_code=400, detail="Already reviewed")
# 创建评价
await review_service.create_review(user_id, review)
# 更新技能评分
await skill_service.update_rating(review.skill_id)
return {"success": True}数据库设计
Skills表:
sql
CREATE TABLE skills (
id SERIAL PRIMARY KEY,
name VARCHAR(255) NOT NULL UNIQUE,
description TEXT,
author_id INTEGER REFERENCES users(id),
category_id INTEGER REFERENCES categories(id),
repository_url VARCHAR(500),
homepage_url VARCHAR(500),
license VARCHAR(50),
downloads INTEGER DEFAULT 0,
views INTEGER DEFAULT 0,
rating DECIMAL(3, 2) DEFAULT 0.0,
review_count INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX idx_skills_name ON skills(name);
CREATE INDEX idx_skills_category ON skills(category_id);
CREATE INDEX idx_skills_rating ON skills(rating DESC);
CREATE INDEX idx_skills_downloads ON skills(downloads DESC);Skill Versions表:
sql
CREATE TABLE skill_versions (
id SERIAL PRIMARY KEY,
skill_id INTEGER REFERENCES skills(id) ON DELETE CASCADE,
version VARCHAR(50) NOT NULL,
download_url VARCHAR(500),
checksum VARCHAR(100),
downloads INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(skill_id, version)
);
CREATE INDEX idx_skill_versions_skill ON skill_versions(skill_id);Skill Tags表:
sql
CREATE TABLE skill_tags (
id SERIAL PRIMARY KEY,
skill_id INTEGER REFERENCES skills(id) ON DELETE CASCADE,
tag_id INTEGER REFERENCES tags(id) ON DELETE CASCADE,
UNIQUE(skill_id, tag_id)
);
CREATE INDEX idx_skill_tags_skill ON skill_tags(skill_id);
CREATE INDEX idx_skill_tags_tag ON skill_tags(tag_id);Skill商业化
商业模式
1. 免费模式
- 完全免费使用
- 通过捐赠获得收入
- 适合开源项目
2. 付费模式
- 一次性购买
- 订阅制
- 按使用量计费
3. 混合模式
- 基础功能免费
- 高级功能付费
- 企业版收费
收费策略
定价策略:
python
class PricingStrategy:
def __init__(self, base_price: float):
self.base_price = base_price
def calculate_price(self, usage: dict) -> float:
pass
class FreePricing(PricingStrategy):
def calculate_price(self, usage: dict) -> float:
return 0.0
class OneTimePricing(PricingStrategy):
def calculate_price(self, usage: dict) -> float:
return self.base_price
class SubscriptionPricing(PricingStrategy):
def __init__(self, monthly_price: float):
self.monthly_price = monthly_price
def calculate_price(self, usage: dict) -> float:
months = usage.get('months', 1)
return self.monthly_price * months
class UsageBasedPricing(PricingStrategy):
def __init__(self, unit_price: float):
self.unit_price = unit_price
def calculate_price(self, usage: dict) -> float:
units = usage.get('units', 0)
return self.unit_price * units
class TieredPricing(PricingStrategy):
def __init__(self, tiers: list):
self.tiers = tiers
def calculate_price(self, usage: dict) -> float:
units = usage.get('units', 0)
total_price = 0.0
for tier in self.tiers:
if units <= 0:
break
tier_units = min(units, tier['max_units'] - tier['min_units'] + 1)
total_price += tier_units * tier['unit_price']
units -= tier_units
return total_price使用示例:
python
# 免费定价
free_pricing = FreePricing(0)
print(f"Free: ${free_pricing.calculate_price({})}")
# 一次性购买
one_time = OneTimePricing(99.99)
print(f"One-time: ${one_time.calculate_price({})}")
# 订阅制
subscription = SubscriptionPricing(9.99)
print(f"Monthly: ${subscription.calculate_price({'months': 12})}")
# 按使用量
usage_based = UsageBasedPricing(0.01)
print(f"Usage-based: ${usage_based.calculate_price({'units': 1000})}")
# 分层定价
tiered = TieredPricing([
{'min_units': 0, 'max_units': 1000, 'unit_price': 0.01},
{'min_units': 1001, 'max_units': 10000, 'unit_price': 0.005},
{'min_units': 10001, 'max_units': float('inf'), 'unit_price': 0.001}
])
print(f"Tiered: ${tiered.calculate_price({'units': 5000})}")支付集成
Stripe集成:
python
import stripe
from fastapi import FastAPI, HTTPException
app = FastAPI()
stripe.api_key = "your_stripe_secret_key"
@app.post("/api/payments/create-checkout-session")
async def create_checkout_session(
skill_id: str,
pricing_type: str,
user_id: str
):
skill = await skill_service.get_skill(skill_id)
if not skill:
raise HTTPException(status_code=404, detail="Skill not found")
# 获取定价
pricing = await pricing_service.get_pricing(skill_id, pricing_type)
# 创建Stripe Checkout Session
session = stripe.checkout.Session.create(
payment_method_types=['card'],
line_items=[{
'price_data': {
'currency': 'usd',
'product_data': {
'name': skill.name,
'description': skill.description,
},
'unit_amount': int(pricing.price * 100), # 转换为分
},
'quantity': 1,
}],
mode='payment',
success_url=f'https://your-site.com/success?session_id={{CHECKOUT_SESSION_ID}}',
cancel_url=f'https://your-site.com/cancel',
metadata={
'skill_id': skill_id,
'user_id': user_id,
'pricing_type': pricing_type
}
)
return {"session_id": session.id, "url": session.url}
@app.post("/api/payments/webhook")
async def stripe_webhook(request: Request):
payload = await request.body()
sig_header = request.headers.get('stripe-signature')
try:
event = stripe.Webhook.construct_event(
payload, sig_header, "your_webhook_secret"
)
except ValueError as e:
raise HTTPException(status_code=400, detail="Invalid payload")
except stripe.error.SignatureVerificationError as e:
raise HTTPException(status_code=400, detail="Invalid signature")
# 处理事件
if event['type'] == 'checkout.session.completed':
session = event['data']['object']
await payment_service.process_payment(session)
return {"status": "success"}Skill版本管理
版本控制
语义化版本:
MAJOR.MINOR.PATCH
MAJOR: 不兼容的API修改
MINOR: 向下兼容的功能性新增
PATCH: 向下兼容的问题修正版本管理工具:
python
from typing import Dict, List
from dataclasses import dataclass
from datetime import datetime
@dataclass
class SkillVersion:
version: str
skill_id: str
changelog: str
created_at: datetime
downloads: int = 0
class VersionManager:
def __init__(self):
self.versions: Dict[str, List[SkillVersion]] = {}
def add_version(self, version: SkillVersion):
if version.skill_id not in self.versions:
self.versions[version.skill_id] = []
self.versions[version.skill_id].append(version)
self.versions[version.skill_id].sort(
key=lambda v: v.created_at,
reverse=True
)
def get_latest_version(self, skill_id: str) -> SkillVersion:
versions = self.versions.get(skill_id, [])
if not versions:
raise ValueError(f"No versions found for skill {skill_id}")
return versions[0]
def get_version(self, skill_id: str, version: str) -> SkillVersion:
versions = self.versions.get(skill_id, [])
for v in versions:
if v.version == version:
return v
raise ValueError(f"Version {version} not found for skill {skill_id}")
def get_all_versions(self, skill_id: str) -> List[SkillVersion]:
return self.versions.get(skill_id, [])
def compare_versions(self, v1: str, v2: str) -> int:
parts1 = [int(x) for x in v1.split('.')]
parts2 = [int(x) for x in v2.split('.')]
for p1, p2 in zip(parts1, parts2):
if p1 > p2:
return 1
elif p1 < p2:
return -1
return 0
def is_newer(self, skill_id: str, version: str) -> bool:
try:
latest = self.get_latest_version(skill_id)
return self.compare_versions(version, latest.version) > 0
except ValueError:
return True更新策略
自动更新:
python
from typing import Optional
class UpdateManager:
def __init__(self, version_manager: VersionManager):
self.version_manager = version_manager
def check_for_updates(self, skill_id: str, current_version: str) -> Optional[SkillVersion]:
try:
latest = self.version_manager.get_latest_version(skill_id)
if self.version_manager.compare_versions(
latest.version,
current_version
) > 0:
return latest
return None
except ValueError:
return None
def get_update_path(
self,
skill_id: str,
from_version: str,
to_version: str
) -> List[SkillVersion]:
versions = self.version_manager.get_all_versions(skill_id)
path = []
for v in versions:
if (self.version_manager.compare_versions(v.version, from_version) > 0 and
self.version_manager.compare_versions(v.version, to_version) <= 0):
path.append(v)
return sorted(path, key=lambda v: v.version)
def is_compatible_update(
self,
from_version: str,
to_version: str
) -> bool:
from_parts = [int(x) for x in from_version.split('.')]
to_parts = [int(x) for x in to_version.split('.')]
# MAJOR版本变更可能不兼容
if from_parts[0] != to_parts[0]:
return False
return TrueSkill推广策略
内容营销
1. 技术博客
markdown
# 如何使用Your Skill解决实际问题
## 简介
介绍Skill解决的问题
## 安装
```bash
pip install your-skill使用示例
python
from your_skill import YourSkill
skill = YourSkill()
result = skill.execute(params)最佳实践
提供使用建议
常见问题
解答用户疑问
**2. 视频教程**
- 基础使用教程
- 高级功能演示
- 实战案例分析
- 问题排查指南
**3. 社交媒体**
- Twitter/X:分享更新和技巧
- LinkedIn:发布专业内容
- Reddit:参与社区讨论
- 微博:中文社区推广
#### 社区建设
**1. GitHub社区**
- 完善README
- 提供Issue模板
- 编写贡献指南
- 及时回复Issue
**2. 技术论坛**
- Stack Overflow:回答相关问题
- 知乎:发布技术文章
- 掘金:分享开发经验
- CSDN:发布教程
**3. 开发者活动**
- 技术分享会
- 开源贡献活动
- Hackathon
- 线下聚会
#### 合作推广
**1. 技术合作**
- 与相关项目集成
- 提供API接口
- 共同开发功能
- 互相推荐
**2. 商业合作**
- 企业定制开发
- 技术咨询服务
- 培训服务
- 维护服务
**3. 媒体合作**
- 技术媒体报道
- 播客访谈
- 技术专栏
- 案例研究
## 实践任务
### 任务1:发布一个Skill
将之前开发的Skill发布到市场。
**步骤**:
1. 完善Skill文档
2. 编写测试代码
3. 准备发布材料
4. 构建发布包
5. 发布到PyPI
6. 发布到Skill Market
7. 发布公告
**输出**:
- 发布的Skill
- 发布文档
- 发布公告
### 任务2:构建Skill市场
构建一个简单的Skill市场平台。
**步骤**:
1. 设计数据库架构
2. 实现后端API
3. 实现前端界面
4. 实现搜索功能
5. 实现评价系统
6. 实现下载统计
**输出**:
- Skill市场平台
- API文档
- 使用说明
### 任务3:制定推广策略
为你的Skill制定推广策略。
**步骤**:
1. 分析目标用户
2. 制定内容计划
3. 选择推广渠道
4. 设计推广活动
5. 制定时间表
6. 评估效果
**输出**:
- 推广策略文档
- 内容计划
- 推广时间表
## 代码示例
### 示例1:Skill发布工具
```python
import os
import shutil
import subprocess
from pathlib import Path
from typing import Dict, List
import requests
class SkillPublisher:
def __init__(self, skill_path: str):
self.skill_path = Path(skill_path)
self.dist_path = self.skill_path / "dist"
def validate_skill(self) -> bool:
print("Validating skill...")
# 检查必要文件
required_files = [
"skill.md",
"README.md",
"LICENSE",
"setup.py"
]
for file in required_files:
if not (self.skill_path / file).exists():
print(f"Missing required file: {file}")
return False
# 检查源代码
if not (self.skill_path / "src").exists():
print("Missing src directory")
return False
print("Skill validation passed")
return True
def run_tests(self) -> bool:
print("Running tests...")
try:
result = subprocess.run(
["python", "-m", "pytest", "tests/"],
cwd=self.skill_path,
capture_output=True,
text=True
)
if result.returncode != 0:
print("Tests failed:")
print(result.stdout)
print(result.stderr)
return False
print("All tests passed")
return True
except Exception as e:
print(f"Test execution failed: {e}")
return False
def build_package(self) -> bool:
print("Building package...")
# 清理旧的构建
if self.dist_path.exists():
shutil.rmtree(self.dist_path)
# 构建包
try:
result = subprocess.run(
["python", "-m", "build"],
cwd=self.skill_path,
capture_output=True,
text=True
)
if result.returncode != 0:
print("Build failed:")
print(result.stdout)
print(result.stderr)
return False
print("Package built successfully")
return True
except Exception as e:
print(f"Build failed: {e}")
return False
def publish_to_pypi(self, test_pypi: bool = False) -> bool:
print(f"Publishing to {'Test' if test_pypi else 'PyPI'}...")
# 获取发布包
packages = list(self.dist_path.glob("*"))
if not packages:
print("No packages found to publish")
return False
# 发布
try:
repository = "testpypi" if test_pypi else "pypi"
result = subprocess.run(
["twine", "upload", "--repository", repository] + [str(p) for p in packages],
cwd=self.skill_path,
capture_output=True,
text=True
)
if result.returncode != 0:
print("Publish failed:")
print(result.stdout)
print(result.stderr)
return False
print("Published successfully")
return True
except Exception as e:
print(f"Publish failed: {e}")
return False
def publish_to_market(self, market_url: str, api_key: str) -> bool:
print("Publishing to Skill Market...")
# 读取skill.md
skill_md = self.skill_path / "skill.md"
with open(skill_md, 'r', encoding='utf-8') as f:
content = f.read()
# 解析Front Matter
import yaml
lines = content.split('\n')
if lines[0] != '---':
print("Invalid skill.md format")
return False
end_index = lines[1:].index('---') + 1
front_matter_text = '\n'.join(lines[1:end_index])
skill_info = yaml.safe_load(front_matter_text)
# 准备发布数据
publish_data = {
"name": skill_info.get('name'),
"version": skill_info.get('version'),
"description": skill_info.get('description'),
"tags": skill_info.get('tags', []),
"license": skill_info.get('license'),
"repository": skill_info.get('repository'),
"author": skill_info.get('author')
}
# 发布
try:
response = requests.post(
f"{market_url}/api/skills/publish",
json=publish_data,
headers={"Authorization": f"Bearer {api_key}"}
)
if response.status_code != 200:
print(f"Publish failed: {response.text}")
return False
print("Published to Skill Market successfully")
return True
except Exception as e:
print(f"Publish failed: {e}")
return False
def publish(
self,
pypi: bool = True,
test_pypi: bool = False,
market_url: str = None,
market_api_key: str = None
) -> bool:
if not self.validate_skill():
return False
if not self.run_tests():
return False
if not self.build_package():
return False
if pypi:
if not self.publish_to_pypi(test_pypi):
return False
if market_url and market_api_key:
if not self.publish_to_market(market_url, market_api_key):
return False
print("All publish steps completed successfully")
return True
# 使用示例
if __name__ == "__main__":
publisher = SkillPublisher("./your-skill")
publisher.publish(
pypi=True,
test_pypi=False,
market_url="https://skills.market",
market_api_key="your-api-key"
)示例2:Skill市场API
python
from fastapi import FastAPI, HTTPException, Depends, Query
from fastapi.middleware.cors import CORSMiddleware
from typing import List, Optional
from pydantic import BaseModel
import uvicorn
app = FastAPI(title="Skill Market API")
# CORS配置
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 数据模型
class Skill(BaseModel):
id: str
name: str
version: str
description: str
author: str
tags: List[str]
license: str
repository: str
downloads: int
views: int
rating: float
review_count: int
created_at: str
updated_at: str
class SkillSearchResult(BaseModel):
skills: List[Skill]
total: int
page: int
page_size: int
# 模拟数据库
skills_db = {}
# API端点
@app.get("/api/skills", response_model=SkillSearchResult)
async def list_skills(
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=1, le=100),
sort: str = Query("updated", regex="^(updated|downloads|rating|name)$")
):
skills = list(skills_db.values())
# 排序
if sort == "updated":
skills.sort(key=lambda x: x.updated_at, reverse=True)
elif sort == "downloads":
skills.sort(key=lambda x: x.downloads, reverse=True)
elif sort == "rating":
skills.sort(key=lambda x: x.rating, reverse=True)
elif sort == "name":
skills.sort(key=lambda x: x.name)
# 分页
total = len(skills)
start = (page - 1) * page_size
end = start + page_size
paginated_skills = skills[start:end]
return SkillSearchResult(
skills=paginated_skills,
total=total,
page=page,
page_size=page_size
)
@app.get("/api/skills/search", response_model=SkillSearchResult)
async def search_skills(
q: str = Query(..., description="Search query"),
category: Optional[str] = None,
tags: Optional[List[str]] = None,
sort: str = Query("relevance", regex="^(relevance|downloads|rating|updated)$"),
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=1, le=100)
):
skills = list(skills_db.values())
# 搜索过滤
filtered_skills = []
query_lower = q.lower()
for skill in skills:
# 文本搜索
if (query_lower in skill.name.lower() or
query_lower in skill.description.lower()):
filtered_skills.append(skill)
continue
# 标签搜索
if tags and any(tag in skill.tags for tag in tags):
filtered_skills.append(skill)
continue
# 排序
if sort == "downloads":
filtered_skills.sort(key=lambda x: x.downloads, reverse=True)
elif sort == "rating":
filtered_skills.sort(key=lambda x: x.rating, reverse=True)
elif sort == "updated":
filtered_skills.sort(key=lambda x: x.updated_at, reverse=True)
# 分页
total = len(filtered_skills)
start = (page - 1) * page_size
end = start + page_size
paginated_skills = filtered_skills[start:end]
return SkillSearchResult(
skills=paginated_skills,
total=total,
page=page,
page_size=page_size
)
@app.get("/api/skills/{skill_id}", response_model=Skill)
async def get_skill(skill_id: str):
skill = skills_db.get(skill_id)
if not skill:
raise HTTPException(status_code=404, detail="Skill not found")
# 增加浏览次数
skill.views += 1
return skill
@app.get("/api/skills/{skill_id}/versions")
async def get_skill_versions(skill_id: str):
skill = skills_db.get(skill_id)
if not skill:
raise HTTPException(status_code=404, detail="Skill not found")
# 返回版本列表(模拟)
return [
{
"version": skill.version,
"download_url": f"https://example.com/skills/{skill_id}/{skill.version}.tar.gz",
"created_at": skill.updated_at
}
]
@app.post("/api/skills/{skill_id}/download")
async def download_skill(skill_id: str, version: Optional[str] = None):
skill = skills_db.get(skill_id)
if not skill:
raise HTTPException(status_code=404, detail="Skill not found")
# 增加下载次数
skill.downloads += 1
# 返回下载信息
return {
"download_url": f"https://example.com/skills/{skill_id}/{skill.version}.tar.gz",
"checksum": "abc123def456"
}
@app.get("/api/categories")
async def list_categories():
return [
{"id": "text", "name": "Text Processing"},
{"id": "image", "name": "Image Processing"},
{"id": "data", "name": "Data Analysis"},
{"id": "web", "name": "Web Scraping"},
{"id": "ai", "name": "AI/ML"}
]
@app.get("/api/tags")
async def list_tags():
return [
{"id": "nlp", "name": "NLP"},
{"id": "cv", "name": "Computer Vision"},
{"id": "ml", "name": "Machine Learning"},
{"id": "api", "name": "API"},
{"id": "utils", "name": "Utilities"}
]
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)总结
Skills生态与市场是Skills标准的重要组成部分,它为开发者提供了展示和分享技能的平台,为使用者提供了发现和使用技能的渠道。本节我们学习了:
- Skills生态系统
- Skill发布流程
- Skill市场构建
- Skill商业化
- Skill版本管理
- Skill推广策略
通过构建健康的Skills生态系统,可以促进技能的共享和创新,推动AI技术的发展。
