RabbitMQ 二

长夜无荒

RabbitMQ高级特性

消息可靠性传输过程

使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败

Confirm 确认模式

消息从 producer 到 exchange 则会返回一个 confirmCallback

  • pom
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.1.7.RELEASE</version>
</dependency>

<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.1.8.RELEASE</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>5.1.7.RELEASE</version>
</dependency>
  • rabbitmq.properties
1
2
3
4
5
rabbitmq.host=localhost
rabbitmq.port=5672
rabbitmq.username=xiaobo
rabbitmq.password=xiaobo
rabbitmq.virtual-host=/
  • spring-rabbitmq-producer.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!--加载配置文件-->
<context:property-placeholder location="classpath:rabbitmq.properties"/>

<!-- 定义rabbitmq connectionFactory 1. 设置 publisher-confirms="true" -->
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"

publisher-confirms="true"
/>
<!--定义管理交换机、队列-->
<rabbit:admin connection-factory="connectionFactory"/>

<!--定义rabbitTemplate对象操作可以在代码中方便发送消息-->
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>

<!--2. 消息可靠性投递(生产端)-->
<rabbit:queue id="test_queue_confirm" name="test_queue_confirm"></rabbit:queue>
<rabbit:direct-exchange name="test_exchange_confirm">
<rabbit:bindings>
<rabbit:binding queue="test_queue_confirm" key="confirm"> </rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>
</beans>

关键publisher-confirms="true" 这个得开启

  • producer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
public class ProducerTest {

@Autowired
private RabbitTemplate rabbitTemplate;

/**
* 确认模式:
* 步骤:
* 1. 确认模式开启:ConnectionFactory中开启publisher-confirms="true"
* 2. 在rabbitTemplate定义ConfirmCallBack回调函数
*/
@Test
public void testConfirm() {

//2. 定义回调 **
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
*
* @param correlationData 相关配置信息
* @param ack exchange交换机 是否成功收到了消息。true 成功,false代表失败
* @param cause 失败原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("confirm方法被执行了....");

if (ack) {
//接收成功
System.out.println("接收成功消息" + cause);
} else {
//接收失败
System.out.println("接收失败消息" + cause);
//做一些处理,让消息再次发送。
}
}
});


//3. 发送消息 接受成功
rabbitTemplate.convertAndSend("test_exchange_confirm", "confirm", "message confirm....");

//3. 发送消息 接受失败
// rabbitTemplate.convertAndSend("test_exchange_confirm1", "confirm", "message confirm....");

}
}

Return 退回模式

当消息从exchange路由到queue失败后,如果设置了rabbitTemplate.setMandatory(true)参数,则会将消息退回给producer 返回一个returnCallback

关键publisher-returns="true" 开启

  • producer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/**
* 步骤:
* 1. 开启回退模式:publisher-returns="true"
* 2. 设置ReturnCallBack
* 3. 设置Exchange处理消息的模式:
* 1. 如果消息没有路由到Queue,则丢弃消息(默认)
* 2. 如果消息没有路由到Queue,返回给消息发送方ReturnCallBack
*/

@Test
public void testReturn() {

//设置交换机处理失败消息的模式
rabbitTemplate.setMandatory(true);

//2.设置ReturnCallBack
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
/**
*
* @param message 消息对象
* @param replyCode 错误码
* @param replyText 错误信息
* @param exchange 交换机
* @param routingKey 路由键
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("return 执行了....");

System.out.println(message);
System.out.println(replyCode);
System.out.println(replyText);
System.out.println(exchange);
System.out.println(routingKey);

//处理
}
});

for (int i = 0; i < 10; i++) {

//3. 发送消息
rabbitTemplate.convertAndSend("test_exchange_confirm", "confirm", "message confirm....");
}
}

消费者处理

消费端确认方式(3种)

  • 自动确认:acknowledge=”none”

自动确认当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除

  • 手动确认:acknowledge=”manual”

手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息

  • pom
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.1.7.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.1.8.RELEASE</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>5.1.7.RELEASE</version>
</dependency>
  • rabbitmq.properties
1
2
3
4
5
rabbitmq.host=localhost
rabbitmq.port=5672
rabbitmq.username=xiaobo
rabbitmq.password=xiaobo
rabbitmq.virtual-host=/
  • spring-rabbitmq-consumer.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!--加载配置文件-->
<context:property-placeholder location="classpath:rabbitmq.properties"/>

<!-- 定义rabbitmq connectionFactory -->
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"/>


<context:component-scan base-package="com.dream.xiaobo.listener" />

<!--定义监听器容器 添加 acknowledge="manual" 手动-->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1">
<rabbit:listener ref="ackListener" queue-names="test_queue_confirm"/>
<rabbit:listener ref="dlxListener" queue-names="test_queue_dlx"/>
</rabbit:listener-container>
</beans>

acknowledge="manual" 手动

prefetch="1" 一批请求接受多少

  • AckListener
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Component
public class AckListener implements ChannelAwareMessageListener {

@Override
public void onMessage(Message message, Channel channel) throws Exception {

long deliveryTag = message.getMessageProperties().getDeliveryTag();

try {

//1.接收转换消息
System.out.println(new String(message.getBody()));

//2. 处理业务逻辑
System.out.println("处理业务逻辑...");
// int i = 1/0;//出现错误
//3. 手动
channel.basicAck(deliveryTag,true);

}catch (Exception e) {
//e.printStackTrace();

//4.拒绝
/**
第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker会重新发送该消息给消费端
*/
channel.basicNack(deliveryTag,true,true);
// 了解
//channel.basicReject(deliveryTag,true);
}

Thread.sleep(5000);

}
}
  • consumer
1
2
3
4
5
6
@Test
public void test(){
while (true){

}
}
  • 根据异常情况确认:acknowledge=”auto”,(不常用)

TTL

存活时间/过期时间,当消息到达存活时间后,还没有被消费,会被自动清除

RabbitMQ可以对消息设置过期时间,也可以对整个队列设置过期时间

设置方法

控制台可以直接设置

编程实现

整体队列设置过期时间

  • xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
<!--ttl-->
<rabbit:queue name="test_queue_ttl" id="test_queue_ttl">
<!--设置queue的参数-->
<rabbit:queue-arguments>
<!--x-message-ttl指队列的过期时间-->
<entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"/>
</rabbit:queue-arguments>
</rabbit:queue>

<rabbit:topic-exchange name="test_exchange_ttl" >
<rabbit:bindings>
<rabbit:binding pattern="com.#" queue="test_queue_ttl"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
  • producer
1
2
3
4
5
6
7
8
9
10
11
/**
* ttl信息发送
*/
@Test
public void ttlTest(){

for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("test_exchange_ttl","com.dream","rabbitmq ttl"+i);
}

}

设置单独消息过期时间

  • producer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
* 单个消息的过期时间
*/
@Test
public void ttlTest2(){


MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {

@Override
public Message postProcessMessage(Message message) throws AmqpException {

// 设置消息过期时间
message.getMessageProperties().setExpiration("5000");

return message;
}
};

MessagePostProcessor messagePostProcessor2 = new MessagePostProcessor() {

@Override
public Message postProcessMessage(Message message) throws AmqpException {

message.getMessageProperties().setExpiration("10000");

return message;
}
};

//发送消息单独过期
rabbitTemplate.convertAndSend("test_exchange_ttl", "com.dream", "rabbitmq ttl....",messagePostProcessor);

rabbitTemplate.convertAndSend("test_exchange_ttl", "com.dream", "rabbitmq ttl....",messagePostProcessor2);

rabbitTemplate.convertAndSend("test_exchange_ttl", "com.dream", "rabbitmq ttl....",messagePostProcessor);

rabbitTemplate.convertAndSend("test_exchange_ttl", "com.dream", "rabbitmq ttl....",messagePostProcessor2);

}
  • 设置了消息的过期时间,也设置了队列的过期时间,它以时间短的为准

  • 队列过期后,会将队列所有消息全部移除

  • 消息过期后,只有消息在队列顶端,才会判断其是否过期

死信队列

死信队列

  • 队列消息长度到达限制

  • 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false

  • 原队列存在消息过期设置,消息到达超时时间未被消费

模拟

  • xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
<rabbit:queue name="test_queue_dlx" id="test_queue_dlx">

<!--3. 正常队列绑定死信交换机-->
<rabbit:queue-arguments>
<!--3.1 x-dead-letter-exchange:死信交换机名称-->
<entry key="x-dead-letter-exchange" value="exchange_dlx" />

<!--3.2 x-dead-letter-routing-key:发送给死信交换机的routingkey-->
<entry key="x-dead-letter-routing-key" value="dlx.hehe" />

<!--4.1 设置队列的过期时间 ttl-->
<entry key="x-message-ttl" value="10000" value-type="java.lang.Integer" />
<!--4.2 设置队列的长度限制 max-length -->
<entry key="x-max-length" value="10" value-type="java.lang.Integer" />
</rabbit:queue-arguments>

</rabbit:queue>
<rabbit:topic-exchange name="test_exchange_dlx">
<rabbit:bindings>
<rabbit:binding pattern="test.dlx.#" queue="test_queue_dlx"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>

<!-- 死信队列-->
<rabbit:queue name="queue_dlx" id="queue_dlx"></rabbit:queue>
<rabbit:topic-exchange name="exchange_dlx">
<rabbit:bindings>
<rabbit:binding pattern="dlx.#" queue="queue_dlx"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
  • producer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* 死信消息测试
*/
@Test
public void ddlTest(){

// 时间过期,死信消息
rabbitTemplate.convertAndSend("test_exchange_dlx",
"test.dlx.haha","我等着过期,然后去死信队列");


// 超出队列长度,死信消息
for (int i = 0; i < 15; i++) {
rabbitTemplate.convertAndSend("test_exchange_dlx",
"test.dlx.haha","我超出队列长度,然后去死信队列");
}

// 被拒收,死信消息
rabbitTemplate.convertAndSend("test_exchange_dlx",
"test.dlx.haha","我等着被拒收,然后去死信队列");

}

消费者

  • xml

添加<rabbit:listener ref="dlxListener" queue-names="test_queue_dlx"/>

  • DlxListener
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Component
public class DlxListener implements ChannelAwareMessageListener {

@Override
public void onMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();

try {
//1.接收转换消息
System.out.println(new String(message.getBody()));

//2. 处理业务逻辑
System.out.println("处理业务逻辑...");
int i = 3/0;//出现错误
//3. 手动签收
channel.basicAck(deliveryTag,true);
} catch (Exception e) {
//e.printStackTrace();
System.out.println("拒绝接受");
//4.拒绝签收
/**
第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker会重新发送该消息给消费端
*/
channel.basicNack(deliveryTag,true,false);
// 了解
//channel.basicReject(deliveryTag,true);
}
}
}

延迟队列

延迟队列

延迟队列 = TTL + 死信队列

进入队列后,延迟一段时间在进行消费

消息可靠性保障(面试)

消息可靠性保障

  • 消息的生产者将业务数据存到数据库中

  • 发送消息给 队列Q1

  • 消息的生产者等待一定的时间后,在发送一个延迟消息给队列 Q3

  • 消息的消费方监听 Q1 队列消息,成功接收后

  • 消息的消费方会 发送 一条确认消息给 队列Q2

  • 回调检查服务监听 队列Q2 发送的确认消息

  • 回调检查服务接收到确认消息后,将消息写入到 消息的数据库表中

  • 回调检查服务同时也会监听 队列Q3延迟消息, 如果接收到消息会和数据库比对消息的唯一标识

  • 如果发现没有接收到确认消息,那么回调检查服务就会远程调用 消息生产者,重新发送消息

  • 重新执行 2-7 步骤,保证消息的可靠性传输

  • 如果发送消息和延迟消息都出现异常,定时检查服务会监控 消息库中的消息数据,如果发现不一致的消息然后远程调用消息的生产者重新发送消息。

以上介绍引用ydl文章,自我做个记录

消息幂等性处理(面试)

幂等性指一次和多次请求某一个资源,对于资源本身应该具有同样的结果。也就是说,其任意多次执行对资源本身所产生的影响均与一次执行的影响相同,消费多条相同的消息,得到与消费该消息一次相同的结果

消息幂等性处理

乐观锁机制 保证消息的幂等操作

以上介绍引用ydl文章,自我做个记录

正确的开始 微小的长进 然后持续 嘿 我是小博 带你一起看我目之所及的世界……

-------------本文结束 感谢您的阅读-------------

本文标题:RabbitMQ 二

文章作者:小博

发布时间:2022年10月25日 - 21:28

最后更新:2022年10月25日 - 21:29

原始链接:https://codexiaobo.github.io/posts/142858353/

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。