
Rocket MQ
Rocket MQ进阶
顺序消费
官方介绍
消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。
顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的

模拟一个订单业务
1 2 3 4 5 6 7 8 9 10 11
   | @Data @NoArgsConstructor @AllArgsConstructor @ToString public class OrderStep implements Serializable {
      private Long orderId;
      private String desc;
  }
   | 
 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102
   | public class RocketMQProducer {
      public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException, UnsupportedEncodingException {
 
          DefaultMQProducer producer = new DefaultMQProducer("group1");
 
          producer.setNamesrvAddr("localhost:9876");
          producer.start();
 
          
 
                   List<OrderStep> orderList = new ArrayList<OrderStep>();
              OrderStep orderDemo = new OrderStep();             orderDemo.setOrderId(1L);             orderDemo.setDesc("创建");             orderList.add(orderDemo);
              orderDemo = new OrderStep();             orderDemo.setOrderId(2L);             orderDemo.setDesc("创建");             orderList.add(orderDemo);
              orderDemo = new OrderStep();             orderDemo.setOrderId(1L);             orderDemo.setDesc("付款");             orderList.add(orderDemo);
              orderDemo = new OrderStep();             orderDemo.setOrderId(3L);             orderDemo.setDesc("创建");             orderList.add(orderDemo);
              orderDemo = new OrderStep();             orderDemo.setOrderId(2L);             orderDemo.setDesc("付款");             orderList.add(orderDemo);
              orderDemo = new OrderStep();             orderDemo.setOrderId(3L);             orderDemo.setDesc("付款");             orderList.add(orderDemo);
              orderDemo = new OrderStep();             orderDemo.setOrderId(2L);             orderDemo.setDesc("完成");             orderList.add(orderDemo);
              orderDemo = new OrderStep();             orderDemo.setOrderId(1L);             orderDemo.setDesc("推送");             orderList.add(orderDemo);
              orderDemo = new OrderStep();             orderDemo.setOrderId(3L);             orderDemo.setDesc("完成");             orderList.add(orderDemo);
              orderDemo = new OrderStep();             orderDemo.setOrderId(1L);             orderDemo.setDesc("完成");             orderList.add(orderDemo);
 
          for (final OrderStep orderStep : orderList) {
 
              Message message = new Message("topic7","tags1",orderList.toString().getBytes("utf-8"));
              SendResult sendResult = producer.send(message, new MessageQueueSelector() {                 @Override                 public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
                                           int orderId = orderStep.getOrderId().intValue();
                                           int size = list.size();
                                           int i = orderId % size;
                                           MessageQueue messageQueue = list.get(i);
                      return messageQueue;                 }             }, null);             System.out.println("结果为:" + sendResult);         }
 
          producer.shutdown();
      } }
  | 
 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
   | public class RocketMQConsumer {
      public static void main(String[] args) throws MQClientException {
 
 
 
          DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
 
 
 
          consumer.setNamesrvAddr("localhost:9876");
 
          consumer.subscribe("topic7","*");
 
          consumer.registerMessageListener(new MessageListenerOrderly() {             @Override             public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                                   for (MessageExt msg : msgs) {                     System.out.println(msg);                     byte[] body = msg.getBody();                     System.out.println(new String(body));                 }
                  return ConsumeOrderlyStatus.SUCCESS;             }         });
 
 
          consumer.start();
 
      } }
  | 
 
事务信息
正常事务过程(红色)
事务补偿过程(蓝色)

事务信息状态
允许进入队列,此消息与非事务消息无区别
不允许进入队列,此消息等同于未发送过
完成了half消息的发送,未对MQ进行二次状态确认
事务消息仅与生产者有关,与消费者无关
演示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
   | public class RocketMQProducer {
      public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException, UnsupportedEncodingException {
 
          TransactionMQProducer producer = new TransactionMQProducer("group1");
 
          producer.setNamesrvAddr("localhost:9876");
 
          producer.setTransactionListener(new TransactionListener() {
              @Override             public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                  System.out.println("正常事务执行");
 
 
 
 
                  return LocalTransactionState.UNKNOW;             }
 
              @Override             public LocalTransactionState checkLocalTransaction(MessageExt msg) {                 System.out.println("补偿事务执行");
                  return LocalTransactionState.COMMIT_MESSAGE;
 
 
 
              }         });
 
          producer.start();
          String msg = "Hello RocketMQ";
          Message message = new Message("topic9","tags1",msg.getBytes("utf-8"));
          TransactionSendResult sendResult = producer.sendMessageInTransaction(message, null);
 
          System.out.println("结果为:" + sendResult);
 
 
 
      } }
  | 
 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
   | public class RocketMQConsumer {
      public static void main(String[] args) throws MQClientException {
 
 
 
          DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
 
 
 
          consumer.setNamesrvAddr("localhost:9876");
 
          consumer.subscribe("topic9","*");
 
          consumer.registerMessageListener(new MessageListenerOrderly() {             @Override             public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                                   for (MessageExt msg : msgs) {                     System.out.println(msg);                     byte[] body = msg.getBody();                     System.out.println(new String(body));                 }
                  return ConsumeOrderlyStatus.SUCCESS;             }         });
 
 
          consumer.start();
 
 
      }
  }
  | 
 
RocketMQ集群
多个broker提供服务
集群流程

过程
NameServer启动,开启监听,等待broker、producer与consumer连接
 
broker启动,根据配置信息,连接所有的NameServer,并保持长连接,如果broker中有现存数据, NameServer将保存topic与broker关系
 
producer发信息,连接某个NameServer,并建立长连接
 
producer发消息,若果topic存在,由NameServer直接分配,如果topic不存在,由NameServer创建topic与broker关系,并分配
 
producer在broker的topic选择一个消息队列
 
producer与broker建立长连接,用于发送消息
 
producer发送消息
 
consumer与producer相同
 
master和slave的同步方式
较异步方式性能略低,消息无延迟
较同步方式性能略高,数据略有延迟
两主两从搭建
主从特点
保证一台机器宕机能正常运行
北京机房放着master1和slave2 深圳机房放着master2和slave1
搭建配置
模拟一下,这里起了两台虚拟机地址不同,模仿两台服务器
1 2 3 4 5 6 7 8
   | # nameserver 192.168.117.200 rocketmq-nameserver1 192.168.117.201 rocketmq-nameserver2 # broker 192.168.117.200 rocketmq-master1 192.168.117.200 rocketmq-slave2 192.168.117.201 rocketmq-master2 192.168.117.201 rocketmq-slave1
   | 
 
这里地址名字自己对应
重启网卡 systemctl restart network
1 2 3 4 5 6
   |  systemctl stop firewalld.service
  firewall-cmd --state
  systemctl disable firewalld.service
 
  | 
 
1 2 3 4
   |  ROCKETMQ_HOME=/rocketmq PATH=$PATH:$ROCKETMQ_HOME/bin export ROCKETMQ_HOME PATH
 
  | 
 
这里路径自己对应
更新环境变量 source /etc/profile 
1 2 3 4 5 6 7 8 9 10 11
   |  mkdir /rocketmq/store mkdir /rocketmq/store/commitlog mkdir /rocketmq/store/consumequeue mkdir /rocketmq/store/index
 
  mkdir /rocketmq/store-slave mkdir /rocketmq/store-slave/commitlog mkdir /rocketmq/store-slave/consumequeue mkdir /rocketmq/store-slave/index
 
  | 
 
1 2 3
   | cd /rocketmq/conf/2m-2s-sync
  vim broker-a.properties
   | 
 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
   |  brokerClusterName=rocketmq-cluster
  brokerName=broker-a
  brokerId=0
  namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
  defaultTopicQueueNums=4
  autoCreateTopicEnable=true
  autoCreateSubscriptionGroup=true
  listenPort=10911
  deleteWhen=04
  fileReservedTime=120
  mapedFileSizeCommitLog=1073741824
  mapedFileSizeConsumeQueue=300000
 
 
  diskMaxUsedSpaceRatio=88
  storePathRootDir=/rocketmq/store
  storePathCommitLog=/rocketmq/store/commitlog
  storePathConsumeQueue=/rocketmq/store/consumequeue
  storePathIndex=/rocketmq/store/index
  storeCheckpoint=/rocketmq/store/checkpoint
  abortFile=/rocketmq/store/abort
  maxMessageSize=65536
 
 
 
 
 
 
 
  brokerRole=SYNC_MASTER
 
 
  flushDiskType=SYNC_FLUSH
 
 
 
 
 
 
  | 
 
1
   | vim broker-b-s.properties
   | 
 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
   |  brokerClusterName=rocketmq-cluster
  brokerName=broker-b
  brokerId=1
  namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
  defaultTopicQueueNums=4
  autoCreateTopicEnable=true
  autoCreateSubscriptionGroup=true
  listenPort=11011
  deleteWhen=04
  fileReservedTime=120
  mapedFileSizeCommitLog=1073741824
  mapedFileSizeConsumeQueue=300000
 
 
  diskMaxUsedSpaceRatio=88
  storePathRootDir=/rocketmq/store-slave
  storePathCommitLog=/rocketmq/store-slave/commitlog
  storePathConsumeQueue=/rocketmq/store-slave/consumequeue
  storePathIndex=/rocketmq/store-slave/index
  storeCheckpoint=/rocketmq/store-slave/checkpoint
  abortFile=/rocketmq/store-slave/abort
  maxMessageSize=65536
 
 
 
 
 
 
 
  brokerRole=SLAVE
 
 
  flushDiskType=ASYNC_FLUSH
 
 
 
 
 
 
  | 
 
1 2 3
   | cd /rocketmq/conf/2m-2s-sync
  vim broker-b.properties
   | 
 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
   |  brokerClusterName=rocketmq-cluster
  brokerName=broker-b
  brokerId=0
  namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
  defaultTopicQueueNums=4
  autoCreateTopicEnable=true
  autoCreateSubscriptionGroup=true
  listenPort=10911
  deleteWhen=04
  fileReservedTime=120
  mapedFileSizeCommitLog=1073741824
  mapedFileSizeConsumeQueue=300000
 
 
  diskMaxUsedSpaceRatio=88
  storePathRootDir=/rocketmq/store
  storePathCommitLog=/rocketmq/store/commitlog
  storePathConsumeQueue=/rocketmq/store/consumequeue
  storePathIndex=/rocketmq/store/index
  storeCheckpoint=/rocketmq/store/checkpoint
  abortFile=/rocketmq/store/abort
  maxMessageSize=65536
 
 
 
 
 
 
 
  brokerRole=SYNC_MASTER
 
 
  flushDiskType=SYNC_FLUSH
 
 
 
 
 
 
  | 
 
1
   | vim broker-a-s.properties
   | 
 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
   |  brokerClusterName=rocketmq-cluster
  brokerName=broker-a
  brokerId=1
  namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
  defaultTopicQueueNums=4
  autoCreateTopicEnable=true
  autoCreateSubscriptionGroup=true
  listenPort=11011
  deleteWhen=04
  fileReservedTime=120
  mapedFileSizeCommitLog=1073741824
  mapedFileSizeConsumeQueue=300000
 
 
  diskMaxUsedSpaceRatio=88
  storePathRootDir=/rocketmq/store-slave
  storePathCommitLog=/rocketmq/store-slave/commitlog
  storePathConsumeQueue=/rocketmq/store-slave/consumequeue
  storePathIndex=/rocketmq/store-slave/index
  storeCheckpoint=/rocketmq/store-slave/checkpoint
  abortFile=/rocketmq/store-slave/abort
  maxMessageSize=65536
 
 
 
 
 
 
 
  brokerRole=SLAVE
 
 
  flushDiskType=ASYNC_FLUSH
 
 
 
 
 
 
  | 
 
- 修改启动内存
因为默认8g,虚拟机没有分配那么大的内存,所以要进行修改 
1
   | vim /rocketmq/bin/runbroker.sh
   | 
 
1 2
   |  JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"
 
  | 
 
两台服务器都要修改
第一台服务器
1 2 3 4 5
   | nohup sh mqnamesrv &
  nohup sh mqbroker -c ../conf/2m-2s-sync/broker-a.properties &
  nohup sh mqbroker -c ../conf/2m-2s-sync/broker-b-s.properties &
   | 
 
第二台服务器
1 2 3 4 5
   | nohup sh mqnamesrv &
  nohup sh mqbroker -c ../conf/2m-2s-sync/broker-a-s.properties &
  nohup sh mqbroker -c ../conf/2m-2s-sync/broker-b.properties & 
   | 
 
jps 查看启动了哪些
1 2 3 4 5
   | export NAMESRV_ADDR=rocketmq-nameserver1:9876 添加环境变量
  sh tools.sh org.apache.rocketmq.example.quickstart.Producer 测试生成生产者
  sh tools.sh org.apache.rocketmq.example.quickstart.Consumer 测试生成消费者
   | 
 

正确的开始 微小的长进 然后持续 嘿 我是小博 带你一起看我目之所及的世界……