单节点部署
下载安装rocketmq
## 下载
wget https://archive.apache.org/dist/rocketmq/4.8.0/rocketmq-all-4.8.0-bin-release.zip
## 项目解压
unzip rocketmq-all-4.8.0-bin-release.zip
## 重命名
mv rocketmq-all-4.8.0-bin-release rocketmq-4.8.0
修改rmq启动配置
修改目录 /opt/rocketmq4.8.0/bin 下的 3 个配置文件--不然会报insufficient memory
cd /opt/rocketmq4.8.0/bin
vi runserver.sh
# JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn512m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
vi runbroker.sh
# JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"
vi tools.sh
# JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=128m"
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=128m"
# 自动创建topic
在conf/broker.conf⽂件中加⼊
autoCreateTopicEnable=true
启动RMQ
cd /opt/rocketmq4.8.0/bin
# 创建日志目录
mkdir logs
# 启动
nohup ./mqnamesrv &:属于后台以静默⽅式启动
./mqnamesrv:属于终端启动,直接输出日志信息,按 ctrl+c 可直接关闭退出
nohup ./mqnamesrv > logs/mqnamesrv.out 2>1 &
# 查看日志
tail -f logs/mqnamesrv.out
# 看到以下表示启动成功
The Name Server boot success. serializeType=JSON
# 报错
ERROR: Please set the JAVA_HOME variable in your environment, We need java(x64)! !!
# 解决 配置jdk环境变量
,需要 export 环境变量
# 启动broker
# 启动命令,并且常驻内存:注意ip地址要配置成为服务的ip地址,保证地址以及端口能够访问
# nohup ./mqbroker -n 192.168.0.101:9876 & :属于后台以静默⽅式启动
# sh ./mqbroker -n 92.168.0.101:9876 :属于终端启动,直接输出日志信息,按 ctrl+c 可直接关闭退出
nohup ./mqbroker -n 192.168.0.101:9876 > logs/mqbroker.out 2>1 &
## 查看日志
tail -f logs/mqbroker.out
## 看到以下表示启动成功
The broker[linux1, 192.168.0.101:10911] boot success. serializeType=JSON and name server is 192.168.0.101:9876
# 关闭服务
与启动顺序相反进行关闭,先关闭 broker、在关闭 nameserv
./mqshutdown broker
./mqshutdown namesr
发送接收消息测试
发送/接收消息之前,需要告诉客户端(Producer、Consumer)名称服务器的位置,RocketMQ 提供了多种方法来实现这一点
1.编程方式,如:producer.setNamesrvAddr("ip:port")
2.Java 选项,如:rocketmq.namesrv.addr
3.环境变量,如:NAMESRV_ADDR
4.HTTP 端点
# 发送消息
export NAMESRV_ADDR=192.168.0.101:9876
./tools.sh org.apache.rocketmq.example.quickstart.Producer
# 示例
[root@linux1 rocketmq]# export NAMESRV_ADDR=localhost:9876
[root@linux1 rocketmq]# sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
OpenJDK 64-Bit Server VM warning: MaxNewSize (262144k) is equal to or greater than the entire heap (262144k). A new max generation size of 261632k will be used.
16:19:05.806 [main] DEBUG i.n.u.i.l.InternalLoggerFactory - Using SLF4J as the default logging framework
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
SendResult [sendStatus=SEND_OK, msgId=AC11000176396FF3C5B512F379FA0000, offsetMsgId=AC11000100002A9F0000000000000000, messageQueue=MessageQueue [topic=TopicTest, brokerName=linux1, queueId=3], queueOffset=0]
......
SendResult [sendStatus=SEND_OK, msgId=AC11000176396FF3C5B512F382B603E7, offsetMsgId=AC11000100002A9F00000000000317BF, messageQueue=MessageQueue [topic=TopicTest, brokerName=linux1, queueId=2], queueOffset=249]
16:19:08.609 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[172.17.0.1:10911] result: true
16:19:08.631 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true
# 接收消息
./tools.sh org.apache.rocketmq.example.quickstart.Consumer
# 示例
[root@linux1 rocketmq]# sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
OpenJDK 64-Bit Server VM warning: MaxNewSize (262144k) is equal to or greater than the entire heap (262144k). A new max generation size of 261632k will be used.
16:21:15.395 [main] DEBUG i.n.u.i.l.InternalLoggerFactory - Using SLF4J as the default logging framework
Consumer Started.
ConsumeMessageThread_3 Receive New Messages: [MessageExt [brokerName=linux1, queueId=2, storeSize=201, queueOffset=1, sysFlag=0, bornTimestamp=1659601146477, bornHost=/192.168.0.101:48216, storeTimestamp=1659601146478, storeHost=/172.17.0.1:10911, msgId=AC11000100002A9F000000000000057F, commitLogOffset=1407, bodyCRC=988340972, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1659601275866, UNIQ_KEY=AC11000176396FF3C5B512F37A6D0007, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 55], transactionId='null'}]]
ConsumeMessageThread_4 Receive New Messages: [MessageExt [brokerName=linux1, queueId=2, storeSize=202, queueOffset=2, sysFlag=0, bornTimestamp=1659601146500, bornHost=/192.168.0.101:48216, storeTimestamp=1659601146501, storeHost=/172.17.0.1:10911, msgId=AC11000100002A9F00000000000008A4, commitLogOffset=2212, bodyCRC=2088767104, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1659601275867, UNIQ_KEY=AC11000176396FF3C5B512F37A84000B, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 49, 49], transactionId='null'}]]
消息发送完毕之后就会退出,在同一窗口中可以使用消费者类来进行接收消息,消费是多线程的。
控制台安装与启动
# 下载
下载地址:https://github.com/apache/rocketmq-externals/tags
下载rocketmq-console-1.0.0:https://github.com/apache/rocketmq-externals/archive/refs/tags/rocketmq-console-1.0.0.zip
# 修改配置
修改其 src/main/resources中的 application.properties配置文件
原来的端口号为8080,如果8080端口被占用,修改为一个不常用的端口。
指定RocketMQ的name server地址,集群环境为 集群1:9876;集群2:9876;集群3:9876。
server.contextPath=
server.port=7000
#spring.application.index=true
spring.application.name=rocketmq-console
spring.http.encoding.charset=UTF-8
spring.http.encoding.enabled=true
spring.http.encoding.force=true
logging.config=classpath:logback.xml
#if this value is empty,use env value rocketmq.config.namesrvAddr NAMESRV_ADDR | now, you can set it in ops page.default localhost:9876
rocketmq.config.namesrvAddr=localhost:9876
#if you use rocketmq version < 3.5.8, rocketmq.config.isVIPChannel should be false.default true
rocketmq.config.isVIPChannel=
#rocketmq-console's data path:dashboard/monitor
rocketmq.config.dataPath=/tmp/rocketmq-console/data
#set it false if you don't want use dashboard.default true
rocketmq.config.enableDashBoardCollect=true
# 添加依赖
在解压目录rocketmq-console的pom.xml中添加如下JAXB依赖
<dependency>
<groupId>javax.xml.bind</groupId>
<artifactId>jaxb-api</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>com.sun.xml.bind</groupId>
<artifactId>jaxb-impl</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>com.sun.xml.bind</groupId>
<artifactId>jaxb-core</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>javax.activation</groupId>
<artifactId>activation</artifactId>
<version>1.1.1</version>
</dependency>
# 打包
在rocketmq-console目录下,打开cmd窗口运行maven的打包命令
mvn clean package -Dmaven.test.skip=true
# 启动
将jar包上传到服务器,然后启动
nohup java -jar rocketmq-console-ng-1.0.0.jar > rocketmq-console.out 2>1 &
启停脚本
#!/bin/bash
user=`whoami`
start_user="root"
mqnamesrv_log_path="/mnt/rocketmq/logs/mqnamesrv.out"
mqbroker_log_path="/mnt/rocketmq/logs/mqbroker.out"
mqnamesrv_ip="192.168.2.48"
mqnamesrv_port="9876"
broker_port="10911"
if [ $user = $start_user ];then
case "$1" in
start_namesrv)
nohup ./bin/mqnamesrv > $mqnamesrv_log_path 2>1 &
timeout +3 tailf $mqnamesrv_log_path
;;
stop_namesrv)
result=`echo -e "\n" | telnet $mqnamesrv_ip $broker_port 2> /dev/null | grep Connected | wc -l`
if [ $result -eq 0 ];then
./bin/mqshutdown namesrv
else
echo -e "\033[31m请先停止 mqbroker\033[0m"
fi
;;
start_broker)
result=`echo -e "\n" | telnet $mqnamesrv_ip $mqnamesrv_port 2> /dev/null | grep Connected | wc -l`
if [ $result -eq 1 ];then
nohup ./bin/mqbroker -n ${mqnamesrv_ip}:${mqnamesrv_port} > $mqbroker_log_path 2>1 &
timeout +3 tailf $mqbroker_log_path
else
echo -e "\033[31m请先启动 mqnamesrv\033[0m"
fi
;;
stop_broker)
./bin/mqshutdown broker
;;
*)
echo -e "\033[33m后面需追加参数 [start_namesrv|stop_namesrv|start_broker|stop_broker] \033[0m"
;;
esac
else
echo -e "\033[31m请切换启动用户: \033[0m"
fi
集群部署
集群架构
rocketmq默认有三种配置模式:
2m-2s-async:(主从异步)
2m-2s-sync:(主从同步)
2m-noslave:(仅master)
每个服务器需开放七个端口
9876 #namesrv
10911、10909、10912 #broker主节点
11011、11009、11012 #broker从节点
端口规则说明:
namesrv默认端口:9876
假设broker配置的 ListenPort 端口:10911
则vip 通道端口为:ListenPort - 2 = 10909
则HA 通道端口为: ListenPort + 1 = 10912
双主双从异步复制的Broker集群,三台服务器都需要安装 JDK 和 Rocketmq
集群信息
服务器 | IP | NameServer | Broker角色 |
server1 | 192.168.1.10 | 192.168.1.10:9876 | broker-a(master),broker-b-s(slave) |
server2 | 192.168.1.11 | 192.168.1.10:9876 | broker-b(master),broker-a-s(slave) |
server2 | 192.168.1.11 | 192.168.1.10:9876 |
启动namesrv
nameserver是⼀个轻量级的注册中心,broker把自己的信息注册到nameserver上。
nameserver是无状态的,直接启动即可。三台nameserver之间不需要通信,而是被请求方来关联三台nameserver的地址。
每台节点创建日志目录
cd /opt/rocketmq4.8.0/bin
mkdir logs
# server1
## 后台静默启动
nohup ./mqnamesrv -n 192.168.1.10:9876 > logs/mqnamesrv.out 2>1 &
# server2
## 后台静默启动
nohup ./mqnamesrv -n 192.168.1.11:9876 > logs/mqnamesrv.out 2>1 &
# server3
## 后台静默启动
nohup ./mqnamesrv -n 192.168.1.12:9876 > logs/mqnamesrv.out 2>1 &
配置broker
broker-a、broker-b-s这两台broker是配置在服务器1上
broker-b、broker-a-s这两台broker是配置在服务器2上
# server1-a
进⼊到conf/2m-2s-async⽂件夹内,修改broker-a.properties⽂件
cd /opt/home/rocketmq4.8.0/conf/2m-2s-async
vi broker-a.properties
# 指定整个broker集群的名称,或者说是RocketMQ集群的名称
brokerClusterName=DefaultCluster
# 指定master-slave集群的名称。一个RocketMQ集群可以包含多个master-slave集群
brokerName=broker-a
# broker所在服务器的ip
brokerIP1=192.168.0.101
# broker的id,0表示master,>0表示slave
brokerId=0
# 指定删除消息存储过期文件的时间为凌晨4点
deleteWhen=04
# 指定未发生更新的消息存储文件的保留时长为48小时,48小时后过期,将会被删除
fileReservedTime=48
# 指定当前broker为异步复制master
brokerRole=ASYNC_MASTER
# 指定刷盘策略为异步刷盘
flushDiskType=ASYNC_FLUSH
# 指定Name Server的地址
namesrvAddr=192.168.1.10:9876;192.168.1.11:9876;192.168.1.12:9876
# 在发送消息⾃动创建不存在的topic时,默认创建的队列数为4个
defaultTopicQueueNums=4
# 是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
# 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
# broker对外服务的监听端口
listenPort=10911
# abort文件存储路径
abortFile=/data/rocketmq/store/abort
# 消息存储路径
storePathRootDir=/data/rocketmq/store
# commitLog存储路径
storePathCommitLog=/data/rocketmq/store/commitlog
# 消费队列存储路径
storePathConsumeQueue=/data/rocketmq/store/consumequeue
# 消息索引存储路径
storePathIndex=/data/rocketmq/store/index
# checkpoint文件存储路径
storeCheckpoint=/data/rocketmq/store/checkpoint
# 限制的消息大小
maxMessageSize=65536
# server2-a-s
进⼊到conf/2m-2s-async文件夹内,修改broker-a-s.properties文件
cd /opt/home/rocketmq4.8.0/conf/2m-2s-async
vi broker-a-s.properties
# 指定整个broker集群的名称,或者说是RocketMQ集群的名称
brokerClusterName=DefaultCluster
# 指定master-slave集群的名称。一个RocketMQ集群可以包含多个master-slave集群
brokerName=broker-a
# broker所在服务器的ip
brokerIP1=192.168.0.102
# broker的id,0表示master,>0表示slave
brokerId=1
# 指定删除消息存储过期文件的时间为凌晨4点
deleteWhen=04
# 指定未发生更新的消息存储文件的保留时长为48小时,48小时后过期,将会被删除
fileReservedTime=48
# 指定当前broker为异步复制master
brokerRole=SLAVE
# 指定刷盘策略为异步刷盘
flushDiskType=ASYNC_FLUSH
# 指定Name Server的地址
namesrvAddr=192.168.1.10:9876;192.168.1.11:9876;192.168.1.12:9876
# 在发送消息⾃动创建不存在的topic时,默认创建的队列数为4个
defaultTopicQueueNums=4
# 是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
# 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
# broker对外服务的监听端口
listenPort==11011
# abort文件存储路径
abortFile=/data/rocketmq/store-slave/abort
# 消息存储路径
storePathRootDir=/data/rocketmq/store-slave
# commitLog存储路径
storePathCommitLog=/data/rocketmq/store-slave/commitlog
# 消费队列存储路径
storePathConsumeQueue=/data/rocketmq/store-slave/consumequeue
# 消息索引存储路径
storePathIndex=/data/rocketmq/store-slave/index
# checkpoint文件存储路径
storeCheckpoint=/data/rocketmq/store-slave/checkpoint
# 限制的消息大小
maxMessageSize=65536
# server-b
进入到conf/2m-2s-async文件夹内,修改broker-b.properties文件
cd /opt/home/rocketmq4.8.0/conf/2m-2s-async
vi broker-b.properties
# 指定整个broker集群的名称,或者说是RocketMQ集群的名称
brokerClusterName=DefaultCluster
# 指定master-slave集群的名称。一个RocketMQ集群可以包含多个master-slave集群
brokerName=broker-b
# broker所在服务器的ip
brokerIP1=192.168.0.102
# broker的id,0表示master,>0表示slave
brokerId=0
# 指定删除消息存储过期文件的时间为凌晨4点
deleteWhen=04
# 指定未发生更新的消息存储文件的保留时长为48小时,48小时后过期,将会被删除
fileReservedTime=48
# 指定当前broker为异步复制master
brokerRole=ASYNC_MASTER
# 指定刷盘策略为异步刷盘
flushDiskType=ASYNC_FLUSH
# 指定Name Server的地址
namesrvAddr=192.168.0.101:9876;192.168.0.102:9876;192.168.0.103:9876
# 在发送消息⾃动创建不存在的topic时,默认创建的队列数为4个
defaultTopicQueueNums=4
# 是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
# 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
# broker对外服务的监听端口
listenPort=10911
# abort文件存储路径
abortFile=/data/rocketmq/store/abort
# 消息存储路径
storePathRootDir=/data/rocketmq/store
# commitLog存储路径
storePathCommitLog=/data/rocketmq/store/commitlog
# 消费队列存储路径
storePathConsumeQueue=/data/rocketmq/store/consumequeue
# 消息索引存储路径
storePathIndex=/data/rocketmq/store/index
# checkpoint文件存储路径
storeCheckpoint=/data/rocketmq/store/checkpoint
# 限制的消息大小
maxMessageSize=65536
# server1-b-s
进入到conf/2m-2s-async文件夹内,修改broker-b-s.properties文件
cd /opt/home/rocketmq4.8.0/conf/2m-2s-async
vi broker-b-s.properties
# 指定整个broker集群的名称,或者说是RocketMQ集群的名称
brokerClusterName=DefaultCluster
# 指定master-slave集群的名称。一个RocketMQ集群可以包含多个master-slave集群
brokerName=broker-b
# broker所在服务器的ip
brokerIP1=192.168.0.101
# broker的id,0表示master,>0表示slave
brokerId=1
# 指定删除消息存储过期文件的时间为凌晨4点
deleteWhen=04
# 指定未发生更新的消息存储文件的保留时长为48小时,48小时后过期,将会被删除
fileReservedTime=48
# 指定当前broker为异步复制master
brokerRole=SLAVE
# 指定刷盘策略为异步刷盘
flushDiskType=ASYNC_FLUSH
# 指定Name Server的地址
namesrvAddr=192.168.0.101:9876;192.168.0.102:9876;192.168.0.103:9876
# 在发送消息⾃动创建不存在的topic时,默认创建的队列数为4个
defaultTopicQueueNums=4
# 是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
# 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
# broker对外服务的监听端口
listenPort=11011
# abort文件存储路径
abortFile=/data/rocketmq/store-slave/abort
# 消息存储路径
storePathRootDir=/data/rocketmq/store-slave
# commitLog存储路径
storePathCommitLog=/data/rocketmq/store-slave/commitlog
# 消费队列存储路径
storePathConsumeQueue=/data/rocketmq/store-slave/consumequeue
# 消息索引存储路径
storePathIndex=/data/rocketmq/store-slave/index
# checkpoint文件存储路径
storeCheckpoint=/data/rocketmq/store-slave/checkpoint
# 限制的消息大小
maxMessageSize=65536
启动broker
server1 启动broker-a(master)和broker-b-s(slave)
cd /opt/home/rocketmq4.8.0/bin
nohup ./mqbroker -c ../conf/2m-2s-async/broker-a.properties > logs/broker-a.out 2>1 &
nohup ./mqbroker -c ../conf/2m-2s-async/broker-b-s.properties > logs/broker-b-s.out 2>1 &
server2 启动broker-b(master)和broker-a-s(slave)
cd /opt/home/rocketmq4.8.0/bin
nohup ./mqbroker -c ../conf/2m-2s-async/broker-b.properties > logs/broker-b.out 2>1 &
nohup ./mqbroker -c ../conf/2m-2s-async/broker-a-s.properties > logs/broker-a-s.out 2>1 &
验证集群
使用RocketMQ提供的tools工具验证集群是否正常工作
在server1上配置环境变量,用于被tools中的生产者和消费者程序读取该变量
# 配置环境变量
export NAMESRV_ADDR='192.168.0.101:9876;192.168.0.102:9876;192.168.0.103:9876'
# 环境变量生效
source /etc/profile
# 启动生产者
./tools.sh org.apache.rocketmq.example.quickstart.Producer
# 启动消费者
./tools.sh org.apache.rocketmq.example.quickstart.Consumer