一、ACK
ACK是acknowledge的缩写,表示已确认
二、默认情况
默认情况下,消费端取回消息后,默认会自动返回ACK确认消息,所以在前面的测试中消息被消费端消费之后,RabbitMQ得到ACK确认信息就会删除消息
但实际开发中,消费端根据消息队列投递的消息执行对应的业务,未必都能执行成功,如果希望能够多次重试,那么默认设定就不满足要求了
所以还是要修改成手动确认
三、创建消费端module
1、配置POM
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>3.1.5</version> </parent>
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> </dependencies>
|
2、YAML
增加针对监听器的设置:
1 2 3 4 5 6 7 8 9 10
| spring: rabbitmq: host: 192.168.200.100 port: 5672 username: guest password: 123456 virtual-host: / listener: simple: acknowledge-mode: manual
|
3、主启动类
没有特殊设定:
1 2 3 4 5 6 7 8 9 10 11 12 13
| package com.atguigu.mq;
import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication public class RabbitMQConsumerMainType {
public static void main(String[] args) { SpringApplication.run(RabbitMQConsumerMainType.class, args); }
}
|
四、消费端监听器
1、创建监听器类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| package com.atguigu.mq.listener;
import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.stereotype.Component;
@Component public class MyMessageListener {
public static final String EXCHANGE_DIRECT = "exchange.direct.order"; public static final String ROUTING_KEY = "order"; public static final String QUEUE_NAME = "queue.order";
public void processMessage(String dataString, Message message, Channel channel) {
}
}
|
2、在接收消息的方法上应用注解
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| @RabbitListener( // 设置绑定关系 bindings = @QueueBinding(
// 配置队列信息:durable 设置为 true 表示队列持久化;autoDelete 设置为 false 表示关闭自动删除 value = @Queue(value = QUEUE_NAME, durable = "true", autoDelete = "false"),
// 配置交换机信息:durable 设置为 true 表示队列持久化;autoDelete 设置为 false 表示关闭自动删除 exchange = @Exchange(value = EXCHANGE_DIRECT, durable = "true", autoDelete = "false"),
// 配置路由键信息 key = {ROUTING_KEY} )) public void processMessage(String dataString, Message message, Channel channel) {
}
|
3、接收消息方法内部逻辑
- 业务处理成功:手动返回ACK信息,表示消息成功消费
- 业务处理失败:手动返回NACK信息,表示消息消费失败。此时有两种后续操作供选择:
- 把消息重新放回消息队列,RabbitMQ会重新投递这条消息,那么消费端将重新消费这条消息——从而让业务代码再执行一遍
- 不把消息放回消息队列,返回reject信息表示拒绝,那么这条消息的处理就到此为止
4、相关API
先回到PPT理解“deliveryTag:交付标签机制”
下面我们探讨的三个方法都是来自于com.rabbitmq.client.Channel接口
①basicAck()方法
- 方法功能:给Broker返回ACK确认信息,表示消息已经在消费端成功消费,这样Broker就可以把消息删除了
- 参数列表:
参数名称 |
含义 |
long deliveryTag |
Broker给每一条进入队列的消息都设定一个唯一标识 |
boolean multiple |
取值为true:为小于、等于deliveryTag的消息批量返回ACK信息 取值为false:仅为指定的deliveryTag返回ACK信息 |
②basicNack()方法
- 方法功能:给Broker返回NACK信息,表示消息在消费端消费失败,此时Broker的后续操作取决于参数requeue的值
- 参数列表:
参数名称 |
含义 |
long deliveryTag |
Broker给每一条进入队列的消息都设定一个唯一标识 |
boolean multiple |
取值为true:为小于、等于deliveryTag的消息批量返回ACK信息 取值为false:仅为指定的deliveryTag返回ACK信息 |
boolean requeue |
取值为true:Broker将消息重新放回队列,接下来会重新投递给消费端 取值为false:Broker将消息标记为已消费,不会放回队列 |
③basicReject()方法
- 方法功能:根据指定的deliveryTag,对该消息表示拒绝
- 参数列表:
参数名称 |
含义 |
long deliveryTag |
Broker给每一条进入队列的消息都设定一个唯一标识 |
boolean requeue |
取值为true:Broker将消息重新放回队列,接下来会重新投递给消费端 取值为false:Broker将消息标记为已消费,不会放回队列 |
- basicNack()和basicReject()有啥区别?
- basicNack()有批量操作
- basicReject()没有批量操作
5、完整代码示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
| package com.atguigu.mq.listener;
import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
import java.io.IOException;
@Component @Slf4j public class MyMessageListener {
public static final String EXCHANGE_DIRECT = "exchange.direct.order"; public static final String ROUTING_KEY = "order"; public static final String QUEUE_NAME = "queue.order";
@RabbitListener( // 设置绑定关系 bindings = @QueueBinding(
// 配置队列信息:durable 设置为 true 表示队列持久化;autoDelete 设置为 false 表示关闭自动删除 value = @Queue(value = QUEUE_NAME, durable = "true", autoDelete = "false"),
// 配置交换机信息:durable 设置为 true 表示队列持久化;autoDelete 设置为 false 表示关闭自动删除 exchange = @Exchange(value = EXCHANGE_DIRECT, durable = "true", autoDelete = "false"),
// 配置路由键信息 key = {ROUTING_KEY} )) public void processMessage(String dataString, Message message, Channel channel) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try { log.info("消费端接收到消息内容:" + dataString);
channel.basicAck(deliveryTag, false); } catch (Exception e) {
Boolean redelivered = message.getMessageProperties().getRedelivered();
if (!redelivered) { channel.basicNack(deliveryTag, false, true); } else { channel.basicReject(deliveryTag, false); }
} }
}
|
五、要点总结
- 要点1:把消息确认模式改为手动确认
- 要点2:调用Channel对象的方法返回信息
- ACK:Acknowledgement,表示消息处理成功
- NACK:Negative Acknowledgement,表示消息处理失败
- Reject:拒绝,同样表示消息处理失败
- 要点3:后续操作
- requeue为true:重新放回队列,重新投递,再次尝试
- requeue为false:不放回队列,不重新投递
- 要点4:deliveryTag 消息的唯一标识,查找具体某一条消息的依据
六、流程梳理

七、多啰嗦一句
消费端如果设定消息重新放回队列,Broker重新投递消息,那么消费端就可以再次消费消息,这是一种“重试”机制,这需要消费端代码支持“幂等性”——这属于前置知识,不展开了。