RocketMQ(一)

正道生物

Rocket MQ

MQ

MQ全称为Message Queue, 消息队列(MQ)是一种用来保存消息数据的队列,是应用程序对应用程序的通信方法

消息

服务器间的业务请求

队列

数据结构的一种,特征为 先进先出

MQ作用

传统项目

传统的项目

MQ项目

MQ的项目

优点

  • 应用解耦(异步消息发送)

MQ解耦

提高系统容错性和可维护性

  • 异步提速

MQ异步提速

提升用户体验和系统吞吐量

  • 削峰填谷

MQ削峰填谷

提高系统稳定性

缺点

  • 系统可用性降低

  • 系统复杂度提高

  • 一致性问题

MQ产品

  • ActiveMQ

java语言实现,万级数据吞吐量,处理速度ms级,主从架构,成熟度高

  • RocketMQ

java语言实现,十万级数据吞吐量,处理速度ms级,分布式架构,功能强大,扩展性强

  • RabbitMQ

erlang语言实现,万级数据吞吐量,处理速度us级,主从架构

  • Kafka

scala语言实现,十万级数据吞吐量,处理速度ms级,分布式架构,功能较少,应用于大数据较多

RocketMQ

RocketMQ是阿里开源的一款中间件产品

工作流程

RocketMQ工作流程

  • 生产者

producer 消息发送发

  • 消费者

consumer 消息接受方

  • 消息服务器

broker 接受消息,提供消息,消息持久化,高可用

  • 命名服务器

NameServer 寻找路径,映射注册的服务器

  • 注册

Register 将服务器都注册到命名服务器

  • 消息

Message 发送的消息

  • 主题

Topic 消息中包含的主题

  • 标签

消息中包含的标签,类似副主题

  • 心跳

注册到命名服务器中服务器一段时间向

  • 监听器

Listener 用于监听推送消息

  • 拉取消费、推动消费

生产者推送消息 消费者拉取消息

执行过程

首先服务器都注册到命名服务器,并且设置一个心跳机制来判断是否宕机,生产者首先发送消息到命名服务器,命名服务器通过Broker IPS进行判断映射,这样生产者就可以拿到Broker信息并向Broker服务器发送请求,消费者接受请求,向命名服务器发送请求,也是一样的过程,这里但是有个问题,怎么判断消息队列中有没有信息,这时有两种方式,一种是每过一段时间向队列中拉取消息,还有一种方式建立一个长链接监听器来进行监听推送消息。

Rocket安装(windows版)

  • 配置环境变量

ROCKETMQ_HOME

MQ解压路径\MQ文件夹名

  • 配置conf
1
2
autoCreateTopicEnable=true 
autoCreateSubscriptionGroup=true
  • 启动NAMESERVER

start mqnamesrv.cmd

  • 启动BROKER

start mqbroker.cmd -n 127.0.0.1:9876

  • 注意:闪退回命令行

删除C:\Users\”当前系统用户名”\store下的所有文件

  • 配置环境变量

NAMESRV_ADDR

localhost:9876

  • 测试生产者发送消息,bin目录下

tools.cmd org.apache.rocketmq.example.quickstart.Producer

  • 测试消费者接收消息, bin目录下

tools.cmd org.apache.rocketmq.example.quickstart.Consumer

  • 控制台

下载地址

https://github.com/apache/rocketmq-externals

  • 打包

我的有个报错,需要手动导入一个包,放到maven库里

D:\Maven\IDEA jar\com\github\eirslett\yarn\1.22.10\yarn-v1.22.10

运行jar包即可查看控制台

java -jar jar包名

Rocket安装(Linux版)

消息发送

单生产者单消费者(OneToOne)

基于发送和基于接受

  • pom
1
2
3
4
5
6
7
8
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.8.0</version>
</dependency>

</dependencies>
  • Producer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class RocketMQProducer {

public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException, UnsupportedEncodingException {

// 谁来发? 实例化一个默认的生产者
DefaultMQProducer producer = new DefaultMQProducer("group1");

// 发给谁? 发给命名服务器
producer.setNamesrvAddr("localhost:9876");
// 怎么发? 生产者跑起来
producer.start();

// 发什么? 模拟一个消息
String msg = "Hello RocketMQ";

// 形参:topic: 主题 tags: 标签 byte[]: 传输的数据,传输时都是字节数组
Message message = new Message("topic1","tags1",msg.getBytes("utf-8"));

// 发送
SendResult sendResult = producer.send(message);

System.out.println("结果为:" + sendResult);

// 关闭资源
producer.shutdown();
}
}
  • consumer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class RocketMQConsumer {

public static void main(String[] args) throws MQClientException {
// 谁来接收 实例化一个默认的Consumer 监听器监听的方式
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
// 每隔几秒发一次请求的方式
// DefaultMQPullConsumer consumer = new DefaultMQPullConsumer();

// 接受哪里的消息
consumer.setNamesrvAddr("localhost:9876");

// 监听哪个消息队列
consumer.subscribe("topic1","*");

// 注册监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {

// 这里写业务
for (MessageExt messageExt : list) {
byte[] body = messageExt.getBody();
System.out.println(new String(body));
}
// ConsumeConcurrentlyStatus 一个枚举类
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});

// 启动
consumer.start();

// 不要关闭资源连接,因为在监听
}
}

单生产者多消费者(OneToMany)

负载均衡模式(默认)和广播模式

  • pom
1
2
3
4
5
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.8.0</version>
</dependency>
  • producer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class RocketMQProducer {

public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException, UnsupportedEncodingException {

// 谁来发? 实例化一个默认的生产者
DefaultMQProducer producer = new DefaultMQProducer("group1");

// 发给谁? 发给命名服务器
producer.setNamesrvAddr("localhost:9876");

// 怎么发? 生产者跑起来
producer.start();

for (int i = 0; i < 10; i++) {

// 发什么? 模拟一个消息
String msg = "Hello RocketMQ" + i;

// 形参:topic: 主题 tags: 标签 byte[]: 传输的数据,传输时都是字节数组
Message message = new Message("topic3","tags1",msg.getBytes("utf-8"));

// 同步发送
SendResult sendResult = producer.send(message);

System.out.println("结果为:" + sendResult);
}

// 关闭资源
producer.shutdown();

}
}
  • consumer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class RocketMQConsumer {

public static void main(String[] args) throws MQClientException {

// 谁来接收 实例化一个默认的Consumer 监听器监听的方式
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group2");
// 每隔几秒发一次请求的方式
// DefaultMQPullConsumer consumer = new DefaultMQPullConsumer();

// 接受哪里的消息
consumer.setNamesrvAddr("localhost:9876");

// 监听哪个消息队列
consumer.subscribe("topic3","*");

// 发送模式 BROADCASTING 广播模式
// consumer.setMessageModel(MessageModel.BROADCASTING);

// 发送模式 CLUSTERING 负载均衡模式
// consumer.setMessageModel(MessageModel.CLUSTERING);

// 注册监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {

// 这里写业务
for (MessageExt messageExt : list) {
byte[] body = messageExt.getBody();
System.out.println(new String(body));
System.out.println("------------------");
}
// ConsumeConcurrentlyStatus 一个枚举类
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});

// 启动
consumer.start();

// 不要关闭资源连接,因为在监听

}

}

广播模式: 每个消费者都消费同样的消息

负载均衡模式: 消费者共同消费

多生产者多消费者(ManyToMany)

多生产者产生的消息可以被同一个消费者消费,也可以被多个消费者消费

消息类别

同步信息

即时性较强,重要的消息,且必须有回执的消息,例如短信,通知(转账成功)

1
2
//        同步发送
SendResult sendResult = producer.send(message);

异步信息

即时性较弱,但需要有回执的消息,例如订单中的某些信息

1
2
3
4
5
6
7
8
9
10
11
12
13
//        异步发送
producer.send(message, new SendCallback() {

@Override
public void onSuccess(SendResult sendResult) {
System.out.println(sendResult);
}

@Override
public void onException(Throwable throwable) {
System.out.println(throwable);
}
});

单向信息

不需要有回执的消息,例如日志类消息

1
2
//            单向发送消息
producer.sendOneway(message);

延时信息

消息发送时并不直接发送到消息服务器,而是根据设定的等待时间到达,起到延时到达的缓冲作用

1
2
3
//            延时发送
message.setDelayTimeLevel(3);
SendResult sendResult = producer.send(message);

级别对应的时间

1
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

批量信息

批量发送消息能显著提高传递小消息的性能

1
2
3
4
5
6
7
8
9
10
11
//            批量发送
List<Message> messageList = new ArrayList<>();

Message message1 = new Message("topic4","tags1",msg.getBytes("utf-8"));
Message message2 = new Message("topic4", "tags2", msg.getBytes("utf-8"));
Message message3 = new Message("topic4", "tags3", msg.getBytes("utf-8"));
messageList.add(message1);
messageList.add(message2);
messageList.add(message3);

SendResult sendResult = producer.send(messageList);

批量消息有相同的topic

相同的waitStoreMsgOK

不能是延时消息

消息内容总长度不超过4M

消息内容总长度:

  • topic(字符串字节数)

  • body (字节数组长度)

  • 消息追加的属性(key与value对应字符串字节数)

  • 日志 固定20字节

信息过滤

分类过滤

按照tag过滤信息

1
2
//        监听哪个消息队列 制定topic,还可以指定接收的tag,*代表任意tag
consumer.subscribe("topic5","tags1 || tags2");

SQL过滤(语法过滤/属性过滤/SQL过滤)

首先修改配置文件broker.conf

1
enablePropertyFilter=true

cmd中输入

1
mqadmin.cmd updateBrokerConfig -blocalhost:10911 -kenablePropertyFilter -vtrue

重新启动broker

1
message.putUserProperty("age","20");
语法
  • 数值比较

‘>’ >= < <= BETWEEN =

  • 字符比较

= <> IN

  • IS NULL 或者 IS NOT NULL

  • 逻辑符号

AND OR NOT

常量类型

  • 数值

123 3.1415;

  • 字符

‘abc’,必须用单引号包裹起来;

  • NULL 特殊的常量

  • 布尔值

TRUE 或 FALSE

Spring Boot整合RocketMQ

  • pom
1
2
3
4
5
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.3</version>
</dependency>
  • application.yml
1
2
3
4
5
6
rocketmq:
name-server: localhost:9876
producer:
group: producer-group1
server:
port: 8081
  • 实体类
1
2
3
4
5
6
7
8
9
10
11
@Data
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class User implements Serializable {

private Integer id;

private String username;

}
  • producer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@RestController
@RequestMapping("/demo")
public class ProducerController {

// rocketmq模板
@Autowired
private RocketMQTemplate template;

@RequestMapping("/producer")
public String producersMessage() {
// 实例化User对象
User user = new User(1, "xiaobo");

// 转换发送信息
template.convertAndSend("topic10",user);

return JSON.toJSONString(user);
}
}
  • consumer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Service
//@RocketMQMessageListener(topic = "topic10",consumerGroup = "producer-group1",
// selectorType = SelectorType.TAG,selectorExpression = "tag = 1 || tag = 2")
//topic 主题 consumerGroup 分组 selectorType 过滤类型 selectorExpression 过滤语法 messageModel 方法模式
@RocketMQMessageListener(topic = "10",consumerGroup = "producer-group1",
selectorType = SelectorType.SQL92,selectorExpression = "tag > 92",
messageModel = MessageModel.BROADCASTING)
public class ProducerListener implements RocketMQListener<User> {

@Override
public void onMessage(User user) {
System.out.println(user);
}

}

topic 主题

consumerGroup 分组

selectorType 过滤类型

selectorExpression 过滤语法

messageModel 方法模式

信息发送形式

  • 异步发送
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//        异步发送
template.asyncSend("topic10", user, new SendCallback() {

// 成功
@Override
public void onSuccess(SendResult sendResult) {
System.out.println(sendResult);
}

// 失败
@Override
public void onException(Throwable throwable) {
System.out.println(throwable);
}
});
  • 单向发送
1
2
//        单向发送
template.sendOneWay("topic10",user);
  • 延时发送
1
2
//        延时发送
template.syncSend("topic10", MessageBuilder.withPayload(user).build(),1000,2);
  • 批量发送
1
2
3
4
5
6
7
8
9
10
//        批量发送
List<User> messageList = new ArrayList<>();
User user1 = new User(1, "xiaobo");
User user2 = new User(2, "wangyibo");
User user3 = new User(3, "wangyangnan");
messageList.add(user1);
messageList.add(user2);
messageList.add(user3);

template.syncSend("topic10",messageList,1000);

正确的开始,微小的长进,然后持续,嘿,我是小博,带你一起看我目之所及的世界……

-------------本文结束 感谢您的阅读-------------

本文标题:RocketMQ(一)

文章作者:小博

发布时间:2022年10月18日 - 20:21

最后更新:2022年10月18日 - 20:22

原始链接:https://codexiaobo.github.io/posts/442872112/

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。