首页IT科技mq面试必备(RabbitMQ个人实践)

mq面试必备(RabbitMQ个人实践)

时间2025-06-14 23:26:22分类IT科技浏览4045
导读:前言 MQ(Message Queue)就是消息队列,其有点有很多:解耦、异步、削峰等等,本文来聊一下RabbitMQ的一些概念以及使用。...

前言

MQ(Message Queue)就是消息队列            ,其有点有很多:解耦            、异步                  、削峰等等                  ,本文来聊一下RabbitMQ的一些概念以及使用            。

RabbitMq

案例

Springboot整合RabbitMQ简单案例

基本概念

Exchange:消息交换机      ,它指定消息按什么规则      ,路由到哪个队列                  。 Queue:消息队列载体                  ,每个消息都会被投入到一个或多个队列      。 Binding:绑定            ,它的作用就是把exchange和queue按照路由规则绑定起来            。 Routing Key:路由关键字      ,exchange根据这个关键字进行消息投递                  。 Producer:消息生产者                  ,就是投递消息的程序      。 Consumer:消息消费者            ,就是接受消息的程序      。

发布消息到RabbitMQ需要经过两步:

producer → exchange exchange 根据 exchange 的类型和 routing key 确定将消息投递到哪个队列

工作流程

了解了RabbitMQ的一些概念,我们来捋捋使用RabbitMQ的流程:

创建Exchange 创建Queue 将Queue绑定进Exchange中(此处会设置routing key) 生产者发布消息 消费者订阅消息

交换机(Exchange)

交换机可以绑定队列                  ,绑定时可以给队列指定路由(Routing key)和参数(Arguments)

所有的消息发送都是经过交换机转发到队列的                  ,而不是直接到队列中

交换机类型:

direct

根据确定的路由(routing key)转发消息到队列中(一条消息可以发到多个队列,只要路由相同)

fanout

路由无效            ,只要和该交换机绑定的队列                  ,都能接收到消息

topic

允许路由使用*和#来进行模糊匹配

*表示一个单词

表示任意数量(零个或多个)单词

例如:如果队列的路由为com.# 那么往交换机发消息是      ,路由填com.ccc 队列就可以收到消息

headers

忽略路由            ,由参数(Arguments)来确定转发的队列

消息过期时间TTL

有两种方式设置TTL                  ,创建队列时设置整个队列的TTL或者在发送消息时单独设置每条消息的TTL      ,消息存活时间取两者的最小值                  。

创建队列时设置

是消息的存活时间      ,不是队列的存活时间                  ,别搞混了            。

@Bean public Queue queue(){ Map<String, Object> args = new HashMap<>(); args.put("x-message-ttl", 5000); // 设置队列中的消息5秒过期 return new Queue("queueName",true, false, false, args); }

发送消息时设置

public void makeOrder(String userid,String productid,int num){ String exchangeName = "ttl_exchange"; String routingKey = "ttlmessage"; //给消息设置过期时间 MessagePostProcessor messagePostProcessor = new MessagePostProcessor(){ public Message postProcessMessage(Message message){ // 设置消息5秒过期 message.getMessageProperties().setExpiration("5000"); return message; } } rabbitTemplate.convertAndSend(exchangeName,routingKey,"message",messagePostProcessor); }

死信队列

死信队列也是一个正常队列            ,只是当绑定了死信队列的队列满足相应条件      ,就会将满足条件的消息转移到死信队列中      。

进入死信队列的条件:

消息被拒绝 消息过期(超时) 队列达到最大长度

死信队列的配置:

按照正常步骤定义一个队列(交换机      、队列            、绑定)

给需要绑定死信队列的队列添加x-dead-letter-exchange(死信队列的交换机)和x-dead-letter-routing-key(死信队列的路由)参数

@Bean public Queue queue(){ Map<String, Object> args = new HashMap<>(); args.put("x-dead-letter-exchange", "死信队列交换机名称"); args.put("x-dead-letter-routing-key", "死信队列路由"); return new Queue("queueName",true, false, false, args); }

如何保证MQ消息正确送达与消费

可靠性生产和推送

步骤:

发送消息前数据库保存MQ消息发送日志 MQ消息发送后使用回调更新日志状态

实现:

上面我们讲了                  ,发布消息到RabbitMQ需要经过两步:

producer → exchange

exchange 根据 exchange 的类型和 routing key 确定将消息投递到哪个队列

所以            ,发布消息的确认也分两个部分,以下是确认步骤:

修改MQ应答机制(yml)

spring: rabbitmq: username: rmq password: 123456 virtual-host: / host: localhost port: 5672 # 发送消息确认                  ,producer -> exchange publisher-confirm-type: correlated # 发送消息确认                  ,exchange -> queue publisher-returns: true

新增mq的回调方法

/** * PostConstruct注解好多人以为是Spring提供的                  。其实是Java自己的注解            。 * Java中该注解的说明:@PostConstruct该注解被用来修饰一个非静态的void()方法。 * 被@PostConstruct修饰的方法会在服务器加载Servlet的时候运行,并且只会被服务器执行一次                  。 * PostConstruct在构造函数之后执行            ,init()方法之前执行                  。 * Constructor(构造方法) -> @Autowired(依赖注入) -> @PostConstruct(注释的方法) */ @PostConstruct private void regCallBack() { // producer -> exchange 成功或失败都会触发此回调 rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { // 这个id是在消息发送的时候传入的 String id = correlationData.getId(); // 如果ack为true代表消息被mq成功接收 if (!ack) { // 应答失败                  ,修改日志状态 System.out.println("exchange 应答失败      ,做失败处理!"); } else { // 应答成功            ,修改日志状态 System.out.println("exchange 成功处理"); } } }); // 这个回调只有exchange -> queue 失败时才会触发 rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println("exchange -> queue 发送失败"); } }); }

修改MQ发送消息的方法                  ,增加日志id的传递

String correlationId = "这是日志id"; rabbitTemplate.convertAndSend(exchange, routeKey, message, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { // 消费者需要correlationId才做这个处理 message.getMessageProperties().setCorrelationId(correlationId); return message; } }, new CorrelationData(correlationId)); // 如果消费者不需要获取correlationId      ,则用下面这种即可 rabbitTemplate.convertAndSend(exchange, routeKey, msg, new CorrelationData(correlationId));

可靠性消费

步骤:

开启手动应答 监听器增加手动应答逻辑

实现:

开启手动应答

spring: rabbitmq: username: rmq password: 123456 virtual-host: / host: localhost port: 5672 listener: simple: acknowledge-mode: manual # 将自动应答ack模式改成手动应答

acknowledge-mode有三种类型:

nome:不进行ack      ,rabbitmq默认消费者正确处理所有请求 munual:手动确认 auto:自动确认消息(默认类型)。如果消费者抛出异常                  ,则消息重回队列            。

监听器增加手动应答逻辑

@RabbitListener(queues = {"队列名字"}) public void messageConsumer(String orderMsg, Channel channel, @Headers Map<String,Object> headers) throws Exception{ // 需要producer做相应处理            ,consumer才能拿到correlationId String correlationId = messages.getMessageProperties().getCorrelationId(); System.out.println("消息为:" + orderMsg); long tag = Long.parseLong(headers.get(AmqpHeaders.DELIVERY_TAG).toString()); try { // 消费成功      ,进行确认 channel.basicAck(tag, false); } catch (IOException e) { // 消费失败                  ,重发 // requeue代表是否重发            ,为false则直接将消息丢弃,有死信就进入死信队列 channel.basicNack(tag, false, true); } }

总结

本文介绍了RabbitMQ的一些概念和简单使用                  ,有不少东西其实是没有讲清楚的                  ,比如publisher-confirm-type和acknowledge-mode的几种类型的区别等等                  。主要是在官方文档找不到相关的细致描述,查文档的能力还是有待提高      。            。                  。

参考资料

RabbitMq 技术文档 - 腾讯云开发者社区-腾讯云 (tencent.com)

Spring AMQP

创心域SEO版权声明:以上内容作者已申请原创保护,未经允许不得转载,侵权必究!授权事宜、对本内容有异议或投诉,敬请联系网站管理员,我们将尽快回复您,谢谢合作!

展开全文READ MORE
vue生命周期简单描述(Vue–》详解Vue组件生命周期的三个阶段) pppoe是什么协议(pppsetup命令 – 设置PPP连线)