一、理论基础
1.1 RocketMQ能用来做什么
消息通讯
消息通讯是最基本也是最为简单的应用。比较典型的一个应用场景就是没有公网IP的情况下,外界服务无法访问接口,可以使用消息队列来订阅事件来实现双向通信。
异步处理
对于处理频繁且不需要即时反馈的场景来讲,RocketMQ具备良好的性能,而且比较优秀的消息堆积处理能力对于异步操作来说也是加分项。
其余功能
比如流量削峰、应用解耦等,具体可看下网上对于该功能的详细讲解,本文不做深入。
1.2 基础概念
-
Topic:主题,一级消息类型,可以配合Tag使用做细致区分,不同类型的消息设置不同Topic
-
Tag:消息标签,二级消息类型,用于进一步区分某个Topic下的消息分类
-
Producer:生产者,发送消息
-
Consumer:消费者,一个消息可以被多个消费者订阅
-
Consumer Group:消费者分组,为了实现集群消费,不同Consumer Group之间消费进度彼此不受影响,一个Consumer Group下包含多个Consumer实例
-
Producer Group:生产者分组,标识发送同一类消息的Producer,通常发送逻辑一致,一个Producer Group可以发送多个Topic消息
1.3 简单说明
GitHub上有一个开源的RocketMQ工具:RocketMQ-Spring
感兴趣的可以研究一下,功能实现很完整。
二、实战代码
2.1 依赖引入
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.7.0</version>
</dependency><!-- 自定义的元数据依赖 -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-configuration-processor</artifactId><optional>true</optional>
</dependency>
2.2 配置项
rocketmq:# rocketmqClient日志路径,默认是系统登录用户的根目录producer:clientLogDir: logs/rocketmq_client# 日志级别clientLogLevel: WARNnamesrvAddr: 127.0.0.1:9876groupName: testretryTimesWhenSendAsyncFailed: 1sendMsgTimeout: 6000brokerName: broker-aconsumer:# 日志级别clientLogLevel: WARNnamesrvAddr: 127.0.0.1:9876groupName: testthreadMax: 20threadMin: 10
备注:对于服务器硬盘不大的机器来讲,一定要记得设置RocketMQ的日志级别和路径等,否则增长极快的日志文件很快就会将你的硬盘塞满。而且如果没有后续的日志搜集与分析需求,很多日志没必要打印。
2.3 配置文件读取
2.3.1 producer
/*** RocketMQ Producer 配置项* @author smile*/
@Component
@ConfigurationProperties(prefix = "rocketmq.producer")
@Data
public class ProducerProperties {private String clientLogDir;private String clientLogLevel;private String namesrvAddr;private String groupName;private int retryTimesWhenSendAsyncFailed;private int sendMsgTimeout;}
####2.3.2 consumer
/*** RocketMQ consumer配置项* @author smile*/
@Component
@ConfigurationProperties(prefix = "rocketmq.consumer")
@Data
public class ConsumerProperties {private String namesrvAddr;private int threadMax;private int threadMin;private String groupName;private String clientLogDir;private String clientLogLevel;
}
2.4 producer初始化
/*** 程序启动时初始化Producer* @author smile*/
@Configuration
@Slf4j
public class ProducerConfig {private final ProducerProperties properties;public ProducerConfig(ProducerProperties properties) {this.properties = properties;}@Beanpublic DefaultMQProducer getRocketMQProducer() throws MQClientException {setClientProperty();DefaultMQProducer producer = new DefaultMQProducer(properties.getGroupName());producer.setNamesrvAddr(properties.getNamesrvAddr());producer.setRetryTimesWhenSendAsyncFailed(properties.getRetryTimesWhenSendAsyncFailed());producer.setSendMsgTimeout(properties.getSendMsgTimeout());producer.start();log.info("*** producer has started! groupName:[{}], namesrvAddr:[{}] ***", properties.getGroupName(), properties.getNamesrvAddr());return producer;}private void setClientProperty() {System.setProperty(ClientLogger.CLIENT_LOG_ROOT, properties.getClientLogDir());System.setProperty(ClientLogger.CLIENT_LOG_LEVEL, properties.getClientLogLevel());}
}
2.5 consumer示例代码
2.5.1 Consumer管理程序
- 本实例基于一个Group的Consumer。简单测试过,未做生产环境的深度测试
- 可实现简单的Consumer初始化、新增订阅、取消订阅功能
/*** @author smile*/
@Slf4j
public class ConsumerManager {private static final ConsumerManager MANAGER = new ConsumerManager();private static final String TAGS_SEP = "||";private static Map<String, String> subscription = new HashMap<String, String>(8) {{// 系统消息:订阅与取消订阅的事件put("sys", "subscribe||unsubscribe");}};private static DefaultMQPushConsumer consumer;/*** 单例,不允许外界主动实例化*/private ConsumerManager() {}public static ConsumerManager getInstance() {return MANAGER;}/*** 初始化Consumer,本示例初始化一个ConsumerGroup* 后续所有的订阅与取消订阅都是在一个consumer实例下进行*/public void initConsumer(ConsumerProperties properties) throws MQClientException {// 设置client日志信息, producer初始化时已配置,此处不再配置
// System.setProperty(ClientLogger.CLIENT_LOG_ROOT, properties.getClientLogDir());
// System.setProperty(ClientLogger.CLIENT_LOG_LEVEL, properties.getClientLogLevel());consumer = new DefaultMQPushConsumer(properties.getGroupName());consumer.setNamesrvAddr(properties.getNamesrvAddr());consumer.setConsumeThreadMax(properties.getThreadMax());consumer.setConsumeThreadMin(properties.getThreadMin());consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);log.info("consumer topic and tags : {}", subscription);for (Map.Entry<String, String> entry : subscription.entrySet()) {consumer.subscribe(entry.getKey(), entry.getValue());}consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {try {MessageExt msg = msgs.get(0);String msgBody = new String(msg.getBody(), "utf-8");log.info("receive message, messageId:[{}], messageBody:{}, topic:[{}], tag:[{}]",msg.getMsgId(), msgBody, msg.getTopic(), msg.getTags());log.info("delay: [{}] ms", (System.currentTimeMillis() - msg.getBornTimestamp()));} catch (UnsupportedEncodingException e) {e.printStackTrace();// 如果执行异常,则稍后会重新消费return ConsumeConcurrentlyStatus.RECONSUME_LATER;}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});consumer.start();log.info("consumer has started! NamesrvAddr:[{}], groupName:[{}]", properties.getNamesrvAddr(), properties.getGroupName());}/*** 订阅新的事件* @param topic topic* @param tags tags,支持多个tag订阅,格式:TagA||TagB* @throws MQClientException*/public void subscribe(String topic, String tags) throws MQClientException {if (subscription.containsKey(topic)) {tags = StringUtils.join(subscription.get(topic), TAGS_SEP, tags);}consumer.subscribe(topic, tags);subscription.put(topic, tags);log.info("!!刷新订阅, {}", subscription);}/*** 取消订阅*/public void unsubscribe(String topic, String tags) throws MQClientException {if (!subscription.containsKey(topic)) {log.error("topic is not found");return;}String[] unsubscribeTags = tags.trim().split("\\|\\|");String[] existingTags = subscription.get(topic).trim().split("\\|\\|");log.info("unsubscribeTags: {}, existingTags: {}", unsubscribeTags, existingTags);StringBuilder newTagsBuilder = new StringBuilder();for (String existingTag : existingTags) {if (!ArrayUtils.contains(unsubscribeTags, existingTag)) {newTagsBuilder.append(existingTag).append(TAGS_SEP);}}if (tags.length() == 0) {consumer.unsubscribe(topic);return;}String newTags = newTagsBuilder.substring(0, newTagsBuilder.length() - 2);log.info("newTags: {}", newTagsBuilder);consumer.subscribe(topic, newTags);subscription.put(topic, newTags);log.info("!!取消订阅,新的订阅列表, {}", subscription);}}
2.5.2 初始化Consumer
/*** @author smile*/
@Component
public class ConsumerInit implements CommandLineRunner {private final ConsumerProperties properties;public ConsumerInit(ConsumerProperties properties) {this.properties = properties;}@Overridepublic void run(String... args) throws Exception {ConsumerManager.getInstance().initConsumer(properties);}
}
三、源码地址
完整示例代码,请移步GitHub