从零构建Agent框架系列二:Runtime模块设计与Workflow执行引擎实现

从零构建Agent框架系列二:Runtime模块设计与Workflow执行引擎实现

在构建Agent框架的过程中,Runtime模块作为Workflow/Agent的执行引擎,承担着状态管理、任务调度和插件调用的核心职责。本文将深入探讨如何设计一个可扩展、高可靠的Runtime模块,使Workflow能够按预期执行,同时支持动态工具调用和复杂逻辑编排。

一、Runtime模块的核心功能定位

Runtime模块是Agent框架的”操作系统”,它需要解决三个核心问题:

  1. 状态管理:跟踪Workflow的执行状态(初始化/运行中/暂停/完成/失败)
  2. 上下文传递:维护跨步骤的数据共享和状态持久化
  3. 插件调度:动态加载和执行各类工具(API调用、数据库查询、LLM推理等)

不同于传统的工作流引擎,Agent Runtime需要特别处理异步操作和LLM驱动的决策逻辑。例如,当Workflow执行到需要调用外部API的步骤时,Runtime不仅要管理API调用的生命周期,还需要处理可能的重试机制和结果解析。

二、基于状态机的执行引擎设计

1. 状态模型设计

推荐采用有限状态机(FSM)模型管理Workflow生命周期:

  1. stateDiagram-v2
  2. [*] --> Initialized
  3. Initialized --> Running: start()
  4. Running --> Paused: wait_for_user_input()
  5. Running --> Completed: finish()
  6. Running --> Failed: throw_error()
  7. Paused --> Running: resume()

每个状态转换都应触发相应的生命周期钩子:

  1. class WorkflowStateMachine:
  2. def __init__(self):
  3. self.state = "INITIALIZED"
  4. self.hooks = {
  5. "INITIALIZED": [],
  6. "RUNNING": [self._validate_context],
  7. "COMPLETED": [self._save_execution_log],
  8. "FAILED": [self._notify_failure]
  9. }
  10. def transition(self, new_state):
  11. if new_state not in self.hooks:
  12. raise ValueError(f"Invalid state transition to {new_state}")
  13. # 执行当前状态的退出钩子
  14. for hook in self._get_exit_hooks(self.state):
  15. hook()
  16. # 状态转换
  17. self.state = new_state
  18. # 执行新状态的进入钩子
  19. for hook in self.hooks.get(new_state, []):
  20. hook()

2. 执行上下文管理

设计多层次的上下文结构:

  1. class ExecutionContext:
  2. def __init__(self, workflow_id):
  3. self.workflow_id = workflow_id
  4. self.global_vars = {} # 全局变量
  5. self.step_vars = {} # 步骤级变量
  6. self.tools_cache = {} # 工具调用缓存
  7. self.metadata = {
  8. "start_time": datetime.now(),
  9. "user_id": None
  10. }
  11. def update_global(self, key, value):
  12. self.global_vars[key] = value
  13. def get_step_var(self, step_name, key):
  14. return self.step_vars.get(step_name, {}).get(key)

三、插件化工具调度系统

1. 工具注册与发现机制

实现基于接口的工具注册系统:

  1. from abc import ABC, abstractmethod
  2. class ToolBase(ABC):
  3. @property
  4. @abstractmethod
  5. def name(self):
  6. pass
  7. @abstractmethod
  8. def execute(self, context: ExecutionContext, **kwargs):
  9. pass
  10. class ToolRegistry:
  11. def __init__(self):
  12. self._tools = {}
  13. def register(self, tool_class):
  14. tool_instance = tool_class()
  15. self._tools[tool_instance.name] = tool_instance
  16. def get_tool(self, name):
  17. return self._tools.get(name)

2. 异步工具执行模式

对于耗时操作,推荐采用异步执行模式:

  1. import asyncio
  2. from concurrent.futures import ThreadPoolExecutor
  3. class AsyncToolExecutor:
  4. def __init__(self, max_workers=5):
  5. self.executor = ThreadPoolExecutor(max_workers=max_workers)
  6. async def execute_async(self, tool: ToolBase, context, **kwargs):
  7. loop = asyncio.get_running_loop()
  8. def _sync_execute():
  9. try:
  10. return tool.execute(context, **kwargs)
  11. except Exception as e:
  12. return {"error": str(e)}
  13. result = await loop.run_in_executor(self.executor, _sync_execute)
  14. return result

四、Workflow执行流程实现

1. 执行流程控制

设计完整的执行循环:

  1. class WorkflowRunner:
  2. def __init__(self, workflow_def, registry: ToolRegistry):
  3. self.workflow = workflow_def
  4. self.registry = registry
  5. self.context = ExecutionContext(workflow_def.id)
  6. async def run(self):
  7. try:
  8. for step in self.workflow.steps:
  9. await self._execute_step(step)
  10. if self.context.state == "PAUSED":
  11. return # 等待外部事件触发恢复
  12. self.context.transition("COMPLETED")
  13. except Exception as e:
  14. self.context.transition("FAILED")
  15. raise
  16. async def _execute_step(self, step):
  17. tool = self.registry.get_tool(step.tool_name)
  18. if not tool:
  19. raise ValueError(f"Tool {step.tool_name} not found")
  20. # 准备工具参数
  21. params = self._prepare_params(step.params)
  22. # 执行工具
  23. if step.async_mode:
  24. result = await self._async_execute(tool, params)
  25. else:
  26. result = tool.execute(self.context, **params)
  27. # 处理结果
  28. self._handle_result(step, result)

2. 错误处理与恢复机制

实现健壮的错误处理:

  1. class ErrorHandlingStrategy(ABC):
  2. @abstractmethod
  3. def handle(self, error, context):
  4. pass
  5. class RetryStrategy(ErrorHandlingStrategy):
  6. def __init__(self, max_retries=3):
  7. self.max_retries = max_retries
  8. def handle(self, error, context):
  9. retry_count = context.metadata.get("retry_count", 0)
  10. if retry_count < self.max_retries:
  11. context.metadata["retry_count"] = retry_count + 1
  12. return "RETRY"
  13. return "FAIL"
  14. class WorkflowErrorHandler:
  15. def __init__(self):
  16. self.strategies = {
  17. "NETWORK_ERROR": RetryStrategy(max_retries=5),
  18. "TIMEOUT": RetryStrategy(max_retries=2),
  19. "DEFAULT": RetryStrategy(max_retries=1)
  20. }
  21. def handle_error(self, error, context):
  22. error_type = self._classify_error(error)
  23. strategy = self.strategies.get(error_type, self.strategies["DEFAULT"])
  24. action = strategy.handle(error, context)
  25. if action == "RETRY":
  26. return True # 继续执行
  27. elif action == "FAIL":
  28. context.transition("FAILED")
  29. return False

五、性能优化与扩展性设计

1. 执行日志与追踪

实现结构化日志记录:

  1. class ExecutionLogger:
  2. def __init__(self, log_path):
  3. self.log_path = log_path
  4. self.session_logs = []
  5. def log_step(self, step_name, status, duration, input=None, output=None):
  6. log_entry = {
  7. "timestamp": datetime.now().isoformat(),
  8. "step": step_name,
  9. "status": status,
  10. "duration_ms": duration,
  11. "input": input,
  12. "output": output
  13. }
  14. self.session_logs.append(log_entry)
  15. def save(self):
  16. with open(self.log_path, "w") as f:
  17. json.dump(self.session_logs, f, indent=2)

2. 动态工具加载

支持运行时工具热加载:

  1. class DynamicToolLoader:
  2. def __init__(self, plugin_dirs):
  3. self.plugin_dirs = plugin_dirs
  4. self.loaded_modules = {}
  5. def load_tools(self):
  6. for dir_path in self.plugin_dirs:
  7. for file_name in os.listdir(dir_path):
  8. if file_name.endswith(".py") and not file_name.startswith("_"):
  9. module_name = file_name[:-3]
  10. module_path = os.path.join(dir_path, file_name)
  11. spec = importlib.util.spec_from_file_location(
  12. module_name, module_path)
  13. module = importlib.util.module_from_spec(spec)
  14. spec.loader.exec_module(module)
  15. # 查找并注册所有Tool子类
  16. for attr_name in dir(module):
  17. attr = getattr(module, attr_name)
  18. if isinstance(attr, type) and issubclass(attr, ToolBase) and attr != ToolBase:
  19. self.loaded_modules[attr_name] = attr

六、最佳实践与反模式

1. 推荐实践

  1. 上下文隔离:每个Workflow实例应使用独立的ExecutionContext
  2. 工具幂等性:设计工具时确保可安全重试
  3. 渐进式状态保存:定期将上下文持久化到存储
  4. 超时控制:为每个工具调用设置合理的超时时间

2. 常见反模式

  1. 全局状态污染:避免在多个Workflow间共享可变状态
  2. 同步阻塞:长时间运行的操作应使用异步模式
  3. 过度设计:初期避免实现复杂的分支逻辑,保持简单
  4. 忽略错误分类:不同错误类型应采用不同处理策略

七、完整示例架构

  1. agent_framework/
  2. ├── runtime/
  3. ├── __init__.py
  4. ├── context.py # ExecutionContext实现
  5. ├── state_machine.py # 状态机核心逻辑
  6. ├── tool_registry.py # 工具注册与发现
  7. ├── workflow_runner.py # 主执行引擎
  8. └── error_handler.py # 错误处理策略
  9. ├── tools/
  10. ├── __init__.py
  11. ├── base.py # ToolBase定义
  12. └── plugins/ # 动态加载的工具
  13. └── examples/
  14. └── basic_workflow.py # 使用示例

通过上述设计,我们构建了一个灵活、可扩展的Runtime模块,能够支持从简单任务到复杂Agent行为的执行需求。实际开发中,建议从最小可行实现开始,逐步添加错误处理、持久化等高级功能。下一篇文章将深入探讨如何设计Workflow定义语言和可视化编排界面。