Rocket MQ
Rocket MQ进阶
顺序消费
官方介绍
消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。
顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的
模拟一个订单业务
1 2 3 4 5 6 7 8 9 10 11
| @Data @NoArgsConstructor @AllArgsConstructor @ToString public class OrderStep implements Serializable {
private Long orderId;
private String desc;
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102
| public class RocketMQProducer {
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException, UnsupportedEncodingException {
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr("localhost:9876");
producer.start();
List<OrderStep> orderList = new ArrayList<OrderStep>();
OrderStep orderDemo = new OrderStep(); orderDemo.setOrderId(1L); orderDemo.setDesc("创建"); orderList.add(orderDemo);
orderDemo = new OrderStep(); orderDemo.setOrderId(2L); orderDemo.setDesc("创建"); orderList.add(orderDemo);
orderDemo = new OrderStep(); orderDemo.setOrderId(1L); orderDemo.setDesc("付款"); orderList.add(orderDemo);
orderDemo = new OrderStep(); orderDemo.setOrderId(3L); orderDemo.setDesc("创建"); orderList.add(orderDemo);
orderDemo = new OrderStep(); orderDemo.setOrderId(2L); orderDemo.setDesc("付款"); orderList.add(orderDemo);
orderDemo = new OrderStep(); orderDemo.setOrderId(3L); orderDemo.setDesc("付款"); orderList.add(orderDemo);
orderDemo = new OrderStep(); orderDemo.setOrderId(2L); orderDemo.setDesc("完成"); orderList.add(orderDemo);
orderDemo = new OrderStep(); orderDemo.setOrderId(1L); orderDemo.setDesc("推送"); orderList.add(orderDemo);
orderDemo = new OrderStep(); orderDemo.setOrderId(3L); orderDemo.setDesc("完成"); orderList.add(orderDemo);
orderDemo = new OrderStep(); orderDemo.setOrderId(1L); orderDemo.setDesc("完成"); orderList.add(orderDemo);
for (final OrderStep orderStep : orderList) {
Message message = new Message("topic7","tags1",orderList.toString().getBytes("utf-8"));
SendResult sendResult = producer.send(message, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
int orderId = orderStep.getOrderId().intValue();
int size = list.size();
int i = orderId % size;
MessageQueue messageQueue = list.get(i);
return messageQueue; } }, null); System.out.println("结果为:" + sendResult); }
producer.shutdown();
} }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| public class RocketMQConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("topic7","*");
consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
for (MessageExt msg : msgs) { System.out.println(msg); byte[] body = msg.getBody(); System.out.println(new String(body)); }
return ConsumeOrderlyStatus.SUCCESS; } });
consumer.start();
} }
|
事务信息
正常事务过程(红色)
事务补偿过程(蓝色)
事务信息状态
允许进入队列,此消息与非事务消息无区别
不允许进入队列,此消息等同于未发送过
完成了half消息的发送,未对MQ进行二次状态确认
事务消息仅与生产者有关,与消费者无关
演示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
| public class RocketMQProducer {
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException, UnsupportedEncodingException {
TransactionMQProducer producer = new TransactionMQProducer("group1");
producer.setNamesrvAddr("localhost:9876");
producer.setTransactionListener(new TransactionListener() {
@Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
System.out.println("正常事务执行");
return LocalTransactionState.UNKNOW; }
@Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { System.out.println("补偿事务执行");
return LocalTransactionState.COMMIT_MESSAGE;
} });
producer.start();
String msg = "Hello RocketMQ";
Message message = new Message("topic9","tags1",msg.getBytes("utf-8"));
TransactionSendResult sendResult = producer.sendMessageInTransaction(message, null);
System.out.println("结果为:" + sendResult);
} }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| public class RocketMQConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("topic9","*");
consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
for (MessageExt msg : msgs) { System.out.println(msg); byte[] body = msg.getBody(); System.out.println(new String(body)); }
return ConsumeOrderlyStatus.SUCCESS; } });
consumer.start();
}
}
|
RocketMQ集群
多个broker提供服务
集群流程
过程
NameServer启动,开启监听,等待broker、producer与consumer连接
broker启动,根据配置信息,连接所有的NameServer,并保持长连接,如果broker中有现存数据, NameServer将保存topic与broker关系
producer发信息,连接某个NameServer,并建立长连接
producer发消息,若果topic存在,由NameServer直接分配,如果topic不存在,由NameServer创建topic与broker关系,并分配
producer在broker的topic选择一个消息队列
producer与broker建立长连接,用于发送消息
producer发送消息
consumer与producer相同
master和slave的同步方式
较异步方式性能略低,消息无延迟
较同步方式性能略高,数据略有延迟
两主两从搭建
主从特点
保证一台机器宕机能正常运行
北京机房放着master1和slave2 深圳机房放着master2和slave1
搭建配置
模拟一下,这里起了两台虚拟机地址不同,模仿两台服务器
1 2 3 4 5 6 7 8
| # nameserver 192.168.117.200 rocketmq-nameserver1 192.168.117.201 rocketmq-nameserver2 # broker 192.168.117.200 rocketmq-master1 192.168.117.200 rocketmq-slave2 192.168.117.201 rocketmq-master2 192.168.117.201 rocketmq-slave1
|
这里地址名字自己对应
重启网卡 systemctl restart network
1 2 3 4 5 6
| systemctl stop firewalld.service
firewall-cmd --state
systemctl disable firewalld.service
|
1 2 3 4
| ROCKETMQ_HOME=/rocketmq PATH=$PATH:$ROCKETMQ_HOME/bin export ROCKETMQ_HOME PATH
|
这里路径自己对应
更新环境变量 source /etc/profile
1 2 3 4 5 6 7 8 9 10 11
| mkdir /rocketmq/store mkdir /rocketmq/store/commitlog mkdir /rocketmq/store/consumequeue mkdir /rocketmq/store/index
mkdir /rocketmq/store-slave mkdir /rocketmq/store-slave/commitlog mkdir /rocketmq/store-slave/consumequeue mkdir /rocketmq/store-slave/index
|
1 2 3
| cd /rocketmq/conf/2m-2s-sync
vim broker-a.properties
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
| brokerClusterName=rocketmq-cluster
brokerName=broker-a
brokerId=0
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
defaultTopicQueueNums=4
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
listenPort=10911
deleteWhen=04
fileReservedTime=120
mapedFileSizeCommitLog=1073741824
mapedFileSizeConsumeQueue=300000
diskMaxUsedSpaceRatio=88
storePathRootDir=/rocketmq/store
storePathCommitLog=/rocketmq/store/commitlog
storePathConsumeQueue=/rocketmq/store/consumequeue
storePathIndex=/rocketmq/store/index
storeCheckpoint=/rocketmq/store/checkpoint
abortFile=/rocketmq/store/abort
maxMessageSize=65536
brokerRole=SYNC_MASTER
flushDiskType=SYNC_FLUSH
|
1
| vim broker-b-s.properties
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
| brokerClusterName=rocketmq-cluster
brokerName=broker-b
brokerId=1
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
defaultTopicQueueNums=4
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
listenPort=11011
deleteWhen=04
fileReservedTime=120
mapedFileSizeCommitLog=1073741824
mapedFileSizeConsumeQueue=300000
diskMaxUsedSpaceRatio=88
storePathRootDir=/rocketmq/store-slave
storePathCommitLog=/rocketmq/store-slave/commitlog
storePathConsumeQueue=/rocketmq/store-slave/consumequeue
storePathIndex=/rocketmq/store-slave/index
storeCheckpoint=/rocketmq/store-slave/checkpoint
abortFile=/rocketmq/store-slave/abort
maxMessageSize=65536
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
|
1 2 3
| cd /rocketmq/conf/2m-2s-sync
vim broker-b.properties
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
| brokerClusterName=rocketmq-cluster
brokerName=broker-b
brokerId=0
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
defaultTopicQueueNums=4
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
listenPort=10911
deleteWhen=04
fileReservedTime=120
mapedFileSizeCommitLog=1073741824
mapedFileSizeConsumeQueue=300000
diskMaxUsedSpaceRatio=88
storePathRootDir=/rocketmq/store
storePathCommitLog=/rocketmq/store/commitlog
storePathConsumeQueue=/rocketmq/store/consumequeue
storePathIndex=/rocketmq/store/index
storeCheckpoint=/rocketmq/store/checkpoint
abortFile=/rocketmq/store/abort
maxMessageSize=65536
brokerRole=SYNC_MASTER
flushDiskType=SYNC_FLUSH
|
1
| vim broker-a-s.properties
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
| brokerClusterName=rocketmq-cluster
brokerName=broker-a
brokerId=1
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
defaultTopicQueueNums=4
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
listenPort=11011
deleteWhen=04
fileReservedTime=120
mapedFileSizeCommitLog=1073741824
mapedFileSizeConsumeQueue=300000
diskMaxUsedSpaceRatio=88
storePathRootDir=/rocketmq/store-slave
storePathCommitLog=/rocketmq/store-slave/commitlog
storePathConsumeQueue=/rocketmq/store-slave/consumequeue
storePathIndex=/rocketmq/store-slave/index
storeCheckpoint=/rocketmq/store-slave/checkpoint
abortFile=/rocketmq/store-slave/abort
maxMessageSize=65536
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
|
- 修改启动内存
因为默认8g,虚拟机没有分配那么大的内存,所以要进行修改
1
| vim /rocketmq/bin/runbroker.sh
|
1 2
| JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"
|
两台服务器都要修改
第一台服务器
1 2 3 4 5
| nohup sh mqnamesrv &
nohup sh mqbroker -c ../conf/2m-2s-sync/broker-a.properties &
nohup sh mqbroker -c ../conf/2m-2s-sync/broker-b-s.properties &
|
第二台服务器
1 2 3 4 5
| nohup sh mqnamesrv &
nohup sh mqbroker -c ../conf/2m-2s-sync/broker-a-s.properties &
nohup sh mqbroker -c ../conf/2m-2s-sync/broker-b.properties &
|
jps
查看启动了哪些
1 2 3 4 5
| export NAMESRV_ADDR=rocketmq-nameserver1:9876 添加环境变量
sh tools.sh org.apache.rocketmq.example.quickstart.Producer 测试生成生产者
sh tools.sh org.apache.rocketmq.example.quickstart.Consumer 测试生成消费者
|
正确的开始 微小的长进 然后持续 嘿 我是小博 带你一起看我目之所及的世界……