RabbitMQ高级特性
消息可靠性传输过程
使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败
Confirm 确认模式
消息从 producer 到 exchange 则会返回一个 confirmCallback
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>5.1.7.RELEASE</version> </dependency>
<dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>2.1.8.RELEASE</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <version>5.1.7.RELEASE</version> </dependency>
|
1 2 3 4 5
| rabbitmq.host=localhost rabbitmq.port=5672 rabbitmq.username=xiaobo rabbitmq.password=xiaobo rabbitmq.virtual-host=/
|
- spring-rabbitmq-producer.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"> <context:property-placeholder location="classpath:rabbitmq.properties"/>
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}" virtual-host="${rabbitmq.virtual-host}"
publisher-confirms="true" /> <rabbit:admin connection-factory="connectionFactory"/>
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
<rabbit:queue id="test_queue_confirm" name="test_queue_confirm"></rabbit:queue> <rabbit:direct-exchange name="test_exchange_confirm"> <rabbit:bindings> <rabbit:binding queue="test_queue_confirm" key="confirm"> </rabbit:binding> </rabbit:bindings> </rabbit:direct-exchange> </beans>
|
关键publisher-confirms="true"
这个得开启
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml") public class ProducerTest {
@Autowired private RabbitTemplate rabbitTemplate;
@Test public void testConfirm() {
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println("confirm方法被执行了....");
if (ack) { System.out.println("接收成功消息" + cause); } else { System.out.println("接收失败消息" + cause); } } });
rabbitTemplate.convertAndSend("test_exchange_confirm", "confirm", "message confirm....");
} }
|
Return 退回模式
当消息从exchange路由到queue失败后,如果设置了rabbitTemplate.setMandatory(true)参数,则会将消息退回给producer 返回一个returnCallback
关键publisher-returns="true"
开启
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
|
@Test public void testReturn() {
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println("return 执行了....");
System.out.println(message); System.out.println(replyCode); System.out.println(replyText); System.out.println(exchange); System.out.println(routingKey);
} });
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("test_exchange_confirm", "confirm", "message confirm...."); } }
|
消费者处理
消费端确认方式(3种)
自动确认当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除
- 手动确认:acknowledge=”manual”
手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>5.1.7.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>2.1.8.RELEASE</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <version>5.1.7.RELEASE</version> </dependency>
|
1 2 3 4 5
| rabbitmq.host=localhost rabbitmq.port=5672 rabbitmq.username=xiaobo rabbitmq.password=xiaobo rabbitmq.virtual-host=/
|
- spring-rabbitmq-consumer.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"> <context:property-placeholder location="classpath:rabbitmq.properties"/>
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}" virtual-host="${rabbitmq.virtual-host}"/>
<context:component-scan base-package="com.dream.xiaobo.listener" />
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1"> <rabbit:listener ref="ackListener" queue-names="test_queue_confirm"/> <rabbit:listener ref="dlxListener" queue-names="test_queue_dlx"/> </rabbit:listener-container> </beans>
|
acknowledge="manual"
手动
prefetch="1"
一批请求接受多少
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| @Component public class AckListener implements ChannelAwareMessageListener {
@Override public void onMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
System.out.println(new String(message.getBody()));
System.out.println("处理业务逻辑...");
channel.basicAck(deliveryTag,true);
}catch (Exception e) {
channel.basicNack(deliveryTag,true,true); }
Thread.sleep(5000);
} }
|
1 2 3 4 5 6
| @Test public void test(){ while (true){
} }
|
- 根据异常情况确认:acknowledge=”auto”,(不常用)
TTL
存活时间/过期时间,当消息到达存活时间后,还没有被消费,会被自动清除
RabbitMQ可以对消息设置过期时间,也可以对整个队列设置过期时间
设置方法
控制台可以直接设置
编程实现
整体队列设置过期时间
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| <rabbit:queue name="test_queue_ttl" id="test_queue_ttl"> <rabbit:queue-arguments> <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"/> </rabbit:queue-arguments> </rabbit:queue>
<rabbit:topic-exchange name="test_exchange_ttl" > <rabbit:bindings> <rabbit:binding pattern="com.#" queue="test_queue_ttl"></rabbit:binding> </rabbit:bindings> </rabbit:topic-exchange>
|
1 2 3 4 5 6 7 8 9 10 11
|
@Test public void ttlTest(){
for (int i = 0; i < 10; i++) { rabbitTemplate.convertAndSend("test_exchange_ttl","com.dream","rabbitmq ttl"+i); }
}
|
设置单独消息过期时间
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
|
@Test public void ttlTest2(){
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration("5000");
return message; } };
MessagePostProcessor messagePostProcessor2 = new MessagePostProcessor() {
@Override public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration("10000");
return message; } };
rabbitTemplate.convertAndSend("test_exchange_ttl", "com.dream", "rabbitmq ttl....",messagePostProcessor);
rabbitTemplate.convertAndSend("test_exchange_ttl", "com.dream", "rabbitmq ttl....",messagePostProcessor2);
rabbitTemplate.convertAndSend("test_exchange_ttl", "com.dream", "rabbitmq ttl....",messagePostProcessor);
rabbitTemplate.convertAndSend("test_exchange_ttl", "com.dream", "rabbitmq ttl....",messagePostProcessor2);
}
|
死信队列
模拟
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| <rabbit:queue name="test_queue_dlx" id="test_queue_dlx">
<rabbit:queue-arguments> <entry key="x-dead-letter-exchange" value="exchange_dlx" />
<entry key="x-dead-letter-routing-key" value="dlx.hehe" />
<entry key="x-message-ttl" value="10000" value-type="java.lang.Integer" /> <entry key="x-max-length" value="10" value-type="java.lang.Integer" /> </rabbit:queue-arguments>
</rabbit:queue> <rabbit:topic-exchange name="test_exchange_dlx"> <rabbit:bindings> <rabbit:binding pattern="test.dlx.#" queue="test_queue_dlx"></rabbit:binding> </rabbit:bindings> </rabbit:topic-exchange>
<rabbit:queue name="queue_dlx" id="queue_dlx"></rabbit:queue> <rabbit:topic-exchange name="exchange_dlx"> <rabbit:bindings> <rabbit:binding pattern="dlx.#" queue="queue_dlx"></rabbit:binding> </rabbit:bindings> </rabbit:topic-exchange>
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
|
@Test public void ddlTest(){
rabbitTemplate.convertAndSend("test_exchange_dlx", "test.dlx.haha","我等着过期,然后去死信队列");
for (int i = 0; i < 15; i++) { rabbitTemplate.convertAndSend("test_exchange_dlx", "test.dlx.haha","我超出队列长度,然后去死信队列"); }
rabbitTemplate.convertAndSend("test_exchange_dlx", "test.dlx.haha","我等着被拒收,然后去死信队列");
}
|
消费者
添加<rabbit:listener ref="dlxListener" queue-names="test_queue_dlx"/>
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| @Component public class DlxListener implements ChannelAwareMessageListener {
@Override public void onMessage(Message message, Channel channel) throws Exception { long deliveryTag = message.getMessageProperties().getDeliveryTag();
try { System.out.println(new String(message.getBody()));
System.out.println("处理业务逻辑..."); int i = 3/0; channel.basicAck(deliveryTag,true); } catch (Exception e) { System.out.println("拒绝接受");
channel.basicNack(deliveryTag,true,false); } } }
|
延迟队列
延迟队列 = TTL + 死信队列
进入队列后,延迟一段时间在进行消费
消息可靠性保障(面试)
消息的生产者将业务数据存到数据库中
发送消息给 队列Q1
消息的生产者等待一定的时间后,在发送一个延迟消息给队列 Q3
消息的消费方监听 Q1 队列消息,成功接收后
消息的消费方会 发送 一条确认消息给 队列Q2
回调检查服务监听 队列Q2 发送的确认消息
回调检查服务接收到确认消息后,将消息写入到 消息的数据库表中
回调检查服务同时也会监听 队列Q3延迟消息, 如果接收到消息会和数据库比对消息的唯一标识
如果发现没有接收到确认消息,那么回调检查服务就会远程调用 消息生产者,重新发送消息
重新执行 2-7 步骤,保证消息的可靠性传输
如果发送消息和延迟消息都出现异常,定时检查服务会监控 消息库中的消息数据,如果发现不一致的消息然后远程调用消息的生产者重新发送消息。
以上介绍引用ydl文章,自我做个记录
消息幂等性处理(面试)
幂等性指一次和多次请求某一个资源,对于资源本身应该具有同样的结果。也就是说,其任意多次执行对资源本身所产生的影响均与一次执行的影响相同,消费多条相同的消息,得到与消费该消息一次相同的结果
乐观锁机制 保证消息的幂等操作
以上介绍引用ydl文章,自我做个记录
正确的开始 微小的长进 然后持续 嘿 我是小博 带你一起看我目之所及的世界……