从零到一:Java自研工作流引擎的设计与实现(上)

一、项目背景与核心痛点

在传统企业级应用开发中,工作流引擎常被视为”黑盒”组件。主流云服务商提供的流程引擎虽功能完备,但存在以下问题:

  1. 技术耦合度高:依赖特定框架(如Spring Cloud Workflow)或云平台服务
  2. 定制成本高:流程节点扩展需遵循供应商规范,二次开发受限
  3. 性能瓶颈:复杂流程场景下,分布式事务处理效率不足

笔者团队在某金融系统重构项目中,面临”既要支持千级并发流程,又要兼容遗留系统”的双重挑战。经技术选型评估,决定采用自研引擎方案,核心设计目标包括:

  • 纯Java实现,无第三方流程引擎依赖
  • 支持BPMN 2.0标准流程定义
  • 具备水平扩展能力的分布式执行架构

二、核心架构设计

1. 分层架构模型

采用经典的”定义-执行-监控”三层架构:

  1. graph TD
  2. A[流程定义层] --> B(流程解析器)
  3. B --> C[执行引擎核心]
  4. C --> D[持久化存储]
  5. C --> E[事件通知中心]
  6. E --> F[监控仪表盘]
  • 定义层:基于XML/JSON的流程描述语言,兼容BPMN规范
  • 执行层:采用状态机模式驱动流程流转,支持同步/异步执行
  • 监控层:通过事件溯源机制实现全流程追踪

2. 关键技术选型

组件 候选方案 最终选择
状态管理 Spring StateMachine 自定义实现
规则引擎 Drools/MVEL 表达式解析器
持久化 JPA/MyBatis 混合模式
并发控制 Saga模式/TCC 乐观锁+补偿机制

选择自定义状态机实现主要考虑:

  1. 避免Spring StateMachine的类加载泄漏问题
  2. 实现更精细的流程控制(如条件分支并行)
  3. 降低与Spring生态的强耦合

三、流程定义规范设计

1. DSL语法设计

采用类BPMN的XML Schema定义,示例片段:

  1. <process id="orderProcess" name="订单处理流程">
  2. <startEvent id="start" />
  3. <sequenceFlow sourceRef="start" targetRef="approveTask" />
  4. <userTask id="approveTask" name="审批"
  5. assignee="#{initiator.manager}">
  6. <documentation>订单金额超过阈值需审批</documentation>
  7. <conditionExpression>${amount > 10000}</conditionExpression>
  8. </userTask>
  9. <exclusiveGateway id="decision" />
  10. <sequenceFlow sourceRef="approveTask" targetRef="decision" />
  11. <sequenceFlow id="approved" sourceRef="decision"
  12. targetRef="paymentTask"
  13. conditionExpression="${approved == true}"/>
  14. <sequenceFlow id="rejected" sourceRef="decision"
  15. targetRef="end"
  16. conditionExpression="${approved == false}"/>
  17. </process>

2. 解析器实现要点

  1. Schema验证:通过XSD校验流程定义合法性

    1. public class ProcessValidator {
    2. private static final String SCHEMA_PATH = "/schema/process.xsd";
    3. public boolean validate(String xml) {
    4. SchemaFactory factory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI);
    5. Schema schema = factory.newSchema(new File(SCHEMA_PATH));
    6. Validator validator = schema.newValidator();
    7. // 实现验证逻辑...
    8. }
    9. }
  2. 表达式解析:集成SpEL实现动态条件判断

    1. public class SpelExpressionEvaluator {
    2. public boolean evaluate(String expression, Map<String, Object> variables) {
    3. ExpressionParser parser = new SpelExpressionParser();
    4. StandardEvaluationContext context = new StandardEvaluationContext();
    5. variables.forEach(context::setVariable);
    6. return (Boolean) parser.parseExpression(expression).getValue(context);
    7. }
    8. }

四、状态机核心实现

1. 状态模型设计

  1. public enum ProcessState {
  2. CREATED("创建"),
  3. RUNNING("执行中"),
  4. SUSPENDED("挂起"),
  5. COMPLETED("完成"),
  6. TERMINATED("终止"),
  7. FAILED("失败");
  8. private final String description;
  9. // 构造方法与getter省略...
  10. }
  11. public class StateTransition {
  12. private ProcessState from;
  13. private ProcessState to;
  14. private String trigger;
  15. // 构造方法与业务逻辑...
  16. }

2. 状态流转控制

采用责任链模式处理状态变更:

  1. public abstract class StateHandler {
  2. private StateHandler next;
  3. public final void handle(StateContext context) {
  4. if (canHandle(context)) {
  5. doHandle(context);
  6. } else if (next != null) {
  7. next.handle(context);
  8. }
  9. }
  10. protected abstract boolean canHandle(StateContext context);
  11. protected abstract void doHandle(StateContext context);
  12. }
  13. // 具体处理器示例
  14. public class CompleteHandler extends StateHandler {
  15. @Override
  16. protected boolean canHandle(StateContext context) {
  17. return context.getCurrentState() == ProcessState.RUNNING
  18. && "complete".equals(context.getTrigger());
  19. }
  20. @Override
  21. protected void doHandle(StateContext context) {
  22. context.setNextState(ProcessState.COMPLETED);
  23. // 持久化状态变更...
  24. }
  25. }

五、分布式执行设计

1. 节点通信机制

采用事件驱动架构,通过消息队列实现:

  1. public class ProcessEventPublisher {
  2. @Autowired
  3. private JmsTemplate jmsTemplate;
  4. public void publish(ProcessEvent event) {
  5. jmsTemplate.convertAndSend("process.events", event);
  6. }
  7. }
  8. @JmsListener(destination = "process.events")
  9. public class ProcessEventListener {
  10. public void handle(ProcessEvent event) {
  11. // 根据事件类型触发对应处理
  12. }
  13. }

2. 幂等性保障

通过全局事务ID实现:

  1. public class IdempotentExecutor {
  2. private static final String IDEMPOTENT_KEY_PREFIX = "proc:";
  3. @Autowired
  4. private RedisTemplate<String, String> redisTemplate;
  5. public <T> T execute(String businessKey, Supplier<T> supplier) {
  6. String key = IDEMPOTENT_KEY_PREFIX + businessKey;
  7. Boolean acquired = redisTemplate.opsForValue().setIfAbsent(key, "1", 1, TimeUnit.HOURS);
  8. if (Boolean.TRUE.equals(acquired)) {
  9. try {
  10. return supplier.get();
  11. } finally {
  12. redisTemplate.delete(key);
  13. }
  14. }
  15. throw new DuplicateExecutionException("重复执行");
  16. }
  17. }

六、性能优化实践

1. 流程实例缓存

采用两级缓存策略:

  1. public class ProcessCache {
  2. private final Cache<String, ProcessDefinition> definitionCache
  3. = Caffeine.newBuilder().maximumSize(1000).build();
  4. private final Cache<String, ProcessInstance> instanceCache
  5. = Caffeine.newBuilder().expireAfterWrite(5, TimeUnit.MINUTES).build();
  6. public ProcessDefinition getDefinition(String processId) {
  7. return definitionCache.get(processId, this::loadFromDB);
  8. }
  9. // 其他缓存操作...
  10. }

2. 异步执行优化

通过线程池隔离不同优先级任务:

  1. @Configuration
  2. public class ExecutorConfig {
  3. @Bean("highPriorityExecutor")
  4. public Executor highPriorityExecutor() {
  5. ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
  6. executor.setCorePoolSize(10);
  7. executor.setMaxPoolSize(20);
  8. executor.setQueueCapacity(100);
  9. executor.setThreadNamePrefix("high-proc-");
  10. return executor;
  11. }
  12. // 低优先级任务配置...
  13. }

本篇详细阐述了工作流引擎的核心架构设计、流程定义规范及关键实现技术。下篇将深入解析分布式执行协调、异常处理机制及监控体系构建,并提供完整的代码示例与性能测试数据。这种自研方案在某金融系统中验证,实现日均处理万级流程实例,平均响应时间<200ms,为同类项目提供了可复用的技术范式。