MCP工具调用实战:Python构建大模型Agent的跨Server交互方案

MCP工具调用实战:Python构建大模型Agent的跨Server交互方案

一、MCP协议与Agent交互的技术背景

Model Context Protocol(MCP)作为大模型工具调用的标准化协议,通过定义工具描述、请求响应等数据结构,实现了Agent与工具服务的高效解耦。相较于传统API调用,MCP协议具有三大优势:

  1. 协议无关性:支持HTTP、WebSocket、gRPC等多种传输层
  2. 动态工具发现:通过/tools端点实时获取可用工具列表
  3. 上下文感知:在请求中携带模型推理所需的上下文信息

在实际生产环境中,Agent需要同时对接多种MCP Server实现,包括但不限于:

  • 本地部署的轻量级工具服务
  • 容器化部署的微服务集群
  • 行业常见技术方案提供的SaaS工具
  • 边缘设备上的嵌入式工具

二、Python实现MCP交互的核心架构

1. 协议层抽象设计

  1. from abc import ABC, abstractmethod
  2. import aiohttp
  3. class MCPServerAdapter(ABC):
  4. @abstractmethod
  5. async def list_tools(self) -> list[dict]:
  6. """获取工具描述列表"""
  7. pass
  8. @abstractmethod
  9. async def invoke_tool(self, tool_id: str, params: dict) -> dict:
  10. """调用指定工具"""
  11. pass
  12. class HTTPMCPAdapter(MCPServerAdapter):
  13. def __init__(self, base_url: str):
  14. self.base_url = base_url.rstrip('/')
  15. async def list_tools(self):
  16. async with aiohttp.ClientSession() as session:
  17. async with session.get(f"{self.base_url}/tools") as resp:
  18. return await resp.json()
  19. async def invoke_tool(self, tool_id, params):
  20. async with aiohttp.ClientSession() as session:
  21. async with session.post(
  22. f"{self.base_url}/tools/{tool_id}",
  23. json=params
  24. ) as resp:
  25. return await resp.json()

2. 动态路由引擎实现

  1. class ToolRouter:
  2. def __init__(self):
  3. self.servers = {}
  4. def register_server(self, server_type: str, adapter: MCPServerAdapter):
  5. self.servers[server_type] = adapter
  6. async def get_tool_adapter(self, tool_id: str) -> MCPServerAdapter:
  7. # 实际实现应包含工具描述缓存和路由策略
  8. for server in self.servers.values():
  9. tools = await server.list_tools()
  10. for tool in tools:
  11. if tool['id'] == tool_id:
  12. return server
  13. raise ValueError(f"Tool {tool_id} not found")

三、全类型Server支持的实战方案

1. HTTP Server对接

关键配置参数

  1. http_config = {
  2. "server_type": "http",
  3. "base_url": "http://tool-server:8080",
  4. "timeout": 30, # 秒
  5. "retry_policy": {
  6. "max_retries": 3,
  7. "backoff_factor": 0.5
  8. }
  9. }

实现要点

  • 使用aiohttp实现异步HTTP客户端
  • 添加请求重试机制(建议指数退避)
  • 实现JSON Schema验证确保参数合法性

2. WebSocket Server对接

  1. class WebSocketMCPAdapter(MCPServerAdapter):
  2. async def connect(self):
  3. self.ws = await aiohttp.ws_connect(f"{self.base_url}/ws")
  4. async def list_tools(self):
  5. await self.ws.send_json({"type": "list_tools"})
  6. msg = await self.ws.receive_json()
  7. return msg["tools"]
  8. async def invoke_tool(self, tool_id, params):
  9. request = {
  10. "type": "invoke",
  11. "tool_id": tool_id,
  12. "params": params
  13. }
  14. await self.ws.send_json(request)
  15. response = await self.ws.receive_json()
  16. return response["result"]

优化建议

  • 实现心跳机制保持长连接
  • 添加消息队列缓冲突发请求
  • 考虑使用Protocol Buffers替代JSON

3. 边缘设备对接方案

针对资源受限的边缘设备,建议:

  1. 协议轻量化:使用MCP的简化子集
  2. 数据压缩:采用MessagePack替代JSON
  3. 离线缓存:实现本地工具描述缓存
  1. class EdgeMCPAdapter(MCPServerAdapter):
  2. def __init__(self, device_id: str):
  3. self.device_id = device_id
  4. self.tool_cache = {}
  5. async def list_tools(self):
  6. if not self.tool_cache:
  7. # 模拟从边缘设备获取
  8. self.tool_cache = {
  9. "edge_calculator": {
  10. "id": "edge_calculator",
  11. "description": "Edge math operations",
  12. "parameters": {
  13. "type": "object",
  14. "properties": {
  15. "expr": {"type": "string"}
  16. }
  17. }
  18. }
  19. }
  20. return list(self.tool_cache.values())

四、性能优化最佳实践

1. 连接池管理

  1. from aiohttp import TCPConnector
  2. class MCPClient:
  3. def __init__(self):
  4. self.connector = TCPConnector(limit=100) # 限制最大连接数
  5. self.session = aiohttp.ClientSession(connector=self.connector)
  6. async def close(self):
  7. await self.session.close()

2. 批量调用优化

  1. async def batch_invoke(self, requests: list[dict]) -> list[dict]:
  2. # 实现批量请求合并逻辑
  3. # 1. 按Server类型分组
  4. grouped = {}
  5. for req in requests:
  6. tool_id = req["tool_id"]
  7. # 实际应通过ToolRouter获取Server
  8. server_type = "http" # 简化示例
  9. if server_type not in grouped:
  10. grouped[server_type] = []
  11. grouped[server_type].append(req)
  12. # 2. 并行调用各Server
  13. tasks = []
  14. for server_type, reqs in grouped.items():
  15. adapter = self.get_adapter(server_type) # 需实现
  16. tasks.append(self._process_batch(adapter, reqs))
  17. return await asyncio.gather(*tasks)

3. 监控指标集成

建议收集以下关键指标:

  1. from prometheus_client import Counter, Histogram
  2. TOOL_CALLS = Counter(
  3. 'mcp_tool_calls_total',
  4. 'Total number of tool invocations',
  5. ['tool_id', 'server_type', 'status']
  6. )
  7. TOOL_LATENCY = Histogram(
  8. 'mcp_tool_latency_seconds',
  9. 'Tool invocation latency',
  10. ['tool_id']
  11. )

五、安全与可靠性设计

1. 认证授权方案

  • JWT验证:在HTTP头中携带Authorization: Bearer <token>
  • API Key:通过查询参数或自定义头传递
  • mTLS:对高安全要求的Server实施双向认证

2. 熔断机制实现

  1. from pybreaker import CircuitBreaker
  2. class SafeMCPAdapter:
  3. def __init__(self, adapter: MCPServerAdapter):
  4. self.adapter = adapter
  5. self.cb = CircuitBreaker(
  6. fail_max=5,
  7. reset_timeout=30,
  8. state_store=MemoryStateStore()
  9. )
  10. async def invoke_tool(self, tool_id, params):
  11. try:
  12. return await self.cb.call(
  13. self.adapter.invoke_tool,
  14. tool_id,
  15. params
  16. )
  17. except pybreaker.CircuitBreakerError:
  18. raise ServiceUnavailable("MCP Server unavailable")

六、完整交互流程示例

  1. async def main():
  2. # 1. 初始化路由和适配器
  3. router = ToolRouter()
  4. router.register_server("http", HTTPMCPAdapter("http://server1"))
  5. router.register_server("ws", WebSocketMCPAdapter("ws://server2"))
  6. # 2. 创建Agent实例
  7. agent = MCPAgent(router)
  8. # 3. 处理用户请求
  9. user_request = {
  10. "query": "计算1+1",
  11. "context": {"user_id": "12345"}
  12. }
  13. # 4. 动态路由到合适工具
  14. tool_result = await agent.process(user_request)
  15. print(f"Tool execution result: {tool_result}")
  16. if __name__ == "__main__":
  17. asyncio.run(main())

七、部署与运维建议

  1. 服务发现:集成Consul/Eureka实现动态Server注册
  2. 日志收集:结构化记录工具调用全链路日志
  3. 灰度发布:通过路由权重实现新工具的渐进式上线
  4. 容量规划:基于历史调用数据预测资源需求

通过上述架构设计,开发者可以构建出既能对接行业常见技术方案,又能兼容定制化部署的MCP交互系统。实际生产环境中,建议结合具体业务场景进行参数调优和功能扩展,例如添加请求追踪、实现更复杂的路由策略等。