Rocket MQ MQ MQ全称为Message Queue, 消息队列(MQ)是一种用来保存消息数据的队列,是应用程序对应用程序的通信方法
消息 服务器间的业务请求
队列 数据结构的一种,特征为 先进先出
MQ作用 传统项目
MQ项目
优点
提高系统容错性和可维护性
提升用户体验和系统吞吐量
提高系统稳定性
缺点
MQ产品
java语言实现,万级数据吞吐量,处理速度ms级,主从架构,成熟度高
java语言实现,十万级数据吞吐量,处理速度ms级,分布式架构,功能强大,扩展性强
erlang语言实现,万级数据吞吐量,处理速度us级,主从架构
scala语言实现,十万级数据吞吐量,处理速度ms级,分布式架构,功能较少,应用于大数据较多
RocketMQ RocketMQ是阿里开源的一款中间件产品
工作流程
producer 消息发送发
consumer 消息接受方
broker 接受消息,提供消息,消息持久化,高可用
NameServer 寻找路径,映射注册的服务器
Register 将服务器都注册到命名服务器
Message 发送的消息
Topic 消息中包含的主题
消息中包含的标签,类似副主题
注册到命名服务器中服务器一段时间向
Listener 用于监听推送消息
生产者推送消息 消费者拉取消息
执行过程 首先服务器都注册到命名服务器,并且设置一个心跳机制来判断是否宕机,生产者首先发送消息到命名服务器,命名服务器通过Broker IPS进行判断映射,这样生产者就可以拿到Broker信息并向Broker服务器发送请求,消费者接受请求,向命名服务器发送请求,也是一样的过程,这里但是有个问题,怎么判断消息队列中有没有信息,这时有两种方式,一种是每过一段时间向队列中拉取消息,还有一种方式建立一个长链接监听器来进行监听推送消息。
Rocket安装(windows版)
ROCKETMQ_HOME
MQ解压路径\MQ文件夹名
1 2 autoCreateTopicEnable=true autoCreateSubscriptionGroup=true
start mqnamesrv.cmd
start mqbroker.cmd -n 127.0.0.1:9876
删除C:\Users\”当前系统用户名”\store下的所有文件
NAMESRV_ADDR
localhost:9876
tools.cmd org.apache.rocketmq.example.quickstart.Producer
tools.cmd org.apache.rocketmq.example.quickstart.Consumer
下载地址
https://github.com/apache/rocketmq-externals
我的有个报错,需要手动导入一个包,放到maven库里
D:\Maven\IDEA jar\com\github\eirslett\yarn\1.22.10\yarn-v1.22.10
运行jar包即可查看控制台
java -jar jar包名
Rocket安装(Linux版) 消息发送 单生产者单消费者(OneToOne) 基于发送和基于接受
1 2 3 4 5 6 7 8 <dependencies > <dependency > <groupId > org.apache.rocketmq</groupId > <artifactId > rocketmq-client</artifactId > <version > 4.8.0</version > </dependency > </dependencies >
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public class RocketMQProducer { public static void main (String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException, UnsupportedEncodingException { DefaultMQProducer producer = new DefaultMQProducer("group1" ); producer.setNamesrvAddr("localhost:9876" ); producer.start(); String msg = "Hello RocketMQ" ; Message message = new Message("topic1" ,"tags1" ,msg.getBytes("utf-8" )); SendResult sendResult = producer.send(message); System.out.println("结果为:" + sendResult); producer.shutdown(); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 public class RocketMQConsumer { public static void main (String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1" ); consumer.setNamesrvAddr("localhost:9876" ); consumer.subscribe("topic1" ,"*" ); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage (List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for (MessageExt messageExt : list) { byte [] body = messageExt.getBody(); System.out.println(new String(body)); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); } }
单生产者多消费者(OneToMany) 负载均衡模式(默认)和广播模式
1 2 3 4 5 <dependency > <groupId > org.apache.rocketmq</groupId > <artifactId > rocketmq-client</artifactId > <version > 4.8.0</version > </dependency >
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public class RocketMQProducer { public static void main (String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException, UnsupportedEncodingException { DefaultMQProducer producer = new DefaultMQProducer("group1" ); producer.setNamesrvAddr("localhost:9876" ); producer.start(); for (int i = 0 ; i < 10 ; i++) { String msg = "Hello RocketMQ" + i; Message message = new Message("topic3" ,"tags1" ,msg.getBytes("utf-8" )); SendResult sendResult = producer.send(message); System.out.println("结果为:" + sendResult); } producer.shutdown(); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 public class RocketMQConsumer { public static void main (String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group2" ); consumer.setNamesrvAddr("localhost:9876" ); consumer.subscribe("topic3" ,"*" ); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage (List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for (MessageExt messageExt : list) { byte [] body = messageExt.getBody(); System.out.println(new String(body)); System.out.println("------------------" ); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); } }
广播模式
: 每个消费者都消费同样的消息
负载均衡模式
: 消费者共同消费
多生产者多消费者(ManyToMany) 多生产者产生的消息可以被同一个消费者消费,也可以被多个消费者消费
消息类别 同步信息 即时性较强,重要的消息,且必须有回执的消息,例如短信,通知(转账成功)
1 2 SendResult sendResult = producer.send(message);
异步信息 即时性较弱,但需要有回执的消息,例如订单中的某些信息
1 2 3 4 5 6 7 8 9 10 11 12 13 producer.send(message, new SendCallback() { @Override public void onSuccess (SendResult sendResult) { System.out.println(sendResult); } @Override public void onException (Throwable throwable) { System.out.println(throwable); } });
单向信息 不需要有回执的消息,例如日志类消息
1 2 producer.sendOneway(message);
延时信息 消息发送时并不直接发送到消息服务器,而是根据设定的等待时间到达,起到延时到达的缓冲作用
1 2 3 message.setDelayTimeLevel(3 ); SendResult sendResult = producer.send(message);
级别对应的时间
1 private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h" ;
批量信息 批量发送消息能显著提高传递小消息的性能
1 2 3 4 5 6 7 8 9 10 11 List<Message> messageList = new ArrayList<>(); Message message1 = new Message("topic4" ,"tags1" ,msg.getBytes("utf-8" )); Message message2 = new Message("topic4" , "tags2" , msg.getBytes("utf-8" )); Message message3 = new Message("topic4" , "tags3" , msg.getBytes("utf-8" )); messageList.add(message1); messageList.add(message2); messageList.add(message3); SendResult sendResult = producer.send(messageList);
批量消息有相同的topic
相同的waitStoreMsgOK
不能是延时消息
消息内容总长度不超过4M
消息内容总长度:
信息过滤 分类过滤 按照tag过滤信息
1 2 consumer.subscribe("topic5" ,"tags1 || tags2" );
SQL过滤(语法过滤/属性过滤/SQL过滤) 首先修改配置文件broker.conf
1 enablePropertyFilter=true
cmd中输入
1 mqadmin.cmd updateBrokerConfig -blocalhost:10911 -kenablePropertyFilter -vtrue
重新启动broker
1 message.putUserProperty("age" ,"20" );
语法
‘>’ >= < <= BETWEEN =
= <> IN
IS NULL 或者 IS NOT NULL
逻辑符号
AND OR NOT
常量类型
123 3.1415;
‘abc’,必须用单引号包裹起来;
TRUE 或 FALSE
Spring Boot整合RocketMQ
1 2 3 4 5 <dependency > <groupId > org.apache.rocketmq</groupId > <artifactId > rocketmq-spring-boot-starter</artifactId > <version > 2.0.3</version > </dependency >
1 2 3 4 5 6 rocketmq: name-server: localhost:9876 producer: group: producer-group1 server: port: 8081
1 2 3 4 5 6 7 8 9 10 11 @Data @NoArgsConstructor @AllArgsConstructor @ToString public class User implements Serializable { private Integer id; private String username; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 @RestController @RequestMapping("/demo") public class ProducerController { @Autowired private RocketMQTemplate template; @RequestMapping("/producer") public String producersMessage () { User user = new User(1 , "xiaobo" ); template.convertAndSend("topic10" ,user); return JSON.toJSONString(user); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Service @RocketMQMessageListener(topic = "10",consumerGroup = "producer-group1", selectorType = SelectorType.SQL92,selectorExpression = "tag > 92", messageModel = MessageModel.BROADCASTING) public class ProducerListener implements RocketMQListener <User > { @Override public void onMessage (User user) { System.out.println(user); } }
topic
主题
consumerGroup
分组
selectorType
过滤类型
selectorExpression
过滤语法
messageModel
方法模式
信息发送形式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 template.asyncSend("topic10" , user, new SendCallback() { @Override public void onSuccess (SendResult sendResult) { System.out.println(sendResult); } @Override public void onException (Throwable throwable) { System.out.println(throwable); } });
1 2 template.sendOneWay("topic10" ,user);
1 2 template.syncSend("topic10" , MessageBuilder.withPayload(user).build(),1000 ,2 );
1 2 3 4 5 6 7 8 9 10 List<User> messageList = new ArrayList<>(); User user1 = new User(1 , "xiaobo" ); User user2 = new User(2 , "wangyibo" ); User user3 = new User(3 , "wangyangnan" ); messageList.add(user1); messageList.add(user2); messageList.add(user3); template.syncSend("topic10" ,messageList,1000 );
正确的开始,微小的长进,然后持续,嘿,我是小博,带你一起看我目之所及的世界……