基于RabbitMQ解决分布式事务

1、可查询操作:服务操作具有全局唯一的标识,操作唯一的确定的时间。
2、幂等操作:重复调用多次产生的业务结果与调用一次产生的结果相同。一是通过业务操作实现幂等性,二是系统缓存所有请求与处理的结果,最后是检测到重复请求之后,自动返回之前的处理结果。
3、TCC操作:Try阶段,尝试执行业务,完成所有业务的检查,实现一致性;预留必须的业务资源,实现准隔离性。Confirm阶段:真正的去执行业务,不做任何检查,仅适用Try阶段预留的业务资源,Confirm操作还要满足幂等性。Cancel阶段:取消执行业务,释放Try阶段预留的业务资源,Cancel操作要满足幂等性。TCC与2PC(两阶段提交)协议的区别:TCC位于业务服务层而不是资源层,TCC没有单独准备阶段,Try操作兼备资源操作与准备的能力,TCC中Try操作可以灵活的选择业务资源,锁定粒度。TCC的开发成本比2PC高。实际上TCC也属于两阶段操作,但是TCC不等同于2PC操作。
4、可补偿操作:Do阶段:真正的执行业务处理,业务处理结果外部可见。Compensate阶段:抵消或者部分撤销正向业务操作的业务结果,补偿操作满足幂等性。约束:补偿操作在业务上可行,由于业务执行结果未隔离或者补偿不完整带来的风险与成本可控。实际上,TCC的Confirm和Cancel操作可以看做是补偿操作。

分布式事务解决方案有很多,下面是我在空闲时间,基于RabbitMQ的分布式事务解决方案,我把它称之为消息驱动。

1、消息驱动可以干什么
答:可用于异步事务,分布式服务调用等
2、消息驱动需要注意的是什么
答:兼容本地事务,事务检查,消息发送失败自动补偿,消费者业务方免幂等操作,免事务操作
3、本消息驱动缺点是什么
答:违反设计模式中最少知道原则,并且有表侵入问题,消息表必须跟业务表在一起,正在努力改进中

在基于MQ解决分布式事务或者异步操作时,最大的问题无非就三点
1、 routingKey 管理困难、队列爆炸。解决方案:每个服务只用一个routingKey,消费方使用统一的处理器,我们把处理器当观察者,发送方发送消息时,只要带上处理器的名字,消费方对应处理器就执行此消息。
2、发送失败补偿。解决方案:发送方发送时消息驱动自动存储消息,发送失败后自动重复发送
3、消费失败重新消费,重复消费幂等问题。解决方案:消费方消费前自动存储待消费的消息,消费成功修改当前消息状态,消费失败自动调用消费方业务重新消费,只到成功为止。

设计概要:
整体思想基于经典MQ事务处理的二次封装,其中发送、接收消息表会侵入业务,这里使用消息中间件是 RabbitMQ。你也可以替换成RocketMQ。每个服务固定一个队列专门用来接收消息驱动发过来的消息,如果你的服务只发送不接收消息,那么只需要建立发送表就可以,如果即发送又接收那么两个表都要建,只接收消息可以只建立接收表。消费端,每种业务处理都是一个处理器,必须实现处理器ReceiveHandle接口,业务处理器自动会成为伪观察者监听消息。

模型图如下:

1、主业务方处理完,写数据库
2、主业务方调用消息驱动
3、消息驱动先检查事务,无事务则抛异常,然后向DB写发送数据
4、异步发送,发送成功失败不影响业务操作
5、消息确认
6、修改发送记录确认成功或失败(定时任务重试发送失败消息)
7、接收消息
8、写接收消息
9、开启事务调用业务方
10、业务方写DB
11、消息驱动修改消息处理成功(定时任务处理初始状态消息)
12、消息回执ACK

相关实现

结构

发送消息表和接收消息表 sql

CREATE TABLE `cs_receive_message`  (`message_id` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '消息类型',`business_type` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '业务类型',`business_id` bigint(19) NULL DEFAULT NULL COMMENT '业务订单号',`data` varchar(1000) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '数据',`gmt_receive` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '接收时间',`handle_state` tinyint(4) NULL DEFAULT NULL COMMENT '状态',`gmt_create` timestamp(0) NOT NULL DEFAULT CURRENT_TIMESTAMP(0) COMMENT '创建时间',`gmt_modify` timestamp(0) NOT NULL DEFAULT CURRENT_TIMESTAMP(0) ON UPDATE CURRENT_TIMESTAMP(0) COMMENT '修改时间',PRIMARY KEY (`message_id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic;CREATE TABLE `cs_send_message`  (`message_id` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '消息类型',`business_type` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '业务类型',`business_id` bigint(19) NULL DEFAULT NULL COMMENT '业务订单号',`exchange` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '路由',`routing_key` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '路由策略',`data` varchar(1000) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '数据',`gmt_send` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '发送时间',`gmt_confirm` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '确认时间',`message_state` tinyint(4) NULL DEFAULT NULL COMMENT '状态',`gmt_create` timestamp(0) NOT NULL DEFAULT CURRENT_TIMESTAMP(0) COMMENT '创建时间',`gmt_modify` timestamp(0) NOT NULL DEFAULT CURRENT_TIMESTAMP(0) ON UPDATE CURRENT_TIMESTAMP(0) COMMENT '修改时间',PRIMARY KEY (`message_id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic;

消息接收实体类

@Data
public class ReceiveMessage implements Serializable {/** 消息类型 */private String messageId;/** 业务类型 */private String businessType;/** 业务订单号 */private Long businessId;/** 数据 */private String data;/** 接收时间 */private Date gmtReceive;/** 状态 */private Integer handleState;/** 创建时间 */private Date gmtCreate;/** 修改时间 */private Date gmtModify;private static final long serialVersionUID = 1L;
}

消息发送实体类

@Data
public class SendMessage implements Serializable {/** 消息类型 */private String messageId;/** 业务类型 */private String businessType;/** 业务订单号 */private Long businessId;/** 路由 */private String exchange;/** 路由策略 */private String routingKey;/** 数据 */private String data;/** 发送时间 */private Date gmtSend;/** 确认时间 */private Date gmtConfirm;/** 状态 */private Integer messageState;/** 创建时间 */private Date gmtCreate;/** 修改时间 */private Date gmtModify;private static final long serialVersionUID = 1L;
}

消息处理状态枚举类

public enum ReceiveHandleStateEnum {/** 初始 */INIT(0, "初始"),/** 处理中 */PROCESSING(5, "处理中"),/** 处理成功 */PROCESS_SUCCESS(10, "处理成功");/** 编码 */private Integer code;/** 说明 */private String describe;ReceiveHandleStateEnum(Integer code, String describe) {this.code = code;this.describe = describe;}public Integer getCode() {return code;}public String getDescribe() {return describe;}
}

消息发送状态枚举类

public enum SendHandleStateEnum {/** 初始 */INIT(0, "初始"),/** 待确认 */WAITING_CONFIRM(5, "待确认"),/** 发送成功 */SEND_SUCCESS(10, "发送成功"),/** 发送失败 */SEND_FAIL(15, "发送失败");/** 编码 */private Integer code;/** 说明 */private String describe;SendHandleStateEnum(Integer code, String describe) {this.code = code;this.describe = describe;}public Integer getCode() {return code;}public String getDescribe() {return describe;}
}

接收处理Mapper配置文件,忽略公共部分,公共部分请用mybatis工具生成

<!-- 根据业务订单和查询接收消息列表 -->
<select id="findByBusinessId" resultMap="BaseResultMap">select<include refid="Base_Column_List" />from cs_receive_messagewhere business_id = #{businessId,jdbcType=BIGINT}
</select>
<!-- 查询未处理的消息 -->
<select id="findOverdueOrder" resultMap="BaseResultMap">select<include refid="Base_Column_List" />from cs_receive_messageWHERE<![CDATA[gmt_create <= #{endGmtCreate,jdbcType=VARCHAR}]]>AND handle_state in<foreach item="item" collection="receiveStateList" separator="," open="(" close=")" index="">#{item,jdbcType=INTEGER}</foreach>
</select>

发送处理Mapper配置文件

<!-- 根据业务订单号查询列表-->
<select id="findByBusinessId" resultMap="BaseResultMap">select<include refid="Base_Column_List" />from cs_send_messagewhere business_id = #{businessId,jdbcType=BIGINT}
</select>
<select id="findOverdueOrder" resultMap="BaseResultMap">select<include refid="Base_Column_List" />from cs_send_messageWHERE<![CDATA[gmt_create <= #{endGmtCreate,jdbcType=VARCHAR}]]>AND message_state in<foreach item="item" collection="sendStateList" separator="," open="(" close=")" index="">#{item,jdbcType=INTEGER}</foreach>
</select>

接收处理Dao 类

public interface ReceiveMessageMapper extends BaseMapper<ReceiveMessage, String> {/*** 根据业务订单和查询接收消息列表* @param businessId 业务ID* @return*/List<ReceiveMessage> findByBusinessId(Long businessId);/*** 根据状态查询创建时间小于等于endGmtCreate* @param receiveStateList 接收状态* @param endGmtCreate 完成时间* @return*/List<ReceiveMessage> findOverdueOrder(@Param(value = "receiveStateList") List<Integer> receiveStateList, @Param(value = "endGmtCreate") String endGmtCreate);
}

发送处理Dao类

public interface SendMessageMapper extends BaseMapper<SendMessage, String> {/*** 根据业务订单号查询列表* @param businessId 业务ID* @return*/List<SendMessage> findByBusinessId(Long businessId);/*** 根据状态查询创建时间小于等于endGmtCreate* @param sendStateList 发送状态* @param endGmtCreate 完成时间* @return*/List<SendMessage> findOverdueOrder(@Param(value = "sendStateList") List<Integer> sendStateList, @Param(value = "endGmtCreate") String endGmtCreate);
}

发送Service类

@Slf4j
@Service
public class SendMessageService {@Resourceprivate SendMessageMapper sendMessageMapper;public void insert(SendMessage sendMessage) {if (sendMessage == null || StringUtils.isEmpty(sendMessage.getMessageId())) {throw new RuntimeException("参数错误");}int total = sendMessageMapper.insertSelective(sendMessage);if (total <= 0) {throw new RuntimeException("系统异常");}}public void update(SendMessage sendMessage) {if (sendMessage == null || StringUtils.isEmpty(sendMessage.getMessageId())) {throw new RuntimeException("参数错误");}sendMessageMapper.updateByPrimaryKeySelective(sendMessage);}public SendMessage getByMessageId(String messageId) {if (StringUtils.isEmpty(messageId)){return null;}return sendMessageMapper.selectByPrimaryKey(messageId);}public List<SendMessage> findByBusinessId(Long businessId) {if (businessId == null){return Collections.emptyList();}return sendMessageMapper.findByBusinessId(businessId);}public void updateMessageState(String messageId, Integer messageState) {if (StringUtils.isEmpty(messageId) || messageState == null) {throw new RuntimeException("参数错误");}SendMessage sendMessageUpdate = new SendMessage();sendMessageUpdate.setMessageId(messageId);sendMessageUpdate.setMessageState(messageState);sendMessageUpdate.setGmtConfirm(new Date());update(sendMessageUpdate);}public List<SendMessage> findOverdueOrder(List<Integer> sendStateList, String endGmtCreate) {if (sendStateList.isEmpty() || StringUtils.isEmpty(endGmtCreate)){return Collections.emptyList();}return sendMessageMapper.findOverdueOrder(sendStateList, endGmtCreate);}
}

接收Service类

@Slf4j
@Service
public class ReceiveMessageService {@Resourceprivate ReceiveMessageMapper receiveMessageMapper;public void insert(ReceiveMessage receiveMessage) {if (receiveMessage == null || StringUtils.isEmpty(receiveMessage.getMessageId())) {throw new RuntimeException("参数错误");}int total = receiveMessageMapper.insertSelective(receiveMessage);if (total <= 0) {throw new RuntimeException("系统异常");}}public void update(ReceiveMessage receiveMessage) {if (receiveMessage == null || StringUtils.isEmpty(receiveMessage.getMessageId())) {throw new RuntimeException("参数错误");}receiveMessageMapper.updateByPrimaryKeySelective(receiveMessage);}public ReceiveMessage getByMessageId(String messageId) {if (StringUtils.isEmpty(messageId)) {return null;}return receiveMessageMapper.selectByPrimaryKey(messageId);}public List<ReceiveMessage> findByBusinessId(Long businessId) {if (businessId == null) {return Collections.emptyList();}return receiveMessageMapper.findByBusinessId(businessId);}public void updateHandleState(String messageId, Integer handleState) {if (StringUtils.isEmpty(messageId) || handleState == null) {throw new RuntimeException("参数错误");}ReceiveMessage receiveMessageUpdate = new ReceiveMessage();receiveMessageUpdate.setMessageId(messageId);receiveMessageUpdate.setHandleState(handleState);update(receiveMessageUpdate);}public List<ReceiveMessage> findOverdueOrder(List<Integer> receiveStateList, String endGmtCreate) {if (receiveStateList.isEmpty() || StringUtils.isEmpty(endGmtCreate)){return Collections.emptyList();}return receiveMessageMapper.findOverdueOrder(receiveStateList, endGmtCreate);}
}

发送工具类

@Slf4j
@Component
public class SendExecute {@Autowiredprivate SendMessageService sendMessageService;@Autowired@Qualifier("driveRabbitTemplate")private RabbitTemplate driveRabbitTemplate;@Transactional(rollbackFor = Exception.class)public void execute(Long businessId, String businessType, String routingKey, String date) {String messageId = StringUtil.randomGUID();SendMessage message = new SendMessage();message.setMessageId(messageId);message.setBusinessId(businessId);message.setBusinessType(businessType);message.setData(date);message.setExchange(driveRabbitTemplate.getExchange());message.setRoutingKey(routingKey);message.setGmtSend(new Date());message.setMessageState(SendHandleStateEnum.WAITING_CONFIRM.getCode());sendMessageService.insert(message);Home("开始异步发送消息{}", message);sendMessage(message);}/*** 发送消息(这里用public只是为了事务)* @param message*/@Async("sendPool")public void sendMessage(SendMessage message) {CorrelationData correlationData = new CorrelationData();correlationData.setId(message.getMessageId());driveRabbitTemplate.convertAndSend(message.getRoutingKey(), (Object) JSONObject.toJSONString(message), correlationData);Home("异步消息发送成功{}", message);}
}

异步线程池配置

@Configuration
public class DriveAsyncConfig {@Bean(name = "sendPool")public ThreadPoolExecutor withdrawAuditPool() {ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(8, 16, 60, TimeUnit.SECONDS,new LinkedBlockingQueue<>(128), new ThreadPoolExecutor.DiscardPolicy());return threadPoolExecutor;}
}

接收业务处理器接口

public interface ReceiveHandle {/*** 获取处理器名称* @return*/String getName();/*** 业务处理* @param data 业务数据*/void execute(String data);
}

接收执行器

@Slf4j
@Service
public class ReceiveExecute {@Autowiredprivate RedisLock redisLock;@Autowiredprivate ReceiveMessageService receiveMessageService;@Autowiredprivate ApplicationContext applicationContext;/** 接收处理类 */private volatile Map<String, ReceiveHandle> handleMap;/** 处理锁名称 */private static final String RECEIVE_EXECUTE_LOCK = "RECEIVE:EXECUTE:LOCK:";/*** 通过类型获取接收处理类* @param businessType* @return*/private ReceiveHandle getReceiveHandle(String businessType) {if (CollectionUtils.isEmpty(handleMap)) {synchronized (this) {if (CollectionUtils.isEmpty(handleMap)) {// 获取所有实现类型Map<String, ReceiveHandle> beans = applicationContext.getBeansOfType(ReceiveHandle.class);if (CollectionUtils.isEmpty(beans)) {return null;}// 初始化maphandleMap = beans.values().stream().collect(Collectors.toMap(ReceiveHandle::getName, x -> x));}}}return handleMap.get(businessType);}/*** 业务处理操作* @param messageId* @param businessType*/@Transactional(rollbackFor = Exception.class)public void execute(String messageId, String businessType) {Home("消息驱动开始业务处理 参数 messageId{} businessType{}", messageId, businessType);// 验证参数if (StringUtils.isEmpty(messageId) || StringUtils.isEmpty(businessType)) {return;}// 开始调用处理ReceiveHandle receiveHandle = getReceiveHandle(businessType);if (receiveHandle == null) {return;}redisLock.tryLock(RECEIVE_EXECUTE_LOCK, messageId, () -> {// 必须是待处理状态ReceiveMessage receiveMessageSelect = receiveMessageService.getByMessageId(messageId);if (receiveMessageSelect == null) {throw new RuntimeException("消息" + messageId + "不存在");}Home("消息驱动业务处理 messageId{}当前状态{}", messageId, receiveMessageSelect.getHandleState());// 定时任务会查初始状态数据并调用此方执行if (!ReceiveHandleStateEnum.INIT.getCode().equals(receiveMessageSelect.getHandleState())) {return;}// 修改消息处理中receiveMessageService.updateHandleState(receiveMessageSelect.getMessageId(), ReceiveHandleStateEnum.PROCESSING.getCode());Home("驱动开始业务处理 参数{}", receiveMessageSelect);// 处理业务receiveHandle.execute(receiveMessageSelect.getData());Home("驱动结束业务处理 参数{}", receiveMessageSelect);// 修改消息处理成功receiveMessageService.updateHandleState(receiveMessageSelect.getMessageId(), ReceiveHandleStateEnum.PROCESS_SUCCESS.getCode());});Home("消息驱动结束业务处理");}
}

MQ监听器

@Slf4j
@Service
public class ReceiveListener implements ChannelAwareMessageListener {@Autowiredprivate RedisLock redisLock;@Autowiredprivate ReceiveExecute receiveExecute;@Autowiredprivate ReceiveMessageService receiveMessageService;/**  写库锁名称 */private static final String RECEIVE_WRITING_DATA_LOCK = "RECEIVE:WRITING:DATA:LOCK:";@Overridepublic void onMessage(Message message, Channel channel) throws Exception {Home("消息驱动接收消息 - 数据{}", message.getBody());if (message == null || StringUtils.isEmpty(message.getBody())) {return;}// 获取发送数据String json = new String(message.getBody(), StandardCharsets.UTF_8);Home("消息驱动接收消息 - 数据{}", json);SendMessage sendMessage = JSONObject.parseObject(json, SendMessage.class);if (sendMessage == null) {// 拒绝并且丢弃channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);return;}redisLock.tryLock(RECEIVE_WRITING_DATA_LOCK, sendMessage.getMessageId(), () -> {// 是否接收过,已经接收过直接确认ReceiveMessage receiveMessageSelect = receiveMessageService.getByMessageId(sendMessage.getMessageId());if (receiveMessageSelect != null) {Home("消息驱动接收消息 - 重复消息 ID{}", sendMessage.getMessageId());return;}// 消息入库ReceiveMessage receiveMessage = new ReceiveMessage();receiveMessage.setMessageId(sendMessage.getMessageId());receiveMessage.setBusinessType(sendMessage.getBusinessType());receiveMessage.setBusinessId(sendMessage.getBusinessId());receiveMessage.setData(sendMessage.getData());receiveMessage.setGmtReceive(new Date());receiveMessage.setHandleState(ReceiveHandleStateEnum.INIT.getCode());receiveMessageService.insert(receiveMessage);Home("消息驱动接收消息 - 消息写入DB成功{}", receiveMessage);});// 调用业务执行receiveExecute.execute(sendMessage.getMessageId(), sendMessage.getBusinessType());// 消息确认channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}
}

驱动管理器

public interface DriveManage {/*** 发送消息* @param businessId 业务ID* @param businessType 业务类型* @param date 业务数据*/void send(Long businessId, String businessType, String date);/*** 发送消息* @param businessId 业务ID* @param businessType 业务类型* @param routingKey 路由key* @param date 业务数据*/void send(Long businessId, String businessType, String routingKey, String date);
}

驱动实现类

@Slf4j
@Component
@EnableScheduling
public class DriveManageImpl implements DriveManage {@Autowiredprivate SendMessageService sendMessageService;@Autowiredprivate ReceiveMessageService receiveMessageService;@Lazy@Autowiredprivate SendExecute sendExecute;@Lazy@Autowiredprivate ReceiveExecute receiveExecute;/** 驱动 - 消息队列名称(必须为每个业务系统建立一个队列) */@Value("${drive.message.queue.name}")private String messageQueueName;/** 默认exchange */@Value("${spring.rabbitmq.template.exchange}")private String exchange;/** 驱动 - 本服务routingKey */@Value("${drive.message.routingKey}")private String routingKey;@Beanpublic RabbitTemplate driveRabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate driveRabbitTemplate = new RabbitTemplate(connectionFactory);driveRabbitTemplate.setExchange(exchange);driveRabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if (ack) {// 修改消息发送成功sendMessageService.updateMessageState(correlationData.getId(), SendHandleStateEnum.SEND_SUCCESS.getCode());Home("drive消息确认 - 发送成功 ID{}", correlationData.getId());} else {sendMessageService.updateMessageState(correlationData.getId(), SendHandleStateEnum.SEND_FAIL.getCode());log.error("drive消消息确认 - 发送失败 ID{}", correlationData.getId());}});return driveRabbitTemplate;}@Overridepublic void send(Long businessId, String businessType, String date) {sendExecute.execute(businessId, businessType, routingKey, date);}@Overridepublic void send(Long businessId, String businessType, String routingKey, String date) {sendExecute.execute(businessId, businessType, routingKey, date);}@Beanpublic MessageListenerContainer openAccountListenerContainer(ConnectionFactory connectionFactory, ReceiveListener receiveListener) {SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.setQueueNames(messageQueueName);container.setMessageListener(receiveListener);container.setAcknowledgeMode(AcknowledgeMode.MANUAL);return container;}/*** 同步发送记录*/@Scheduled(cron = "0 0/1 * * * ?")public void synSend() {Calendar calendar = Calendar.getInstance();calendar.add(Calendar.MINUTE, -1);String endGmtCreate = new SimpleDateFormat(DateConstant.DATETIME_FORMAT).format(calendar.getTime());// 查询一分钟之前发送状态为待确认的List<Integer> sendStateList = new ArrayList<>();sendStateList.add(SendHandleStateEnum.SEND_FAIL.getCode());sendStateList.add(SendHandleStateEnum.WAITING_CONFIRM.getCode());List<SendMessage> sendMessagesListSelect = sendMessageService.findOverdueOrder(sendStateList, endGmtCreate);if (sendMessagesListSelect.isEmpty()) {return;}// 调用发送执行方法for (SendMessage sendMessage : sendMessagesListSelect) {sendExecute.sendMessage(sendMessage);}}/*** 同步接收记录*/@Scheduled(cron = "0 0/1 * * * ?")public void synReceive() {Calendar calendar = Calendar.getInstance();calendar.add(Calendar.MINUTE, -1);String endGmtCreate = new SimpleDateFormat(DateConstant.DATETIME_FORMAT).format(calendar.getTime());// 查询一分钟之前发送状态为初始List<Integer> receiveStateList = new ArrayList<>();receiveStateList.add(ReceiveHandleStateEnum.INIT.getCode());List<ReceiveMessage> receiveMessageList = receiveMessageService.findOverdueOrder(receiveStateList, endGmtCreate);if (receiveMessageList.isEmpty()) {return;}// 调用接收执行方法for (ReceiveMessage receiveMessage : receiveMessageList) {receiveExecute.execute(receiveMessage.getMessageId(), receiveMessage.getBusinessType());}}
}

业务实现例子

@RunWith(SpringRunner.class)
@SpringBootTest(classes = Starter.class)
public class MainTest {@Autowiredprivate DriveManage driveManage;public String testSend() {return driveManage.send(0L, "order.pay.result", "支付成功");return driveManage.send(1L, "order.create.result", "下单成功");}
}
@Slf4j
@Service
public class Receive1 implements ReceiveHandle {@Overridepublic String getName() {return "order.pay.result";}@Overridepublic void execute(String data) {Home("驱动业务实现{},接收参数{}", getName(), data);Home("订单支付成功");}}
@Slf4j
@Service
public class Receive2 implements ReceiveHandle {@Overridepublic String getName() {return "order.create.result";}@Overridepublic void execute(String data) {Home("驱动业务实现{},接收参数{}", getName(), data);Home("订单下单成");}
}

引用方配置

drive:message:queue:name: topic.transaction.drive.orderroutingKey: order.transaction.orde