Skip to content

第22天:MCP资源管理

学习目标

  • 掌握MCP资源的定义方法
  • 学会资源访问控制
  • 学会权限管理
  • 能够实现MCP资源管理

核心内容

1. MCP资源概述

MCP资源的定义

MCP资源是Tool操作的对象或数据,定义了Tool可以访问的范围。资源可以是物理的(如文件、设备)或逻辑的(如数据库记录、API端点)。

资源的重要性

  1. 访问控制:控制Tool对资源的访问
  2. 权限管理:管理用户对资源的权限
  3. 状态跟踪:跟踪资源的当前状态
  4. 版本控制:管理资源的版本变化
  5. 审计日志:记录资源的访问历史

2. 资源类型

2.1 资源分类

MCP资源分类

  1. 文件资源:本地文件、网络文件、云存储文件
  2. 网络资源:API端点、网页、网络服务
  3. 数据库资源:数据库表、记录、查询
  4. 计算资源:CPU、GPU、内存、存储
  5. 服务资源:外部服务、内部服务
  6. 配置资源:配置文件、环境变量
  7. 用户资源:用户信息、用户数据
  8. 系统资源:系统信息、进程、网络连接

2.2 资源属性

资源核心属性

  1. 标识符:唯一标识资源
  2. 名称:资源的可读名称
  3. 类型:资源的类型
  4. 描述:资源的详细描述
  5. 属性:资源的特定属性
  6. 状态:资源的当前状态
  7. 权限:资源的访问权限
  8. 元数据:资源的附加信息
  9. 创建时间:资源创建时间
  10. 更新时间:资源最后更新时间

3. 资源定义

3.1 资源模型设计

资源模型

python
from pydantic import BaseModel
from typing import Optional, Dict, Any, List
from enum import Enum
from datetime import datetime

class ResourceType(str, Enum):
    """资源类型枚举"""
    FILE = "file"
    NETWORK = "network"
    DATABASE = "database"
    COMPUTE = "compute"
    SERVICE = "service"
    CONFIG = "config"
    USER = "user"
    SYSTEM = "system"

class ResourceStatus(str, Enum):
    """资源状态枚举"""
    ACTIVE = "active"
    INACTIVE = "inactive"
    CREATING = "creating"
    DELETING = "deleting"
    ERROR = "error"

class ResourcePermission(BaseModel):
    """资源权限"""
    read: bool = False
    write: bool = False
    execute: bool = False
    delete: bool = False

class Resource(BaseModel):
    """资源模型"""
    id: str
    name: str
    type: ResourceType
    description: Optional[str] = None
    properties: Dict[str, Any] = {}
    status: ResourceStatus = ResourceStatus.ACTIVE
    permissions: Dict[str, ResourcePermission] = {}
    metadata: Dict[str, Any] = {}
    created_at: str
    updated_at: str
    version: int = 1

class ResourceCreate(BaseModel):
    """创建资源的参数"""
    name: str
    type: ResourceType
    description: Optional[str] = None
    properties: Dict[str, Any] = {}
    permissions: Optional[Dict[str, ResourcePermission]] = None
    metadata: Dict[str, Any] = {}

class ResourceUpdate(BaseModel):
    """更新资源的参数"""
    name: Optional[str] = None
    description: Optional[str] = None
    properties: Optional[Dict[str, Any]] = None
    status: Optional[ResourceStatus] = None
    permissions: Optional[Dict[str, ResourcePermission]] = None
    metadata: Optional[Dict[str, Any]] = None

class ResourceVersion(BaseModel):
    """资源版本"""
    version: int
    resource_id: str
    content: Dict[str, Any]
    changed_by: Optional[str] = None
    changed_at: str
    reason: Optional[str] = None

3.2 资源管理器设计

资源管理器

python
from typing import Dict, List, Optional
from datetime import datetime
from app.models.resource import Resource, ResourceCreate, ResourceUpdate, ResourceStatus

class ResourceManager:
    """资源管理器"""
    def __init__(self):
        self.resources: Dict[str, Resource] = {}
        self.resource_versions: Dict[str, List[Dict[str, Any]]] = {}
    
    def create(self, resource_data: ResourceCreate) -> Resource:
        """创建资源"""
        resource_id = f"{resource_data.type.value}_{len(self.resources) + 1}"
        now = datetime.utcnow().isoformat()
        
        # 初始化权限
        permissions = resource_data.permissions or {}
        if "default" not in permissions:
            from app.models.resource import ResourcePermission
            permissions["default"] = ResourcePermission(read=True, write=False, execute=False, delete=False)
        
        resource = Resource(
            id=resource_id,
            name=resource_data.name,
            type=resource_data.type,
            description=resource_data.description,
            properties=resource_data.properties,
            status=ResourceStatus.ACTIVE,
            permissions=permissions,
            metadata=resource_data.metadata,
            created_at=now,
            updated_at=now,
            version=1
        )
        
        self.resources[resource_id] = resource
        
        # 记录版本
        self._record_version(resource_id, resource, "Resource created")
        
        return resource
    
    def get(self, resource_id: str) -> Optional[Resource]:
        """获取资源"""
        return self.resources.get(resource_id)
    
    def list(self, resource_type: Optional[str] = None) -> List[Resource]:
        """获取资源列表"""
        if resource_type:
            return [r for r in self.resources.values() if r.type.value == resource_type]
        return list(self.resources.values())
    
    def update(self, resource_id: str, resource_data: ResourceUpdate) -> Optional[Resource]:
        """更新资源"""
        if resource_id not in self.resources:
            return None
        
        resource = self.resources[resource_id]
        update_data = resource_data.dict(exclude_unset=True)
        
        # 记录旧版本
        old_version = resource.dict()
        
        # 更新资源
        for field, value in update_data.items():
            if hasattr(resource, field):
                setattr(resource, field, value)
        
        # 更新版本和时间
        resource.version += 1
        resource.updated_at = datetime.utcnow().isoformat()
        
        self.resources[resource_id] = resource
        
        # 记录版本
        self._record_version(resource_id, resource, "Resource updated")
        
        return resource
    
    def delete(self, resource_id: str) -> bool:
        """删除资源"""
        if resource_id not in self.resources:
            return False
        
        # 记录删除操作
        resource = self.resources[resource_id]
        self._record_version(resource_id, resource, "Resource deleted")
        
        del self.resources[resource_id]
        return True
    
    def update_status(self, resource_id: str, status: ResourceStatus) -> Optional[Resource]:
        """更新资源状态"""
        if resource_id not in self.resources:
            return None
        
        resource = self.resources[resource_id]
        resource.status = status
        resource.updated_at = datetime.utcnow().isoformat()
        
        self.resources[resource_id] = resource
        
        # 记录版本
        self._record_version(resource_id, resource, f"Status changed to {status.value}")
        
        return resource
    
    def _record_version(self, resource_id: str, resource: Resource, reason: str):
        """记录资源版本"""
        if resource_id not in self.resource_versions:
            self.resource_versions[resource_id] = []
        
        version_record = {
            "version": resource.version,
            "content": resource.dict(),
            "changed_at": datetime.utcnow().isoformat(),
            "reason": reason
        }
        
        self.resource_versions[resource_id].append(version_record)
        
        # 限制版本历史数量
        if len(self.resource_versions[resource_id]) > 100:
            self.resource_versions[resource_id] = self.resource_versions[resource_id][-100:]
    
    def get_versions(self, resource_id: str) -> List[Dict[str, Any]]:
        """获取资源版本历史"""
        return self.resource_versions.get(resource_id, [])
    
    def get_version(self, resource_id: str, version: int) -> Optional[Dict[str, Any]]:
        """获取特定版本的资源"""
        versions = self.resource_versions.get(resource_id, [])
        for v in versions:
            if v["version"] == version:
                return v
        return None

4. 资源访问控制

4.1 访问控制模型

常见的访问控制模型

  1. DAC(自主访问控制):资源所有者控制访问权限
  2. MAC(强制访问控制):系统根据安全策略控制访问
  3. RBAC(基于角色的访问控制):根据用户角色控制访问
  4. ABAC(基于属性的访问控制):根据资源和用户属性控制访问
  5. PBAC(基于策略的访问控制):根据预定义策略控制访问

4.2 RBAC实现

基于角色的访问控制

python
from typing import Dict, List, Set

class Role:
    """角色"""
    def __init__(self, name: str, description: str):
        self.name = name
        self.description = description
        self.permissions: Set[str] = set()
    
    def add_permission(self, permission: str):
        """添加权限"""
        self.permissions.add(permission)
    
    def remove_permission(self, permission: str):
        """移除权限"""
        if permission in self.permissions:
            self.permissions.remove(permission)
    
    def has_permission(self, permission: str) -> bool:
        """检查是否有特定权限"""
        return permission in self.permissions

class User:
    """用户"""
    def __init__(self, username: str):
        self.username = username
        self.roles: Set[str] = set()
    
    def add_role(self, role_name: str):
        """添加角色"""
        self.roles.add(role_name)
    
    def remove_role(self, role_name: str):
        """移除角色"""
        if role_name in self.roles:
            self.roles.remove(role_name)

class RBACManager:
    """RBAC管理器"""
    def __init__(self):
        self.roles: Dict[str, Role] = {}
        self.users: Dict[str, User] = {}
    
    def create_role(self, name: str, description: str) -> Role:
        """创建角色"""
        role = Role(name, description)
        self.roles[name] = role
        return role
    
    def get_role(self, name: str) -> Optional[Role]:
        """获取角色"""
        return self.roles.get(name)
    
    def create_user(self, username: str) -> User:
        """创建用户"""
        user = User(username)
        self.users[username] = user
        return user
    
    def get_user(self, username: str) -> Optional[User]:
        """获取用户"""
        return self.users.get(username)
    
    def assign_role(self, username: str, role_name: str) -> bool:
        """分配角色给用户"""
        if username not in self.users or role_name not in self.roles:
            return False
        
        user = self.users[username]
        user.add_role(role_name)
        return True
    
    def revoke_role(self, username: str, role_name: str) -> bool:
        """从用户撤销角色"""
        if username not in self.users:
            return False
        
        user = self.users[username]
        user.remove_role(role_name)
        return True
    
    def check_permission(self, username: str, permission: str) -> bool:
        """检查用户是否有特定权限"""
        if username not in self.users:
            return False
        
        user = self.users[username]
        for role_name in user.roles:
            role = self.roles.get(role_name)
            if role and role.has_permission(permission):
                return True
        
        return False

4.3 资源访问控制实现

资源访问控制

python
from typing import Dict, Any, Optional
from app.models.resource import Resource
from app.security.rbac import RBACManager

class ResourceAccessController:
    """资源访问控制器"""
    def __init__(self, rbac_manager: RBACManager):
        self.rbac_manager = rbac_manager
        self.resource_locks: Dict[str, Dict[str, Any]] = {}
    
    def check_access(self, user: str, resource_id: str, action: str) -> bool:
        """检查用户对资源的访问权限"""
        # 1. 检查用户是否存在
        if not self.rbac_manager.get_user(user):
            return False
        
        # 2. 检查资源是否存在
        from app.managers.resource_manager import resource_manager
        resource = resource_manager.get(resource_id)
        if not resource:
            return False
        
        # 3. 检查用户角色权限
        permission = f"{resource.type.value}:{action}"
        if not self.rbac_manager.check_permission(user, permission):
            return False
        
        # 4. 检查资源特定权限
        if user in resource.permissions:
            resource_perm = resource.permissions[user]
            if action == "read" and not resource_perm.read:
                return False
            elif action == "write" and not resource_perm.write:
                return False
            elif action == "execute" and not resource_perm.execute:
                return False
            elif action == "delete" and not resource_perm.delete:
                return False
        
        # 5. 检查默认权限
        elif "default" in resource.permissions:
            default_perm = resource.permissions["default"]
            if action == "read" and not default_perm.read:
                return False
            elif action == "write" and not default_perm.write:
                return False
            elif action == "execute" and not default_perm.execute:
                return False
            elif action == "delete" and not default_perm.delete:
                return False
        
        # 6. 检查资源状态
        from app.models.resource import ResourceStatus
        if resource.status != ResourceStatus.ACTIVE:
            return False
        
        return True
    
    def lock_resource(self, resource_id: str, user: str, timeout: int = 300) -> bool:
        """锁定资源,防止并发访问"""
        if resource_id in self.resource_locks:
            existing_lock = self.resource_locks[resource_id]
            if existing_lock["user"] != user:
                return False
        
        import time
        self.resource_locks[resource_id] = {
            "user": user,
            "locked_at": time.time(),
            "timeout": timeout
        }
        return True
    
    def unlock_resource(self, resource_id: str, user: str) -> bool:
        """解锁资源"""
        if resource_id in self.resource_locks:
            lock = self.resource_locks[resource_id]
            if lock["user"] == user:
                del self.resource_locks[resource_id]
                return True
        return False
    
    def cleanup_locks(self):
        """清理过期的锁"""
        import time
        current_time = time.time()
        expired_locks = []
        
        for resource_id, lock in self.resource_locks.items():
            if current_time - lock["locked_at"] > lock["timeout"]:
                expired_locks.append(resource_id)
        
        for resource_id in expired_locks:
            del self.resource_locks[resource_id]

5. 资源权限管理

5.1 权限层级

权限层级

  1. 系统级权限:控制对整个系统的访问
  2. 资源类型级权限:控制对特定类型资源的访问
  3. 资源实例级权限:控制对特定资源实例的访问
  4. 操作级权限:控制对资源的特定操作

5.2 权限管理实现

权限管理

python
from typing import Dict, List, Optional
from app.models.resource import Resource, ResourcePermission

class PermissionManager:
    """权限管理器"""
    def __init__(self):
        self.permissions: Dict[str, Dict[str, ResourcePermission]] = {}
    
    def set_permission(self, resource_id: str, user: str, permission: ResourcePermission):
        """设置用户对资源的权限"""
        if resource_id not in self.permissions:
            self.permissions[resource_id] = {}
        
        self.permissions[resource_id][user] = permission
        
        # 更新资源的权限
        from app.managers.resource_manager import resource_manager
        resource = resource_manager.get(resource_id)
        if resource:
            resource.permissions[user] = permission
            resource_manager.update(resource_id, resource)
    
    def get_permission(self, resource_id: str, user: str) -> Optional[ResourcePermission]:
        """获取用户对资源的权限"""
        if resource_id in self.permissions and user in self.permissions[resource_id]:
            return self.permissions[resource_id][user]
        
        # 检查默认权限
        from app.managers.resource_manager import resource_manager
        resource = resource_manager.get(resource_id)
        if resource and "default" in resource.permissions:
            return resource.permissions["default"]
        
        return None
    
    def remove_permission(self, resource_id: str, user: str) -> bool:
        """移除用户对资源的权限"""
        if resource_id in self.permissions and user in self.permissions[resource_id]:
            del self.permissions[resource_id][user]
            
            # 更新资源的权限
            from app.managers.resource_manager import resource_manager
            resource = resource_manager.get(resource_id)
            if resource and user in resource.permissions:
                del resource.permissions[user]
                resource_manager.update(resource_id, resource)
            
            return True
        
        return False
    
    def set_default_permission(self, resource_id: str, permission: ResourcePermission):
        """设置资源的默认权限"""
        # 更新资源的默认权限
        from app.managers.resource_manager import resource_manager
        resource = resource_manager.get(resource_id)
        if resource:
            resource.permissions["default"] = permission
            resource_manager.update(resource_id, resource)
    
    def check_permission(self, resource_id: str, user: str, action: str) -> bool:
        """检查用户对资源的特定操作权限"""
        permission = self.get_permission(resource_id, user)
        if not permission:
            return False
        
        if action == "read":
            return permission.read
        elif action == "write":
            return permission.write
        elif action == "execute":
            return permission.execute
        elif action == "delete":
            return permission.delete
        else:
            return False
    
    def get_users_with_access(self, resource_id: str, action: str) -> List[str]:
        """获取对资源有特定操作权限的用户列表"""
        users = []
        
        # 检查资源的权限
        from app.managers.resource_manager import resource_manager
        resource = resource_manager.get(resource_id)
        if resource:
            for user, permission in resource.permissions.items():
                if user != "default" and self.check_permission(resource_id, user, action):
                    users.append(user)
        
        return users

6. 资源状态管理

6.1 状态定义

资源状态

  1. ACTIVE:资源可用
  2. INACTIVE:资源不可用
  3. CREATING:资源正在创建
  4. DELETING:资源正在删除
  5. ERROR:资源出错
  6. MAINTENANCE:资源维护中
  7. UPDATING:资源正在更新

6.2 状态管理实现

状态管理

python
from typing import Dict, Any, Optional
from app.models.resource import ResourceStatus

class ResourceStateManager:
    """资源状态管理器"""
    def __init__(self):
        self.state_history: Dict[str, List[Dict[str, Any]]] = {}
    
    def update_state(self, resource_id: str, status: ResourceStatus, reason: str = ""):
        """更新资源状态"""
        # 更新资源状态
        from app.managers.resource_manager import resource_manager
        resource = resource_manager.update_status(resource_id, status)
        
        if resource:
            # 记录状态历史
            if resource_id not in self.state_history:
                self.state_history[resource_id] = []
            
            import datetime
            state_record = {
                "status": status.value,
                "timestamp": datetime.utcnow().isoformat(),
                "reason": reason
            }
            
            self.state_history[resource_id].append(state_record)
            
            # 限制历史记录数量
            if len(self.state_history[resource_id]) > 100:
                self.state_history[resource_id] = self.state_history[resource_id][-100:]
            
            return True
        
        return False
    
    def get_state(self, resource_id: str) -> Optional[ResourceStatus]:
        """获取资源当前状态"""
        from app.managers.resource_manager import resource_manager
        resource = resource_manager.get(resource_id)
        if resource:
            return resource.status
        return None
    
    def get_state_history(self, resource_id: str) -> List[Dict[str, Any]]:
        """获取资源状态历史"""
        return self.state_history.get(resource_id, [])
    
    def get_resources_by_state(self, status: ResourceStatus) -> List[str]:
        """获取特定状态的资源列表"""
        from app.managers.resource_manager import resource_manager
        resources = resource_manager.list()
        matching_resources = []
        
        for resource in resources:
            if resource.status == status:
                matching_resources.append(resource.id)
        
        return matching_resources
    
    def validate_state_transition(self, current_status: ResourceStatus, new_status: ResourceStatus) -> bool:
        """验证状态转换是否合法"""
        # 定义合法的状态转换
        valid_transitions = {
            ResourceStatus.ACTIVE: [ResourceStatus.INACTIVE, ResourceStatus.UPDATING, ResourceStatus.MAINTENANCE, ResourceStatus.ERROR],
            ResourceStatus.INACTIVE: [ResourceStatus.ACTIVE, ResourceStatus.DELETING],
            ResourceStatus.CREATING: [ResourceStatus.ACTIVE, ResourceStatus.ERROR],
            ResourceStatus.DELETING: [],
            ResourceStatus.ERROR: [ResourceStatus.ACTIVE, ResourceStatus.INACTIVE, ResourceStatus.DELETING],
            ResourceStatus.MAINTENANCE: [ResourceStatus.ACTIVE, ResourceStatus.ERROR],
            ResourceStatus.UPDATING: [ResourceStatus.ACTIVE, ResourceStatus.ERROR]
        }
        
        return new_status in valid_transitions.get(current_status, [])

7. 资源版本控制

7.1 版本管理

版本控制的重要性

  1. 回滚能力:在出错时回滚到之前的版本
  2. 变更追踪:追踪资源的变更历史
  3. 审计日志:记录谁在何时修改了资源
  4. 冲突解决:解决并发修改的冲突

7.2 版本控制实现

版本控制

python
from typing import Dict, List, Any, Optional

class ResourceVersionManager:
    """资源版本管理器"""
    def __init__(self):
        self.versions: Dict[str, List[Dict[str, Any]]] = {}
    
    def create_version(self, resource_id: str, content: Dict[str, Any], author: str, reason: str):
        """创建资源版本"""
        if resource_id not in self.versions:
            self.versions[resource_id] = []
        
        import datetime
        version = {
            "version": len(self.versions[resource_id]) + 1,
            "content": content,
            "author": author,
            "timestamp": datetime.utcnow().isoformat(),
            "reason": reason
        }
        
        self.versions[resource_id].append(version)
        
        # 限制版本数量
        if len(self.versions[resource_id]) > 50:
            self.versions[resource_id] = self.versions[resource_id][-50:]
        
        return version
    
    def get_version(self, resource_id: str, version: int) -> Optional[Dict[str, Any]]:
        """获取特定版本"""
        if resource_id in self.versions:
            for v in self.versions[resource_id]:
                if v["version"] == version:
                    return v
        return None
    
    def get_latest_version(self, resource_id: str) -> Optional[Dict[str, Any]]:
        """获取最新版本"""
        if resource_id in self.versions and self.versions[resource_id]:
            return self.versions[resource_id][-1]
        return None
    
    def list_versions(self, resource_id: str) -> List[Dict[str, Any]]:
        """列出所有版本"""
        return self.versions.get(resource_id, [])
    
    def rollback(self, resource_id: str, version: int) -> Optional[Dict[str, Any]]:
        """回滚到特定版本"""
        target_version = self.get_version(resource_id, version)
        if not target_version:
            return None
        
        # 从目标版本创建新版本
        new_version = self.create_version(
            resource_id=resource_id,
            content=target_version["content"],
            author="system",
            reason=f"Rollback to version {version}"
        )
        
        # 更新资源
        from app.managers.resource_manager import resource_manager
        from app.models.resource import Resource
        
        resource_data = target_version["content"]
        resource = Resource(**resource_data)
        resource.id = resource_id
        resource.version = new_version["version"]
        
        resource_manager.update(resource_id, resource)
        
        return new_version
    
    def compare_versions(self, resource_id: str, version1: int, version2: int) -> Dict[str, Any]:
        """比较两个版本的差异"""
        v1 = self.get_version(resource_id, version1)
        v2 = self.get_version(resource_id, version2)
        
        if not v1 or not v2:
            return {"error": "版本不存在"}
        
        # 简单的差异比较
        diff = {}
        for key, value in v2["content"].items():
            if key not in v1["content"] or v1["content"][key] != value:
                diff[key] = {
                    "old": v1["content"].get(key),
                    "new": value
                }
        
        return {
            "version1": version1,
            "version2": version2,
            "changes": diff
        }

8. 资源监控与审计

8.1 监控指标

资源监控指标

  1. 使用情况:资源的使用百分比
  2. 性能指标:响应时间、吞吐量
  3. 错误率:错误发生的频率
  4. 访问频率:资源被访问的次数
  5. 状态变化:状态转换的频率
  6. 安全事件:安全相关的事件

8.2 监控实现

资源监控

python
from typing import Dict, List, Any
import time

class ResourceMonitor:
    """资源监控器"""
    def __init__(self):
        self.metrics: Dict[str, List[Dict[str, Any]]] = {}
        self.access_logs: Dict[str, List[Dict[str, Any]]] = {}
    
    def record_metric(self, resource_id: str, metric_name: str, value: Any):
        """记录资源指标"""
        if resource_id not in self.metrics:
            self.metrics[resource_id] = []
        
        metric = {
            "timestamp": time.time(),
            "metric": metric_name,
            "value": value
        }
        
        self.metrics[resource_id].append(metric)
        
        # 限制指标数量
        if len(self.metrics[resource_id]) > 1000:
            self.metrics[resource_id] = self.metrics[resource_id][-1000:]
    
    def record_access(self, resource_id: str, user: str, action: str, success: bool, duration: float):
        """记录资源访问"""
        if resource_id not in self.access_logs:
            self.access_logs[resource_id] = []
        
        access_log = {
            "timestamp": time.time(),
            "user": user,
            "action": action,
            "success": success,
            "duration": duration
        }
        
        self.access_logs[resource_id].append(access_log)
        
        # 限制日志数量
        if len(self.access_logs[resource_id]) > 1000:
            self.access_logs[resource_id] = self.access_logs[resource_id][-1000:]
    
    def get_metrics(self, resource_id: str, metric_name: str = None, time_range: float = 3600):
        """获取资源指标"""
        if resource_id not in self.metrics:
            return []
        
        now = time.time()
        filtered_metrics = []
        
        for metric in self.metrics[resource_id]:
            if (now - metric["timestamp"]) <= time_range:
                if not metric_name or metric["metric"] == metric_name:
                    filtered_metrics.append(metric)
        
        return filtered_metrics
    
    def get_access_logs(self, resource_id: str, user: str = None, action: str = None, time_range: float = 3600):
        """获取资源访问日志"""
        if resource_id not in self.access_logs:
            return []
        
        now = time.time()
        filtered_logs = []
        
        for log in self.access_logs[resource_id]:
            if (now - log["timestamp"]) <= time_range:
                if (not user or log["user"] == user) and (not action or log["action"] == action):
                    filtered_logs.append(log)
        
        return filtered_logs
    
    def get_statistics(self, resource_id: str, time_range: float = 3600):
        """获取资源统计信息"""
        access_logs = self.get_access_logs(resource_id, time_range=time_range)
        
        if not access_logs:
            return {
                "total_accesses": 0,
                "successful_accesses": 0,
                "failed_accesses": 0,
                "average_duration": 0,
                "most_common_action": None,
                "most_active_user": None
            }
        
        total_accesses = len(access_logs)
        successful_accesses = sum(1 for log in access_logs if log["success"])
        failed_accesses = total_accesses - successful_accesses
        average_duration = sum(log["duration"] for log in access_logs) / total_accesses
        
        # 最常见的操作
        action_counts = {}
        for log in access_logs:
            action_counts[log["action"]] = action_counts.get(log["action"], 0) + 1
        most_common_action = max(action_counts, key=action_counts.get) if action_counts else None
        
        # 最活跃的用户
        user_counts = {}
        for log in access_logs:
            user_counts[log["user"]] = user_counts.get(log["user"], 0) + 1
        most_active_user = max(user_counts, key=user_counts.get) if user_counts else None
        
        return {
            "total_accesses": total_accesses,
            "successful_accesses": successful_accesses,
            "failed_accesses": failed_accesses,
            "average_duration": average_duration,
            "most_common_action": most_common_action,
            "most_active_user": most_active_user
        }

9. 实战:实现资源管理系统

9.1 系统架构

资源管理系统架构

┌────────────────┐     ┌────────────────┐     ┌────────────────┐
│   MCP Server   │────>│   Resource     │────>│   Storage      │
│   (FastAPI)    │<────│   Manager      │<────│   (Database)   │
└────────────────┘     └────────────────┘     └────────────────┘
        │                      │                      │
        ▼                      ▼                      ▼
┌────────────────┐     ┌────────────────┐     ┌────────────────┐
│   Access       │     │   Version      │     │   Monitor      │
│   Controller   │     │   Manager      │     │   System       │
└────────────────┘     └────────────────┘     └────────────────┘

9.2 完整实现

资源管理系统实现

python
# main.py
from fastapi import FastAPI, Depends, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
import json
from typing import Dict, Any, List, Optional

from app.managers.resource_manager import ResourceManager
from app.models.resource import ResourceCreate, ResourceUpdate, ResourceType, ResourceStatus
from app.security.rbac import RBACManager, User, Role
from app.security.permission import PermissionManager
from app.monitoring.resource_monitor import ResourceMonitor
from app.api.dependencies import get_current_user, get_api_key

app = FastAPI(title="MCP Resource Management", version="1.0.0")

# 配置CORS
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# 初始化管理器
resource_manager = ResourceManager()
rbac_manager = RBACManager()
permission_manager = PermissionManager()
resource_monitor = ResourceMonitor()

# 初始化默认角色和用户
def init_defaults():
    """初始化默认角色和用户"""
    # 创建默认角色
    admin_role = rbac_manager.create_role("admin", "Administrator")
    user_role = rbac_manager.create_role("user", "Regular User")
    
    # 添加默认权限
    admin_role.add_permission("file:read")
    admin_role.add_permission("file:write")
    admin_role.add_permission("file:execute")
    admin_role.add_permission("file:delete")
    
    user_role.add_permission("file:read")
    user_role.add_permission("file:execute")
    
    # 创建默认用户
    admin_user = rbac_manager.create_user("admin")
    test_user = rbac_manager.create_user("test")
    
    # 分配角色
    rbac_manager.assign_role("admin", "admin")
    rbac_manager.assign_role("test", "user")

# 初始化
init_defaults()

# 资源管理端点

@app.post("/resources", response_model=Dict[str, Any])
async def create_resource(
    resource_data: ResourceCreate,
    current_user: str = Depends(get_current_user),
    api_key: str = Depends(get_api_key)
):
    """创建资源"""
    start_time = time.time()
    
    try:
        resource = resource_manager.create(resource_data)
        duration = time.time() - start_time
        
        # 记录访问
        resource_monitor.record_access(
            resource_id=resource.id,
            user=current_user,
            action="create",
            success=True,
            duration=duration
        )
        
        return resource.dict()
    except Exception as e:
        duration = time.time() - start_time
        resource_monitor.record_access(
            resource_id="",
            user=current_user,
            action="create",
            success=False,
            duration=duration
        )
        raise HTTPException(status_code=400, detail=str(e))

@app.get("/resources", response_model=List[Dict[str, Any]])
async def list_resources(
    resource_type: Optional[str] = None,
    current_user: str = Depends(get_current_user),
    api_key: str = Depends(get_api_key)
):
    """列出资源"""
    start_time = time.time()
    
    try:
        resources = resource_manager.list(resource_type)
        duration = time.time() - start_time
        
        # 记录访问
        resource_monitor.record_access(
            resource_id="",
            user=current_user,
            action="list",
            success=True,
            duration=duration
        )
        
        return [r.dict() for r in resources]
    except Exception as e:
        duration = time.time() - start_time
        resource_monitor.record_access(
            resource_id="",
            user=current_user,
            action="list",
            success=False,
            duration=duration
        )
        raise HTTPException(status_code=400, detail=str(e))

@app.get("/resources/{resource_id}", response_model=Dict[str, Any])
async def get_resource(
    resource_id: str,
    current_user: str = Depends(get_current_user),
    api_key: str = Depends(get_api_key)
):
    """获取资源"""
    start_time = time.time()
    
    try:
        resource = resource_manager.get(resource_id)
        if not resource:
            raise HTTPException(status_code=404, detail="Resource not found")
        
        duration = time.time() - start_time
        
        # 记录访问
        resource_monitor.record_access(
            resource_id=resource_id,
            user=current_user,
            action="read",
            success=True,
            duration=duration
        )
        
        return resource.dict()
    except HTTPException:
        raise
    except Exception as e:
        duration = time.time() - start_time
        resource_monitor.record_access(
            resource_id=resource_id,
            user=current_user,
            action="read",
            success=False,
            duration=duration
        )
        raise HTTPException(status_code=400, detail=str(e))

@app.put("/resources/{resource_id}", response_model=Dict[str, Any])
async def update_resource(
    resource_id: str,
    resource_data: ResourceUpdate,
    current_user: str = Depends(get_current_user),
    api_key: str = Depends(get_api_key)
):
    """更新资源"""
    start_time = time.time()
    
    try:
        resource = resource_manager.update(resource_id, resource_data)
        if not resource:
            raise HTTPException(status_code=404, detail="Resource not found")
        
        duration = time.time() - start_time
        
        # 记录访问
        resource_monitor.record_access(
            resource_id=resource_id,
            user=current_user,
            action="update",
            success=True,
            duration=duration
        )
        
        return resource.dict()
    except HTTPException:
        raise
    except Exception as e:
        duration = time.time() - start_time
        resource_monitor.record_access(
            resource_id=resource_id,
            user=current_user,
            action="update",
            success=False,
            duration=duration
        )
        raise HTTPException(status_code=400, detail=str(e))

@app.delete("/resources/{resource_id}", response_model=Dict[str, bool])
async def delete_resource(
    resource_id: str,
    current_user: str = Depends(get_current_user),
    api_key: str = Depends(get_api_key)
):
    """删除资源"""
    start_time = time.time()
    
    try:
        success = resource_manager.delete(resource_id)
        duration = time.time() - start_time
        
        # 记录访问
        resource_monitor.record_access(
            resource_id=resource_id,
            user=current_user,
            action="delete",
            success=success,
            duration=duration
        )
        
        if not success:
            raise HTTPException(status_code=404, detail="Resource not found")
        
        return {"success": True}
    except HTTPException:
        raise
    except Exception as e:
        duration = time.time() - start_time
        resource_monitor.record_access(
            resource_id=resource_id,
            user=current_user,
            action="delete",
            success=False,
            duration=duration
        )
        raise HTTPException(status_code=400, detail=str(e))

@app.get("/resources/{resource_id}/versions", response_model=List[Dict[str, Any]])
async def list_resource_versions(
    resource_id: str,
    current_user: str = Depends(get_current_user),
    api_key: str = Depends(get_api_key)
):
    """列出资源版本"""
    try:
        versions = resource_manager.get_versions(resource_id)
        return versions
    except Exception as e:
        raise HTTPException(status_code=400, detail=str(e))

@app.get("/resources/{resource_id}/metrics", response_model=Dict[str, Any])
async def get_resource_metrics(
    resource_id: str,
    metric_name: Optional[str] = None,
    time_range: float = 3600,
    current_user: str = Depends(get_current_user),
    api_key: str = Depends(get_api_key)
):
    """获取资源指标"""
    try:
        metrics = resource_monitor.get_metrics(resource_id, metric_name, time_range)
        statistics = resource_monitor.get_statistics(resource_id, time_range)
        
        return {
            "metrics": metrics,
            "statistics": statistics
        }
    except Exception as e:
        raise HTTPException(status_code=400, detail=str(e))

if __name__ == "__main__":
    import uvicorn
    import time
    uvicorn.run(app, host="0.0.0.0", port=8000)

10. 技术选型建议

10.1 存储方案

资源存储方案

方案优势劣势适用场景
内存存储性能高,易于实现数据易丢失,容量有限开发测试,临时存储
文件系统持久化,容量大性能一般,管理复杂中小规模应用
关系型数据库结构化存储,事务支持性能开销大企业级应用,需要事务
NoSQL数据库灵活存储,性能高缺少事务支持大规模应用,半结构化数据
云存储弹性伸缩,高可用成本较高,依赖网络云原生应用,大规模存储

10.2 安全方案

安全方案

方案适用场景实现难度
API Key简单认证
JWT无状态认证
OAuth2第三方认证
RBAC基于角色的权限管理
ABAC基于属性的权限管理
多因素认证高安全性场景

11. 常见问题与解决方案

11.1 资源冲突

问题:多个用户同时访问或修改同一资源,导致冲突。

解决方案

  • 实现资源锁定机制
  • 使用乐观锁或悲观锁
  • 实现冲突检测和解决机制
  • 提供资源版本控制和回滚能力

11.2 权限管理复杂

问题:权限管理过于复杂,难以维护。

解决方案

  • 使用RBAC简化权限管理
  • 建立清晰的权限层级
  • 自动化权限分配
  • 定期权限审计

11.3 资源泄露

问题:资源创建后未正确清理,导致资源泄露。

解决方案

  • 实现资源生命周期管理
  • 设置资源自动清理机制
  • 定期资源审计
  • 使用容器化隔离资源

11.4 性能问题

问题:资源管理系统性能下降。

解决方案

  • 优化存储访问
  • 实现缓存机制
  • 使用异步处理
  • 水平扩展系统

12. 学习资源

12.1 官方文档

12.2 在线资源

13. 总结

本课程深入介绍了MCP资源管理的核心内容,包括资源定义、资源类型、访问控制、权限管理、状态管理、版本控制和监控审计等方面。通过本课程的学习,你应该能够:

  • 理解MCP资源的概念和重要性
  • 设计和实现资源管理系统
  • 实现资源的访问控制和权限管理
  • 管理资源的状态和版本
  • 监控和审计资源的使用
  • 解决资源管理中的常见问题

在后续课程中,我们将学习MCP Client开发、Claude Desktop集成和MCP生态系统等内容,进一步完善你的MCP开发技能。


课后作业

  1. 实践题

    • 实现文件资源管理系统
    • 实现数据库资源管理系统
    • 实现资源访问控制和权限管理
    • 实现资源监控和审计系统
  2. 思考题

    • 如何设计一个可扩展的资源管理系统?
    • 如何处理大规模资源的管理?
    • 如何实现资源的自动发现和注册?
    • 如何优化资源管理系统的性能?

架构师AI杜公众号二维码

扫描二维码关注"架构师AI杜"公众号,获取更多技术内容和最新动态