RocketMQ(二)

纸飞机

Rocket MQ

Rocket MQ进阶

顺序消费

官方介绍

消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。

顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的

MQ的顺序消费

模拟一个订单业务

  • 实体类
1
2
3
4
5
6
7
8
9
10
11
@Data
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class OrderStep implements Serializable {

private Long orderId;

private String desc;

}
  • producer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
public class RocketMQProducer {

public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException, UnsupportedEncodingException {

// 谁来发? 实例化一个默认的生产者
DefaultMQProducer producer = new DefaultMQProducer("group1");

// 发给谁? 发给命名服务器
producer.setNamesrvAddr("localhost:9876");
// 怎么发? 生产者跑起来
producer.start();


/**
* 生成模拟订单数据
*/
// 发什么? 模拟一个消息
List<OrderStep> orderList = new ArrayList<OrderStep>();

OrderStep orderDemo = new OrderStep();
orderDemo.setOrderId(1L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);

orderDemo = new OrderStep();
orderDemo.setOrderId(2L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);

orderDemo = new OrderStep();
orderDemo.setOrderId(1L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);

orderDemo = new OrderStep();
orderDemo.setOrderId(3L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);

orderDemo = new OrderStep();
orderDemo.setOrderId(2L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);

orderDemo = new OrderStep();
orderDemo.setOrderId(3L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);

orderDemo = new OrderStep();
orderDemo.setOrderId(2L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);

orderDemo = new OrderStep();
orderDemo.setOrderId(1L);
orderDemo.setDesc("推送");
orderList.add(orderDemo);

orderDemo = new OrderStep();
orderDemo.setOrderId(3L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);

orderDemo = new OrderStep();
orderDemo.setOrderId(1L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);


for (final OrderStep orderStep : orderList) {

// 形参:topic: 主题 tags: 标签 byte[]: 传输的数据,传输时都是字节数组
Message message = new Message("topic7","tags1",orderList.toString().getBytes("utf-8"));
// 发送
SendResult sendResult = producer.send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> list, Message message, Object o) {

// 拿到队列中信息的Id
int orderId = orderStep.getOrderId().intValue();

// 拿到整个队列长度
int size = list.size();

// 进行一个简单算法运算 取余
int i = orderId % size;

// 拿到顺序的id
MessageQueue messageQueue = list.get(i);

return messageQueue;
}
}, null);
System.out.println("结果为:" + sendResult);
}

// 关闭资源
producer.shutdown();

}
}
  • consumer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class RocketMQConsumer {

public static void main(String[] args) throws MQClientException {



// 谁来接收 实例化一个默认的Consumer 监听器监听的方式
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
// 每隔几秒发一次请求的方式
// DefaultMQPullConsumer consumer = new DefaultMQPullConsumer();

// 接受哪里的消息
consumer.setNamesrvAddr("localhost:9876");

// 监听哪个消息队列
consumer.subscribe("topic7","*");

// 一个线程只监听一个queue
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {

// 这里写业务
for (MessageExt msg : msgs) {
System.out.println(msg);
byte[] body = msg.getBody();
System.out.println(new String(body));
}
// ConsumeOrderlyStatus 一个枚举类
return ConsumeOrderlyStatus.SUCCESS;
}
});


// 启动
consumer.start();

// 不要关闭资源连接,因为在监听
}
}

事务信息

正常事务过程(红色)

事务补偿过程(蓝色)

RocketMQ事务流程

事务信息状态

  • 提交状态

允许进入队列,此消息与非事务消息无区别

  • 回滚状态

不允许进入队列,此消息等同于未发送过

  • 中间状态

完成了half消息的发送,未对MQ进行二次状态确认

  • 注意

事务消息仅与生产者有关,与消费者无关

演示

  • producer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public class RocketMQProducer {

public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException, UnsupportedEncodingException {

// 谁来发? 实例化一个事务生产者
TransactionMQProducer producer = new TransactionMQProducer("group1");

// 发给谁? 发给命名服务器
producer.setNamesrvAddr("localhost:9876");

// 事务监听
producer.setTransactionListener(new TransactionListener() {
// 正常事务情况下
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {

System.out.println("正常事务执行");
// 成功情况
// return LocalTransactionState.COMMIT_MESSAGE;
// 失败回滚情况
// return LocalTransactionState.ROLLBACK_MESSAGE;
// 未知,等待情况
return LocalTransactionState.UNKNOW;
}

// 事务补偿情况下
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
System.out.println("补偿事务执行");
// 成功情况
return LocalTransactionState.COMMIT_MESSAGE;
// 失败回滚情况
// return LocalTransactionState.ROLLBACK_MESSAGE;
// 未知,等待情况
// return LocalTransactionState.UNKNOW;
}
});

// 怎么发? 生产者跑起来
producer.start();

String msg = "Hello RocketMQ";
// 形参:topic: 主题 tags: 标签 byte[]: 传输的数据,传输时都是字节数组
Message message = new Message("topic9","tags1",msg.getBytes("utf-8"));

TransactionSendResult sendResult = producer.sendMessageInTransaction(message, null);

// 发送
System.out.println("结果为:" + sendResult);

// 关闭资源
// producer.shutdown();

}
}
  • consumer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class RocketMQConsumer {

public static void main(String[] args) throws MQClientException {



// 谁来接收 实例化一个默认的Consumer 监听器监听的方式
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
// 每隔几秒发一次请求的方式
// DefaultMQPullConsumer consumer = new DefaultMQPullConsumer();

// 接受哪里的消息
consumer.setNamesrvAddr("localhost:9876");

// 监听哪个消息队列
consumer.subscribe("topic9","*");

// 一个线程只监听一个queue
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {

// 这里写业务
for (MessageExt msg : msgs) {
System.out.println(msg);
byte[] body = msg.getBody();
System.out.println(new String(body));
}
// ConsumeOrderlyStatus 一个枚举类
return ConsumeOrderlyStatus.SUCCESS;
}
});


// 启动
consumer.start();

// 不要关闭资源连接,因为在监听

}

}

RocketMQ集群

多个broker提供服务

集群流程

MQ集群

过程

  • NameServer启动,开启监听,等待broker、producer与consumer连接

  • broker启动,根据配置信息,连接所有的NameServer,并保持长连接,如果broker中有现存数据, NameServer将保存topic与broker关系

  • producer发信息,连接某个NameServer,并建立长连接

  • producer发消息,若果topic存在,由NameServer直接分配,如果topic不存在,由NameServer创建topic与broker关系,并分配

  • producer在broker的topic选择一个消息队列

  • producer与broker建立长连接,用于发送消息

  • producer发送消息

  • consumer与producer相同

master和slave的同步方式

  • 同步方式同步

较异步方式性能略低,消息无延迟

  • 异步方式同步

较同步方式性能略高,数据略有延迟

两主两从搭建

主从特点

保证一台机器宕机能正常运行

北京机房放着master1和slave2 深圳机房放着master2和slave1

搭建配置

模拟一下,这里起了两台虚拟机地址不同,模仿两台服务器

  • 配置服务器环境
1
vim /etc/hosts
1
2
3
4
5
6
7
8
# nameserver
192.168.117.200 rocketmq-nameserver1
192.168.117.201 rocketmq-nameserver2
# broker
192.168.117.200 rocketmq-master1
192.168.117.200 rocketmq-slave2
192.168.117.201 rocketmq-master2
192.168.117.201 rocketmq-slave1

这里地址名字自己对应

重启网卡 systemctl restart network

  • 关闭防火墙
1
2
3
4
5
6
# 关闭防火墙
systemctl stop firewalld.service
# 查看防火墙的状态
firewall-cmd --state
# 禁止firewall开机启动
systemctl disable firewalld.service
  • 配置环境变量
1
vim /etc/profile
1
2
3
4
#set rocketmq
ROCKETMQ_HOME=/rocketmq
PATH=$PATH:$ROCKETMQ_HOME/bin
export ROCKETMQ_HOME PATH

这里路径自己对应

更新环境变量 source /etc/profile

  • 创建集群服务器的数据存储目录
1
2
3
4
5
6
7
8
9
10
11
#master 数据存储目录
mkdir /rocketmq/store
mkdir /rocketmq/store/commitlog
mkdir /rocketmq/store/consumequeue
mkdir /rocketmq/store/index

#slave 数据存储目录
mkdir /rocketmq/store-slave
mkdir /rocketmq/store-slave/commitlog
mkdir /rocketmq/store-slave/consumequeue
mkdir /rocketmq/store-slave/index
  • 修改第一台服务器配置
1
2
3
cd /rocketmq/conf/2m-2s-sync

vim broker-a.properties
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/rocketmq/store
#commitLog 存储路径
storePathCommitLog=/rocketmq/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
  • 修改配置文件
1
vim broker-b-s.properties
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-b
#0 表示 Master,>0 表示 Slave
brokerId=1
#nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=11011
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/rocketmq/store-slave
#commitLog 存储路径
storePathCommitLog=/rocketmq/store-slave/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/rocketmq/store-slave/consumequeue
#消息索引存储路径
storePathIndex=/rocketmq/store-slave/index
#checkpoint 文件存储路径
storeCheckpoint=/rocketmq/store-slave/checkpoint
#abort 文件存储路径
abortFile=/rocketmq/store-slave/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
  • 修改第二台服务器配置
1
2
3
cd /rocketmq/conf/2m-2s-sync

vim broker-b.properties
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-b
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/rocketmq/store
#commitLog 存储路径
storePathCommitLog=/rocketmq/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
  • 修改配置文件
1
vim broker-a-s.properties
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=1
#nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=11011
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/rocketmq/store-slave
#commitLog 存储路径
storePathCommitLog=/rocketmq/store-slave/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/rocketmq/store-slave/consumequeue
#消息索引存储路径
storePathIndex=/rocketmq/store-slave/index
#checkpoint 文件存储路径
storeCheckpoint=/rocketmq/store-slave/checkpoint
#abort 文件存储路径
abortFile=/rocketmq/store-slave/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
  • 修改启动内存
    因为默认8g,虚拟机没有分配那么大的内存,所以要进行修改
1
vim /rocketmq/bin/runbroker.sh
1
2
# 开发环境配置 JVM Configuration
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"

两台服务器都要修改

  • 启动服务器

第一台服务器

1
2
3
4
5
nohup sh mqnamesrv &

nohup sh mqbroker -c ../conf/2m-2s-sync/broker-a.properties &

nohup sh mqbroker -c ../conf/2m-2s-sync/broker-b-s.properties &

第二台服务器

1
2
3
4
5
nohup sh mqnamesrv &

nohup sh mqbroker -c ../conf/2m-2s-sync/broker-a-s.properties &

nohup sh mqbroker -c ../conf/2m-2s-sync/broker-b.properties &

jps 查看启动了哪些

  • 模拟数据进行测试
1
2
3
4
5
export NAMESRV_ADDR=rocketmq-nameserver1:9876 添加环境变量

sh tools.sh org.apache.rocketmq.example.quickstart.Producer 测试生成生产者

sh tools.sh org.apache.rocketmq.example.quickstart.Consumer 测试生成消费者
  • 运行rocketmq控制台jar即可进行查看

RocketMQ仪表板集群

正确的开始 微小的长进 然后持续 嘿 我是小博 带你一起看我目之所及的世界……

-------------本文结束 感谢您的阅读-------------

本文标题:RocketMQ(二)

文章作者:小博

发布时间:2022年10月21日 - 21:14

最后更新:2022年10月21日 - 21:15

原始链接:https://codexiaobo.github.io/posts/3042191954/

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。