rabbitmq如何保证消息不丢(RabbitMQ中间件)
RabbitMQ
配置环境
安装 erlang环境以及RabbitMQ
RabbitMQ端口号: 5672
去官网下载 https://www.rabbitmq.com
然后重启RabbitMQ服务 RabbitMQ安装教程
开放端口15672
这里 ,通过http://IP地址:15672 进行Web页面登录 ,输入账号密码(默认都是guest) ,完成页面访问 。至此 ,全部安装结束 。
导入依赖
<!-- 集成RabbitMQ --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>配置相关信息
RabbitMQ的端口号是什么?
5672
:这是rabbitMQ的端口号;
15672 :这是那个RabbitMQ的web页面的端口号; spring.application.name=spirng-boot-rabbitmq spring.rabbitmq.host=127.0.0.1 ##主机ip spring.rabbitmq.port=5672 spring.rabbitmq.virtual-host=MmHost spring.rabbitmq.username=root spring.rabbitmq.password=root spring: rabbitmq: username: root password: root addresses: 127.0.0.1:5672 cache: connection: #Cache connection mode, with default connections and multiple channels mode: channel #Multiple connections, multiple channels # mode: connection # rabbitmq server: port: 8080 spring: #给项目来个名字 application: name: rabbitmq-consumer #配置rabbitMq 服务器 rabbitmq: host: 127.0.0.1 port: 5672 username: root password: root #虚拟host 可以不设置,使用server默认host virtual-host: MmHost发送消息
@Component public class SenderTest{ @Autowired private RabbitTemplate rabbitTemplate; @Test public void Send() { // 队列名称 String queueName = "ThisKey"; // 消息 String message = "Hello, Spring AMQP!"; // 发送消息 rabbitTemplate.convertAndSend(queueName, message); } }rabbitTemplate.convertAndSend(key, message);
接收消息
@Component public class SpringRabbitMQListener { @RabbitListener(queues = "ThisKey") public void listenSimpleQueueMsg(String msg){ System.out.println(msg); } } @Component //指定所监听的队列 @RabbitListener(queues = "ThisKey") public class SpringRabbitMQListener { //指定用来处理接收消息的方法 @RabbitHandler public void listenSimpleQueueMsg(String msg){ System.out.println(msg); } }注意:此处消息被消费后 ,对应的ThisKey中的消息就消失了 。
原文链接 去的去看看
RabbitMQ-基础使用(Spring AMQP) - 简书 (jianshu.com)
如果使用其他交换机 ,则需要进行相关配置
可以看这篇文章:SpringBoot整合RabbitMQ
1 、创建对应的配置文件
例如Direct交换机
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @Author : JCccc * @CreateTime : 2019/9/3 * @Description : **/ @Configuration public class DirectRabbitConfig { //队列 起名:TestDirectQueue @Bean public Queue TestDirectQueue() { return new Queue("TestDirectQueue",true); } //Direct交换机 起名:TestDirectExchange @Bean DirectExchange TestDirectExchange() { return new DirectExchange("TestDirectExchange"); } //绑定 将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting @Bean Binding bindingDirect() { return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting"); } }如何保证消息的可靠?
ack应答
消息应答
概念
消费者完成一个任务可能需要一段时间 ,如果其中一个消费者处理一个长的任务并仅只完成了部分突然它挂掉了 ,会导致消息丢失 。RabbitMQ 一旦向消费者传递了一条消息 ,便立即将该消息标记为删除 。在这种情况下 ,突然有个消费者挂掉了 ,我们将丢失正在处理的消息 。以及后续发送给该消费这的消息 ,因为它无法接收到 。为了保证消息在发送过程中不丢失,rabbitmq 引入消息应答机制 ,消息应答就是:消费者在接收到消息并且处理该消息之后 ,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了 。
各种消息模型实例
五种交换机类型
Direct Exchange直连型交换机 ,根据消息携带的路由键将消息投递给对应队列 。
大致流程 ,有一个队列绑定到一个直连交换机上 ,同时赋予一个路由键 routing key 。
然后当一个消息携带着路由值为X ,这个消息通过生产者发送给交换机时 ,交换机就会根据这个路由值X去寻找绑定值也是X的队列 。 Fanout Exchange扇型交换机 ,这个交换机没有路由键概念 ,就算你绑了路由键也是无视的 。 这个交换机在接收到消息后 ,会直接转发到绑定到它上面的所有队列。
Topic Exchange主题交换机 ,这个交换机其实跟直连交换机流程差不多 ,但是它的特点就是在它的路由键和绑定键之间是有规则的 。
其他两种:WorkQueue(通过监听队列名称)
基本消息队列BasicQueue即为上方的代码 ,此处不再重复 。
- 1 、WorkQueue(通过监听队列名称)
WorkQueue.png
WorkQueue与BasicQueue不同之处 ,就是WorkQueue支持一对多发布消息(不是一个消息发给多个消费者,一个消息只会被一个消费者消费) ,多个消费者可以提高消息消费速度 ,当然相同之处也是消息消费后就会从Queue中消失(后续的几种模型都是如此)。
① 模拟消息堆积
// 队列名称 String queueName = "simple.queue"; // 消息 String message = "Message_"; for (int i = 1; i <= 50; i++) { // 发送消息 rabbitTemplate.convertAndSend(queueName, message + i); Thread.sleep(20); }② 接收消息
此处设置两个线程处理速度不同 。 @RabbitListener(queues = "simple.queue") public void listenWorkQueue1(String msg) throws InterruptedException { System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now()); Thread.sleep(20); } @RabbitListener(queues = "simple.queue") public void listenWorkQueue2(String msg) throws InterruptedException { System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now()); Thread.sleep(200); }处理结果是2个消费者会均分消息 。可以修改消费方的配置,以按照实际处理能力分配 ,如下:
spring: rabbitmq: listener: simple: prefetch: 1 # 每次只能获取一条消息 ,处理完成才能获取下一个消息- 2 、Fanout (扇形交换机)
Fanout.png
① 编写Fanout配置类
创建FanoutExchange ,绑定队列Queue和交换机Exchange 。 @Configuration public class FanoutConfig { /** * 声明交换机 * @return Fanout类型交换机 */ @Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange("stone.fanout"); } /** * 第1个队列 */ @Bean public Queue fanoutQueue1(){ return new Queue("fanout.queue1"); } /** * 绑定队列和交换机 */ @Bean public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){ return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange); } /** * 第2个队列 */ @Bean public Queue fanoutQueue2(){ return new Queue("fanout.queue2"); } /** * 绑定队列和交换机 */ @Bean public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){ return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange); } }② 发送消息
// 队列名称 String exchangeName = "stone.fanout"; // 消息 String message = "Hello, Fanout!"; rabbitTemplate.convertAndSend(exchangeName, "", message);③ 接收消息
@RabbitListener(queues = "fanout.queue1") public void listen1FanoutQueueMsg(String msg){ System.out.println("Listener1 get :" + msg); } @RabbitListener(queues = "fanout.queue2") public void listen2FanoutQueueMsg(String msg){ System.out.println("Listener2 get :" + msg); }不同于WorkQueue ,Fanout Exchange广播模型下 ,绑定该交换机的消费者可以获取到对应的消息(即一条消息可以通过交换机被多个消费者消费) 。
- 3 、Direct(直连交换机)
Direct.png
① 基于注解声明队列和交换机
@RabbitListener的使用
Ⅰ bindings = @QueueBinding()配置绑定关系;
Ⅱ value = @Queue(name = "direct.queue1")配置队列;
Ⅲ exchange = @Exchange(name = "stone.direct", type = ExchangeTypes.DIRECT)配置交换机;
Ⅳ key = {"talkshow", "musicshow"}配置订阅 。监听的key进行匹配 ,
rabbitTemplate.convertAndSend(exchangeName, "xxx", message);中的xxx第二个参数进行匹配
注意:type = ExchangeTypes.DIRECT是默认类型 ,可以不做配置 。 @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct.queue1"), exchange = @Exchange(name = "stone.direct", type = ExchangeTypes.DIRECT), key = {"talkshow", "musicshow"} )) public void listenDirectQueue1(String msg){ System.out.println("DirectQueue1 :" + msg); } @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct.queue2"), exchange = @Exchange(name = "stone.direct", type = ExchangeTypes.DIRECT), key = {"talkshow", "news"} )) public void listenDirectQueue2(String msg){ System.out.println("DirectQueue2 :" + msg); }② 发送消息
// 交换机名称 String exchangeName = "itcast.direct"; // 消息 String messageNews = "乌俄冲突升级 ,昔日友邦冷眼旁观!"; // 发送消息 rabbitTemplate.convertAndSend(exchangeName, "news", messageNews); // 消息 String messageTalks = "蜘蛛侠3英雄无归发布蓝光预告 ,主演再登SN宣传!"; // 发送消息 rabbitTemplate.convertAndSend(exchangeName, "talkshow", messageTalks);此时:订阅news主题的队列direct.queue1可以消费messageNews ,订阅talkshow主题的direct.queue1和direct.queue2均可以消费messageTalks 。
- 4 、Topic(主题交换机)
Topic.png
Topic类型的Exchange与Direct相比 ,都是可以根据RoutingKey把消息路由到不同的队列 。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符 。通配符规则:
#:匹配一个或多个词
*:匹配不多不少恰好1个词
① 发送消息 /** * topicExchange */ @Test public void testSendTopicExchange() { // 交换机名称 String exchangeName = "itcast.topic"; // 消息 String message = "建设更高水平法治中国"; // 发送消息 rabbitTemplate.convertAndSend(exchangeName, "china.news", message); }② 接收消息
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "topic.queue1"), exchange = @Exchange(name = "stone.topic", type = ExchangeTypes.TOPIC), key = {"China.#"} )) public void listenTopicQueue1(String msg){ System.out.println("TopicQueue1 :" + msg); } @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "topic.queue2"), exchange = @Exchange(name = "stone.topic", type = ExchangeTypes.TOPIC), key = {"#.news"} )) public void listenTopicQueue2(String msg){ System.out.println("TopicQueue2 :" + msg); }此时 ,由于消息Topic满足两个队列的订阅规则,所以两个队列都可以消费到消息 。
RabbitMQ的应用场景
延迟队列 ,延迟消息 服务与服务之间的解耦(例如一个服务进行mysql操作的时候需要另一个服务同时进行对应操作) 异步处理 、流量削峰1 、异步处理
假设想象一下我们做一个商城项目 ,在用户支付模块中,可能会涉及到其它业务 ,比如:积分折扣 、消费券 、短信验证等功能 。我们传统的执行步骤是逐步执行 ,也就是说当用户点击支付 ----> 积分折扣 ----> 消费券 ----> 短信验证 ----->支付完成 ,用户需要等待每个业务执行完毕才能支付成功!假设我们从点击支付 -----> 支付成功消耗时间为100/ms ,后面我们每新增一个业务就会多耗时50/ms ,上述的流程大概会耗时250/ms!如果说以后业务更多的话 ,那么用户支付订单的时间会越来越长 ,这样大大影响了用户的体验!参照下图理解
我们使用消息中间件进行异步处理 ,当用户下单支付同时我们创建消息队列进行异步的处理其它业务 ,在我们支付模块中最重要的是用户支付 ,我们可以将一些不重要的业务放入消息队列执行 ,这样可以大大添加我们程序运行的速度 ,用户支付模块中也大大减少了支付时间,为用户添加了更好的体验 。其它模块与其思想一致 ,就比如说用户注册!
2 、流量削峰
假设我们有一个订单系统 ,我们的订单系统最大承受访问量是每秒1万次,如果说某天访问量过大我们的系统承受不住了 ,会对服务器造成宕机 ,这样的话我们的系统就瘫痪了 ,为了解决该问题我们可以使用中间件对流量进行消峰
未加入中间件之前 ,用户直接访问的是订单系统
加入中间件之后 ,用户直接访问的是中间件 ,通过中间件对用户进行消峰 ,好处是可以避免系统的宕机瘫痪 ,坏处是系统速度变慢 ,但是总比不能使用好
3 、应用解耦
我们以商城项目为例 ,订单系统耦合调用支付 、库存、物流系统 ,如果某天其中一个系统出现了异常就会造成订单系统故障!使用中间件后订单系统通过队列去访问支付 、库存 、物流系统就不会造成上述的问题 ,因为订单系统执行完成才会发消息给队列,接下来的任务就交给队列完成 ,队列会监督各个系统完成 ,如果完不成队列会一直监督,直到完成为止!所以说使用中间件后不会造成一个子系统出现故障而造成整个系统故障
创心域SEO版权声明:以上内容作者已申请原创保护,未经允许不得转载,侵权必究!授权事宜、对本内容有异议或投诉,敬请联系网站管理员,我们将尽快回复您,谢谢合作!