Appearance
第22天:MCP资源管理
学习目标
- 掌握MCP资源的定义方法
- 学会资源访问控制
- 学会权限管理
- 能够实现MCP资源管理
核心内容
1. MCP资源概述
MCP资源的定义:
MCP资源是Tool操作的对象或数据,定义了Tool可以访问的范围。资源可以是物理的(如文件、设备)或逻辑的(如数据库记录、API端点)。
资源的重要性:
- 访问控制:控制Tool对资源的访问
- 权限管理:管理用户对资源的权限
- 状态跟踪:跟踪资源的当前状态
- 版本控制:管理资源的版本变化
- 审计日志:记录资源的访问历史
2. 资源类型
2.1 资源分类
MCP资源分类:
- 文件资源:本地文件、网络文件、云存储文件
- 网络资源:API端点、网页、网络服务
- 数据库资源:数据库表、记录、查询
- 计算资源:CPU、GPU、内存、存储
- 服务资源:外部服务、内部服务
- 配置资源:配置文件、环境变量
- 用户资源:用户信息、用户数据
- 系统资源:系统信息、进程、网络连接
2.2 资源属性
资源核心属性:
- 标识符:唯一标识资源
- 名称:资源的可读名称
- 类型:资源的类型
- 描述:资源的详细描述
- 属性:资源的特定属性
- 状态:资源的当前状态
- 权限:资源的访问权限
- 元数据:资源的附加信息
- 创建时间:资源创建时间
- 更新时间:资源最后更新时间
3. 资源定义
3.1 资源模型设计
资源模型:
python
from pydantic import BaseModel
from typing import Optional, Dict, Any, List
from enum import Enum
from datetime import datetime
class ResourceType(str, Enum):
"""资源类型枚举"""
FILE = "file"
NETWORK = "network"
DATABASE = "database"
COMPUTE = "compute"
SERVICE = "service"
CONFIG = "config"
USER = "user"
SYSTEM = "system"
class ResourceStatus(str, Enum):
"""资源状态枚举"""
ACTIVE = "active"
INACTIVE = "inactive"
CREATING = "creating"
DELETING = "deleting"
ERROR = "error"
class ResourcePermission(BaseModel):
"""资源权限"""
read: bool = False
write: bool = False
execute: bool = False
delete: bool = False
class Resource(BaseModel):
"""资源模型"""
id: str
name: str
type: ResourceType
description: Optional[str] = None
properties: Dict[str, Any] = {}
status: ResourceStatus = ResourceStatus.ACTIVE
permissions: Dict[str, ResourcePermission] = {}
metadata: Dict[str, Any] = {}
created_at: str
updated_at: str
version: int = 1
class ResourceCreate(BaseModel):
"""创建资源的参数"""
name: str
type: ResourceType
description: Optional[str] = None
properties: Dict[str, Any] = {}
permissions: Optional[Dict[str, ResourcePermission]] = None
metadata: Dict[str, Any] = {}
class ResourceUpdate(BaseModel):
"""更新资源的参数"""
name: Optional[str] = None
description: Optional[str] = None
properties: Optional[Dict[str, Any]] = None
status: Optional[ResourceStatus] = None
permissions: Optional[Dict[str, ResourcePermission]] = None
metadata: Optional[Dict[str, Any]] = None
class ResourceVersion(BaseModel):
"""资源版本"""
version: int
resource_id: str
content: Dict[str, Any]
changed_by: Optional[str] = None
changed_at: str
reason: Optional[str] = None3.2 资源管理器设计
资源管理器:
python
from typing import Dict, List, Optional
from datetime import datetime
from app.models.resource import Resource, ResourceCreate, ResourceUpdate, ResourceStatus
class ResourceManager:
"""资源管理器"""
def __init__(self):
self.resources: Dict[str, Resource] = {}
self.resource_versions: Dict[str, List[Dict[str, Any]]] = {}
def create(self, resource_data: ResourceCreate) -> Resource:
"""创建资源"""
resource_id = f"{resource_data.type.value}_{len(self.resources) + 1}"
now = datetime.utcnow().isoformat()
# 初始化权限
permissions = resource_data.permissions or {}
if "default" not in permissions:
from app.models.resource import ResourcePermission
permissions["default"] = ResourcePermission(read=True, write=False, execute=False, delete=False)
resource = Resource(
id=resource_id,
name=resource_data.name,
type=resource_data.type,
description=resource_data.description,
properties=resource_data.properties,
status=ResourceStatus.ACTIVE,
permissions=permissions,
metadata=resource_data.metadata,
created_at=now,
updated_at=now,
version=1
)
self.resources[resource_id] = resource
# 记录版本
self._record_version(resource_id, resource, "Resource created")
return resource
def get(self, resource_id: str) -> Optional[Resource]:
"""获取资源"""
return self.resources.get(resource_id)
def list(self, resource_type: Optional[str] = None) -> List[Resource]:
"""获取资源列表"""
if resource_type:
return [r for r in self.resources.values() if r.type.value == resource_type]
return list(self.resources.values())
def update(self, resource_id: str, resource_data: ResourceUpdate) -> Optional[Resource]:
"""更新资源"""
if resource_id not in self.resources:
return None
resource = self.resources[resource_id]
update_data = resource_data.dict(exclude_unset=True)
# 记录旧版本
old_version = resource.dict()
# 更新资源
for field, value in update_data.items():
if hasattr(resource, field):
setattr(resource, field, value)
# 更新版本和时间
resource.version += 1
resource.updated_at = datetime.utcnow().isoformat()
self.resources[resource_id] = resource
# 记录版本
self._record_version(resource_id, resource, "Resource updated")
return resource
def delete(self, resource_id: str) -> bool:
"""删除资源"""
if resource_id not in self.resources:
return False
# 记录删除操作
resource = self.resources[resource_id]
self._record_version(resource_id, resource, "Resource deleted")
del self.resources[resource_id]
return True
def update_status(self, resource_id: str, status: ResourceStatus) -> Optional[Resource]:
"""更新资源状态"""
if resource_id not in self.resources:
return None
resource = self.resources[resource_id]
resource.status = status
resource.updated_at = datetime.utcnow().isoformat()
self.resources[resource_id] = resource
# 记录版本
self._record_version(resource_id, resource, f"Status changed to {status.value}")
return resource
def _record_version(self, resource_id: str, resource: Resource, reason: str):
"""记录资源版本"""
if resource_id not in self.resource_versions:
self.resource_versions[resource_id] = []
version_record = {
"version": resource.version,
"content": resource.dict(),
"changed_at": datetime.utcnow().isoformat(),
"reason": reason
}
self.resource_versions[resource_id].append(version_record)
# 限制版本历史数量
if len(self.resource_versions[resource_id]) > 100:
self.resource_versions[resource_id] = self.resource_versions[resource_id][-100:]
def get_versions(self, resource_id: str) -> List[Dict[str, Any]]:
"""获取资源版本历史"""
return self.resource_versions.get(resource_id, [])
def get_version(self, resource_id: str, version: int) -> Optional[Dict[str, Any]]:
"""获取特定版本的资源"""
versions = self.resource_versions.get(resource_id, [])
for v in versions:
if v["version"] == version:
return v
return None4. 资源访问控制
4.1 访问控制模型
常见的访问控制模型:
- DAC(自主访问控制):资源所有者控制访问权限
- MAC(强制访问控制):系统根据安全策略控制访问
- RBAC(基于角色的访问控制):根据用户角色控制访问
- ABAC(基于属性的访问控制):根据资源和用户属性控制访问
- PBAC(基于策略的访问控制):根据预定义策略控制访问
4.2 RBAC实现
基于角色的访问控制:
python
from typing import Dict, List, Set
class Role:
"""角色"""
def __init__(self, name: str, description: str):
self.name = name
self.description = description
self.permissions: Set[str] = set()
def add_permission(self, permission: str):
"""添加权限"""
self.permissions.add(permission)
def remove_permission(self, permission: str):
"""移除权限"""
if permission in self.permissions:
self.permissions.remove(permission)
def has_permission(self, permission: str) -> bool:
"""检查是否有特定权限"""
return permission in self.permissions
class User:
"""用户"""
def __init__(self, username: str):
self.username = username
self.roles: Set[str] = set()
def add_role(self, role_name: str):
"""添加角色"""
self.roles.add(role_name)
def remove_role(self, role_name: str):
"""移除角色"""
if role_name in self.roles:
self.roles.remove(role_name)
class RBACManager:
"""RBAC管理器"""
def __init__(self):
self.roles: Dict[str, Role] = {}
self.users: Dict[str, User] = {}
def create_role(self, name: str, description: str) -> Role:
"""创建角色"""
role = Role(name, description)
self.roles[name] = role
return role
def get_role(self, name: str) -> Optional[Role]:
"""获取角色"""
return self.roles.get(name)
def create_user(self, username: str) -> User:
"""创建用户"""
user = User(username)
self.users[username] = user
return user
def get_user(self, username: str) -> Optional[User]:
"""获取用户"""
return self.users.get(username)
def assign_role(self, username: str, role_name: str) -> bool:
"""分配角色给用户"""
if username not in self.users or role_name not in self.roles:
return False
user = self.users[username]
user.add_role(role_name)
return True
def revoke_role(self, username: str, role_name: str) -> bool:
"""从用户撤销角色"""
if username not in self.users:
return False
user = self.users[username]
user.remove_role(role_name)
return True
def check_permission(self, username: str, permission: str) -> bool:
"""检查用户是否有特定权限"""
if username not in self.users:
return False
user = self.users[username]
for role_name in user.roles:
role = self.roles.get(role_name)
if role and role.has_permission(permission):
return True
return False4.3 资源访问控制实现
资源访问控制:
python
from typing import Dict, Any, Optional
from app.models.resource import Resource
from app.security.rbac import RBACManager
class ResourceAccessController:
"""资源访问控制器"""
def __init__(self, rbac_manager: RBACManager):
self.rbac_manager = rbac_manager
self.resource_locks: Dict[str, Dict[str, Any]] = {}
def check_access(self, user: str, resource_id: str, action: str) -> bool:
"""检查用户对资源的访问权限"""
# 1. 检查用户是否存在
if not self.rbac_manager.get_user(user):
return False
# 2. 检查资源是否存在
from app.managers.resource_manager import resource_manager
resource = resource_manager.get(resource_id)
if not resource:
return False
# 3. 检查用户角色权限
permission = f"{resource.type.value}:{action}"
if not self.rbac_manager.check_permission(user, permission):
return False
# 4. 检查资源特定权限
if user in resource.permissions:
resource_perm = resource.permissions[user]
if action == "read" and not resource_perm.read:
return False
elif action == "write" and not resource_perm.write:
return False
elif action == "execute" and not resource_perm.execute:
return False
elif action == "delete" and not resource_perm.delete:
return False
# 5. 检查默认权限
elif "default" in resource.permissions:
default_perm = resource.permissions["default"]
if action == "read" and not default_perm.read:
return False
elif action == "write" and not default_perm.write:
return False
elif action == "execute" and not default_perm.execute:
return False
elif action == "delete" and not default_perm.delete:
return False
# 6. 检查资源状态
from app.models.resource import ResourceStatus
if resource.status != ResourceStatus.ACTIVE:
return False
return True
def lock_resource(self, resource_id: str, user: str, timeout: int = 300) -> bool:
"""锁定资源,防止并发访问"""
if resource_id in self.resource_locks:
existing_lock = self.resource_locks[resource_id]
if existing_lock["user"] != user:
return False
import time
self.resource_locks[resource_id] = {
"user": user,
"locked_at": time.time(),
"timeout": timeout
}
return True
def unlock_resource(self, resource_id: str, user: str) -> bool:
"""解锁资源"""
if resource_id in self.resource_locks:
lock = self.resource_locks[resource_id]
if lock["user"] == user:
del self.resource_locks[resource_id]
return True
return False
def cleanup_locks(self):
"""清理过期的锁"""
import time
current_time = time.time()
expired_locks = []
for resource_id, lock in self.resource_locks.items():
if current_time - lock["locked_at"] > lock["timeout"]:
expired_locks.append(resource_id)
for resource_id in expired_locks:
del self.resource_locks[resource_id]5. 资源权限管理
5.1 权限层级
权限层级:
- 系统级权限:控制对整个系统的访问
- 资源类型级权限:控制对特定类型资源的访问
- 资源实例级权限:控制对特定资源实例的访问
- 操作级权限:控制对资源的特定操作
5.2 权限管理实现
权限管理:
python
from typing import Dict, List, Optional
from app.models.resource import Resource, ResourcePermission
class PermissionManager:
"""权限管理器"""
def __init__(self):
self.permissions: Dict[str, Dict[str, ResourcePermission]] = {}
def set_permission(self, resource_id: str, user: str, permission: ResourcePermission):
"""设置用户对资源的权限"""
if resource_id not in self.permissions:
self.permissions[resource_id] = {}
self.permissions[resource_id][user] = permission
# 更新资源的权限
from app.managers.resource_manager import resource_manager
resource = resource_manager.get(resource_id)
if resource:
resource.permissions[user] = permission
resource_manager.update(resource_id, resource)
def get_permission(self, resource_id: str, user: str) -> Optional[ResourcePermission]:
"""获取用户对资源的权限"""
if resource_id in self.permissions and user in self.permissions[resource_id]:
return self.permissions[resource_id][user]
# 检查默认权限
from app.managers.resource_manager import resource_manager
resource = resource_manager.get(resource_id)
if resource and "default" in resource.permissions:
return resource.permissions["default"]
return None
def remove_permission(self, resource_id: str, user: str) -> bool:
"""移除用户对资源的权限"""
if resource_id in self.permissions and user in self.permissions[resource_id]:
del self.permissions[resource_id][user]
# 更新资源的权限
from app.managers.resource_manager import resource_manager
resource = resource_manager.get(resource_id)
if resource and user in resource.permissions:
del resource.permissions[user]
resource_manager.update(resource_id, resource)
return True
return False
def set_default_permission(self, resource_id: str, permission: ResourcePermission):
"""设置资源的默认权限"""
# 更新资源的默认权限
from app.managers.resource_manager import resource_manager
resource = resource_manager.get(resource_id)
if resource:
resource.permissions["default"] = permission
resource_manager.update(resource_id, resource)
def check_permission(self, resource_id: str, user: str, action: str) -> bool:
"""检查用户对资源的特定操作权限"""
permission = self.get_permission(resource_id, user)
if not permission:
return False
if action == "read":
return permission.read
elif action == "write":
return permission.write
elif action == "execute":
return permission.execute
elif action == "delete":
return permission.delete
else:
return False
def get_users_with_access(self, resource_id: str, action: str) -> List[str]:
"""获取对资源有特定操作权限的用户列表"""
users = []
# 检查资源的权限
from app.managers.resource_manager import resource_manager
resource = resource_manager.get(resource_id)
if resource:
for user, permission in resource.permissions.items():
if user != "default" and self.check_permission(resource_id, user, action):
users.append(user)
return users6. 资源状态管理
6.1 状态定义
资源状态:
- ACTIVE:资源可用
- INACTIVE:资源不可用
- CREATING:资源正在创建
- DELETING:资源正在删除
- ERROR:资源出错
- MAINTENANCE:资源维护中
- UPDATING:资源正在更新
6.2 状态管理实现
状态管理:
python
from typing import Dict, Any, Optional
from app.models.resource import ResourceStatus
class ResourceStateManager:
"""资源状态管理器"""
def __init__(self):
self.state_history: Dict[str, List[Dict[str, Any]]] = {}
def update_state(self, resource_id: str, status: ResourceStatus, reason: str = ""):
"""更新资源状态"""
# 更新资源状态
from app.managers.resource_manager import resource_manager
resource = resource_manager.update_status(resource_id, status)
if resource:
# 记录状态历史
if resource_id not in self.state_history:
self.state_history[resource_id] = []
import datetime
state_record = {
"status": status.value,
"timestamp": datetime.utcnow().isoformat(),
"reason": reason
}
self.state_history[resource_id].append(state_record)
# 限制历史记录数量
if len(self.state_history[resource_id]) > 100:
self.state_history[resource_id] = self.state_history[resource_id][-100:]
return True
return False
def get_state(self, resource_id: str) -> Optional[ResourceStatus]:
"""获取资源当前状态"""
from app.managers.resource_manager import resource_manager
resource = resource_manager.get(resource_id)
if resource:
return resource.status
return None
def get_state_history(self, resource_id: str) -> List[Dict[str, Any]]:
"""获取资源状态历史"""
return self.state_history.get(resource_id, [])
def get_resources_by_state(self, status: ResourceStatus) -> List[str]:
"""获取特定状态的资源列表"""
from app.managers.resource_manager import resource_manager
resources = resource_manager.list()
matching_resources = []
for resource in resources:
if resource.status == status:
matching_resources.append(resource.id)
return matching_resources
def validate_state_transition(self, current_status: ResourceStatus, new_status: ResourceStatus) -> bool:
"""验证状态转换是否合法"""
# 定义合法的状态转换
valid_transitions = {
ResourceStatus.ACTIVE: [ResourceStatus.INACTIVE, ResourceStatus.UPDATING, ResourceStatus.MAINTENANCE, ResourceStatus.ERROR],
ResourceStatus.INACTIVE: [ResourceStatus.ACTIVE, ResourceStatus.DELETING],
ResourceStatus.CREATING: [ResourceStatus.ACTIVE, ResourceStatus.ERROR],
ResourceStatus.DELETING: [],
ResourceStatus.ERROR: [ResourceStatus.ACTIVE, ResourceStatus.INACTIVE, ResourceStatus.DELETING],
ResourceStatus.MAINTENANCE: [ResourceStatus.ACTIVE, ResourceStatus.ERROR],
ResourceStatus.UPDATING: [ResourceStatus.ACTIVE, ResourceStatus.ERROR]
}
return new_status in valid_transitions.get(current_status, [])7. 资源版本控制
7.1 版本管理
版本控制的重要性:
- 回滚能力:在出错时回滚到之前的版本
- 变更追踪:追踪资源的变更历史
- 审计日志:记录谁在何时修改了资源
- 冲突解决:解决并发修改的冲突
7.2 版本控制实现
版本控制:
python
from typing import Dict, List, Any, Optional
class ResourceVersionManager:
"""资源版本管理器"""
def __init__(self):
self.versions: Dict[str, List[Dict[str, Any]]] = {}
def create_version(self, resource_id: str, content: Dict[str, Any], author: str, reason: str):
"""创建资源版本"""
if resource_id not in self.versions:
self.versions[resource_id] = []
import datetime
version = {
"version": len(self.versions[resource_id]) + 1,
"content": content,
"author": author,
"timestamp": datetime.utcnow().isoformat(),
"reason": reason
}
self.versions[resource_id].append(version)
# 限制版本数量
if len(self.versions[resource_id]) > 50:
self.versions[resource_id] = self.versions[resource_id][-50:]
return version
def get_version(self, resource_id: str, version: int) -> Optional[Dict[str, Any]]:
"""获取特定版本"""
if resource_id in self.versions:
for v in self.versions[resource_id]:
if v["version"] == version:
return v
return None
def get_latest_version(self, resource_id: str) -> Optional[Dict[str, Any]]:
"""获取最新版本"""
if resource_id in self.versions and self.versions[resource_id]:
return self.versions[resource_id][-1]
return None
def list_versions(self, resource_id: str) -> List[Dict[str, Any]]:
"""列出所有版本"""
return self.versions.get(resource_id, [])
def rollback(self, resource_id: str, version: int) -> Optional[Dict[str, Any]]:
"""回滚到特定版本"""
target_version = self.get_version(resource_id, version)
if not target_version:
return None
# 从目标版本创建新版本
new_version = self.create_version(
resource_id=resource_id,
content=target_version["content"],
author="system",
reason=f"Rollback to version {version}"
)
# 更新资源
from app.managers.resource_manager import resource_manager
from app.models.resource import Resource
resource_data = target_version["content"]
resource = Resource(**resource_data)
resource.id = resource_id
resource.version = new_version["version"]
resource_manager.update(resource_id, resource)
return new_version
def compare_versions(self, resource_id: str, version1: int, version2: int) -> Dict[str, Any]:
"""比较两个版本的差异"""
v1 = self.get_version(resource_id, version1)
v2 = self.get_version(resource_id, version2)
if not v1 or not v2:
return {"error": "版本不存在"}
# 简单的差异比较
diff = {}
for key, value in v2["content"].items():
if key not in v1["content"] or v1["content"][key] != value:
diff[key] = {
"old": v1["content"].get(key),
"new": value
}
return {
"version1": version1,
"version2": version2,
"changes": diff
}8. 资源监控与审计
8.1 监控指标
资源监控指标:
- 使用情况:资源的使用百分比
- 性能指标:响应时间、吞吐量
- 错误率:错误发生的频率
- 访问频率:资源被访问的次数
- 状态变化:状态转换的频率
- 安全事件:安全相关的事件
8.2 监控实现
资源监控:
python
from typing import Dict, List, Any
import time
class ResourceMonitor:
"""资源监控器"""
def __init__(self):
self.metrics: Dict[str, List[Dict[str, Any]]] = {}
self.access_logs: Dict[str, List[Dict[str, Any]]] = {}
def record_metric(self, resource_id: str, metric_name: str, value: Any):
"""记录资源指标"""
if resource_id not in self.metrics:
self.metrics[resource_id] = []
metric = {
"timestamp": time.time(),
"metric": metric_name,
"value": value
}
self.metrics[resource_id].append(metric)
# 限制指标数量
if len(self.metrics[resource_id]) > 1000:
self.metrics[resource_id] = self.metrics[resource_id][-1000:]
def record_access(self, resource_id: str, user: str, action: str, success: bool, duration: float):
"""记录资源访问"""
if resource_id not in self.access_logs:
self.access_logs[resource_id] = []
access_log = {
"timestamp": time.time(),
"user": user,
"action": action,
"success": success,
"duration": duration
}
self.access_logs[resource_id].append(access_log)
# 限制日志数量
if len(self.access_logs[resource_id]) > 1000:
self.access_logs[resource_id] = self.access_logs[resource_id][-1000:]
def get_metrics(self, resource_id: str, metric_name: str = None, time_range: float = 3600):
"""获取资源指标"""
if resource_id not in self.metrics:
return []
now = time.time()
filtered_metrics = []
for metric in self.metrics[resource_id]:
if (now - metric["timestamp"]) <= time_range:
if not metric_name or metric["metric"] == metric_name:
filtered_metrics.append(metric)
return filtered_metrics
def get_access_logs(self, resource_id: str, user: str = None, action: str = None, time_range: float = 3600):
"""获取资源访问日志"""
if resource_id not in self.access_logs:
return []
now = time.time()
filtered_logs = []
for log in self.access_logs[resource_id]:
if (now - log["timestamp"]) <= time_range:
if (not user or log["user"] == user) and (not action or log["action"] == action):
filtered_logs.append(log)
return filtered_logs
def get_statistics(self, resource_id: str, time_range: float = 3600):
"""获取资源统计信息"""
access_logs = self.get_access_logs(resource_id, time_range=time_range)
if not access_logs:
return {
"total_accesses": 0,
"successful_accesses": 0,
"failed_accesses": 0,
"average_duration": 0,
"most_common_action": None,
"most_active_user": None
}
total_accesses = len(access_logs)
successful_accesses = sum(1 for log in access_logs if log["success"])
failed_accesses = total_accesses - successful_accesses
average_duration = sum(log["duration"] for log in access_logs) / total_accesses
# 最常见的操作
action_counts = {}
for log in access_logs:
action_counts[log["action"]] = action_counts.get(log["action"], 0) + 1
most_common_action = max(action_counts, key=action_counts.get) if action_counts else None
# 最活跃的用户
user_counts = {}
for log in access_logs:
user_counts[log["user"]] = user_counts.get(log["user"], 0) + 1
most_active_user = max(user_counts, key=user_counts.get) if user_counts else None
return {
"total_accesses": total_accesses,
"successful_accesses": successful_accesses,
"failed_accesses": failed_accesses,
"average_duration": average_duration,
"most_common_action": most_common_action,
"most_active_user": most_active_user
}9. 实战:实现资源管理系统
9.1 系统架构
资源管理系统架构:
┌────────────────┐ ┌────────────────┐ ┌────────────────┐
│ MCP Server │────>│ Resource │────>│ Storage │
│ (FastAPI) │<────│ Manager │<────│ (Database) │
└────────────────┘ └────────────────┘ └────────────────┘
│ │ │
▼ ▼ ▼
┌────────────────┐ ┌────────────────┐ ┌────────────────┐
│ Access │ │ Version │ │ Monitor │
│ Controller │ │ Manager │ │ System │
└────────────────┘ └────────────────┘ └────────────────┘9.2 完整实现
资源管理系统实现:
python
# main.py
from fastapi import FastAPI, Depends, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
import json
from typing import Dict, Any, List, Optional
from app.managers.resource_manager import ResourceManager
from app.models.resource import ResourceCreate, ResourceUpdate, ResourceType, ResourceStatus
from app.security.rbac import RBACManager, User, Role
from app.security.permission import PermissionManager
from app.monitoring.resource_monitor import ResourceMonitor
from app.api.dependencies import get_current_user, get_api_key
app = FastAPI(title="MCP Resource Management", version="1.0.0")
# 配置CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 初始化管理器
resource_manager = ResourceManager()
rbac_manager = RBACManager()
permission_manager = PermissionManager()
resource_monitor = ResourceMonitor()
# 初始化默认角色和用户
def init_defaults():
"""初始化默认角色和用户"""
# 创建默认角色
admin_role = rbac_manager.create_role("admin", "Administrator")
user_role = rbac_manager.create_role("user", "Regular User")
# 添加默认权限
admin_role.add_permission("file:read")
admin_role.add_permission("file:write")
admin_role.add_permission("file:execute")
admin_role.add_permission("file:delete")
user_role.add_permission("file:read")
user_role.add_permission("file:execute")
# 创建默认用户
admin_user = rbac_manager.create_user("admin")
test_user = rbac_manager.create_user("test")
# 分配角色
rbac_manager.assign_role("admin", "admin")
rbac_manager.assign_role("test", "user")
# 初始化
init_defaults()
# 资源管理端点
@app.post("/resources", response_model=Dict[str, Any])
async def create_resource(
resource_data: ResourceCreate,
current_user: str = Depends(get_current_user),
api_key: str = Depends(get_api_key)
):
"""创建资源"""
start_time = time.time()
try:
resource = resource_manager.create(resource_data)
duration = time.time() - start_time
# 记录访问
resource_monitor.record_access(
resource_id=resource.id,
user=current_user,
action="create",
success=True,
duration=duration
)
return resource.dict()
except Exception as e:
duration = time.time() - start_time
resource_monitor.record_access(
resource_id="",
user=current_user,
action="create",
success=False,
duration=duration
)
raise HTTPException(status_code=400, detail=str(e))
@app.get("/resources", response_model=List[Dict[str, Any]])
async def list_resources(
resource_type: Optional[str] = None,
current_user: str = Depends(get_current_user),
api_key: str = Depends(get_api_key)
):
"""列出资源"""
start_time = time.time()
try:
resources = resource_manager.list(resource_type)
duration = time.time() - start_time
# 记录访问
resource_monitor.record_access(
resource_id="",
user=current_user,
action="list",
success=True,
duration=duration
)
return [r.dict() for r in resources]
except Exception as e:
duration = time.time() - start_time
resource_monitor.record_access(
resource_id="",
user=current_user,
action="list",
success=False,
duration=duration
)
raise HTTPException(status_code=400, detail=str(e))
@app.get("/resources/{resource_id}", response_model=Dict[str, Any])
async def get_resource(
resource_id: str,
current_user: str = Depends(get_current_user),
api_key: str = Depends(get_api_key)
):
"""获取资源"""
start_time = time.time()
try:
resource = resource_manager.get(resource_id)
if not resource:
raise HTTPException(status_code=404, detail="Resource not found")
duration = time.time() - start_time
# 记录访问
resource_monitor.record_access(
resource_id=resource_id,
user=current_user,
action="read",
success=True,
duration=duration
)
return resource.dict()
except HTTPException:
raise
except Exception as e:
duration = time.time() - start_time
resource_monitor.record_access(
resource_id=resource_id,
user=current_user,
action="read",
success=False,
duration=duration
)
raise HTTPException(status_code=400, detail=str(e))
@app.put("/resources/{resource_id}", response_model=Dict[str, Any])
async def update_resource(
resource_id: str,
resource_data: ResourceUpdate,
current_user: str = Depends(get_current_user),
api_key: str = Depends(get_api_key)
):
"""更新资源"""
start_time = time.time()
try:
resource = resource_manager.update(resource_id, resource_data)
if not resource:
raise HTTPException(status_code=404, detail="Resource not found")
duration = time.time() - start_time
# 记录访问
resource_monitor.record_access(
resource_id=resource_id,
user=current_user,
action="update",
success=True,
duration=duration
)
return resource.dict()
except HTTPException:
raise
except Exception as e:
duration = time.time() - start_time
resource_monitor.record_access(
resource_id=resource_id,
user=current_user,
action="update",
success=False,
duration=duration
)
raise HTTPException(status_code=400, detail=str(e))
@app.delete("/resources/{resource_id}", response_model=Dict[str, bool])
async def delete_resource(
resource_id: str,
current_user: str = Depends(get_current_user),
api_key: str = Depends(get_api_key)
):
"""删除资源"""
start_time = time.time()
try:
success = resource_manager.delete(resource_id)
duration = time.time() - start_time
# 记录访问
resource_monitor.record_access(
resource_id=resource_id,
user=current_user,
action="delete",
success=success,
duration=duration
)
if not success:
raise HTTPException(status_code=404, detail="Resource not found")
return {"success": True}
except HTTPException:
raise
except Exception as e:
duration = time.time() - start_time
resource_monitor.record_access(
resource_id=resource_id,
user=current_user,
action="delete",
success=False,
duration=duration
)
raise HTTPException(status_code=400, detail=str(e))
@app.get("/resources/{resource_id}/versions", response_model=List[Dict[str, Any]])
async def list_resource_versions(
resource_id: str,
current_user: str = Depends(get_current_user),
api_key: str = Depends(get_api_key)
):
"""列出资源版本"""
try:
versions = resource_manager.get_versions(resource_id)
return versions
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@app.get("/resources/{resource_id}/metrics", response_model=Dict[str, Any])
async def get_resource_metrics(
resource_id: str,
metric_name: Optional[str] = None,
time_range: float = 3600,
current_user: str = Depends(get_current_user),
api_key: str = Depends(get_api_key)
):
"""获取资源指标"""
try:
metrics = resource_monitor.get_metrics(resource_id, metric_name, time_range)
statistics = resource_monitor.get_statistics(resource_id, time_range)
return {
"metrics": metrics,
"statistics": statistics
}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
if __name__ == "__main__":
import uvicorn
import time
uvicorn.run(app, host="0.0.0.0", port=8000)10. 技术选型建议
10.1 存储方案
资源存储方案:
| 方案 | 优势 | 劣势 | 适用场景 |
|---|---|---|---|
| 内存存储 | 性能高,易于实现 | 数据易丢失,容量有限 | 开发测试,临时存储 |
| 文件系统 | 持久化,容量大 | 性能一般,管理复杂 | 中小规模应用 |
| 关系型数据库 | 结构化存储,事务支持 | 性能开销大 | 企业级应用,需要事务 |
| NoSQL数据库 | 灵活存储,性能高 | 缺少事务支持 | 大规模应用,半结构化数据 |
| 云存储 | 弹性伸缩,高可用 | 成本较高,依赖网络 | 云原生应用,大规模存储 |
10.2 安全方案
安全方案:
| 方案 | 适用场景 | 实现难度 |
|---|---|---|
| API Key | 简单认证 | 低 |
| JWT | 无状态认证 | 中 |
| OAuth2 | 第三方认证 | 高 |
| RBAC | 基于角色的权限管理 | 中 |
| ABAC | 基于属性的权限管理 | 高 |
| 多因素认证 | 高安全性场景 | 高 |
11. 常见问题与解决方案
11.1 资源冲突
问题:多个用户同时访问或修改同一资源,导致冲突。
解决方案:
- 实现资源锁定机制
- 使用乐观锁或悲观锁
- 实现冲突检测和解决机制
- 提供资源版本控制和回滚能力
11.2 权限管理复杂
问题:权限管理过于复杂,难以维护。
解决方案:
- 使用RBAC简化权限管理
- 建立清晰的权限层级
- 自动化权限分配
- 定期权限审计
11.3 资源泄露
问题:资源创建后未正确清理,导致资源泄露。
解决方案:
- 实现资源生命周期管理
- 设置资源自动清理机制
- 定期资源审计
- 使用容器化隔离资源
11.4 性能问题
问题:资源管理系统性能下降。
解决方案:
- 优化存储访问
- 实现缓存机制
- 使用异步处理
- 水平扩展系统
12. 学习资源
12.1 官方文档
- FastAPI Documentation:https://fastapi.tiangolo.com/
- Python Documentation:https://docs.python.org/
- Pydantic Documentation:https://pydantic-docs.helpmanual.io/
- SQLAlchemy Documentation:https://docs.sqlalchemy.org/
12.2 在线资源
- RBAC Explained:https://en.wikipedia.org/wiki/Role-based_access_control
- Resource Management Best Practices:https://www.gartner.com/en/information-technology/glossary/it-resource-management
- Access Control Models:https://www.oreilly.com/library/view/identity-and-access/9781492031085/
13. 总结
本课程深入介绍了MCP资源管理的核心内容,包括资源定义、资源类型、访问控制、权限管理、状态管理、版本控制和监控审计等方面。通过本课程的学习,你应该能够:
- 理解MCP资源的概念和重要性
- 设计和实现资源管理系统
- 实现资源的访问控制和权限管理
- 管理资源的状态和版本
- 监控和审计资源的使用
- 解决资源管理中的常见问题
在后续课程中,我们将学习MCP Client开发、Claude Desktop集成和MCP生态系统等内容,进一步完善你的MCP开发技能。
课后作业
实践题:
- 实现文件资源管理系统
- 实现数据库资源管理系统
- 实现资源访问控制和权限管理
- 实现资源监控和审计系统
思考题:
- 如何设计一个可扩展的资源管理系统?
- 如何处理大规模资源的管理?
- 如何实现资源的自动发现和注册?
- 如何优化资源管理系统的性能?

扫描二维码关注"架构师AI杜"公众号,获取更多技术内容和最新动态
