从零构建Agent框架系列二:Runtime模块设计与Workflow执行引擎实现
在构建Agent框架的过程中,Runtime模块作为Workflow/Agent的执行引擎,承担着状态管理、任务调度和插件调用的核心职责。本文将深入探讨如何设计一个可扩展、高可靠的Runtime模块,使Workflow能够按预期执行,同时支持动态工具调用和复杂逻辑编排。
一、Runtime模块的核心功能定位
Runtime模块是Agent框架的”操作系统”,它需要解决三个核心问题:
- 状态管理:跟踪Workflow的执行状态(初始化/运行中/暂停/完成/失败)
- 上下文传递:维护跨步骤的数据共享和状态持久化
- 插件调度:动态加载和执行各类工具(API调用、数据库查询、LLM推理等)
不同于传统的工作流引擎,Agent Runtime需要特别处理异步操作和LLM驱动的决策逻辑。例如,当Workflow执行到需要调用外部API的步骤时,Runtime不仅要管理API调用的生命周期,还需要处理可能的重试机制和结果解析。
二、基于状态机的执行引擎设计
1. 状态模型设计
推荐采用有限状态机(FSM)模型管理Workflow生命周期:
stateDiagram-v2[*] --> InitializedInitialized --> Running: start()Running --> Paused: wait_for_user_input()Running --> Completed: finish()Running --> Failed: throw_error()Paused --> Running: resume()
每个状态转换都应触发相应的生命周期钩子:
class WorkflowStateMachine:def __init__(self):self.state = "INITIALIZED"self.hooks = {"INITIALIZED": [],"RUNNING": [self._validate_context],"COMPLETED": [self._save_execution_log],"FAILED": [self._notify_failure]}def transition(self, new_state):if new_state not in self.hooks:raise ValueError(f"Invalid state transition to {new_state}")# 执行当前状态的退出钩子for hook in self._get_exit_hooks(self.state):hook()# 状态转换self.state = new_state# 执行新状态的进入钩子for hook in self.hooks.get(new_state, []):hook()
2. 执行上下文管理
设计多层次的上下文结构:
class ExecutionContext:def __init__(self, workflow_id):self.workflow_id = workflow_idself.global_vars = {} # 全局变量self.step_vars = {} # 步骤级变量self.tools_cache = {} # 工具调用缓存self.metadata = {"start_time": datetime.now(),"user_id": None}def update_global(self, key, value):self.global_vars[key] = valuedef get_step_var(self, step_name, key):return self.step_vars.get(step_name, {}).get(key)
三、插件化工具调度系统
1. 工具注册与发现机制
实现基于接口的工具注册系统:
from abc import ABC, abstractmethodclass ToolBase(ABC):@property@abstractmethoddef name(self):pass@abstractmethoddef execute(self, context: ExecutionContext, **kwargs):passclass ToolRegistry:def __init__(self):self._tools = {}def register(self, tool_class):tool_instance = tool_class()self._tools[tool_instance.name] = tool_instancedef get_tool(self, name):return self._tools.get(name)
2. 异步工具执行模式
对于耗时操作,推荐采用异步执行模式:
import asynciofrom concurrent.futures import ThreadPoolExecutorclass AsyncToolExecutor:def __init__(self, max_workers=5):self.executor = ThreadPoolExecutor(max_workers=max_workers)async def execute_async(self, tool: ToolBase, context, **kwargs):loop = asyncio.get_running_loop()def _sync_execute():try:return tool.execute(context, **kwargs)except Exception as e:return {"error": str(e)}result = await loop.run_in_executor(self.executor, _sync_execute)return result
四、Workflow执行流程实现
1. 执行流程控制
设计完整的执行循环:
class WorkflowRunner:def __init__(self, workflow_def, registry: ToolRegistry):self.workflow = workflow_defself.registry = registryself.context = ExecutionContext(workflow_def.id)async def run(self):try:for step in self.workflow.steps:await self._execute_step(step)if self.context.state == "PAUSED":return # 等待外部事件触发恢复self.context.transition("COMPLETED")except Exception as e:self.context.transition("FAILED")raiseasync def _execute_step(self, step):tool = self.registry.get_tool(step.tool_name)if not tool:raise ValueError(f"Tool {step.tool_name} not found")# 准备工具参数params = self._prepare_params(step.params)# 执行工具if step.async_mode:result = await self._async_execute(tool, params)else:result = tool.execute(self.context, **params)# 处理结果self._handle_result(step, result)
2. 错误处理与恢复机制
实现健壮的错误处理:
class ErrorHandlingStrategy(ABC):@abstractmethoddef handle(self, error, context):passclass RetryStrategy(ErrorHandlingStrategy):def __init__(self, max_retries=3):self.max_retries = max_retriesdef handle(self, error, context):retry_count = context.metadata.get("retry_count", 0)if retry_count < self.max_retries:context.metadata["retry_count"] = retry_count + 1return "RETRY"return "FAIL"class WorkflowErrorHandler:def __init__(self):self.strategies = {"NETWORK_ERROR": RetryStrategy(max_retries=5),"TIMEOUT": RetryStrategy(max_retries=2),"DEFAULT": RetryStrategy(max_retries=1)}def handle_error(self, error, context):error_type = self._classify_error(error)strategy = self.strategies.get(error_type, self.strategies["DEFAULT"])action = strategy.handle(error, context)if action == "RETRY":return True # 继续执行elif action == "FAIL":context.transition("FAILED")return False
五、性能优化与扩展性设计
1. 执行日志与追踪
实现结构化日志记录:
class ExecutionLogger:def __init__(self, log_path):self.log_path = log_pathself.session_logs = []def log_step(self, step_name, status, duration, input=None, output=None):log_entry = {"timestamp": datetime.now().isoformat(),"step": step_name,"status": status,"duration_ms": duration,"input": input,"output": output}self.session_logs.append(log_entry)def save(self):with open(self.log_path, "w") as f:json.dump(self.session_logs, f, indent=2)
2. 动态工具加载
支持运行时工具热加载:
class DynamicToolLoader:def __init__(self, plugin_dirs):self.plugin_dirs = plugin_dirsself.loaded_modules = {}def load_tools(self):for dir_path in self.plugin_dirs:for file_name in os.listdir(dir_path):if file_name.endswith(".py") and not file_name.startswith("_"):module_name = file_name[:-3]module_path = os.path.join(dir_path, file_name)spec = importlib.util.spec_from_file_location(module_name, module_path)module = importlib.util.module_from_spec(spec)spec.loader.exec_module(module)# 查找并注册所有Tool子类for attr_name in dir(module):attr = getattr(module, attr_name)if isinstance(attr, type) and issubclass(attr, ToolBase) and attr != ToolBase:self.loaded_modules[attr_name] = attr
六、最佳实践与反模式
1. 推荐实践
- 上下文隔离:每个Workflow实例应使用独立的ExecutionContext
- 工具幂等性:设计工具时确保可安全重试
- 渐进式状态保存:定期将上下文持久化到存储
- 超时控制:为每个工具调用设置合理的超时时间
2. 常见反模式
- 全局状态污染:避免在多个Workflow间共享可变状态
- 同步阻塞:长时间运行的操作应使用异步模式
- 过度设计:初期避免实现复杂的分支逻辑,保持简单
- 忽略错误分类:不同错误类型应采用不同处理策略
七、完整示例架构
agent_framework/├── runtime/│ ├── __init__.py│ ├── context.py # ExecutionContext实现│ ├── state_machine.py # 状态机核心逻辑│ ├── tool_registry.py # 工具注册与发现│ ├── workflow_runner.py # 主执行引擎│ └── error_handler.py # 错误处理策略├── tools/│ ├── __init__.py│ ├── base.py # ToolBase定义│ └── plugins/ # 动态加载的工具└── examples/└── basic_workflow.py # 使用示例
通过上述设计,我们构建了一个灵活、可扩展的Runtime模块,能够支持从简单任务到复杂Agent行为的执行需求。实际开发中,建议从最小可行实现开始,逐步添加错误处理、持久化等高级功能。下一篇文章将深入探讨如何设计Workflow定义语言和可视化编排界面。