秒杀RocketMQ
1、RocketMQ简介与安装
1.1、RocketMQ简介
Apache RocketMQ是一个采用Java语言开发的分布式的消息系统,由阿里巴巴团队开发,与2016年底贡献给 Apache,成为了Apache的一个顶级项目。 在阿里内部,RocketMQ 很好地服务了 集 团大大小小上千个应用,在每年的双十一当天,更有不可思议的万亿级 消息通过 RocketMQ 流转(在 2017 年的双十一当天,整个阿里巴巴集团通过 RocketMQ 流转的线上消息达到了 万 亿级,峰值 TPS 达到 5600 万),在阿里大中台策略上发挥着举足轻重的作用 。 地址:http://rocketmq.apache.org/
1.2、RocketMQ的历史发展
阿里巴巴消息中间件起源 于 2001 年的五彩石项目, Notify 在这期间应运而生,用于交易核心消息的流转 。 2010 年, B2B 开始大规模使用 ActiveMQ 作为消息内核,随着阿里业务 的快速发展,急需一款支持顺序消 息,拥有海量消息堆积能力的消息中间件, MetaQ 1.0 在 2011 年诞生 。 2012年, MetaQ已经发展到了3.0版本,并抽象出了通用的消息引擎 RocketMQ。 随后,对 RocketMQ 进行 了开源 , 阿里的消息中间件正式走人了 公众视野 。 2015年, RocketMQ已经经历了多年双十一的洗礼,在可用性、 可靠性以 及稳定性等方面都有出色的表现。 与此同时 ,云计算大行其道, 阿里消息中间 件基于 RocketMQ推出了 Aliware MQ 1.0,开始为阿里云上成 千上万家企业提 供消息服务 。
2016 年, MetaQ 在双十一期间承载了万亿级消息的流转,跨越了一个新的里程碑 ,同时 RocketMQ 进入Apache 孵化 。
1.3、核心概念说明
- Producer
- 消息生产者,负责产生消息,一般由业务系统负责产生消息。
- Producer Group
- 一类 Producer 的集合名称,这类 Producer 通常发送一类消息,且发送逻辑一致。
- Consumer
- 消息费者,负责消费消息,一般是后台系统负责异步消费。
- Push Consumer
- 服务端向消费者端推送消息
- Pull Consumer
- 消费者端向服务定时拉取消息
- Consumer Group
- 一类 Consumer 的集合名称,这类 Consumer 通常消费一类消息,且消费逻辑一致。
- NameServer
- 集群架构中的组织协调员
- 收集broker的工作情况
- 不负责消息的处理
- Broker
- 是RocketMQ的核心负责消息的发送、接收、高可用等(真正干活的)
- 需要定时发送自身情况到NameServer,默认10秒发送一次,超时2分钟会认为该broker失效。
- Topic
- 不同类型的消息以不同的Topic名称进行区分,如User、Order等
- 是逻辑概念
- Message Queue
- 消息队列,用于存储消息
1.4、部署安装
1.4.1、下载
下载地址:http://www.apache.org/dyn/closer.cgi?path=rocketmq/4.3.2/rocketmq-all-4.3.2-bin-release.zip 版本使用目前最新版:4.3.2
1.4.2、非Docker安装
#启动nameserver
bin/mqnamesrv
# The Name Server boot success. serializeType=JSON 看到这个表示已经提供成功
#启动broker
bin/mqbroker -n 192.100.3.90:9876 #-n 指定nameserver地址和端口
#启动出错
Java HotSpot(TM) 64-Bit Server VM warning: INFO:
os::commit_memory(0x00000005c0000000, 8589934592, 0) failed; error='Cannot allocate
memory' (errno=12)
……………………………………………………………………………………
启动错误,是因为内存不够,导致启动失败,原因:RocketMQ的配置默认是生产环境的配置,设置的jvm的内存 大小值比较大,对于学习而言没有必要设置这么大,测试环境的内存往往都不是很大,所以需要调整默认值。
#调整默认的内存大小参数
cd bin/
vim runserver.sh
JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx128m -Xmn128m -XX:MetaspaceSize=128m -
XX:MaxMetaspaceSize=128m"
cd bin/
vim runbroker.sh
JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx128m -Xmn128m"
#从新启动测试
bin/mqbroker -n 192.100.3.90:9876
The broker[elk-docker01 , 192.100.3.90:10911] boot success. serializeType=JSON and name
server is 192.100.3.90:9876
下面进行发送消息测试:
export NAMESRV_ADDR=127.0.0.1:9876
cd bin
sh tools.sh org.apache.rocketmq.example.quickstart.Producer
#测试结果
SendResult [sendStatus=SEND_OK, msgId=AC110001473C7D4991AD336AEA5703E0,
offsetMsgId=AC11000100002A9F00000000000E8580, messageQueue=MessageQueue
[topic=TopicTest, brokerName=itcast, queueId=3], queueOffset=1323]
SendResult [sendStatus=SEND_OK, msgId=AC110001473C7D4991AD336AEA5903E1,
offsetMsgId=AC11000100002A9F00000000000E8634, messageQueue=MessageQueue
[topic=TopicTest, brokerName=itcast, queueId=0], queueOffset=1323]
SendResult [sendStatus=SEND_OK, msgId=AC110001473C7D4991AD336AEA5F03E2,
offsetMsgId=AC11000100002A9F00000000000E86E8, messageQueue=MessageQueue
[topic=TopicTest, brokerName=itcast, queueId=1], queueOffset=1323]
SendResult [sendStatus=SEND_OK, msgId=AC110001473C7D4991AD336AEA6103E3,
offsetMsgId=AC11000100002A9F00000000000E879C, messageQueue=MessageQueue
[topic=TopicTest, brokerName=itcast, queueId=2], queueOffset=1323]
#可以正常发送消息
测试接收消息:
sh tools.sh org.apache.rocketmq.example.quickstart.Consumer
#测试结果
ConsumeMessageThread_7 Receive New Messages: [MessageExt [queueId=2, storeSize=180,
queueOffset=1322, sysFlag=0, bornTimestamp=1544456244818,
bornHost=/172.16.55.185:33702, storeTimestamp=1544456244819,
storeHost=/172.17.0.1:10911, msgId=AC11000100002A9F00000000000E84CC,
commitLogOffset=951500, bodyCRC=684865321, reconsumeTimes=0,
preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0,
properties={MIN_OFFSET=0, MAX_OFFSET=1325, CONSUME_START_TIME=1544456445397,
UNIQ_KEY=AC110001473C7D4991AD336AEA5203DF, WAIT=true, TAGS=TagA}, body=[72, 101, 108,
108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 57, 49],
transactionId='null'}]]
ConsumeMessageThread_6 Receive New Messages: [MessageExt [queueId=2, storeSize=180,
queueOffset=1323, sysFlag=0, bornTimestamp=1544456244833,
bornHost=/172.16.55.185:33702, storeTimestamp=1544456244835,
storeHost=/172.17.0.1:10911, msgId=AC11000100002A9F00000000000E879C,
commitLogOffset=952220, bodyCRC=801108784, reconsumeTimes=0,
preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0,
properties={MIN_OFFSET=0, MAX_OFFSET=1325, CONSUME_START_TIME=1544456445397,
UNIQ_KEY=AC110001473C7D4991AD336AEA6103E3, WAIT=true, TAGS=TagA}, body=[72, 101, 108,
108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 57, 53],
transactionId='null'}]]
#从结果中,可以看出,接收消息正常
1.4.3、编写Java代码进行测试
第一步,创建rocketmq工程 第二步,导入依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.sdx</groupId>
<artifactId>rocketmq</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.3.2</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
第三步,编写代码
package com.sdx.rocketmq;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class SyncProducer {
public static void main(String[] args) throws Exception {
//Instantiate with a producer group name.
DefaultMQProducer producer = new
DefaultMQProducer("test-group");
// Specify name server addresses.
producer.setNamesrvAddr("192.100.3.90:9876");
//Launch the instance.
producer.start();
for (int i = 0; i < 100; i++) {
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("broker01" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */);
//Call send message to deliver message to one of brokers.
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
//Shut down once the producer instance is not longer in use.
producer.shutdown();
}
}
测试:
Exception in thread "main"
org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException:
sendDefaultImpl call timeout
at
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(Defaul
tMQProducerImpl.java:612)
at
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducer
Impl.java:1253)
at
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducer
Impl.java:1203)
at
org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:214
)
at cn.itcast.rocketmq.SyncProducer.main(SyncProducer.java:26)
测试结果会发现,发送消息会报错。 原因是什么呢? 仔细观察broker启动的信息:
The broker[elk-docker01,172.17.0.1:10911] boot success. serializeType=JSON and name server
is 192.100.3.90:9876
会发现,broker的ip地址是172.17.0.1,那么在开发机上是不可能访问到的。 所以,需要指定broker的ip地址。
#创建broker配置文件
vim /rmq/rmqbroker/conf/broker.conf
brokerIP1=192.100.3.90
namesrvAddr=192.100.3.90:9876
brokerName=broker_demo
#启动broker,通过 -c 指定配置文件
bin/mqbroker -c /rmq/rmqbroker/conf/broker.conf
The broker[elk-docker01,192.100.3.90:10911] boot success. serializeType=JSON and name
server is 192.100.3.90:9876 #这样就可以进行访问了
测试:
D:\sdx\repository\org\apache\commons\commons-lang3\3.4\commons-lang3-3.4.jar com.sdx.rocketmq.SyncProducer
SendResult [sendStatus=SEND_OK, msgId=C064057031F818B4AAC21DB01CA30000, offsetMsgId=C064035A00002A9F000000000001175B, messageQueue=MessageQueue [topic=broker01, brokerName=broker01, queueId=6], queueOffset=50]
...
...
...
offsetMsgId=C064035A00002A9F0000000000015B75, messageQueue=MessageQueue [topic=broker01, brokerName=broker01, queueId=0], queueOffset=61]
SendResult [sendStatus=SEND_OK, msgId=C064057031F818B4AAC21DB01E5A0063, offsetMsgId=C064035A00002A9F0000000000015C27, messageQueue=MessageQueue [topic=broker01, brokerName=broker01, queueId=1], queueOffset=61]
Process finished with exit code 0
1.4.4、通过docker安装
#拉取镜像
docker pull foxiswho/rocketmq:server-4.3.2
docker pull foxiswho/rocketmq:broker-4.3.2
#创建nameserver容器
docker create -p 9876:9876 --name rmqserver \
-e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m" \
-e "JAVA_OPTS=-Duser.home=/opt" \
-v /rmq/rmqserver/logs:/opt/logs \
-v /rmq/rmqserver/store:/opt/store \
foxiswho/rocketmq:server-4.3.2
#创建broker容器
docker create -p 10911:10911 -p 10909:10909 --name rmqbroker \
-e "JAVA_OPTS=-Duser.home=/opt" \
-e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m" \
-v /rmq/rmqbroker/conf/broker.conf:/etc/rocketmq/broker.conf \
-v /rmq/rmqbroker/logs:/opt/logs \
-v /rmq/rmqbroker/store:/opt/store \
foxiswho/rocketmq:broker-4.3.2
#启动容器
docker start rmqserver rmqbroker
#停止删除容器
docker stop rmqbroker rmqserver
docker rm rmqbroker rmqserver
经测试,可以正常发送、接收消息。
1.4.5、部署RocketMQ的管理工具
RocketMQ提供了UI管理工具,名为rocketmq-console,项目地址:http://github.com/apache/rocketmq-externals/tree/master/rocketmq-console 该工具支持docker以及非docker安装,这里我们选择使用docker安装
#拉取镜像
docker pull styletang/rocketmq-console-ng:1.0.0
#创建并启动容器
docker run -e "JAVA_OPTS=-Drocketmq.namesrv.addr=192.100.3.90:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" -p 8082:8080 -t styletang/rocketmq-
console-ng:1.0.0
通过浏览器进行访问:http://192.100.3.90:8082/#/ 切换语言到中文,所有的功能就一目了然了。
2、快速入门
2.1、创建topic
package com.sdx.rocketmq.topic;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
public class TopicDemo {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("demo");
//设置nameserver的地址
producer.setNamesrvAddr("192.100.3.90:9876");
// 启动生产者
producer.start();
/**
* 创建topic,参数分别是:broker的名称,topic的名称,queue的数量
*
*/
producer.createTopic("demo", "my-topic", 8);
System.out.println("topic创建成功!");
producer.shutdown();
}
}
2.2、发送消息(同步)
package com.sdx.rocketmq.sendmsg;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
public class SyncProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("demo");
producer.setNamesrvAddr("192.100.3.90:9876");
producer.start();
//发送消息
String msg = "我的第一个消息!";
// Message message = new Message("my-topic", "delete", msg.getBytes("UTF-8"));
Message message = new Message("demo01", "delete",msg.getBytes("UTF-8"));
SendResult sendResult = producer.send(message);
System.out.println("消息状态:" + sendResult.getSendStatus());
System.out.println("消息id:" + sendResult.getMsgId());
System.out.println("消息队列:" + sendResult.getMessageQueue());
System.out.println("消息offset值:" + sendResult.getQueueOffset());
System.out.println(sendResult);
producer.shutdown();
}
}
消息id:C06405703F5818B4AAC21DC99A0B0000 消息队列:MessageQueue [topic=demo01, brokerName=broker01, queueId=0] 消息offset值:0 SendResult [sendStatus=SEND_OK, msgId=C06405703F5818B4AAC21DC99A0B0000, offsetMsgId=C064035A00002A9F0000000000015CD9, messageQueue=MessageQueue [topic=demo01, brokerName=broker01, queueId=0], queueOffset=0]
Process finished with exit code 0
2.2.1、Message数据结构
2.3、发送消息(异步)
package com.sdx.rocketmq.sendmsg;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
public class AsyncProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("demo");
producer.setNamesrvAddr("192.100.3.90:9876");
producer.start();
// 发送消息
String msg = "我的第一个异步发送消息!";
Message message = new Message("demo01", msg.getBytes("UTF-8"));
producer.send(message, new SendCallback() {
public void onSuccess(SendResult sendResult) {
System.out.println("发送成功了!" + sendResult);
System.out.println("消息id:" + sendResult.getMsgId());
System.out.println("消息队列:" + sendResult.getMessageQueue());
System.out.println("消息offset值:" + sendResult.getQueueOffset());
}
public void onException(Throwable e) {
System.out.println("消息发送失败!" + e);
}
});
// producer.shutdown();
}
}
注意: producer.shutdown()要注释掉,否则发送失败。原因是,异步发送,还未来得及发送就被关闭了。
发送成功了!SendResult [sendStatus=SEND_OK, msgId=C0640570405C18B4AAC21DCDAC360000, offsetMsgId=C064035A00002A9F0000000000015D85, messageQueue=MessageQueue [topic=demo01, brokerName=broker01, queueId=1], queueOffset=1] 消息id:C0640570405C18B4AAC21DCDAC360000 消息队列:MessageQueue [topic=demo01, brokerName=broker01, queueId=1] 消息offset值:1
2.4、消费消息
package com.sdx.rocketmq.consumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import java.io.UnsupportedEncodingException;
import java.util.List;
public class ConsumerDemo {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("demo");
consumer.setNamesrvAddr("192.100.3.90:9876");
// 订阅消息,接收的是所有消息
consumer.subscribe("demo01", "*");
// consumer.subscribe("my-topic", "add || update");
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
try {
for (MessageExt msg : msgs) {
System.out.println("消息:" + new String(msg.getBody(), "UTF-8"));
}
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
System.out.println("接收到消息 -> " + msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者
consumer.start();
}
}
测试:
消息:我的第一个消息! 接收到消息 -> [MessageExt [queueId=1, storeSize=171, queueOffset=2, sysFlag=0, bornTimestamp=1575630062890, bornHost=/192.100.5.112:51854, storeTimestamp=1575629971007, storeHost=/192.100.3.90:10911, msgId=C064035A00002A9F0000000000015E3C, commitLogOffset=89660, bodyCRC=2012479578, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=’demo01’, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=3, CONSUME_START_TIME=1575630062923, UNIQ_KEY=C064057015AC18B4AAC21DD475290000, WAIT=true}, body=[-26, -120, -111, -25, -102, -124, -25, -84, -84, -28, -72, -128, -28, -72, -86, -26, -74, -120, -26, -127, -81, 33], transactionId=’null’}]]
其它订阅方式:
//完整匹配
consumer.subscribe("demo", "delete");
//或匹配
consumer.subscribe("demo", "delete || update");
2.5、消息过滤器
RocketMQ支持根据用户自定义属性进行过滤,过滤表达式类似于SQL的where,如:a> 5 AND b =’abc’ 发送消息:
package com.sdx.rocketmq.filter;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
public class SyncProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("demo");
producer.setNamesrvAddr("192.100.3.90:9876");
producer.start();
//发送消息
String msg = "这是一个用户的消息, id = 1003";
Message message = new Message("my-topic-filter", "delete", msg.getBytes("UTF-8"));
message.putUserProperty("sex","男");
message.putUserProperty("age","10");
SendResult sendResult = producer.send(message);
System.out.println("消息id:" + sendResult.getMsgId());
System.out.println("消息队列:" + sendResult.getMessageQueue());
System.out.println("消息offset值:" + sendResult.getQueueOffset());
System.out.println(sendResult);
producer.shutdown();
}
}
接收消息:
package com.sdx.rocketmq.filter;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.io.UnsupportedEncodingException;
import java.util.List;
public class ConsumerFilter {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("demo-consumer");
consumer.setNamesrvAddr("192.100.3.90:9876");
// 订阅消息,接收的是所有消息
// consumer.subscribe("my-topic", "*");
consumer.subscribe("my-topic-filter", MessageSelector.bySql("sex='女' AND age>=18"));
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
try {
for (MessageExt msg : msgs) {
System.out.println("消息:" + new String(msg.getBody(), "UTF-8"));
}
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
System.out.println("接收到消息 -> " + msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者
consumer.start();
}
}
测试,报错:
Exception in thread "main" org.apache.rocketmq.client.exception.MQClientException:
CODE: 1 DESC: The broker does not support consumer to filter message by SQL92
For more information, please visit the url, http://rocketmq.apache.org/docs/faq/
at
org.apache.rocketmq.client.impl.MQClientAPIImpl.checkClientInBroker(MQClientAPIImpl.j
ava:2089)
at
org.apache.rocketmq.client.impl.factory.MQClientInstance.checkClientInBroker(MQClient
Instance.java:432)
at
org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl.start(DefaultMQPus
hConsumerImpl.java:633)
at
org.apache.rocketmq.client.consumer.DefaultMQPushConsumer.start(DefaultMQPushConsumer
.java:520)
at cn.itcast.rocketmq.ConsumerFilterDemo.main(ConsumerFilterDemo.java:39)
原因是默认配置下,不支持自定义属性,需要设置开启:
#加入到broker的配置文件中
enablePropertyFilter=true
3、producer详解
3.1、顺序消息
在某些业务中,consumer在消费消息时,是需要按照生产者发送消息的顺序进行消费的,比如在电商系统中,订 单的消息,会有创建订单、订单支付、订单完成,如果消息的顺序发生改变,那么这样的消息就没有意义了。
3.1.1、生产者
package com.sdx.rocketmq.order;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class OrderProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("ORDER_PRODUCER");
producer.setNamesrvAddr("192.100.3.90:9876");
producer.start();
for (int i = 0; i < 100; i++) {
int orderId = i % 10; // 模拟生成订单id
String msgStr = "order --> " + i +", id = "+ orderId;
Message message = new Message("order_topic", "ORDER_MSG",
msgStr.getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(message, (mqs, msg, arg) -> {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}, orderId);
System.out.println(sendResult);
}
producer.shutdown();
}
}
3.1.2、消费者
package com.sdx.rocketmq.order;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;
import java.io.UnsupportedEncodingException;
import java.util.List;
public class OrderConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new
DefaultMQPushConsumer("ORDER_CONSUMER");
consumer.setNamesrvAddr("192.100.3.90:9876");
consumer.subscribe("order_topic", "*");
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeOrderlyContext context) {
for (MessageExt msg : msgs) {
try {
System.out.println(Thread.currentThread().getName() + " "
+ msg.getQueueId() + " "
+ new String(msg.getBody(),"UTF-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
// System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
}
}
3.1.3、测试
测试结果:相同订单id的消息会落到同一个queue中,一个消费者线程会顺序消费queue,从而实现顺序消费消 息。
3.2、分布式事务消息
3.2.1、回顾什么事务
聊什么是事务,最经典的例子就是转账操作,用户A转账给用户B1000元的过程如下:
- 用户A发起转账请求,用户A账户减去1000元
- 用户B的账户增加1000元
如果,用户A账户减去1000元后,出现了故障(如网络故障),那么需要将该操作回滚,用户A账户增加1000元。
这就是事务。
3.2.2、分布式事务
随着项目越来越复杂,越来越服务化,就会导致系统间的事务问题,这个就是分布式事务问题。 分布式事务分类有这几种:
- 基于单个JVM,数据库分库分表了(跨多个数据库)。
- 基于多JVM,服务拆分了(不跨数据库)。
- 基于多JVM,服务拆分了 并且数据库分库分表了。
解决分布式事务问题的方案有很多,使用消息实现只是其中的一种。
3.2.3、原理
Half(Prepare) Message
指的是暂不能投递的消息,发送方已经将消息成功发送到了 MQ 服务端,但是服务端未收到生产者对该消息的二次 确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半消息。
Message Status Check
由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,MQ 服务端通过扫描发现某条消息长 期处于“半消息”时,需要主动向消息生产者询问该消息的最终状态(Commit 或是 Rollback),该过程即消息回 查。
3.3.4、执行流程
- 发送方向 MQ 服务端发送消息。
- MQ Server 将消息持久化成功之后,向发送方 ACK 确认消息已经发送成功,此时消息为半消息。
- 发送方开始执行本地事务逻辑。
- 发送方根据本地事务执行结果向 MQ Server 提交二次确认(Commit 或是 Rollback),MQ Server 收到 Commit 状态则将半消息标记为可投递,订阅方最终将收到该消息;MQ Server 收到 Rollback 状态则删除半 消息,订阅方将不会接受该消息。
- 在断网或者是应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达 MQ Server,经过固定时间后 MQ Server 将对该消息发起消息回查。
- 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
- 发送方根据检查得到的本地事务的最终状态再次提交二次确认,MQ Server 仍按照步骤4对半消息进行操作。
3.3.5、生产者
package com.sdx.rocketmq.transaction;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
public class TransactionProducer {
public static void main(String[] args) throws Exception {
TransactionMQProducer producer = new
TransactionMQProducer("transaction_producer");
producer.setNamesrvAddr("192.100.3.90:9876");
// 设置事务监听器
producer.setTransactionListener(new TransactionListenerImpl());
producer.start();
// 发送消息
Message message = new Message("pay_topic", "用户A给用户B转账500元".getBytes("UTF-8"));
producer.sendMessageInTransaction(message, null);
Thread.sleep(999999);
producer.shutdown();
}
}
注意:发送消息使用的是TransactionMQProducer
3.3.6、本地事务处理
package com.sdx.rocketmq.transaction;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.HashMap;
import java.util.Map;
public class TransactionListenerImpl implements TransactionListener {
private static Map<String, LocalTransactionState> STATE_MAP = new HashMap<>();
/**
* 执行具体的业务逻辑
*
* @param msg 发送的消息对象
* @param arg
* @return
*/
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
System.out.println("用户A账户减500元.");
Thread.sleep(500); //模拟调用服务
// System.out.println(1/0);
System.out.println("用户B账户加500元.");
Thread.sleep(800);
STATE_MAP.put(msg.getTransactionId(), LocalTransactionState.COMMIT_MESSAGE);
// 二次提交确认
// return LocalTransactionState.UNKNOW;
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
e.printStackTrace();
}
STATE_MAP.put(msg.getTransactionId(), LocalTransactionState.ROLLBACK_MESSAGE);
// 回滚
return LocalTransactionState.ROLLBACK_MESSAGE;
}
/**
* 消息回查
*
* @param msg
* @return
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
System.out.println("状态回查 ---> " + msg.getTransactionId() +" " +STATE_MAP.get(msg.getTransactionId()) );
return STATE_MAP.get(msg.getTransactionId());
}
}
3.3.7、消费者
package com.sdx.rocketmq.transaction;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.io.UnsupportedEncodingException;
import java.util.List;
public class TransactionConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new
DefaultMQPushConsumer("TRA_CONSUMER");
consumer.setNamesrvAddr("192.100.3.90:9876");
// 订阅topic,接收此Topic下的所有消息
consumer.subscribe("pay_topic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
try {
System.out.println(new String(msg.getBody(), "UTF-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
3.3.8、测试
测试结果:返回commit状态时,消费者能够接收到消息,返回rollback状态时,消费者接受不到消息。
4、consumer详解
4.1、push和pull模式
在RocketMQ中,消费者有两种模式,一种是push模式,另一种是pull模式。 push模式:客户端与服务端建立连接后,当服务端有消息时,将消息推送到客户端。 pull模式:客户端不断的轮询请求服务端,来获取新的消息。 但在具体实现时,Push和Pull模式都是采用消费端主动拉取的方式,即consumer轮询从broker拉取消息。
区别:
Push方式里,consumer把轮询过程封装了,并注册MessageListener监听器,取到消息后,唤醒 MessageListener的consumeMessage()来消费,对用户而言,感觉消息是被推送过来的。 Pull方式里,取消息的过程需要用户自己写,首先通过打算消费的Topic拿到MessageQueue的集合,遍历 MessageQueue集合,然后针对每个MessageQueue批量取消息,一次取完后,记录该队列下一次要取的开 始offset,直到取完了,再换另一个MessageQueue。
疑问:既然是采用pull方式实现,RocketMQ如何保证消息的实时性呢?
4.1.1、长轮询
RocketMQ中采用了长轮询的方式实现,什么是长轮询呢?
长轮询即是在请求的过程中,若是服务器端数据并没有更新,那么则将这个连接挂起,直到服务器推送新的数据,再返回,然后进入循环周期。
客户端像传统轮询一样从服务端请求数据,服务端会阻塞请求不会立刻返回,直到有数据或超时才返回给客
户端,然后关闭连接,客户端处理完响应信息后再向服务器发送新的请求。
4.2、消息模式
DefaultMQPushConsumer实现了自动保存offset值以及实现多个consumer的负载均衡。
//设置组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("demo");
通过groupname将多个consumer组合在一起,那么就会存在一个问题,消息发送到这个组后,消息怎么分配呢? 这个时候,就需要指定消息模式,分别有集群和广播模式。
-
集群模式
-
同一个 ConsumerGroup(GroupName相同) 里的每 个 Consumer 只消费所订阅消息的一部分内容, 同
一个 ConsumerGroup 里所有的 Consumer消费的内容合起来才是所订阅 Topic 内容的整体, 从而达到 负载均衡的目的 。
-
-
广播模式
- 同一个 ConsumerGroup里的每个 Consumer都 能消费到所订阅 Topic 的全部消息,也就是一个消息会
被多次分发,被多个 Consumer消费。
// 集群模式
consumer.setMessageModel(MessageModel.CLUSTERING);
// 广播模式
consumer.setMessageModel(MessageModel.BROADCASTING);
4.3、重复消息的解决方案
造成消息重复的根本原因是:网络不可达。只要通过网络交换数据,就无法避免这个问题。所以解决这个问题的办 法就是绕过这个问题。那么问题就变成了:如果消费端收到两条一样的消息,应该怎样处理?
- 消费端处理消息的业务逻辑保持幂等性
- 保证每条消息都有唯一编号且保证消息处理成功与去重表的日志同时出现
第1条很好理解,只要保持幂等性,不管来多少条重复消息,最后处理的结果都一样。第2条原理就是利用一张日志表来记录已经处理成功的消息的ID,如果新到的消息ID已经在日志表中,那么就不再处理这条消息。 第1条解决方案,很明显应该在消费端实现,不属于消息系统要实现的功能。第2条可以消息系统实现,也可以业务端实现。正常情况下出现重复消息的概率其实很小,如果由消息系统来实现的话,肯定会对消息系统的吞吐量和高可用有影响,所以最好还是由业务端自己处理消息重复的问题,这也是RocketMQ不解决消息重复的问题的原因。
RocketMQ不保证消息不重复,如果你的业务需要保证严格的不重复消息,需要你自己在业务端去重。
5、RocketMQ存储
RocketMQ中的消息数据存储,采用了零拷贝技术(使用 mmap + write 方式),文件系统采用 Linux Ext4 文件系统进行存储。
5.1、消息数据的存储
在RocketMQ中,消息数据是保存在磁盘文件中,为了保证写入的性能,RocketMQ尽可能保证顺序写入,顺序写 入的效率比随机写入的效率高很多。 RocketMQ消息的存储是由ConsumeQueue和CommitLog配合完成的,CommitLog是真正存储数据的文件, ConsumeQueue是索引文件,存储数据指向到物理文件的配置。
如上图所示:
-
消息主体以及元数据都存储在CommitLog当中
-
Consume Queue相当于kafka中的partition,是一个逻辑队列,存储了这个Queue在CommiLog中的起始
offset,log大小和MessageTag的hashCode。
-
每次读取消息队列先读取consumerQueue,然后再通过consumerQueue去commitLog中拿到消息主体。
文件位置:
5.2、同步刷盘与异步刷盘
RocketMQ 为了提高性能,会尽可能地保证 磁盘的顺序写。消息在通过 Producer 写入 RocketMQ 的时候,有两 种写磁盘方式,分别是同步刷盘与异步刷盘。
-
同步刷盘
-
在返回写成功状态时,消息已经被写入磁盘 。
-
具体流程是:消息写入内存的 PAGECACHE 后,立刻通知刷盘线程刷盘,然后等待刷盘完成,刷盘线程
执行完成后唤醒等待的线程,返回消息写成功的状态 。
-
-
异步刷盘
- 在返回写成功状态时,消息可能只是被写入了内存的 PAGECACHE,写操作的返回快,吞吐量大
- 当内存里的消息量积累到一定程度时,统一触发写磁盘动作,快速写入。
-
broker配置文件中指定刷盘方式
- flushDiskType=ASYNC_FLUSH – 异步
- flushDiskType=SYNC_FLUSH – 同步
6、重试策略
在消息的发送和消费过程中,都有可能出现错误,如网络异常等,出现了错误就需要进行错误重试,这种消息的重 试需要分2种,分别是producer端重试和consumer端重试。
6.1、producer端重试
生产者端的消息失败,也就是Producer往MQ上发消息没有发送成功,比如网络抖动导致生产者发送消息到MQ失 败。
package com.sdx.rocketmq.error;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class SyncProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("HAOKE_IM");
producer.setNamesrvAddr("192.100.3.90:9876");
//消息发送失败时,重试3次
producer.setRetryTimesWhenSendFailed(3);
producer.start();
String msgStr = "用户A发送消息给用户B";
Message msg = new Message("haoke_im_topic", "SEND_MSG",
msgStr.getBytes(RemotingHelper.DEFAULT_CHARSET));
// 发送消息,并且指定超时时间
SendResult sendResult = producer.send(msg, 1000);
System.out.println("消息状态:" + sendResult.getSendStatus());
System.out.println("消息id:" + sendResult.getMsgId());
System.out.println("消息queue:" + sendResult.getMessageQueue());
System.out.println("消息offset:" + sendResult.getQueueOffset());
System.out.println(sendResult);
producer.shutdown();
}
}
6.2、consumer端重试
消费者端的失败,分为2种情况,一个是exception,一个是timeout。
6.2.1、exception
消息正常的到了消费者,结果消费者发生异常,处理失败了。例如反序列化失败,消息数据本身无法处理(例如话 费充值,当前消息的手机号被注销,无法充值)等。 消息的状态:
package org.apache.rocketmq.client.consumer.listener;
public enum ConsumeConcurrentlyStatus {
/**
* Success consumption
*/
CONSUME_SUCCESS,
/**
* Failure consumption,later try to consume
*/
RECONSUME_LATER;
}
可以看到,消息的状态分为成功或者失败。如果返回的状态为失败会怎么样呢? 在启动broker的日志中可以看到这样的信息:
INFO main - messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h
2h
这个表示了,如果消息消费失败,那么消息将会在1s、5s、10s后重试,一直到2h后不再重试。 其实,有些时候并不需要重试这么多次,一般重试3~5次即可。这个时候就可以通过msg.getReconsumeTimes()获取重试次数进行控制。
package com.sdx.rocketmq.error;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.io.UnsupportedEncodingException;
import java.util.List;
public class ConsumerDemo {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("HAOKE_IM");
consumer.setNamesrvAddr("192.100.3.90:9876");
// 订阅topic,接收此Topic下的所有消息
consumer.subscribe("my-test-topic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
try {
System.out.println(new String(msg.getBody(), "UTF-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
System.out.println("收到消息->" + msgs);
if(msgs.get(0).getReconsumeTimes() >= 3){
// 重试3次后,不再进行重试
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});
consumer.start();
}
}
6.2.2、timeout
比如由于网络原因导致消息压根就没有从MQ到消费者上,那么在RocketMQ内部会不断的尝试发送这条消息,直 至发送成功为止! 也就是说,服务端没有接收到消息的反馈,既不是成功也不是失败,这个时候定义为超时。
7、RocketMQ的集群
7.1、集群模式
在RocketMQ中,集群的部署模式是比较多的,有以下几种:
-
单个Master
- 这种方式风险较大,一旦Broker重启或者宕机时,会导致整个服务不可用,不建议线上环境使用。
-
多Master模式
- 一个集群无Slave,全是Master,例如2个Master或者3个Master
- 单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到影响。
-
多Master多Slave模式,异步复制
-
每个Master配置一个Slave,有多对Master-Slave,HA采用异步复制方式,主备有短暂消息延迟,毫秒级。
-
优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,因为Master宕机后,消费者仍然
可以从Slave消费,此过程对应用透明,不需要人工干预。性能同多Master模式几乎一样。
-
缺点:Master宕机,磁盘损坏情况,会丢失少量消息。
-
-
多Master多Slave模式,同步双写
- 每个Master配置一个Slave,有多对Master-Slave,HA采用同步双写方式,主备都写成功,向应用返回成功。
- 优点:数据与服务都无单点,Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高。
- 缺点:性能比异步复制模式略低,大约低10%左右。
7.2搭建2m2s集群
7.2.1 集群工作流程
- 启动NameServer,NameServer起来后监听端口,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心。
- Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。
- 收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。
- Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。
- Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。
7.2.2环境变量配置
vim /etc/profile
在profile文件的末尾加入如下命令
#set rocketmq
export ROCKETMQ_HOME=/usr/local/rocketmq/
export PATH=$PATH:$ROCKETMQ_HOME/bin
输入:wq! 保存并退出, 并使得配置立刻生效:
source /etc/profile
7.2.3创建消息存储路径
mkdir /usr/local/rocketmq/store
mkdir /usr/local/rocketmq/store/commitlog
mkdir /usr/local/rocketmq/store/consumequeue
mkdir /usr/local/rocketmq/store/index
7.2.4broker配置文件
1)master1 服务器:192.100.3.72
vi /usr/soft/rocketmq/conf/2m-2s-sync/broker-a.properties
修改配置如下:
#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=0
brokerIP1=192.100.3.72
#nameServer地址,分号分割
namesrvAddr=192.100.3.72:9876;192.100.3.73:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#‐ ASYNC_MASTER 异步复制Master
#‐ SYNC_MASTER 同步双写Master
#‐ SLAVE
brokerRole=SYNC_MASTER
#刷盘方式
#‐ ASYNC_FLUSH 异步刷盘
#‐ SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
2)slave2 服务器:192.100.3.72
vi /usr/soft/rocketmq/conf/2m-2s-sync/broker-b-s.properties
修改配置如下:
#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-b
#0 表示 Master,>0 表示 Slave
brokerId=1
brokerIP1=192.100.3.72
#nameServer地址,分号分割
namesrvAddr=192.100.3.72:9876;192.100.3.73:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=11011
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store1
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store/commitlog1
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue1
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/index1
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint1
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store/abort1
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
1)master2 服务器:192.100.3.73
vi /usr/soft/rocketmq/conf/2m-2s-sync/broker-b.properties
修改配置如下:
#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-b
#0 表示 Master,>0 表示 Slave
brokerId=0
brokerIP1=192.100.3.73
#nameServer地址,分号分割
namesrvAddr=192.100.3.72:9876;192.100.3.73:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#‐ ASYNC_MASTER 异步复制Master
#‐ SYNC_MASTER 同步双写Master
#‐ SLAVE
brokerRole=SYNC_MASTER
#刷盘方式
#‐ ASYNC_FLUSH 异步刷盘
#‐ SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
2)slave1 服务器:192.100.3.72
vi /usr/soft/rocketmq/conf/2m-2s-sync/broker-a-s.properties
修改配置如下:
#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=1
brokerIP1=192.100.3.73
#nameServer地址,分号分割
namesrvAddr=192.100.3.72:9876;192.100.3.73:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=11011
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store1
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store/commitlog1
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue1
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/index1
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint1
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store/abort1
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
7.2.5修改启动脚本文件
1)runbroker.sh
vi /usr/local/rocketmq/bin/runbroker.sh
需要根据内存大小进行适当的对JVM参数进行调整:
#===================================================
# 开发环境配置 JVM Configuration
JAVA_OPT="${JAVA_OPT} ‐server ‐Xms256m ‐Xmx256m ‐Xmn128m"
2)runserver.sh
vim /usr/local/rocketmq/bin/runserver.sh
JAVA_OPT="${JAVA_OPT} ‐server ‐Xms256m ‐Xmx256m ‐Xmn128m ‐XX:MetaspaceSize=128m ‐XX:MaxMetaspaceSize=320m"
7.2.6服务启动
#启动NameServe集群
nohup sh mqnamesrv &
#启动Broker集群
nohup sh mqbroker -c /usr/soft/rocketmq/conf/2m-2s-sync/broker-a.properties &
nohup sh mqbroker -c /usr/soft/rocketmq/conf/2m-2s-sync/broker-b-s.properties &
#启动NameServe集群
nohup sh mqnamesrv &
#启动Broker集群
nohup sh mqbroker -c/usr/soft/rocketmq/conf/2m-2s-sync/broker-b.properties &
nohup sh mqbroker -c /usr/soft/rocketmq/conf/2m-2s-sync/broker-a-s.properties &
7.2.7查看日志
# 查看nameServer日志
tail ‐500f ~/logs/rocketmqlogs/namesrv.log
# 查看broker日志
tail ‐500f ~/logs/rocketmqlogs/broker.log
7.3、搭建2m2s集群(docker搭建)
下面通过docker搭建2master+2slave的集群。
#创建2个master
#nameserver1
docker create -p 9876:9876 --name rmqserver01 \
-e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m" \
-e "JAVA_OPTS=-Duser.home=/opt" \
-v /rmq/rmqserver01/logs:/opt/logs \
-v /rmq/rmqserver01/store:/opt/store \
foxiswho/rocketmq:server-4.3.2
#nameserver2
docker create -p 9877:9876 --name rmqserver02 \
-e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m" \
-e "JAVA_OPTS=-Duser.home=/opt" \
-v /rmq/rmqserver02/logs:/opt/logs \
-v /rmq/rmqserver02/store:/opt/store \
foxiswho/rocketmq:server-4.3.2
#创建第1个master broker
#master broker01
docker create --net host --name rmqbroker01 \
-e "JAVA_OPTS=-Duser.home=/opt" \
-e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m" \
-v /rmq/rmqbroker01/conf/broker.conf:/etc/rocketmq/broker.conf \
-v /rmq/rmqbroker01/logs:/opt/logs \
-v /rmq/rmqbroker01/store:/opt/store \
foxiswho/rocketmq:broker-4.3.2
#配置
namesrvAddr=192.100.3.90:9876;192.100.3.90:9877
brokerClusterName=ItcastCluster
brokerName=broker01
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=SYNC_MASTER
flushDiskType=ASYNC_FLUSH
brokerIP1=192.100.3.90
brokerIp2=192.100.3.90
listenPort=10911
#创建第2个master broker
#master broker02
docker create --net host --name rmqbroker02 \
-e "JAVA_OPTS=-Duser.home=/opt" \
-e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m" \
-v /rmq/rmqbroker02/conf/broker.conf:/etc/rocketmq/broker.conf \
-v /rmq/rmqbroker02/logs:/opt/logs \
-v /rmq/rmqbroker02/store:/opt/store \
foxiswho/rocketmq:broker-4.3.2
#master broker02
namesrvAddr=192.100.3.90:9876;192.100.3.90:9877
brokerClusterName=ItcastCluster
brokerName=broker02
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=SYNC_MASTER
flushDiskType=ASYNC_FLUSH
brokerIP1=192.100.3.90
brokerIp2=192.100.3.90
listenPort=10811
#创建第1个slave broker
#slave broker01
docker create --net host --name rmqbroker03 \
-e "JAVA_OPTS=-Duser.home=/opt" \
-e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m" \
-v /rmq/rmqbroker03/conf/broker.conf:/etc/rocketmq/broker.conf \
-v /rmq/rmqbroker03/logs:/opt/logs \
-v /rmq/rmqbroker03/store:/opt/store \
foxiswho/rocketmq:broker-4.3.2
#slave broker01
namesrvAddr=192.100.3.90:9876;192.100.3.90:9877
brokerClusterName=ItcastCluster
brokerName=broker01
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
brokerIP1=192.100.3.90
brokerIp2=192.100.3.90
listenPort=10711
#创建第2个slave broker
#slave broker01
docker create --net host --name rmqbroker04 \
-e "JAVA_OPTS=-Duser.home=/opt" \
-e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m" \
-v /rmq/rmqbroker04/conf/broker.conf:/etc/rocketmq/broker.conf \
-v /rmq/rmqbroker04/logs:/opt/logs \
-v /rmq/rmqbroker04/store:/opt/store \
foxiswho/rocketmq:broker-4.3.2
#slave broker02
namesrvAddr=192.100.3.90:9876;192.100.3.90:9877
brokerClusterName=ItcastCluster
brokerName=broker02
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
brokerIP1=192.100.3.90
brokerIp2=192.100.3.90
listenPort=10611
#启动容器
docker start rmqserver01 rmqserver02
docker start rmqbroker01 rmqbroker02 rmqbroker03 rmqbroker04