1. MQ介绍
B站视频:黑马程序员RocketMQ系统精讲 P1 - P34
1.1 为什么要用MQ
消息队列是一种”先进先出”的数据结构. Message Queue
其应用场景主要包含以下3个方面
应用解耦
系统的耦合性越高, 容错性就越低。 以电商应用为例,,用户创建订单后,如果耦合调用库存系统,、物流系统,、支付系统, 任何一个子系统出现了故障或者因为升级等原因暂时不可用, 都会造成下单操作异常, 影响用户使用体验。
使用消息队列解耦, 系统的耦合性就降低了。比如物流系统发生故障, 需要几分钟才能修复, 在这段时间内, 物流系统要处理的数据被缓存到消息队列中, 用户下单操作正常完成。 当物流系统恢复后, 补充处理存储在消息队列中的订单消息即可, 终端系统感知不到物流系统发生过几分钟的故障。流量削峰
应用系统如果遇到系统请求流量的瞬间猛增, 有可能将系统压垮。 有了消息队列可以将大量请求缓存起来, 分散到很长时间处理, 这样可以大大提高系统的稳定性和用户体验。
一般情况, 为了保证系统的稳定性, 如果系统负载超过阈值, 就会阻止用户请求, 这会影响用户体验, 而如果使用消息队列将请求缓存起来, 等待系统处理完毕后通知用户下单完毕, 用户下单体验要好。
处于经济考虑的目的:
业务系统正常时间段的QPS如果是1000, 流量最高峰是10000, 为了应对流程高峰配置高性能的服务器不划算, 可以使用消息队列对峰值流量削峰。
数据分发
通过消息队列可以让数据在多个系统之间进行流通, 数据的产生方不需要关心谁来使用这个数据, 只需要将数据发送到消息队列, 数据使用方直接在消息队列中直接获取数据即可。(发布,订阅)
1.2 MQ的优点和缺点
优点: 解耦, 削峰, 数据分发
缺点包含以下几点:
系统可用性降低
系统引入的外部依赖越多, 系统稳定性越差。 一旦MQ宕机, 就会对业务造成影响。如何保证MQ高可用呢?
系统复杂度提高
MQ的加入大大增加了系统的复杂度, 以前系统之间是同步的远程调用(RPC调用), 现在是通过MQ进行异步调用。
如何保证消息没有被重复消费?怎么处理消息丢失情况? 怎么保证消息传递的顺序性?
一致性问题
A系统处理完业务, 通过MQ给B,C,D三个系统发消息数据,如果B系统,C系统处理成功,D系统处理失败。 如果保证消息数据处理的一致性?
1.3 各种MQ产品的比较
常见的MQ产品包括Kafka,ActiveMQ,RabbitMQ,RocketMQ。
特性 | ActiveMQ | RabbitMQ | RocketMQ | Kafka |
---|---|---|---|---|
单机吞吐量 | 万级,吞吐量比RocketMQ和Kafka要低了一个数量级 | 万级,吞吐量比RocketMQ和Kafka要低了一个数量级 | 10万级,RocketMQ也是可以支撑高吞吐的一种MQ | 10万级别,这是kafka最大的优点,就是吞吐量高。 一般配合大数据类的系统来进行实时数据计算、日志采集等场景 |
topic数量对吞吐量的影响 | topic可以达到几百,几千个的级别,吞吐量会有较小幅度的下降 这是RocketMQ的一大优势,在同等机器下,可以支撑大量的topic | topic从几十个到几百个的时候,吞吐量会大幅度下降 所以在同等机器下,kafka尽量保证topic数量不要过多。如果要支撑大规模topic,需要增加更多的机器资源 | ||
时效性 | ms级 | 微秒级,这是rabbitmq的一大特点,延迟是最低的 | ms级 | 延迟在ms级以内 |
可用性 | 高,基于主从架构实现高可用性 | 高,基于主从架构实现高可用性 | 非常高,分布式架构 | 非常高,kafka是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用 |
消息可靠性 | 有较低的概率丢失数据 | 经过参数优化配置,可以做到0丢失 | 经过参数优化配置,消息可以做到0丢失 | |
功能支持 | MQ领域的功能极其完备 | 基于erlang开发,所以并发能力很强,性能极其好,延时很低 | MQ功能较为完善,还是分布式的,扩展性好 | 功能较为简单,主要支持简单的MQ功能,在大数据领域的实时计算以及日志采集被大规模使用,是事实上的标准 |
优劣势总结 | 非常成熟,功能强大,在业内大量的公司以及项目中都有应用 偶尔会有较低概率丢失消息 而且现在社区以及国内应用都越来越少,官方社区现在对ActiveMQ 5.x维护越来越少,几个月才发布一个版本 而且确实主要是基于解耦和异步来用的,较少在大规模吞吐的场景中使用 | erlang语言开发,性能极其好,延时很低; 吞吐量到万级,MQ功能比较完备 而且开源提供的管理界面非常棒,用起来很好用 社区相对比较活跃,几乎每个月都发布几个版本分 在国内一些互联网公司近几年用rabbitmq也比较多一些 但是问题也是显而易见的,RabbitMQ确实吞吐量会低一些,这是因为他做的实现机制比较重。 而且erlang开发,国内有几个公司有实力做erlang源码级别的研究和定制?如果说你没这个实力的话,确实偶尔会有一些问题,你很难去看懂源码,你公司对这个东西的掌控很弱,基本职能依赖于开源社区的快速维护和修复bug。 而且rabbitmq集群动态扩展会很麻烦,不过这个我觉得还好。其实主要是erlang语言本身带来的问题。很难读源码,很难定制和掌控。 | 接口简单易用,而且毕竟在阿里大规模应用过,有阿里品牌保障 日处理消息上百亿之多,可以做到大规模吞吐,性能也非常好,分布式扩展也很方便,社区维护还可以,可靠性和可用性都是ok的,还可以支撑大规模的topic数量,支持复杂MQ业务场景 而且一个很大的优势在于,阿里出品都是java系的,我们可以自己阅读源码,定制自己公司的MQ,可以掌控 社区活跃度相对较为一般,不过也还可以,文档相对来说简单一些,然后接口这块不是按照标准JMS规范走的有些系统要迁移需要修改大量代码 还有就是阿里出台的技术,你得做好这个技术万一被抛弃,社区黄掉的风险,那如果你们公司有技术实力我觉得用RocketMQ挺好的 | kafka的特点其实很明显,就是仅仅提供较少的核心功能,但是提供超高的吞吐量,ms级的延迟,极高的可用性以及可靠性,而且分布式可以任意扩展 同时kafka最好是支撑较少的topic数量即可,保证其超高吞吐量 而且kafka唯一的一点劣势是有可能消息重复消费,那么对数据准确性会造成极其轻微的影响,在大数据领域中以及日志采集中,这点轻微影响可以忽略 这个特性天然适合大数据实时计算以及日志收集 |
2. RocketMQ快速入门
RocketMQ是阿里巴巴2016年开源的MQ中间件, 使用JAVA语言开发, 在阿里内部RocketMQ承受了例如“双11”等高并发场景的消息流转, 能够处理万亿级别的消息。
2.1 准备工作
2.1.1 下载RocketMQ
RocketMQ最新版本: rocketmq-all-4.9.3-source-release.zip
2.1.2 环境要求
LInux64位系统
JDK8 (64位)
源码安装需要安装Maven 3.2.x, 下面是编译源码步骤:
> unzip rocketmq-all-4.9.3-source-release.zip
> cd rocketmq-all-4.9.3/
> mvn -Prelease-all -DskipTests clean install -U
> cd distribution/target/rocketmq-4.9.3/rocketmq-4.9.3
推荐直接下载编译后的bin包rocketmq-all-4.9.3-bin-release.zip
2.2 安装RocketMQ
2.2.1 安装目录
推荐使用二进制包进行安装。
1) 解压安装包
unzip rocketmq-all-4.9.3-bin-release.zip /usr/local/
2)进入安装目录即可。
2.2.2 目录介绍
- bin:启动脚本, 包括shell脚本和cmd脚本
- conf:实例配置文件,包括broker配置文件,logback配置文件等。
- lib:依赖jar包,包括Netty,commons-lang,fastJson等。
[root@centos7-01 rocketmq-4.9.3]# ll
total 48
drwxr-xr-x. 3 root root 21 May 1 21:39 ~
drwxr-xr-x. 2 root root 126 Feb 22 01:25 benchmark
drwxr-xr-x. 3 root root 4096 May 1 20:26 bin
drwxr-xr-x. 7 root root 201 May 19 22:59 conf
drwxr-xr-x. 2 root root 4096 Feb 22 01:25 lib
-rw-r--r--. 1 root root 17327 Feb 22 00:25 LICENSE
-rw-------. 1 root root 5735 May 19 23:04 nohup.out
-rw-r--r--. 1 root root 1338 Feb 22 00:25 NOTICE
-rw-r--r--. 1 root root 6069 Feb 22 00:25 README.md
2.3 启动RocketMQ
- 启动NameServer
# 启动NameServer
nohup sh bin/mqnamesrv &
# 查看启动日志
tail -f ~/logs/rocketmqlogs/namesrv.log
2)启动broker
# 启动broker
nohup sh bin/mqbroker -n localhost:9876 &
# 查看启动日志
tail -f ~/logs/rocketmqlogs/broker.log
问题描述:
RocketMQ默认的虚拟机内存较大, 启动Broker如果因为内存不足失败, 需要编辑如下两个配置文件, 修改JVM初始化内存大小。
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"
修改 runserver.sh

修改 runbroker.sh
修改完初始化内存大小后,重启namesrv,broker。使用 jps 查看。
[root@centos7-01 bin]# jps
2498 BrokerStartup
9923 Jps
2410 NamesrvStartup
2.4 测试RocketMQ
发送消息
# 设置环境变量
export NAMESRV_ADDR=localhost:9876
# 使用安装包的demo发送消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
接收消息
# 设置环境变量
export NAMESRV_ADDR=localhost:9876
# 使用安装包的demo进行消费
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
2.5 关闭RocketMQ
关闭RocketMQ , 先停止broker,再停止namesrv
# 关闭broker
sh bin/mqshutdown broker
# 关闭namesrv
sh bin/mqshutdown namesrv
3. RocketMQ集群搭建
3.1 各角色介绍
- producer:消息的发送者;比如:发信者
- consumer:消息接收者;比如:收信者
- broker:暂存和传输消息;比如:邮局
- NameServer:管理broker;比如:各个邮局的管理机构
- topic:区分消息的种类;一个发送者发送消息给一个或多个topic;一个消息的接收者可以订阅一个或多个topic消息。
- Message Queue:相当于topic的分区, 用于并行发送和接收消息。
3.2 集群搭建方式
3.2.1 集群特点
- NameServer是一个几乎无状态节点, 可集群部署, 节点之间无任何信息同步.
- Broker部署相对复杂, Broker分为Master与Slave, 一个Master可以对应多个Slave, 但是一个Slave只能应对一个Master, Master与Slave的对应关系通过指定相同的BrokerName, 不同的brokerId来定义, BrokerId为0表示Master, 非0表示Slave。Master也可以部署多个, 每个Broker与NameServer集群中的所有节点建立长连接, 定时注册Topic信息到所有NameServer。
- Producer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer取Topic路由信息, 并向提供Topic服务的Master建立长连接, 且定时向Master发送心跳。Producer完全无状态,可集群部署。
- Consumer与NameServer集群中的其中一个节点(随机选择)建立长连接, 定期从NameServer取Topic路由信息, 并向提供Topic服务的Master,Slave建立长连接, 且定时向Master,Slave发送心跳。 Consumer既可以从Master订阅消息, 也可以从Slave订阅消息, 订阅规则由Broker配置决定。
3.2.2 集群模式
1)单Master模式
这种方式的风险较大, 一旦Broker重启或者宕机, 会导致整个服务不可用。不建议线上环境使用, 可以用于本地测试。
2) 多Master模式
一个集群无Slave,全是Master,例如2个Master或3个Master,这种模式的优缺点如下:
- 优点:配置简单,单个Master宕机或重启维护对应用无影响,在磁盘配置为RAID10时,即使机器宕机不可恢复情况下,由于RAID10磁盘非常可靠,消息也不会丢失(异步刷盘丢失少量消息,同步刷盘一条不丢),性能最高;
- 缺点:单台机器宕机期间, 这台机器上未被消费的消息在机器恢复之前不可订阅, 消息实时性会受到影响。
3) 多Master多Slave模式(异步)
每个Master配置一个Slave,有多对Master-Slave,HA采用异步复制方式,主备有短暂消息延迟(毫秒级),这种模式的优缺点如下:
- 优点:即使磁盘损坏,消息丢失的非常少, 且消息实时性不会受到影响,同时Master宕机后,消费者仍然可以从Slave消费,而且此过程对应用透明,不需要人工干预,性能同多Master模式几乎一样。
- 缺点:Master宕机,磁盘损坏情况下会丢失少量消息。
4)多Master多Slave模式(同步)
每个Master配置一个Slave,有多对Master-Slave,HA采用同步双写方式,即只有主备都写成功,才向应用返回成功,这种模式的优缺点如下:
- 优点:数据与服务都无单点故障,Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高;
- 缺点:性能比异步复制模式略低(大约低10%左右),发送单个消息的RT(实时时间)会略高,且目前版本在主节点宕机后,备机不能自动切换为主机。
3.3 双主双从集群搭建
3.3.1 总体架构
消息高可用采用2M-2S(同步双写)方式
3.3.2 集群工作流程
1)启动NameServer, NameServer启起来后监听端口, 等待Broker,Producer,Consumer连上来,相当于一个路由控制中心。
2)Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。 注册成功后,NameServer集群中就有Topic和Broker的映射关系。
3)收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。
4)Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存储在哪些Broker上,轮询从队列列表中选择一个队列, 然后与队列所在的Broker建立长连接,从而向Broker发送消息。
5)Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存储在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。
3.3.3 服务器环境
序号 | IP | 角色 | 架构模式 |
---|---|---|---|
1 | 192.168.65.129 | NameServer, brokerServer | Master1, Slave2 |
2 | 192.168.65.130 | NameServer, brokerServer | Master2, Slave1 |
3.3.4 Host添加信息
vim /etc/hosts
配置如下:
# nameserver
192.168.65.129 rocketmq-nameserver1
192.168.65.130 rocketmq-nameserver2
# broker
192.168.65.129 rocketmq-master1
192.168.65.129 rocketmq-slave2
192.168.65.130 rocketmq-master2
192.168.65.130 rocketmq-slave1
配置完成后,重启网卡
systemctl restart network
3.3.5 防火墙配置
宿主机需要远程访问虚拟机的rocketmq服务和web服务, 需要开放相关的端口号, 简单粗暴的方式是直接关闭防火墙.
# 关闭防火墙
systemctl stop firewalld.service
# 查看防火墙的状态
firewall-cmd --state
# 禁止firewall开机启动
systemctl disable firewalld.service
或为了安全, 只开放特定的端口号, RocketMQ默认使用的3个端口: 9876、10911、11011
。如果防火墙没有关闭, 那么防火墙就必须开放这些端口:
- nameserver默认使用9876端口
- master默认使用10911端口
- slave默认使用11011端口
执行以下命令:
# 开放nameserver默认端口
firewall-cmd --permanent --zone=public --add-port=9876/tcp
# 移除端口访问权限
firewall-cmd --permanent --remove-port=9876/tcp
# 开放master默认端口
firewall-cmd --permanent --zone=public --add-port=10911/tcp
# 开放slave默认端口(当前集群模式下可以不开启)
firewall-cmd --permanent --zone=public --add-port=11011/tcp
# 重启防火墙
firewall-cmd --reload
# 查看端口开放情况
firewall-cmd --permanent --list-ports
3.3.6 环境变量配置
vim /etc/profile
在profile文件的末尾加入如下命令:
# set rocketmq
ROCKETMQ_HOME=/usr/local/rocketmq-4.9.3
PATH=$PATH:$ROCKETMQ_HOME/bin
export ROCKETMQ_HOME PATH
输入:wq!保存退出, 并生效修改的配置:
source /etc/profile
3.3.7 创建消息存储路径
如果没有创建消息的存储路径, 会默认在当前用户的home目录下生成store目录。
[root@centos7-01 store]# pwd
/root/store
[root@centos7-01 store]# ll
total 8
-rw-r--r--. 1 root root 0 May 19 23:04 abort
-rw-r--r--. 1 root root 4096 May 29 22:53 checkpoint
drwxr-xr-x. 2 root root 34 May 1 20:36 commitlog
drwxr-xr-x. 2 root root 280 May 29 22:54 config
drwxr-xr-x. 12 root root 210 May 17 21:24 consumequeue
drwxr-xr-x. 2 root root 31 May 19 20:49 index
-rw-r--r--. 1 root root 4 May 19 23:04 lock
也可以手动创建消息存储路径,指定到该目录下。
mkdir -p /usr/local/rocketmq-4.9.3/store
mkdir -p /usr/local/rocketmq-4.9.3/store/commitlog
mkdir -p /usr/local/rocketmq-4.9.3/store/consumequeue
mkdir -p /usr/local/rocketmq-4.9.3/store/index
3.3.8 broker配置文件
3.3.8.1 Master1
服务器: 192.168.65.129
# 2m-2s-sync是同步双写 2m-2s-async是异步复制 2m-noslave 双Master无Slave
vi /usr/local/rocketmq-4.9.3/conf/2m-2s-sync/broker-a.properties
broker-a.properties 修改配置如下:
# 所属集群名字
brokerClusterName=rocketmq-cluster
# broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
# 0 表示Master,>0表示Slave
brokerId=0
# nameServer地址,分号分隔
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
# 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
# 是否允许Broker自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
# 是否允许Broker自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
# Broker对外服务的监听端口,即Broker与producer与consumer通信的端口
listenPort=10911
# 删除文件时间点,默认凌晨4点
deleteservedTime=120
# commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
# ConsumeQueue每个文件默认存30w条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
# 检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
# 存储路径
storePathRootDir=/usr/local/rocketmq-4.9.3/store
# commitLog存储路径
storePathCommitLog=/usr/local/rocketmq-4.9.3/store/commitlog
# 消息队列存储路径
storePathConsumeQueue=/usr/local/rocketmq-4.9.3/store/consumequeue
# 消息索引存储路径
storePathIndex=/usr/local/rocketmq-4.9.3/store/index
# checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq-4.9.3/store/checkpoint
# abort文件存储路径
abortFile=/usr/local/rocketmq-4.9.3/store/abort
# 限制的消息大小
maxMessageSize=65536
# flushCommitLogLeastPages=4
# flushConsumeQueueLeastPages=2
# flushCommitLogThoroughInterval=10000
# flushConsumeQueueThoroughInterval=60000
# Broker的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SYNC_MASTER
# 刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
# checkTransactionMessageEnable=false
# 发送消息线程池
# sendMessageThreadPoolNums=128
# 拉消息线程池数量
# pullMessageThreadPoolNums=128
3.3.8.2 Slave2
服务器: 192.168.65.129
# 2m-2s-sync是同步双写 2m-2s-async是异步复制 2m-noslave 双Master无Slave
vi /usr/local/rocketmq-4.9.3/conf/2m-2s-sync/broker-b-s.properties
broker-b-s.properties 配置文件内容如下:
# 所属集群名字
brokerClusterName=rocketmq-cluster
# broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-b
# 0 表示Master,>0表示Slave
brokerId=1
# nameServer地址,分号分隔
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
# 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
# 是否允许Broker自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
# 是否允许Broker自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
# Broker对外服务的监听端口,即Broker与producer与consumer通信的端口
listenPort=11011
# 删除文件时间点,默认凌晨4点
deleteservedTime=120
# commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
# ConsumeQueue每个文件默认存30w条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
# 检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
# 存储路径
storePathRootDir=/usr/local/rocketmq-4.9.3/store
# commitLog存储路径
storePathCommitLog=/usr/local/rocketmq-4.9.3/store/commitlog
# 消息队列存储路径
storePathConsumeQueue=/usr/local/rocketmq-4.9.3/store/consumequeue
# 消息索引存储路径
storePathIndex=/usr/local/rocketmq-4.9.3/store/index
# checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq-4.9.3/store/checkpoint
# abort文件存储路径
abortFile=/usr/local/rocketmq-4.9.3/store/abort
# 限制的消息大小
maxMessageSize=65536
# flushCommitLogLeastPages=4
# flushConsumeQueueLeastPages=2
# flushCommitLogThoroughInterval=10000
# flushConsumeQueueThoroughInterval=60000
# Broker的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
# 刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
# checkTransactionMessageEnable=false
# 发送消息线程池
# sendMessageThreadPoolNums=128
# 拉消息线程池数量
# pullMessageThreadPoolNums=128
3.3.8.3 Master2
服务器: 192.168.65.130
# 2m-2s-sync是同步双写 2m-2s-async是异步复制 2m-noslave 双Master无Slave
vi /usr/local/rocketmq-4.9.3/conf/2m-2s-sync/broker-b.properties
broker-b.properties 配置内容如下:
# 所属集群名字
brokerClusterName=rocketmq-cluster
# broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-b
# 0 表示Master,>0表示Slave
brokerId=0
# nameServer地址,分号分隔
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
# 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
# 是否允许Broker自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
# 是否允许Broker自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
# Broker对外服务的监听端口,即Broker与producer与consumer通信的端口
listenPort=10911
# 删除文件时间点,默认凌晨4点
deleteservedTime=120
# commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
# ConsumeQueue每个文件默认存30w条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
# 检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
# 存储路径
storePathRootDir=/usr/local/rocketmq/store
# commitLog存储路径
storePathCommitLog=/usr/local/rocketmq-4.9.3/store/commitlog
# 消息队列存储路径
storePathConsumeQueue=/usr/local/rocketmq-4.9.3/store/consumequeue
# 消息索引存储路径
storePathIndex=/usr/local/rocketmq-4.9.3/store/index
# checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq-4.9.3/store/checkpoint
# abort文件存储路径
abortFile=/usr/local/rocketmq-4.9.3/store/abort
# 限制的消息大小
maxMessageSize=65536
# flushCommitLogLeastPages=4
# flushConsumeQueueLeastPages=2
# flushCommitLogThoroughInterval=10000
# flushConsumeQueueThoroughInterval=60000
# Broker的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SYNC_MASTER
# 刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
# checkTransactionMessageEnable=false
# 发送消息线程池
# sendMessageThreadPoolNums=128
# 拉消息线程池数量
# pullMessageThreadPoolNums=128
3.3.8.4 Slave1
服务器: 192.168.65.130
# 2m-2s-sync是同步双写 2m-2s-async是异步复制 2m-noslave 双Master无Slave
vi /usr/local/rocketmq-4.9.3/conf/2m-2s-sync/broker-a-s.properties
broker-a-s.properties 配置内容如下:
# 所属集群名字
brokerClusterName=rocketmq-cluster
# broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
# 0 表示Master,>0表示Slave
brokerId=1
# nameServer地址,分号分隔
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
# 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
# 是否允许Broker自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
# 是否允许Broker自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
# Broker对外服务的监听端口,即Broker与producer与consumer通信的端口
listenPort=11011
# 删除文件时间点,默认凌晨4点
deleteservedTime=120
# commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
# ConsumeQueue每个文件默认存30w条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
# 检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
# 存储路径
storePathRootDir=/usr/local/rocketmq-4.9.3/store
# commitLog存储路径
storePathCommitLog=/usr/local/rocketmq-4.9.3/store/commitlog
# 消息队列存储路径
storePathConsumeQueue=/usr/local/rocketmq-4.9.3/store/consumequeue
# 消息索引存储路径
storePathIndex=/usr/local/rocketmq-4.9.3/store/index
# checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq-4.9.3/store/checkpoint
# abort文件存储路径
abortFile=/usr/local/rocketmq-4.9.3/store/abort
# 限制的消息大小
maxMessageSize=65536
# flushCommitLogLeastPages=4
# flushConsumeQueueLeastPages=2
# flushCommitLogThoroughInterval=10000
# flushConsumeQueueThoroughInterval=60000
# Broker的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
# 刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
# checkTransactionMessageEnable=false
# 发送消息线程池
# sendMessageThreadPoolNums=128
# 拉消息线程池数量
# pullMessageThreadPoolNums=128
3.3.9 修改启动脚本文件
runbroker.sh
vi /usr/local/rocketmq-4.9.3/bin/runbroker.sh
需要根据内存大小对JVM参数进行进行适当的调整:
# 开发环境配置 JVM Configuration
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"
runserver.sh
vi /usr/local/rocketmq-4.9.3/bin/runserver.sh
配置内容:
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
3.3.10 服务启动
3.3.10.1 启动NameServer集群
分别在192.168.65.129和192.168.65.130上启动NameServer
cd /usr/local/rocketmq-4.9.3/bin
nohup sh mqnamesrv &
3.3.10.2 启动Broker集群
- 在192.168.65.129上启动Master1和Slave2.
Master1:
cd /usr/local/rocketmq-4.9.3/bin
nohup sh mqbroker -c /usr/local/rocketmq-4.9.3/conf/2m-2s-sync/broker-a.properties &
Slave2:
cd /usr/local/rocketmq-4.9.3/bin
nohup sh mqbroker -c /usr/local/rocketmq-4.9.3/conf/2m-2s-sync/broker-b-s.properties &
- 在192.168.65.130上启动Master2和Slave1.
Master2:
cd /usr/local/rocketmq-4.9.3/bin
nohup sh mqbroker -c /usr/local/rocketmq-4.9.3/conf/2m-2s-sync/broker-b.properties &
Slave1:
cd /usr/local/rocketmq-4.9.3/bin
nohup sh mqbroker -c /usr/local/rocketmq-4.9.3/conf/2m-2s-sync/broker-a-s.properties &
3.3.11 查看进程状态
启动后通过jps
查看启动进程
[root@centos7-02 2m-2s-sync]# jps
2117 NamesrvStartup
2261 BrokerStartup
2375 BrokerStartup
2398 Jps
3.3.12 查看日志
# 查看nameserver日志
tail -500f ~/logs/rocketmqlogs/namesrv.log
# 查看broker日志
tail -500f ~/logs/rocketmqlogs/broker.log
挑选130机器, 查看broker的启动日志
# broker成功注册到nameserver1,nameserver2
2022-05-31 22:24:58 INFO brokerOutApi_thread_2 - register broker[1]to name server rocketmq-nameserver1:9876 OK
2022-05-31 22:24:58 INFO brokerOutApi_thread_1 - register broker[1]to name server rocketmq-nameserver2:9876 OK
2022-05-31 22:24:58 INFO main - The broker[broker-a, 192.168.65.130:11011] boot success. serializeType=JSON and name server is rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
# 从Master同步主题配置到slave
2022-05-31 22:25:01 INFO BrokerControllerScheduledThread1 - Update slave topic config from master, 192.168.65.129:10911
2022-05-31 22:25:01 INFO BrokerControllerScheduledThread1 - Update slave consumer offset from master, 192.168.65.129:10911
2022-05-31 22:25:01 INFO BrokerControllerScheduledThread1 - Update slave delay offset from master, 192.168.65.129:10911
2022-05-31 22:25:01 INFO BrokerControllerScheduledThread1 - Update slave Subscription Group from master, 192.168.65.129:10911
2022-05-31 22:25:08 INFO BrokerControllerScheduledThread1 - dispatch behind commit log 0 bytes
2022-05-31 22:25:08 INFO brokerOutApi_thread_3 - register broker[1]to name server rocketmq-nameserver2:9876 OK
2022-05-31 22:25:08 INFO brokerOutApi_thread_4 - register broker[1]to name server rocketmq-nameserver1:9876 OK
2022-05-31 22:25:11 INFO BrokerControllerScheduledThread1 - Update slave consumer offset from master, 192.168.65.129:10911
2022-05-31 22:25:11 INFO BrokerControllerScheduledThread1 - Update slave delay offset from master, 192.168.65.129:10911
2022-05-31 22:25:18 INFO brokerOutApi_thread_1 - register broker[0]to name server rocketmq-nameserver2:9876 OK
2022-05-31 22:25:18 INFO brokerOutApi_thread_2 - register broker[0]to name server rocketmq-nameserver1:9876 OK
2022-05-31 22:25:21 INFO BrokerControllerScheduledThread1 - Update slave consumer offset from master, 192.168.65.129:10911
2022-05-31 22:25:21 INFO BrokerControllerScheduledThread1 - Update slave delay offset from master, 192.168.65.129:10911
2022-05-31 22:25:31 INFO BrokerControllerScheduledThread1 - Update slave consumer offset from master, 192.168.65.129:10911
2022-05-31 22:25:31 INFO BrokerControllerScheduledThread1 - Update slave delay offset from master, 192.168.65.129:10911
2022-05-31 22:25:38 INFO TransactionalMessageCheckService - create new topic TopicConfig [topicName=RMQ_SYS_TRANS_HALF_TOPIC, readQueueNums=1, writeQueueNums=1, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false]
3.4 mqadmin管理工具
3.4.1 使用方式
进入RocketMQ安装位置, 在bin目录下执行./mqadmin {command} {args}
3.4.2 命令介绍
3.4.2.1 Topic相关
名称 | 含义 | 命令选项 | 说明 |
updateTopic | 创建新Topic配置 | -b | Broker地址,表示topic所在Broker,只支持单台Broker,地址为ip:port |
-c | cluster名称,表示topic所在集群(集群可通过clusterList查询) | ||
-h | 打印帮助 | ||
-n | NameServer服务地址,格式ip:port | ||
-p | 指定新topic的读写权限(W=2|R=4|WR=6) | ||
-r | 可读队列数(默认为8) | ||
-w | 可写队列数(默认为8) | ||
-t | topic名称(名称只能使用字符^[a-zA-Z0-9_-]+$) | ||
deleteTopic | 删除Topic | -c | cluster名称,表示删除某集群下的某个topic(集群可通过clusterList查询) |
-h | 打印帮助 | ||
-n | NameServer服务地址,格式ip:port | ||
-t | topic名称(名称只能使用字符^[a-zA-Z0-9_-]+$) | ||
topicList | 查看Topic列表信息 | -h | 打印帮助 |
-c | 不配置-c,只返回topic列表,增加-c返回clusterName,topic,consumerGroup信息,即topic的所属集群和订阅关系,没有参数 | ||
-n | NameServer服务地址,格式ip:port | ||
topicRoute | 查看Topic路由信息 | -t | topic名称 |
-h | 打印帮助 | ||
-n | NameServer服务地址,格式ip:port | ||
topicStatus | 查看topic消息队列offset | -t | topic名称 |
-h | 打印帮助 | ||
-n | NameServer服务地址,格式ip:port | ||
topicClusterList | 查看Topic所在集群列表 | -t | topic名称 |
-h | 打印帮助 | ||
-n | NameServer服务地址,格式ip:port | ||
updateTopicPerm | 更新Topic读写权限 | -t | topic名称 |
-h | 打印帮助 | ||
-n | NameServer服务地址,格式ip:port | ||
-b | Broker地址,表示topic所在Broker,只支持单台Broker,地址为ip:port | ||
-p | 指定新topic的读写权限(W=2|R=4|WR=6) | ||
-c | cluster名称,表示topic所在集群(集群可通过clusterList查询),-b优先,如果没有-b,则对集群中所有Broker执行命令 | ||
updateOrderConf | 从NameServer上创建,删除,获取特定命名空间的kv配置,目前还未启用 | -h | 打印帮助 |
-n | NameServer服务地址,格式ip:port | ||
-t | topic, 键 | ||
-v | orderConf,值 | ||
-m | method, 可选get,put,delete | ||
allocateMQ | 以平均负载算法计算消费者列表负载消息队列的负载结果 | -t | topic名称 |
-h | 打印帮助 | ||
-n | NameServer服务地址,格式ip:port |
3.4.2.2 集群相关
名称 | 含义 | 命令选项 | 说明 |
clusterList | 查看集群信息,集群,BrokerName,BrokerId,TPS等信息 | -m | 打印更多信息(增加打印出如下信息#InotalYest,#OutTotalYest,#InTotalToday,#OutTotalToday |
-h | 打印帮助 | ||
-n | NameServer服务地址,格式ip:port | ||
-i | 打印间隔,单位秒 | ||
culsterRT | 发送消息检测集群各BrokerRT. 消息发往$(BrokerName) Topic | -a | amountj, 每次探测的总数,RT=总时间/amount |
-s | 消息大小,单位B | ||
-c | 探测哪个集群 | ||
-p | 是否打印格式化日志,以|分割,默认不打印 | ||
-h | 打印帮助 | ||
-m | 所属机房,打印使用 | ||
-i | 发送间隔,单位秒 | ||
-n | NameServer服务地址,格式ip:port |
3.4.2.3 Broker相关
名称 | 含义 | 命令选项 | 说明 |
3.4.2.4 消息相关
名称 | 含义 | 命令选项 | 说明 |
3.4.2.5 消费者,消费组相关
名称 | 含义 | 命令选项 | 说明 |
3.4.2.6 连接相关
名称 | 含义 | 命令选项 | 说明 |
consumerConnection | 查询Consumer的网络连接 | -g | 消费者所属组名 |
-n | NameServer服务地址,格式ip:port | ||
-h | 打印帮助 | ||
producerConnection | 查询Producer的网络连接 | -g | 生产者所属组名 |
-t | 主题名称 | ||
-n | NameServer服务地址,格式ip:port | ||
-h | 打印帮助 |
3.4.2.7 NameServer相关
名称 | 含义 | 命令选项 | 说明 |
updateKvConfig | 更新NameServer的kv配置,目前还未使用 | -s | 命名空间 |
-k | key | ||
-v | value | ||
-n | NameServer服务地址,格式ip:port | ||
-h | 打印帮助 | ||
deleteKvConfig | 删除Nameserver的kv配置 | -s | 命名空间 |
-k | key | ||
-n | NameServer服务地址,格式ip:port | ||
-h | 打印帮助 | ||
getNameSrvConfig | 获取NameServer配置 | -n | NameServer服务地址,格式ip:port |
-h | 打印帮助 | ||
updateKvConfig | 修改Nameserver的kv配置 | -n | NameServer服务地址,格式ip:port |
-k | key | ||
-v | value | ||
-h | 打印帮助 | ||
getNameSrvConfig | 获取NameServer配置 | -n | NameServer服务地址,格式ip:port |
-h | 打印帮助 |
3.4.2.8 其他
名称 | 含义 | 命令选项 | 说明 |
startMonitoring | 开启监控进程,监控消息误删,重试队列消息数等 | -n | NameServer服务地址,格式ip:port |
-h | 打印帮助 |
3.4.3 注意事项
- 几乎所有命令都需要配置-n表示NameServer地址,格式为ip:port
- 几乎所有命令都可以通过-h获取帮助
- 如果既有Broker地址(-b)配置项,又有clusterName(-c)配置项,则优先以Broker地址执行命令;如果不配置Broker地址,则对集群中所有主机执行命令。
3.5 集群监控平台搭建
3.5.1 概述
RocketMQ
有一个对其扩展的开源项目incubactor-rocketmq-externals, 这个项目中有一个子模块叫rocketmq-console
, 这个便是管理控制台项目,先将incubactor-rocketmq-externals拉到本地,因为我们需要自己对rocketmq-console
进行编译打包运行。
3.5.2 下载并编译打包
git clone https://github.com/apache/rocketmq-externals
cd rocketmq-console
mvn clean package -Dmaven.test.skip=true
注意:打包前在rocketmq-console
的application.properties
中配置namesrv
集群地址:
rocketmq.config.namesrvAddr=92.168.65.129:9876;92.168.65.130:9876
启动rocketmq-console:
java -jar rocketmq-console-ng-2.0.0.jar
启动成功后, 我们就可以通过浏览器访问http://localhost:8080
进入控制台界面了, 如下图:
4. RocketMQ消息案例
4.1 消息生产/消费分析
导入MQ客户端依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.3</version>
</dependency>
消息生产者步骤:
- 创建消息生产者producer,并指定生产者组名
- 指定nameserver地址
- 启动producer
- 创建消息对象,指定主题Topic,Tag和消息体
- 发送消息
- 关闭生产者producer
消息消费者步骤:
- 创建消费者Consumer, 指定消费者组名
- 指定NameServer地址
- 订阅主题Topic和Tag
- 设置回调函数,处理消息
- 启动消费者Consumer
4.2 消息发送
4.2.1 发送同步消息
这种同步的发送方式使用比较广泛,比如: 重要的消息通知,短信通知。
package com.crysw.bootrocketmq.base;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
/**
* 描述:发送同步消息
* @author crysw
* @date 2022/6/4 19:46
* @version 1.0
*/
public class SyncProducer {
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("producerGroupNm1");
// 设置NameServer地址
producer.setNamesrvAddr("rocketmq-nameserver1:9876;rocketmq-nameserver2:9876");
// 启动Producer
producer.start();
for (int i = 0; i < 10; i++) {
// 创建消息,并指定topic,tag和消息体
byte[] body = ("Hello RocketMQ +" + i).getBytes();
Message message = new Message("base", "Tag1", body);
// 发送消息到一个Broker
SendResult sendResult = producer.send(message);
// 通过sendResult返回消息判断是否成功送达
// System.out.printf("%s%n", sendResult);
// 发送状态
SendStatus status = sendResult.getSendStatus();
// 消息ID
String msgId = sendResult.getMsgId();
// 消息接收队列ID
int queueId = sendResult.getMessageQueue().getQueueId();
System.out.printf("发送状态: %s, 消息ID: %s , 队列: %s %n", status.toString(), msgId, queueId);
}
// 关闭Producer
producer.shutdown();
}
}
测试结果:
发送状态: SLAVE_NOT_AVAILABLE, 消息ID: 7F000001559C18B4AAC213C6DFF00000 , 队列: 0
发送状态: SLAVE_NOT_AVAILABLE, 消息ID: 7F000001559C18B4AAC213C6E0080001 , 队列: 1
发送状态: SLAVE_NOT_AVAILABLE, 消息ID: 7F000001559C18B4AAC213C6E0100002 , 队列: 2
发送状态: SLAVE_NOT_AVAILABLE, 消息ID: 7F000001559C18B4AAC213C6E0190003 , 队列: 3
发送状态: SLAVE_NOT_AVAILABLE, 消息ID: 7F000001559C18B4AAC213C6E0210004 , 队列: 0
发送状态: SLAVE_NOT_AVAILABLE, 消息ID: 7F000001559C18B4AAC213C6E02E0005 , 队列: 1
发送状态: SLAVE_NOT_AVAILABLE, 消息ID: 7F000001559C18B4AAC213C6E0340006 , 队列: 2
发送状态: SLAVE_NOT_AVAILABLE, 消息ID: 7F000001559C18B4AAC213C6E03C0007 , 队列: 3
发送状态: SLAVE_NOT_AVAILABLE, 消息ID: 7F000001559C18B4AAC213C6E0420008 , 队列: 0
4.2.2 发送异步消息
异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间的等待Broker的响应。
package com.crysw.bootrocketmq.base;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.util.concurrent.TimeUnit;
/**
* 描述:发送异步消息
* @author crysw
* @date 2022/6/4 20:18
* @version 1.0
*/
public class AsyncProducer {
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException {
//- 创建消息生产者producer,并指定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("producerGroupNm1");
//- 指定nameserver地址
producer.setNamesrvAddr("rocketmq-nameserver1:9876;rocketmq-nameserver2:9876");
//- 启动producer
producer.start();
// 设置失败后的重试次数
producer.setRetryTimesWhenSendAsyncFailed(0);
//- 创建消息对象,指定主题Topic,Tag和消息体
for (int i = 0; i < 10; i++) {
byte[] body = ("Hello World " + i).getBytes();
Message msg = new Message("base", "Tag2", body);
//- 发送异步消息,SendCallback接收异步返回结果的回调
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
// 发送成功后的回调结果
System.out.println(sendResult);
}
@Override
public void onException(Throwable throwable) {
// 发送失败的回调结果
System.out.println("发送失败: " + throwable.getMessage());
}
});
}
//- 关闭生产者producer
TimeUnit.SECONDS.sleep(5);
producer.shutdown();
}
}
测试结果:
SendResult [sendStatus=SLAVE_NOT_AVAILABLE, msgId=7F00000152E418B4AAC213DFE34B0000, offsetMsgId=C0A8418100002A9F0000000000007122, messageQueue=MessageQueue [topic=base, brokerName=broker-a, queueId=1], queueOffset=26]
SendResult [sendStatus=SLAVE_NOT_AVAILABLE, msgId=7F00000152E418B4AAC213DFE34E0005, offsetMsgId=C0A8418200002A9F0000000000006FA0, messageQueue=MessageQueue [topic=base, brokerName=broker-b, queueId=1], queueOffset=24]
SendResult [sendStatus=SLAVE_NOT_AVAILABLE, msgId=7F00000152E418B4AAC213DFE35C0006, offsetMsgId=C0A8418100002A9F0000000000007402, messageQueue=MessageQueue [topic=base, brokerName=broker-a, queueId=3], queueOffset=25]
SendResult [sendStatus=SLAVE_NOT_AVAILABLE, msgId=7F00000152E418B4AAC213DFE34D0002, offsetMsgId=C0A8418100002A9F000000000000734A, messageQueue=MessageQueue [topic=base, brokerName=broker-a, queueId=1], queueOffset=27]
// .......
4.2.3 发送单向消息
主要用在不特别关心发送结果的场景,例如日志发送。
package com.crysw.bootrocketmq.base;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
/**
* 描述:发送单向消息
* @author crysw
* @date 2022/6/4 20:40
* @version 1.0
*/
public class OneWayProducer {
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("producerGroupNm1");
producer.setNamesrvAddr("rocketmq-nameserver1:9876;rocketmq-nameserver2:9876");
producer.start();
for (int i = 0; i < 10; i++) {
//- 创建消息对象,指定主题Topic,Tag和消息体
byte[] body = ("one way msg" + i).getBytes();
Message message = new Message("base", "Tag3", body);
// 发送单向消息
producer.sendOneway(message);
}
producer.shutdown();
}
}
4.3 消息消费
两种消费模式
public enum MessageModel {
// 广播模式
BROADCASTING("BROADCASTING"),
// 负载均衡模式(集群)
CLUSTERING("CLUSTERING");
}
4.3.1 负载均衡模式
消费者采用负载均衡方式进行消费消息, 多个消费者共同消费队列消息, 每个消费者处理的消息不同。
package com.crysw.bootrocketmq.base;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import java.util.List;
/**
* 描述:消费者
* @author crysw
* @date 2022/6/4 20:50
* @version 1.0
*/
public class MyConsumer {
public static void main(String[] args) throws MQClientException {
//- 创建消费者Consumer, 指定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroupNm1");
//- 指定NameServer地址
consumer.setNamesrvAddr("rocketmq-nameserver1:9876;rocketmq-nameserver2:9876");
//- 订阅主题Topic和Tag
consumer.subscribe("base", "Tag1 || Tag2");
// 设置负载均衡模式消费, 默认也是负载均衡模式
consumer.setMessageModel(MessageModel.CLUSTERING);
//- 注册监听器, 设置回调函数,处理消息
consumer.registerMessageListener((List<MessageExt> messageExts, ConsumeConcurrentlyContext context) -> {
messageExts.forEach(System.out::println);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
//- 启动消费者Consumer
consumer.start();
}
}
测试结果:
MessageExt [brokerName=broker-b, queueId=1, storeSize=189, queueOffset=2, sysFlag=0, bornTimestamp=1654344600741, bornHost=/192.168.65.1:55630, storeTimestamp=1654344600836, storeHost=/192.168.65.130:10911, msgId=C0A8418200002A9F0000000000002C81, commitLogOffset=11393, bodyCRC=1417976316, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='base', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=26, CONSUME_START_TIME=1654347508837, UNIQ_KEY=7F000001559C18B4AAC213C6E0A50015, CLUSTER=rocketmq-cluster, TAGS=Tag1}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 43, 50, 49], transactionId='null'}]
// ....
4.3.2 广播模式
消费者采用广播模式进行消费消息,每个消费者消费的消息都是相同的。
consumer.setMessageModel(MessageModel.BROADCASTING);
4.4 顺序消息
消息有序指的是可以按照消息的发送顺序来消费(FIFO),RocketMQ可以严格的保证消息有序,可以分为分区有序
和全局有序
。
顺序消费的原理解析, 在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取消息, 则就保证了顺序。
当发送和消费参与的queue只有一个,则是全局有序。
如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。
下面用订单进行分区有序的示例。一个订单的顺序流程是:创建,付款,推送,完成。 订单号相同的消息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列。
4.4.1 顺序消息生产
首先构建订单信息类
package com.crysw.bootrocketmq.order;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.ArrayList;
import java.util.List;
/**
* 描述:订单
* @author crysw
* @date 2022/6/4 21:42
* @version 1.0
*/
@NoArgsConstructor
@AllArgsConstructor
@Data
@Builder
public class OrderStep {
/**
* 订单ID
*/
private long orderId;
/**
* desc
*/
private String desc;
// 模拟订单信息
public static List<OrderStep> buildOrders() {
List<OrderStep> orderStepList = new ArrayList<>();
// 1039L
orderStepList.add(OrderStep.builder()
.orderId(1039L)
.desc("创建")
.build());
orderStepList.add(OrderStep.builder()
.orderId(1039L)
.desc("付款")
.build());
orderStepList.add(OrderStep.builder()
.orderId(1039L)
.desc("推送")
.build());
orderStepList.add(OrderStep.builder()
.orderId(1039L)
.desc("完成")
.build());
// 1065L
orderStepList.add(OrderStep.builder()
.orderId(1065L)
.desc("创建")
.build());
orderStepList.add(OrderStep.builder()
.orderId(1065L)
.desc("付款")
.build());
orderStepList.add(OrderStep.builder()
.orderId(1065L)
.desc("推送")
.build());
orderStepList.add(OrderStep.builder()
.orderId(1065L)
.desc("完成")
.build());
// 7235L
orderStepList.add(OrderStep.builder()
.orderId(7235L)
.desc("创建")
.build());
orderStepList.add(OrderStep.builder()
.orderId(7235L)
.desc("付款")
.build());
orderStepList.add(OrderStep.builder()
.orderId(7235L)
.desc("推送")
.build());
orderStepList.add(OrderStep.builder()
.orderId(7235L)
.desc("完成")
.build());
return orderStepList;
}
}
编写订单生产者
package com.crysw.bootrocketmq.order;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.util.List;
/**
* 描述:订单生产者
* @author crysw
* @date 2022/6/4 21:50
* @version 1.0
*/
public class Producer {
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
DefaultMQProducer producer = new DefaultMQProducer("producerGroup1");
producer.setNamesrvAddr("rocketmq-nameserver1:9876;rocketmq-nameserver2:9876");
producer.start();
// 构建消息集合
List<OrderStep> orderSteps = OrderStep.buildOrders();
// 发送消息
for (int i = 0; i < orderSteps.size(); i++) {
byte[] body = orderSteps.get(i).toString().getBytes();
Message msg = new Message("OrderTopic", "Order", "key" + i, body);
/**
* 参数一:消息对象
* 参数二:消息队列的选择器
* 参数三:选择队列的业务标识(订单ID)
*/
SendResult sendResult = producer.send(msg, (List<MessageQueue> mqs, Message message, Object arg) -> {
/**
* lambda表达式
* 参数一: 消息队列集合
* 参数二: 消息对象
* 参数三:业务标识的参数
*/
long orderId = (long) arg;
// 相同的订单ID对消息队列取模后,会将相同订单周期中的不同节点由同一个消息队列进行发送,保证消息顺序性
int index = (int) orderId % mqs.size();
return mqs.get(index);
},
// 将业务唯一标识订单ID传递给arg, 后面通过订单ID和消息队列集合大小取模决定生产者消息推送的队列
orderSteps.get(i).getOrderId());
System.out.println("发送结果:" + sendResult);
}
producer.shutdown();
}
}
生产者测试日志:
// 1039L
发送结果:SendResult [sendStatus=SLAVE_NOT_AVAILABLE, msgId=7F0000013A4818B4AAC22E3BD1450000, offsetMsgId=C0A8418200002A9F0000000000007F4C, messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-b, queueId=3], queueOffset=13]
发送结果:SendResult [sendStatus=SLAVE_NOT_AVAILABLE, msgId=7F0000013A4818B4AAC22E3BD19D0001, offsetMsgId=C0A8418200002A9F000000000000802C, messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-b, queueId=3], queueOffset=14]
发送结果:SendResult [sendStatus=SLAVE_NOT_AVAILABLE, msgId=7F0000013A4818B4AAC22E3BD1A30002, offsetMsgId=C0A8418200002A9F000000000000810C, messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-b, queueId=3], queueOffset=15]
发送结果:SendResult [sendStatus=SLAVE_NOT_AVAILABLE, msgId=7F0000013A4818B4AAC22E3BD1A90003, offsetMsgId=C0A8418200002A9F00000000000081EC, messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-b, queueId=3], queueOffset=16]
// 1065L
发送结果:SendResult [sendStatus=SLAVE_NOT_AVAILABLE, msgId=7F0000013A4818B4AAC22E3BD1B40004, offsetMsgId=C0A8418100002A9F0000000000009082, messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-a, queueId=1], queueOffset=13]
发送结果:SendResult [sendStatus=SLAVE_NOT_AVAILABLE, msgId=7F0000013A4818B4AAC22E3BD1BA0005, offsetMsgId=C0A8418100002A9F0000000000009162, messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-a, queueId=1], queueOffset=14]
发送结果:SendResult [sendStatus=SLAVE_NOT_AVAILABLE, msgId=7F0000013A4818B4AAC22E3BD1C10006, offsetMsgId=C0A8418100002A9F0000000000009242, messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-a, queueId=1], queueOffset=15]
发送结果:SendResult [sendStatus=SLAVE_NOT_AVAILABLE, msgId=7F0000013A4818B4AAC22E3BD1CA0007, offsetMsgId=C0A8418100002A9F0000000000009322, messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-a, queueId=1], queueOffset=16]
// 7235L
发送结果:SendResult [sendStatus=SLAVE_NOT_AVAILABLE, msgId=7F0000013A4818B4AAC22E3BD1D00008, offsetMsgId=C0A8418100002A9F0000000000009402, messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-a, queueId=3], queueOffset=13]
发送结果:SendResult [sendStatus=SLAVE_NOT_AVAILABLE, msgId=7F0000013A4818B4AAC22E3BD1D80009, offsetMsgId=C0A8418100002A9F00000000000094E2, messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-a, queueId=3], queueOffset=14]
发送结果:SendResult [sendStatus=SLAVE_NOT_AVAILABLE, msgId=7F0000013A4818B4AAC22E3BD1E2000A, offsetMsgId=C0A8418100002A9F00000000000095C2, messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-a, queueId=3], queueOffset=15]
发送结果:SendResult [sendStatus=SLAVE_NOT_AVAILABLE, msgId=7F0000013A4818B4AAC22E3BD1E9000B, offsetMsgId=C0A8418100002A9F00000000000096A3, messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-a, queueId=3], queueOffset=16]
4.4.2 顺序消息消费
编写订单消费者,顺序消费订单消息
package com.crysw.bootrocketmq.order;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
import java.util.stream.Collectors;
/**
* 描述:订单消费者
* @author crysw
* @date 2022/6/9 22:46
* @version 1.0
*/
public class Consumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup1");
consumer.setNamesrvAddr("rocketmq-nameserver1:9876;rocketmq-nameserver2:9876");
// 订阅主题和tag
consumer.subscribe("OrderTopic", "*");
// 注册消息监听器
consumer.registerMessageListener((List<MessageExt> msgs, ConsumeConcurrentlyContext context) -> {
msgs.stream()
.map(msg -> "queueId: [" + msg.getQueueId() + "]: " + new String(msg.getBody()))
.collect(Collectors.toList())
.forEach(System.out::println);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// 启动消费者
consumer.start();
System.out.println("启动消费者.....");
}
}
查看订单消费的日志, 可以看到每个订单的创建,付款,推送,完成过程都是由从同一个队列进行消费的,保证了消息的顺序性。
启动消费者.....
// 1039
queueId: [3]: OrderStep(orderId=1039, desc=付款)
queueId: [3]: OrderStep(orderId=1039, desc=创建)
queueId: [3]: OrderStep(orderId=1039, desc=推送)
queueId: [3]: OrderStep(orderId=1039, desc=完成)
// 1065
queueId: [1]: OrderStep(orderId=1065, desc=创建)
queueId: [1]: OrderStep(orderId=1065, desc=付款)
queueId: [1]: OrderStep(orderId=1065, desc=推送)
queueId: [1]: OrderStep(orderId=1065, desc=完成)
// 7235
queueId: [3]: OrderStep(orderId=7235, desc=创建)
queueId: [3]: OrderStep(orderId=7235, desc=付款)
queueId: [3]: OrderStep(orderId=7235, desc=推送)
queueId: [3]: OrderStep(orderId=7235, desc=完成)
4.5 延时消息
比如电商,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。
4.5.1 延时消息生产者
package com.crysw.bootrocketmq.delay;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.util.concurrent.TimeUnit;
/**
* 描述:延时消息的生产者
* @author crysw
* @date 2022/6/11 11:13
* @version 1.0
*/
public class DelayProducer {
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
DefaultMQProducer producer = new DefaultMQProducer("pgroup1");
producer.setNamesrvAddr("rocketmq-nameserver1:9876;rocketmq-nameserver2:9876");
producer.start();
for (int i = 0; i < 10; i++) {
// 构建消息 topic, tag, body
Message message = new Message("delayTopic", "Tag1", ("Hello World " + i).getBytes());
// 设定延迟时间 等级2=5s messageDelayLevel = 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 1d
message.setDelayTimeLevel(2);
// 发送消息
SendResult result = producer.send(message);
System.out.println("发送结果: " + result);
// 线程休眠1s
TimeUnit.MILLISECONDS.sleep(500);
}
producer.shutdown();
}
}
4.5.2 延时消息消费者
package com.crysw.bootrocketmq.delay;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import java.util.List;
import java.util.stream.Collectors;
/**
* 描述: 延迟消息的消费者
* @author crysw
* @date 2022/6/11 11:13
* @version 1.0
*/
public class DelayConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cgroup1");
consumer.setNamesrvAddr("rocketmq-nameserver1:9876;rocketmq-nameserver2:9876");
consumer.subscribe("delayTopic", "*");
// 设置消费模式: 负载均衡, 广播模式; 默认是负载均衡模式
consumer.setMessageModel(MessageModel.BROADCASTING);
// 注册消息监听器, 回调函数处理消息
consumer.registerMessageListener((List<MessageExt> messageExts, ConsumeConcurrentlyContext context) -> {
messageExts.stream()
.map(messageExt -> "QueueId:" + messageExt.getQueueId() + ", messageID: " + messageExt.getMsgId()
+ ", 延时时间: " + (System.currentTimeMillis() - messageExt.getStoreTimestamp()) + ", body: "
+ new String(messageExt.getBody()))
.collect(Collectors.toList())
.forEach(System.out::println);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
System.out.println("消费者启动, ......");
}
}
4.5.3 验证
启动生产者延时发送消息.
发送结果: SendResult [sendStatus=SLAVE_NOT_AVAILABLE, msgId=7F0000012A5818B4AAC235FC0F910000, offsetMsgId=C0A8418100002A9F000000000000AE88, messageQueue=MessageQueue [topic=delayTopic, brokerName=broker-a, queueId=0], queueOffset=12]
发送结果: SendResult [sendStatus=SLAVE_NOT_AVAILABLE, msgId=7F0000012A5818B4AAC235FC11960001, offsetMsgId=C0A8418100002A9F000000000000AF82, messageQueue=MessageQueue [topic=delayTopic, brokerName=broker-a, queueId=1], queueOffset=13]
发送结果: SendResult [sendStatus=SLAVE_NOT_AVAILABLE, msgId=7F0000012A5818B4AAC235FC13970002, offsetMsgId=C0A8418100002A9F000000000000B07C, messageQueue=MessageQueue [topic=delayTopic, brokerName=broker-a, queueId=2], queueOffset=14]
发送结果: SendResult [sendStatus=SLAVE_NOT_AVAILABLE, msgId=7F0000012A5818B4AAC235FC15900003, offsetMsgId=C0A8418100002A9F000000000000B176, messageQueue=MessageQueue [topic=delayTopic, brokerName=broker-a, queueId=3], queueOffset=15]
发送结果: SendResult [sendStatus=SLAVE_NOT_AVAILABLE, msgId=7F0000012A5818B4AAC235FC178B0004, offsetMsgId=C0A8418200002A9F0000000000009224, messageQueue=MessageQueue [topic=delayTopic, brokerName=broker-b, queueId=0], queueOffset=8]
发送结果: SendResult [sendStatus=SLAVE_NOT_AVAILABLE, msgId=7F0000012A5818B4AAC235FC19860005, offsetMsgId=C0A8418200002A9F000000000000931E, messageQueue=MessageQueue [topic=delayTopic, brokerName=broker-b, queueId=1], queueOffset=9]
发送结果: SendResult [sendStatus=SLAVE_NOT_AVAILABLE, msgId=7F0000012A5818B4AAC235FC1B7E0006, offsetMsgId=C0A8418200002A9F0000000000009418, messageQueue=MessageQueue [topic=delayTopic, brokerName=broker-b, queueId=2], queueOffset=10]
发送结果: SendResult [sendStatus=SLAVE_NOT_AVAILABLE, msgId=7F0000012A5818B4AAC235FC1D770007, offsetMsgId=C0A8418200002A9F0000000000009512, messageQueue=MessageQueue [topic=delayTopic, brokerName=broker-b, queueId=3], queueOffset=11]
发送结果: SendResult [sendStatus=SLAVE_NOT_AVAILABLE, msgId=7F0000012A5818B4AAC235FC1F700008, offsetMsgId=C0A8418100002A9F000000000000B270, messageQueue=MessageQueue [topic=delayTopic, brokerName=broker-a, queueId=0], queueOffset=16]
发送结果: SendResult [sendStatus=SLAVE_NOT_AVAILABLE, msgId=7F0000012A5818B4AAC235FC216A0009, offsetMsgId=C0A8418100002A9F000000000000B36A, messageQueue=MessageQueue [topic=delayTopic, brokerName=broker-a, queueId=1], queueOffset=17]
11:35:16.651 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.65.129:9876] result: true
11:35:16.660 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.65.129:9876] result: true
11:35:16.660 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.65.129:10911] result: true
11:35:16.661 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.65.130:10911] result: true
11:35:16.661 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.65.130:11011] result: true
启动消费者, 会看到消息的消费比存储时间晚(延时)10s左右.
消费者启动, ......
QueueId:1, messageID: 7F0000012A5818B4AAC235FC19860005, 延时时间: 8441, body: Hello World 5
QueueId:1, messageID: 7F0000012A5818B4AAC235FC216A0009, 延时时间: 6435, body: Hello World 9
QueueId:0, messageID: 7F0000012A5818B4AAC235FC1F700008, 延时时间: 6939, body: Hello World 8
QueueId:1, messageID: 7F0000012A5818B4AAC235FC11960001, 延时时间: 10493, body: Hello World 1
QueueId:0, messageID: 7F0000012A5818B4AAC235FC0F910000, 延时时间: 11000, body: Hello World 0
QueueId:2, messageID: 7F0000012A5818B4AAC235FC13970002, 延时时间: 9975, body: Hello World 2
QueueId:3, messageID: 7F0000012A5818B4AAC235FC15900003, 延时时间: 9466, body: Hello World 3
QueueId:2, messageID: 7F0000012A5818B4AAC235FC1B7E0006, 延时时间: 7933, body: Hello World 6
QueueId:3, messageID: 7F0000012A5818B4AAC235FC1D770007, 延时时间: 7428, body: Hello World 7
QueueId:0, messageID: 7F0000012A5818B4AAC235FC178B0004, 延时时间: 8844, body: Hello World 4
4.5.4 使用限制
现在RocketMQ并不支持任意时间的延时, 需要设置几个固定的延时等级, 从1s到1d分别对应着等级1到19. 延时等级定义在RocketMQ服务端的MessageStoreConfig
类中的如下变量中:
// org/apache/rocketmq/store/config/MessageStoreConfig.java
messageDelayLevel = 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 1d
4.6批量消息
批量发送消息能显著提高传递消息的性能。限制是这些批量消息应该有相同的topic,相同的waitStoreMsgOK, 而且不能是延时消息。 此外,这一批消息的总大小不应超过4MB。
4.6.1 发送批量消息
如果每次只发送不超过4MB的消息, 则很容易使用批处理, 如下:
package com.crysw.bootrocketmq.batch;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.util.ArrayList;
import java.util.List;
/**
* 描述:批量消息生产者
* @author crysw
* @date 2022/6/11 11:53
* @version 1.0
*/
public class BatchProducer {
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
DefaultMQProducer producer = new DefaultMQProducer("pgroup1");
producer.setNamesrvAddr("rocketmq-nameserver1:9876;rocketmq-nameserver2:9876");
producer.start();
String topic = "batchTopic";
List<Message> messages = new ArrayList<>();
// 当每条消息不超过4MB,可以直接使用批量发送
for (int i = 0; i < 4; i++) {
// 构建消息 topic, tag, body
Message message = new Message(topic, "Tag1", ("Hello World" + i).getBytes());
messages.add(message);
}
// 发送消息
SendResult result = producer.send(messages);
System.out.println("发送结果: " + result);
producer.shutdown();
}
}
如果消息的总长度大于4MB,这时候需要将消息进行分割,如下:
package com.crysw.bootrocketmq.batch;
import org.apache.rocketmq.common.message.Message;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
/**
* 描述:定义消息列表分割器
* 消息列表分割器:其只会处理每条消息的大小不超4M的情况。若存在某条消息,其本身大小大于4M,这个分割器无法处理,其直接将这条消息构成一个子列表返回。并没有再进行分割
* @author crysw
* @date 2022/5/19 20:59
* @version 1.0
*/
public class MessageListSplitter implements Iterator<List<Message>> {
// 指定极限值为4M
private final int SIZE_LIMIT = 4 * 1024 * 1024;
// 存放所有要发送的消息
private final List<Message> messages;
// 要进行批量发送消息的小集合起始索引
private int currIndex;
public MessageListSplitter(List<Message> messages) {
this.messages = messages;
}
@Override
public boolean hasNext() {
// 判断当前开始遍历的消息索引要小于消息总数
return currIndex < messages.size();
}
/**
* 生产者通过send()方法发送的Message,并不是直接将Message序列化后发送到网络上的,而是通过这个Message生成了一个字符串发送出去的。
* 这个字符串由四部分构成:Topic、消息Body、消息日志(占 20 字节),及用于描述消息的一堆属性key-value。
* @return
*/
@Override
public List<Message> next() {
int nextIndex = currIndex;
// 记录当前要发送的这一小批次消息列表的大小
int totalSize = 0;
for (; nextIndex < messages.size(); nextIndex++) {
// 获取当前遍历的消息
Message message = messages.get(nextIndex);
// 统计当前遍历的message的大小
int tempSize = message.getTopic().length() + message.getBody().length;
Map<String, String> properties = message.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
tempSize += entry.getKey().length() + entry.getValue().length();
}
tempSize += 20;
// 判断当前消息本身是否大于4M
if (tempSize > SIZE_LIMIT) {
if (nextIndex == currIndex) {
nextIndex++;
}
break;
}
if (tempSize + totalSize > SIZE_LIMIT) {
break;
} else {
totalSize += tempSize;
}
} // end for
// 获取当前messages列表的子集合[currIndex, nextIndex)
List<Message> subList = messages.subList(currIndex, nextIndex);
// 下次遍历的开始索引
currIndex = nextIndex;
return subList;
}
}
如果消息超过4MB, 将消息列表分割后再批量发送。
// 发送消息
MessageSpliter messageSpliter = new MessageSpliter(messages);
while (messageSpliter.hasNext()) {
List<Message> subMsgList = messageSpliter.next();
SendResult result = producer.send(messages);
System.out.println("发送结果: " + result.getMsgId());
}
producer.shutdown();
4.7 过滤消息
在大多数情况下,Tag可以用来过滤你想要的消息。通过consumer的subscribe()方法指定要订阅消息的Tag。如果订阅多个Tag的消息,Tag间使用或运算符
(双竖线||)连接。例如:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cgroup1");
consumer.setNamesrvAddr("rocketmq-nameserver1:9876;rocketmq-nameserver2:9876");
//consumer.subscribe("batchTopic", "*");
// Tag过滤
consumer.subscribe("batchTopic", "Tag1 || Tag2 || Tag3");
消费者将接收包含Tag1,Tag2或Tag3的消息,但是限制一个消息只能有一个标签,这对于复杂的场景可能不起作用, 在这种情况下, 可以使用SQL表达式筛选消息。 SQL特性可以通过发送消息时的属性来进行计算。 在RocketMQ定义的语法下,可以实现一些简单的逻辑。 下面是一个例子:
-------------
| message |
|-----------| a > 5 AND b = 'abc'
| a = 10 |--------------------->> Gotten
| b = 'abc'|
| c = true |
-------------
-------------
| message |
|-----------| a > 5 AND b = 'abc'
| a = 1 |--------------------->> Missed
| b = 'abc'|
| c = true |
-------------
4.7.1 SQL基本语法
RocketMQ只定义了一些基本语法来支持这个特性, 可以很容易扩展。
- 数值比较,比如: > , >=, <=, BETWEEN, = ;
- 字符比较,比如:= , <>, IN;
- IS NULL 或者 IS NOT NULL;
- 逻辑符号 AND, OR ,NOT;
常量支持类型为:
- 数值: 比如 123, 3.1415;
- 字符: 比如 ‘abc’, 必须用单引号括起来;
- NULL, 特殊的常量;
- 布尔值, TRUE或FALSE
只有使用push模式的消费者才能使用SQL92标准的sql语句,接口如下:
public void subscribe(String topic, MessageSelector messageSelector) throws MQClientException {}
4.7.2 消息生产者
发送消息时,你能通过putUserProperty
来设置消息的属性。
package com.crysw.bootrocketmq.filter;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.io.UnsupportedEncodingException;
/**
* 描述:SQL过滤的生产者
* @author crysw
* @date 2022/6/11 14:03
* @version 1.0
*/
public class SqlFilterProducer {
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
DefaultMQProducer producer = new DefaultMQProducer("pgroup1");
producer.setNamesrvAddr("rocketmq-nameserver1:9876;rocketmq-nameserver2:9876");
producer.start();
for (int i = 0; i < 10; i++) {
Message message = new Message("filterTopic", "tag1", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
// 设置一些属性
message.putUserProperty("a", String.valueOf(i));
SendResult sendResult = producer.send(message);
System.out.println(sendResult);
}
producer.shutdown();
}
}
4.7.3 消息消费者
package com.crysw.bootrocketmq.filter;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.io.UnsupportedEncodingException;
import java.util.List;
/**
* 描述:SQL过滤的消费者
* @author crysw
* @date 2022/6/11 14:09
* @version 1.0
*/
public class SqlFilterConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cgroup1");
consumer.setNamesrvAddr("rocketmq-nameserver1:9876;rocketmq-nameserver2:9876");
// 订阅消息主题
// tag过滤
// consumer.subscribe("filterTopic", "Tag1 || Tag2 || Tag3");
// SQL过滤条件
consumer.subscribe("filterTopic", MessageSelector.bySql("a>5"));
// 注册监听
consumer.registerMessageListener((List<MessageExt> messageExts, ConsumeConcurrentlyContext context) -> {
messageExts.stream()
.map(messageExt -> {
try {
return "QueueID: " + messageExt.getQueueId() + ", MessageID: " + messageExt.getMsgId() + ", body: " + new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return null;
})
.forEach(System.out::println);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
System.out.println("消费者启动了,.....");
}
}
4.7.4 测试效果
发送消息成功后,启动消费者发现报错。
Exception in thread "main" org.apache.rocketmq.client.exception.MQClientException: CODE: 1 DESC: The broker does not support consumer to filter message by SQL92
For more information, please visit the url, http://rocketmq.apache.org/docs/faq/
at org.apache.rocketmq.client.impl.MQClientAPIImpl.checkClientInBroker(MQClientAPIImpl.java:2280)
at org.apache.rocketmq.client.impl.factory.MQClientInstance.checkClientInBroker(MQClientInstance.java:450)
at org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl.start(DefaultMQPushConsumerImpl.java:655)
at org.apache.rocketmq.client.consumer.DefaultMQPushConsumer.start(DefaultMQPushConsumer.java:707)
at com.crysw.bootrocketmq.filter.SqlFilterConsumer.main(SqlFilterConsumer.java:43)
报错原因:
从console控制台可以看到集群中的broker配置, enablePropertyFilter=false
, 默认情况下Broker没有开启消息的SQL过滤功能。
解决方案:
需要在Broker加载的配置文件中添加如下属性,以开启该功能:
# 开启SQL过滤支持
enablePropertyFilter=true
在启动Broker时需要指定这个修改过的配置文件。例如对于集群Broker的启动,其修改的配置文件是conf/2m-2s-sync/broker-*.conf,启动时使用如下命令:
# 先停掉broker,NameServer
sh mqshutdown broker
sh mqshutdown namesrv
# NameServer1 & NameServer2
nohup sh mqnamesrv &
# Master1
nohup sh mqbroker -c /usr/local/rocketmq-4.9.3/conf/2m-2s-sync/broker-a.properties &
# Slave2
nohup sh mqbroker -c /usr/local/rocketmq-4.9.3/conf/2m-2s-sync/broker-b-s.properties &
# Master2
nohup sh mqbroker -c /usr/local/rocketmq-4.9.3/conf/2m-2s-sync/broker-b.properties &
# Slave1
nohup sh mqbroker -c /usr/local/rocketmq-4.9.3/conf/2m-2s-sync/broker-a-s.properties &
查看console控制台的集群配置,已经开启SQL过滤支持。
重启后重新测试,消费正常。可以看到日志打印的结果只有a>5的消息被消费了。
消费者启动了,.....
QueueID: 0, MessageID: 7F0000016D7018B4AAC238388EEC0006, body: Hello RocketMQ 6
QueueID: 1, MessageID: 7F0000016D7018B4AAC238388EFB0007, body: Hello RocketMQ 7
QueueID: 2, MessageID: 7F0000016D7018B4AAC238388F010008, body: Hello RocketMQ 8
QueueID: 3, MessageID: 7F0000016D7018B4AAC238388F0B0009, body: Hello RocketMQ 9
4.8 事务消息
4.8.1 流程分析
下图说明了事务消息的大致方案, 其中分为两个流程: 正常事务消息的发送及提交, 事务消息的补偿流程。
1)事务消息发送及提交
- 发送消息(半消息);
- 服务端响应消息写入结果;
- 根据发送结果执行本地事务,如果写入失败,此时半消息对业务不可见,本地逻辑不执行;
- 根据本地事务状态执行commit或rollback, commit操作生成消息索引, 消息对消费者可见。
2)事务补偿
- 对没有commit或rollback的事务消息(pending状态的消息),从服务端发起一次
回查
; - producer收到回查消息, 检查回查消息对应的本地事务的状态;
- 根据本地事务状态, 重新commit或rollback。
其中, 补偿阶段用于解决消息commit或rollback发生超时或失败的情况, 此时本地事务执行状态未知, 所以需要通过回查
来确认。
3)事务消息状态
事务消息共有三种状态: 提交状态,回滚状态,中间状态。
- TransactionStatus.CommitTransaction: 提交事务, 它允许消费者消费此消息。
- TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除, 不允许被消费。
- TransactionStatus.Unknown: 中间状态, 它代表需要检查(回查)消息队列来确定状态。
4.8.2 发送事务消息
1)创建事务消息生产者
使用TransactionMQProducer
类创建生产者, 并指定唯一的ProducerGroup, 就可以设置自定义线程池来处理这些检查请求,执行本地事务后,需要根据执行结果对消息队列进行回复。回传的事务状态请参考上面介绍。
package com.crysw.bootrocketmq.tx;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.common.message.Message;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 描述:事务消息的生产者
* @author crysw
* @date 2022/6/12 12:40
* @version 1.0
*/
public class TxProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
// 创建消息生产者
TransactionMQProducer producer = new TransactionMQProducer("txGroup");
producer.setNamesrvAddr("rocketmq-nameserver1:9876;rocketmq-nameserver2:9876");
// 定义一个线程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2000),
runnable -> new Thread(runnable, "client-transaction-msg-check-thread"));
// 生产者设置线程池,用来执行是事务请求
producer.setExecutorService(threadPoolExecutor);
// 创建事务监听器
TransactionListener transactionListener = new TransactionListenerImpl();
// 生产者设置事务监听器
producer.setTransactionListener(transactionListener);
// 启动消息生产者
producer.start();
String[] tags = new String[]{"TagA", "TagB", "TagC"};
for (int i = 0; i < 3; i++) {
// 构建事务消息
Message message = new Message("txTopic", tags[i % tags.length], "key" + i, ("Hello RocketMQ Tx" + i).getBytes());
// 发送事务消息
TransactionSendResult transactionSendResult = producer.sendMessageInTransaction(message, null);
System.out.printf("%s%n", transactionSendResult);
TimeUnit.SECONDS.sleep(1);
}
// producer.shutdown();
}
}
从console控制台可以看出,只有TagA和TagC的事务消息发送成功了,TagB的事务消息被回滚了。
2)实现事务的监听接口
当发送半消息成功时,使用executeLocalTransaction
方法来执行本地事务, 它返回提交状态,回滚状态或中间状态之一。
package com.crysw.bootrocketmq.tx;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
/**
* 描述:事务监听器
* @author crysw
* @date 2022/6/12 13:16
* @version 1.0
* @see TransactionListener#executeLocalTransaction(Message, Object) 执行本地事务并返回事务执行状态
* @see TransactionListener#checkLocalTransaction(MessageExt) 上面方法本地事务执行状态返回未知时, 执行该方法进行本地事务状态回查
*/
public class TransactionListenerImpl implements TransactionListener {
// 回调操作方法, 消息预提交就会触发该方法的执行, 用于完成本地事务
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object arg) {
System.out.println("预提交消息成功: " + message);
// 假设接收到TagA的消息就表示提交成功, TagB的消息表示提交失败, TagC表示提交状态未知, 需要执行消息回查
String tag = message.getTags();
switch (tag) {
case "TagA":
return LocalTransactionState.COMMIT_MESSAGE;
case "TagB":
return LocalTransactionState.ROLLBACK_MESSAGE;
case "TagC":
default:
return LocalTransactionState.UNKNOW;
}
}
// 消息事务状态回查方法, 引发消息回查的原因最常见的有两个: 1) 回调操作返回UNKNOW 2) TC没有接收到TM的最终全局事务确认命令
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
// 模拟回查事务状态成功, 返回提交事务
System.out.println("消息Tag: " + messageExt.getTags());
return LocalTransactionState.COMMIT_MESSAGE;
}
}
3) 创建事务消息消费者
package com.crysw.bootrocketmq.tx;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
import java.util.stream.Collectors;
/**
* 描述:事务消息的消费者
* @author crysw
* @date 2022/6/12 13:00
* @version 1.0
*/
public class TxConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("txGroup");
consumer.setNamesrvAddr("rocketmq-nameserver1:9876;rocketmq-nameserver2:9876");
// 订阅主题
consumer.subscribe("txTopic", "*");
// 注册监听器, 回调函数处理消息
consumer.registerMessageListener((List<MessageExt> messageExts, ConsumeConcurrentlyContext context) -> {
messageExts.stream()
.map(messageExt -> "MsgID: " + messageExt.getMsgId() + ", body: " + new String(messageExt.getBody()))
.collect(Collectors.toList())
.forEach(System.out::println);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// 启动消费者
consumer.start();
System.out.println("消费者启动了....");
}
}
从消费者的消费日志看出, 只有TagA和TagC的事务消息被成功消费。
消费者启动了....
MsgID: 7F0000014FF818B4AAC23DC9D08C0002, body: Hello RocketMQ Tx2 # Tx2 -》 TagC
MsgID: 7F0000014FF818B4AAC23DC9C8920000, body: Hello RocketMQ Tx0 # Tx0 -》 TagA
4.8.3 使用限制
- 事务消息不支持延时消息和批量消息;
- 为了避免单个消息被检查太多次而导致半队列消息累积, 我们默认将单个消息的检查次数限制为15次, 但是用户可以通过Broker配置文件的
transactionCheckMax
参数来修改此限制。 如果检查某条消息已经超过transactionCheckMax次,那么Broker将丢弃此消息, 并在默认情况下同时打印错误日志,用户可以通过重写AbstractTransactionCheckListener
类来修改这个行为。 - 事务消息将在Broker配置文件中的参数
transactionMsgTimeout
这个特定时间之后被检查, 当发送事务消息时,用户还可以通过设置用户属性CHECK_IMMUNITY_TIME_IN_SECONDS
来修改这个限制, 该参数优先于transactionMsgTimeout
参数。 - 事务性消息可能不止一次被检查或消费(因为存在回滚后再提交的情况),对于事务消息要做好幂等性检查。
- 提交给用户的目标主题消息可能会失败,目前需要依据日志的记录而定。 它的高可用性通过RocketMQ本身的高可用性机制来保证, 如果希望确保事务消息不丢失, 并且事务完整性得到保证, 建议使用同步的双写机制。
- 事务消息的生产者ID不能与其他类型消息的生产者ID共享,与其他类型的消息不同, 事务消息允许反向查询,MQ服务器能通过它们的生产者ID查询到消费者。
5. 遇到的问题
5.1 broker集群启动失败
5.1.1 问题现象
在启动broker集群时, 发现只能成功启动一台。 比如启动了Master1,但是Slave2每次启动都会报错,broker.log
报错信息如下:
2022-06-12 23:13:45 WARN ShutdownHook - unregisterBroker Exception, rocketmq-nameserver2:9876
org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to rocketmq-nameserver2:9876 failed # 异常
at org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:407)
at org.apache.rocketmq.broker.out.BrokerOuterAPI.unregisterBroker(BrokerOuterAPI.java:247)
at org.apache.rocketmq.broker.out.BrokerOuterAPI.unregisterBrokerAll(BrokerOuterAPI.java:224)
at org.apache.rocketmq.broker.BrokerController.unregisterBrokerAll(BrokerController.java:861)
at org.apache.rocketmq.broker.BrokerController.shutdown(BrokerController.java:796)
at org.apache.rocketmq.broker.BrokerStartup$1.run(BrokerStartup.java:237)
at java.lang.Thread.run(Thread.java:748)
2022-06-12 23:13:45 WARN ShutdownHook - unregisterBroker Exception, rocketmq-nameserver1:9876 # 异常
org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to rocketmq-nameserver1:9876 failed
at org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:407)
at org.apache.rocketmq.broker.out.BrokerOuterAPI.unregisterBroker(BrokerOuterAPI.java:247)
at org.apache.rocketmq.broker.out.BrokerOuterAPI.unregisterBrokerAll(BrokerOuterAPI.java:224)
at org.apache.rocketmq.broker.BrokerController.unregisterBrokerAll(BrokerController.java:861)
at org.apache.rocketmq.broker.BrokerController.shutdown(BrokerController.java:796)
at org.apache.rocketmq.broker.BrokerStartup$1.run(BrokerStartup.java:237)
at java.lang.Thread.run(Thread.java:748)
这个报错比较坑,会让我们抓住connect to rocketmq-nameserver2:9876 failed
异常信息去找解决方案,网上关于这个问题的解决方案没有什么意义,因为造成这个问题的本质原因不在这个日志中,而是在执行启动命令目录下的nohup.out
日志文件中,connect to rocketmq-nameserver2:9876 failed
对应的问题日志如下! 查看nohup.out文件的报错信息:
The broker[broker-a, 192.168.65.129:10911] boot success. serializeType=JSON and name server is rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
java.lang.RuntimeException: Lock failed,MQ already started
at org.apache.rocketmq.store.DefaultMessageStore.start(DefaultMessageStore.java:239)
at org.apache.rocketmq.broker.BrokerController.start(BrokerController.java:874)
at org.apache.rocketmq.broker.BrokerStartup.start(BrokerStartup.java:63)
at org.apache.rocketmq.broker.BrokerStartup.main(BrokerStartup.java:57)
java.lang.RuntimeException: Lock failed,MQ already started
at org.apache.rocketmq.store.DefaultMessageStore.start(DefaultMessageStore.java:239)
at org.apache.rocketmq.broker.BrokerController.start(BrokerController.java:874)
at org.apache.rocketmq.broker.BrokerStartup.start(BrokerStartup.java:63)
at org.apache.rocketmq.broker.BrokerStartup.main(BrokerStartup.java:57)
Java HotSpot(TM) 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future release
5.1.2 问题原因
根据java.lang.RuntimeException: Lock failed,MQ already started
报错就会找到storePath的配置问题,这个也就是消息存储路径的问题,一台机器中部署master和salve的时候使用的是同一个消息存储文件,Master启动时导致文件被锁住,Slave再启动的时候去写这个存储文件就报错了,导致启动失败。
5.1.3 解决方案
给slave的配置文件broker-b-s.properties
中重新配置一套消息存储路径。
# 存储路径
storePathRootDir=/usr/local/rocketmq-4.9.3/store1
# commitLog存储路径
storePathCommitLog=/usr/local/rocketmq-4.9.3/store1/commitLog
# 消息队列存储路径
storePathConsumeQueue=/usr/local/rocketmq-4.9.3/store1/consumequeue
# 消息索引存储路径
storePathIndex=/usr/local/rocketmq-4.9.3/store1/index
# checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq-4.9.3/store1/checkpoint
# abort文件存储路径
abortFile=/usr/local/rocketmq-4.9.3/store1/abort
重新启动namesrv, broker(Master,Slave)正常。
[root@centos7-01 2m-2s-sync]# ps -ef | grep broker
root 5232 5170 0 23:07 pts/1 00:00:00 tail -f broker.log
root 6030 3911 0 23:20 pts/0 00:00:00 sh mqbroker -c /usr/local/rocketmq-4.9.3/conf/2m-2s-sync/broker-a.properties
root 6031 6030 0 23:20 pts/0 00:00:00 sh /usr/local/rocketmq-4.9.3/bin/runbroker.sh org.apache.rocketmq.broker.BrokerStartup -c /usr/local/rocketmq-4.9.3/conf/2m-2s-sync/broker-a.properties # Master1
root 6055 6031 38 23:20 pts/0 00:05:24 /usr/local/jdk1.8.0_141/bin/java -server -Xms256m -Xmx256m -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0 -verbose:gc -Xloggc:/dev/shm/rmq_srv_gc_%p_%t.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintAdaptiveSizePolicy -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m -XX:-OmitStackTraceInFastThrow -XX:+AlwaysPreTouch -XX:MaxDirectMemorySize=15g -XX:-UseLargePages -XX:-UseBiasedLocking -cp .:/usr/local/rocketmq-4.9.3/bin/../conf:/usr/local/rocketmq-4.9.3/bin/../lib/*:.:/usr/local/jdk1.8.0_141/jre/lib/rt.jar:/usr/local/jdk1.8.0_141/lib/dt.jar:/usr/local/jdk1.8.0_141/lib/tools.jar:/usr/local/jdk1.8.0_141/jre/lib org.apache.rocketmq.broker.BrokerStartup -c /usr/local/rocketmq-4.9.3/conf/2m-2s-sync/broker-a.properties
root 6144 3911 0 23:21 pts/0 00:00:00 sh mqbroker -c /usr/local/rocketmq-4.9.3/conf/2m-2s-sync/broker-b-s.properties
root 6145 6144 0 23:21 pts/0 00:00:00 sh /usr/local/rocketmq-4.9.3/bin/runbroker.sh org.apache.rocketmq.broker.BrokerStartup -c /usr/local/rocketmq-4.9.3/conf/2m-2s-sync/broker-b-s.properties # Slave2
root 6169 6145 25 23:21 pts/0 00:03:37 /usr/local/jdk1.8.0_141/bin/java -server -Xms256m -Xmx256m -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0 -verbose:gc -Xloggc:/dev/shm/rmq_srv_gc_%p_%t.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintAdaptiveSizePolicy -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m -XX:-OmitStackTraceInFastThrow -XX:+AlwaysPreTouch -XX:MaxDirectMemorySize=15g -XX:-UseLargePages -XX:-UseBiasedLocking -cp .:/usr/local/rocketmq-4.9.3/bin/../conf:/usr/local/rocketmq-4.9.3/bin/../lib/*:.:/usr/local/jdk1.8.0_141/jre/lib/rt.jar:/usr/local/jdk1.8.0_141/lib/dt.jar:/usr/local/jdk1.8.0_141/lib/tools.jar:/usr/local/jdk1.8.0_141/jre/lib org.apache.rocketmq.broker.BrokerStartup -c /usr/local/rocketmq-4.9.3/conf/2m-2s-sync/broker-b-s.properties
console控制台也可以看到成功启动了Master1,Slave2,Master2和Slave1四台Broker集群服务。
5.2 发送消息失败
5.2.1 问题现象
双主双从集群,在使用同步模式下,在向 master 发送消息时,返回的消息状态码为 SLAVE_NOT_AVAILABLE
.
# sendStatus=SLAVE_NOT_AVAILABLE
SendResult [sendStatus=SLAVE_NOT_AVAILABLE, msgId=7F00000120C418B4AAC242AB05670000, offsetMsgId=null, messageQueue=MessageQueue [topic=txTopic, brokerName=broker-a, queueId=2], queueOffset=34]
SendResult [sendStatus=SLAVE_NOT_AVAILABLE, msgId=7F00000120C418B4AAC242AB09630001, offsetMsgId=null, messageQueue=MessageQueue [topic=txTopic, brokerName=broker-a, queueId=3], queueOffset=35]
SendResult [sendStatus=SLAVE_NOT_AVAILABLE, msgId=7F00000120C418B4AAC242AB0D560002, offsetMsgId=null, messageQueue=MessageQueue [topic=txTopic, brokerName=broker-a, queueId=0], queueOffset=36]
5.2.2 原因分析及解决方案
原因分析:
检查了双主双从的broker集群,都已经正常启动。查阅相关资料发现了可能导致失败的问题。没有开放Master到Slave同步消息的端口,导致消息发送失败。
使用 netstat -ntpl
命令查看使用到的端口,可以看到java相关的进程端口10909、10911、10912、11009、11011、11012、9876
。
[root@centos7-01 ~]# netstat -ntpl
Active Internet connections (only servers)
Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program name
tcp 0 0 0.0.0.0:111 0.0.0.0:* LISTEN 596/rpcbind
tcp 0 0 0.0.0.0:22 0.0.0.0:* LISTEN 955/sshd
tcp 0 0 127.0.0.1:25 0.0.0.0:* LISTEN 1235/master
tcp6 0 0 :::10909 :::* LISTEN 1622/java
tcp6 0 0 :::10911 :::* LISTEN 1622/java
tcp6 0 0 :::10912 :::* LISTEN 1622/java
tcp6 0 0 :::11009 :::* LISTEN 1748/java
tcp6 0 0 :::11011 :::* LISTEN 1748/java
tcp6 0 0 :::11012 :::* LISTEN 1748/java
tcp6 0 0 :::111 :::* LISTEN 596/rpcbind
tcp6 0 0 :::9876 :::* LISTEN 1555/java
在配置文件 broker-a.properties
中找到 Master1
配置的监听端口:
# Broker对外服务的监听端口
listenPort=10911
Master1的vip 通道端口为:ListenPort - 2 = 10909
Master1的HA 通道端口为: ListenPort + 1 = 10912
端口对应描述:
端口号 | 作用 |
---|---|
9876 | NameServer 对外暴露的端口,允许消费者和生产者连接 |
10909 | fastListen 端口, 在消费者或生产者中配置 isVipChannel 为 false 即可 |
10911 | Broker对外服务的监听端口, 用于broker与producer或consumer进行通信 |
10912 | HA 高可用端口,用于主从同步,为 Master 常见的 socket 连接; 若没有开放,则无法连接到 Slave |
解决方案:
需要检查 Master 是否打开了10909、10912
这两个端口, 使用下面的命令查看, 我的机器确实没有开放这两个端口,开放端口权限即可。
# 查看端口开放情况
firewall-cmd --permanent --list-ports
# 查看单个端口开放情况
firewall-cmd --permanent --query-port=10912/tcp
# 开放端口
firewall-cmd --permanent --zone=public --add-port=11012/tcp
# 重启防火墙
firewall-cmd --reload
同理, 查看配置文件broker-b-s.properties
,找到Slave2
配置的监听端口:
# Broker对外服务的监听端口
listenPort=11011
Slave2 的vip 通道端口为:ListenPort - 2 = 11009
Slave2 的HA 通道端口为: ListenPort + 1 = 11012
和上面一样的操作,开放11009、11012
这两个端口。