NestJS智能体开发进阶:结构化输出与工具调用实践

一、结构化输出的核心价值与设计原则

在智能体开发中,结构化输出是连接后端逻辑与前端交互的桥梁。相较于自由文本,结构化数据(如JSON Schema)能显著提升信息解析效率,尤其在多模态交互场景下,清晰的字段定义可避免语义歧义。

1.1 DTO设计规范

NestJS推荐使用数据传输对象(DTO)定义输出结构,例如:

  1. // src/agent/dto/output.dto.ts
  2. export class AgentOutputDto {
  3. @ApiProperty({ description: '智能体唯一标识' })
  4. agentId: string;
  5. @ApiProperty({
  6. type: 'object',
  7. description: '结构化响应内容',
  8. example: {
  9. text: '查询成功',
  10. data: { temperature: 25, humidity: 60 }
  11. }
  12. })
  13. response: {
  14. text: string;
  15. data?: Record<string, any>;
  16. };
  17. @ApiProperty({ description: '工具调用状态' })
  18. status: 'success' | 'pending' | 'failed';
  19. }

设计要点

  • 字段分级:基础信息(如agentId)与业务数据分离
  • 类型安全:通过Zod或class-validator进行运行时校验
  • 扩展性:预留data字段支持动态业务数据

1.2 中间件处理链

通过NestJS中间件实现输出标准化:

  1. // src/common/middleware/output-formatter.middleware.ts
  2. export class OutputFormatterMiddleware implements NestMiddleware {
  3. use(req: Request, res: Response, next: NextFunction) {
  4. const originalJson = res.json;
  5. res.json = (body: any) => {
  6. const formatted = {
  7. timestamp: new Date().toISOString(),
  8. ...(this.isAgentResponse(body) ?
  9. { agentOutput: this.normalizeAgentOutput(body) } :
  10. body)
  11. };
  12. return originalJson.call(res, formatted);
  13. };
  14. next();
  15. }
  16. private isAgentResponse(body: any): boolean {
  17. return body?.hasOwnProperty('agentId');
  18. }
  19. }

最佳实践

  • 在全局中间件中注入时间戳、请求ID等元数据
  • 对特定路由(如/api/agent/*)应用差异化格式化
  • 避免过度嵌套,保持三级以内结构

二、工具调用的异步处理模式

智能体的核心能力往往依赖外部工具(如数据库查询、API调用),需建立可靠的异步处理机制。

2.1 工具注册与发现

采用依赖注入方式管理工具:

  1. // src/tools/tool-registry.service.ts
  2. @Injectable()
  3. export class ToolRegistryService {
  4. private readonly tools = new Map<string, ToolInterface>();
  5. registerTool(name: string, tool: ToolInterface) {
  6. this.tools.set(name, tool);
  7. }
  8. getTool(name: string): ToolInterface {
  9. const tool = this.tools.get(name);
  10. if (!tool) throw new NotFoundException(`Tool ${name} not found`);
  11. return tool;
  12. }
  13. }
  14. // src/tools/database.tool.ts
  15. @Injectable()
  16. export class DatabaseTool implements ToolInterface {
  17. constructor(private prisma: PrismaService) {}
  18. async execute(query: string): Promise<any> {
  19. return this.prisma.$queryRawUnsafe(query);
  20. }
  21. }

架构优势

  • 集中式管理避免工具重复实例化
  • 通过接口抽象实现工具热插拔
  • 结合AOP实现调用日志、权限校验等横切关注点

2.2 异步调用编排

使用RxJS处理工具链调用:

  1. // src/agent/services/agent.service.ts
  2. executeToolChain(tools: string[], input: any): Observable<any> {
  3. return from(tools).pipe(
  4. concatMap(toolName => {
  5. const tool = this.toolRegistry.getTool(toolName);
  6. return tool.execute(input).pipe(
  7. tap(result => this.logger.log(`Tool ${toolName} executed`)),
  8. catchError(err => {
  9. this.logger.error(`Tool ${toolName} failed`, err.stack);
  10. return throwError(err);
  11. })
  12. );
  13. }),
  14. reduce((acc, curr) => ({ ...acc, ...curr }), {})
  15. );
  16. }

性能优化

  • 对无依赖的工具采用mergeMap并行执行
  • 设置并发限制避免资源耗尽
  • 通过retry策略处理瞬时故障

三、典型场景实现方案

3.1 多轮对话状态管理

结合Redis实现跨请求状态持久化:

  1. // src/agent/services/dialog-manager.service.ts
  2. @Injectable()
  3. export class DialogManagerService {
  4. constructor(
  5. @InjectRedis() private readonly redis: Redis,
  6. private readonly agentService: AgentService
  7. ) {}
  8. async handleDialog(sessionId: string, input: DialogInputDto) {
  9. const dialogState = await this.getDialogState(sessionId);
  10. const context = { ...dialogState, ...input };
  11. const result = await this.agentService.process(context);
  12. await this.updateDialogState(sessionId, {
  13. lastInput: input,
  14. lastOutput: result,
  15. step: dialogState.step + 1
  16. });
  17. return result;
  18. }
  19. private async getDialogState(sessionId: string) {
  20. const cached = await this.redis.get(`dialog:${sessionId}`);
  21. return cached ? JSON.parse(cached) : { step: 0 };
  22. }
  23. }

关键考虑

  • 设置合理的TTL避免状态堆积
  • 对敏感数据加密存储
  • 实现状态迁移验证逻辑

3.2 工具调用超时控制

通过AbortController实现:

  1. // src/tools/http-client.tool.ts
  2. @Injectable()
  3. export class HttpClientTool implements ToolInterface {
  4. async execute(config: HttpRequestConfig): Promise<any> {
  5. const controller = new AbortController();
  6. const timeoutId = setTimeout(() => controller.abort(), 5000);
  7. try {
  8. const response = await fetch(config.url, {
  9. signal: controller.signal,
  10. ...config
  11. });
  12. clearTimeout(timeoutId);
  13. return response.json();
  14. } catch (err) {
  15. if (err.name === 'AbortError') {
  16. throw new RequestTimeoutException('Tool execution timed out');
  17. }
  18. throw err;
  19. }
  20. }
  21. }

容错设计

  • 熔断机制:连续超时后自动降级
  • 备份工具:主工具失败时切换备用方案
  • 监控告警:记录超时率异常波动

四、测试与质量保障

4.1 结构化输出验证

使用Fastify的JSON Schema验证:

  1. // src/agent/agent.controller.ts
  2. @Post()
  3. @UsePipes(new ValidationPipe({ transform: true }))
  4. @UseInterceptors(new SchemaValidatorInterceptor({
  5. schema: {
  6. type: 'object',
  7. properties: {
  8. agentOutput: {
  9. type: 'object',
  10. required: ['text', 'status'],
  11. properties: {
  12. text: { type: 'string' },
  13. status: { enum: ['success', 'pending', 'failed'] }
  14. }
  15. }
  16. }
  17. }
  18. }))
  19. async handleRequest(@Body() input: AgentInputDto) {
  20. // ...
  21. }

4.2 工具调用模拟测试

创建工具存根(Stub)进行隔离测试:

  1. // test/tools/database.tool.stub.ts
  2. export class DatabaseToolStub implements ToolInterface {
  3. private mockData: Record<string, any> = {};
  4. setMockData(key: string, value: any) {
  5. this.mockData[key] = value;
  6. }
  7. async execute(query: string): Promise<any> {
  8. const match = query.match(/SELECT\s+\*\s+FROM\s+(\w+)/i);
  9. if (match) return this.mockData[match[1]] || [];
  10. throw new Error('Query not mocked');
  11. }
  12. }
  13. // 在测试中替换真实工具
  14. beforeEach(() => {
  15. const mock = new DatabaseToolStub();
  16. mock.setMockData('users', [{ id: 1, name: 'Test' }]);
  17. const module = await Test.createTestingModule({
  18. providers: [
  19. AgentService,
  20. { provide: ToolRegistryService, useValue: {
  21. getTool: () => mock
  22. }}
  23. ]
  24. }).compile();
  25. // ...
  26. });

五、性能优化实践

5.1 输出缓存策略

对稳定输出实施缓存:

  1. // src/agent/services/output-cache.service.ts
  2. @Injectable()
  3. export class OutputCacheService {
  4. constructor(
  5. @InjectRedis() private readonly redis: Redis,
  6. private readonly agentService: AgentService
  7. ) {}
  8. async getCachedOutput(inputHash: string): Promise<any> {
  9. const cached = await this.redis.get(`output:${inputHash}`);
  10. return cached ? JSON.parse(cached) : null;
  11. }
  12. async processWithCache(input: AgentInputDto): Promise<any> {
  13. const inputHash = createHash('md5').update(JSON.stringify(input)).digest('hex');
  14. const cached = await this.getCachedOutput(inputHash);
  15. if (cached) return cached;
  16. const result = await this.agentService.process(input);
  17. await this.redis.setex(`output:${inputHash}`, 3600, JSON.stringify(result));
  18. return result;
  19. }
  20. }

缓存策略选择

  • 参数化查询:对相同输入返回相同输出的情况适用
  • 版本控制:当工具集变更时清空相关缓存
  • 梯度失效:重要数据设置较短TTL,静态数据设置长TTL

5.2 工具池复用

对耗时工具实现连接池:

  1. // src/tools/pool/tool-pool.service.ts
  2. @Injectable()
  3. export class ToolPoolService {
  4. private pool = new Map<string, Pool<ToolInterface>>();
  5. acquireTool(name: string): Promise<ToolInterface> {
  6. if (!this.pool.has(name)) {
  7. const factory = () => this.createToolInstance(name);
  8. this.pool.set(name, new Pool({ factory, max: 5 }));
  9. }
  10. return this.pool.get(name)!.acquire();
  11. }
  12. releaseTool(name: string, tool: ToolInterface): Promise<void> {
  13. const pool = this.pool.get(name);
  14. if (pool) return pool.release(tool);
  15. return Promise.resolve();
  16. }
  17. }

六、安全与合规考量

6.1 输出脱敏处理

实现敏感数据过滤中间件:

  1. // src/common/middleware/output-sanitizer.middleware.ts
  2. export class OutputSanitizerMiddleware implements NestMiddleware {
  3. private readonly sensitivePatterns = [
  4. /(\d{3})-\d{2}-\d{4}/g, // SSN
  5. /(\d{3})[\s-]?\d{3}[\s-]?\d{4}/g // 信用卡
  6. ];
  7. use(req: Request, res: Response, next: NextFunction) {
  8. const originalJson = res.json;
  9. res.json = (body: any) => {
  10. const sanitized = this.sanitize(body);
  11. return originalJson.call(res, sanitized);
  12. };
  13. next();
  14. }
  15. private sanitize(data: any): any {
  16. if (typeof data !== 'object' || data === null) return data;
  17. return Object.fromEntries(
  18. Object.entries(data).map(([key, value]) => {
  19. if (typeof value === 'string') {
  20. return [key, this.applyPatterns(value)];
  21. } else if (typeof value === 'object') {
  22. return [key, this.sanitize(value)];
  23. }
  24. return [key, value];
  25. })
  26. );
  27. }
  28. }

6.2 工具调用鉴权

基于角色的工具访问控制:

  1. // src/tools/auth/tool-auth.guard.ts
  2. @Injectable()
  3. export class ToolAuthGuard implements CanActivate {
  4. constructor(
  5. private readonly reflector: Reflector,
  6. private readonly toolAuthService: ToolAuthService
  7. ) {}
  8. async canActivate(context: ExecutionContext): Promise<boolean> {
  9. const requiredTools = this.reflector.get<string[]>(
  10. 'requiredTools',
  11. context.getHandler()
  12. );
  13. if (!requiredTools) return true;
  14. const request = context.switchToHttp().getRequest();
  15. const user = request.user;
  16. return this.toolAuthService.checkAccess(
  17. user.id,
  18. requiredTools
  19. );
  20. }
  21. }
  22. // 在控制器方法上使用装饰器
  23. export const RequiredTools = (tools: string[]) =>
  24. SetMetadata('requiredTools', tools);
  25. @UseGuards(ToolAuthGuard)
  26. @RequiredTools(['database', 'file-system'])
  27. @Post('/sensitive-operation')
  28. async sensitiveOperation() {
  29. // ...
  30. }

通过系统化的结构化输出设计和工具调用管理,NestJS智能体可实现高可维护性、强安全性的业务逻辑处理。开发者应重点关注DTO设计的合理性、异步调用的错误处理以及性能瓶颈的提前识别,结合具体业务场景选择合适的缓存策略和安全控制方案。