rabbitmq发送对象(SpringCloud(六) – RabbitMQ安装,三种消息发送模式,消息发送确认,消息消费确认(自动,手动))
1 、安装erlang语言环境
1.1 创建 erlang安装目录
mkdir erlang1.2 上传解压压缩包
上传到: /root/ 解压缩# tar -zxvf otp_src_22.0.tar.gz1.3 进入解压缩目录 ,指定目录并安装
进入解压目录 ,指定安装目录# ./configure --prefix=/usr/local/kh96/erlang 安装# make install 添加环境变量# echo export PATH=$PATH:/usr/local/kh96/erlang/bin >> /etc/profile 刷新环境变量# source /etc/profile1.4 测试环境
进入erlang环境#erl 退出# halt().2 、安装RabbitMQ
2.1上传解压压缩包
第一步xx.tar.xz->xx.tar # /bin/xz -d rabbitmq-server-generic-unix-3.7.15.tar.xz 第二步#tar -xvf rabbitmq-server-generic-unix-3.7.15.tar2.2 添加环境变量
添加环境变量# echo export PATH=$PATH:/usr/local/kh96/rabbitmq/rabbitmq_server-3.7.15/sbin >> /etc/profile 刷新环境变量# source /etc/profile2.3 启动
启动# rabbitmq-server -detached 查看状态# rabbitmqctl status 查看防火墙状态# firewall-cmd --state (建议不开)2.4 开启云服务端口
RabbitMQ 服务端口: 5672 RabbitMQ 监控平台端口: 15672 开启web插件允许监控平台访问 # rabbitmq-plugins enable rabbitmq_management2.5 远程 访问 15672
公网ip:15672 Username: guest Password: guest 提示这个这个账号只允许本地访问 ,所以需要添加用户2.6 添加用户
显示所有用户# rabbitmqctl list_users 查看guest用户权限# rabbitmqctl list_user_permissions guest 添加admin用户及密码# rabbitmqctl add_user admin admin 设置限权# rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*" 授予admin用户administrator角色# rabbitmqctl set_user_tags admin administrator 查看admin用户权限# rabbitmqctl list_user_permissions admin 删除用户guest# rabbitmqctl delete_user guest 停止RabbitMQ# rabbitmqctl stop2.7 登录成功
Username: admin Password: admin3 、SpringBoot整合
3.0 项目准备
3.0.1 jar包 <!--rabbitmq依赖--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> 3.0.2 配置信息 # 端口 server: port: 8104 # RabbitMQ配置 spring: rabbitmq: host: x.xxx.xx.xx #服务器公网ip port: 5672 username: admin password: admin 3.0.3 常量类 /** * Created On : 1/11/2022. * <p> * Author : huayu * <p> * Description: RabbitMQ 常量类 ,系统的所有队列名 ,交换机名 ,路由键名等 ,统一进行配置管理 */ public class RabbitMQConstant { //========================== 直连模式 /** * Direct直连模式 队列名 */ public static final String RABBITMQ_DIRECT_QUEUE_NAME_KH96 ="rabbitmq_direct_queue_name_kh96"; /** * Direct直连模式 交换机名 */ public static final String RABBITMQ_DIRECT_EXCHANGE_KH96 ="rabbitmq_direct_exchange_kh96"; /** * Direct直连模式 路由键 */ public static final String RABBITMQ_DIRECT_ROUTING_KEY_KH96 ="rabbitmq_direct_routing_key_kh96"; //========================== 扇形模式 /** * Fanout 扇形模式 队列名one */ public static final String RABBITMQ_FANOUT_QUEUE_NAME_KH96_ONE ="rabbitmq_fanout_queue_name_kh96_one"; /** * Fanout 扇形模式 队列名two */ public static final String RABBITMQ_FANOUT_QUEUE_NAME_KH96_TWO ="rabbitmq_fanout_queue_name_kh96_two"; /** * Fanout 扇形模式 交换机名 */ public static final String RABBITMQ_FANOUT_EXCHANGE_KH96 ="rabbitmq_fanout_exchange_kh96"; //========================== 主题模式 // -- 队列 /** * Topic 主题模式 队列名one */ public static final String RABBITMQ_TOPIC_QUEUE_NAME_KH96_ONE ="rabbitmq_topic_queue_name_kh96_one"; /** * Topic 主题模式 队列名two */ public static final String RABBITMQ_TOPIC_QUEUE_NAME_KH96_TWO ="rabbitmq_topic_queue_name_kh96_two"; /** * Topic 主题模式 队列名Three */ public static final String RABBITMQ_TOPIC_QUEUE_NAME_KH96_THREE ="rabbitmq_topic_queue_name_kh96_three"; //-- 交换机 /** * Topic 主题模式 交换机名 */ public static final String RABBITMQ_TOPIC_EXCHANGE_KH96 ="rabbitmq_topic_exchange_kh96"; //-- 路由键 /** * Topic 主题模式 -路由键-唯一匹配规则 */ public static final String RABBITMQ_TOPIC_ROUTING_KEY_KH96_ONLY="rabbitmq_topic_routing_key_kh96.only"; /** * Topic 主题模式 -路由键-单词匹配规则 * 单个词 */ public static final String RABBITMQ_TOPIC_ROUTING_KEY_KH96_WORLD="rabbitmq_topic_routing_key_kh96.*"; /** * Topic 主题模式 -路由键-模糊匹配规则 # 0 或 多个词 */ public static final String RABBITMQ_TOPIC_ROUTING_KEY_KH96_LIKE="rabbitmq_topic_routing_key_kh96.#"; } 3.0.4 手动操作队列关系在测试的时候 ,一定要注意交换机和队列的绑定关系 ,只要绑定过的关系就会一直存在需要手动删除;如果测试结果不正常的时候 ,看一些交换机和队列与键值的绑定关系;
选择队列: 删除队列:3.1 Direct 直连模式
3.1.0 核心构造方法:Queue 核心构造方法:Queue(String name, boolean durable, boolean exclusive, boolean autoDelete) name参数:name – the name of the queue. 指定创建的消息队列的名字 ,参数必传 ,即创建队列必须要有队列名 。 durable参数:durable – true if we are declaring a durable queue (the queue will survive a server restart) 指定创建的消息队列是否需要持久化,默认是true ,如果是true ,该队列支持持久化,自动持久化到磁盘 ,RabbitMQ服务重启 ,队列仍然是可用的(存活的) 。 exclusive参数:true if we are declaring an exclusive queue (the queue will only be used by the declarers connection) 指定创建的消息队列是否是排他队列 ,默认是false ,如果是true ,该队列是排他队列 ,只有创建当前队列的连接才可以使用 ,连接一旦断开 ,队列会自动删除 。 autoDelete参数:true if the server should delete the queue when it is no longer in use 指定创建的消息队列是否是自动删除队列 ,默认是false ,如果是true ,该队列是自动删除队列 ,一旦没有消息生产者或者消费者使用当前队列,会被自动删除 。 3.1.1 配置类 /** * Created On : 1/11/2022. * <p> * Author : huayu * <p> * Description: Direct直连模式 ,自动配置类 ,自动创建队列,交换机 ,并将队列绑定到交换机 ,指定唯一路由 */ @Configuration public class RabbitMQDirectConfig { //创建 直连队列 @Bean public Queue directQueue(){ //创建 直连队列 return new Queue(RabbitMQConstant.RABBITMQ_DIRECT_QUEUE_NAME_KH96,true); } //创建 直连交换机 @Bean public DirectExchange directExchange(){ // 创建支持持久化的直连交换机 ,指定交换机的名称 return new DirectExchange(RabbitMQConstant.RABBITMQ_DIRECT_EXCHANGE_KH96); } //将直连队列和直连交换机 进行绑定 ,并指定绑定的唯一路由键 @Bean public Binding directBinding(){ // 将直连队列和直连交换机进行绑定 ,并指定绑定的唯一路由键 return BindingBuilder.bind(directQueue()) .to(directExchange()) .with(RabbitMQConstant.RABBITMQ_DIRECT_ROUTING_KEY_KH96); } } 3.1.2 消息生产者 /** * Created On : 1/11/2022. * <p> * Author : huayu * <p> * Description: Direct 直连模式 消息生产者 */ @Slf4j @Component public class RabbitMQDirectProducer { @Autowired private RabbitTemplate rabbitTemplate; /** * @author : huayu * @date : 1/11/2022 * @param : [directMsg, directExchange, directRoutingKey] * @return : void * @description : 使用直连模式 ,发送消息到直连交换机 ,通过交换机绑定的唯一路由键 ,将消息发送到绑定的队列中 */ public void sendDirectMsg2DirectExchange(String directExchange,String directRoutingKey,String directMsg){ log.info("++++++ direct模式消息生产者 ,发送直连消息:{} ,到交换机:{} ,路由键:{} ++++++",directMsg,directExchange,directRoutingKey); rabbitTemplate.convertAndSend(directExchange,directRoutingKey,directMsg); } } 3.1.3 消费者 3.1.3.1 消费者One /** * Created On : 1/11/2022. * <p> * Author : huayu * <p> * Description: Direct 直连模式消费者 One */ @Slf4j @Component //指定接听的 消息队列 名字 @RabbitListener(queues = RabbitMQConstant.RABBITMQ_DIRECT_QUEUE_NAME_KH96) public class RabbitMQDirectConsumerOne { /** * @author : huayu * @date : 1/11/2022 * @param : [directMsgJson] * @return : void * @description : Direct 直连模式消费者One,消费信息 */ //指定消息队列中的消息 ,交给对应的方法处理 @RabbitHandler public void consumeOneDirectMsgFromDirectQueue(String directMsgJson){ log.info("***** Direct直连模式,消费者One,消费消息:{} ******",directMsgJson); // TODO 核心业务逻辑处理 } // @RabbitHandler //自动根据队列中的消息类型 ,自动区分方法 // public void consumeOtherDirectMsgFromDirectQueue(List<String> directMsgJson){ // log.info("***** Direct直连模式 ,消费者Two,消费消息:{} ******",directMsgJson); // // // TODO 核心业务逻辑处理 // // } } 3.1.3.2 消费者Two /** * Created On : 1/11/2022. * <p> * Author : huayu * <p> * Description: RabbitMQDirectConsumerTwo */ @Slf4j @Component //指定监听的消息队列 名字 @RabbitListener(queues = RabbitMQConstant.RABBITMQ_DIRECT_QUEUE_NAME_KH96) public class RabbitMQDirectConsumerTwo { /** * @author : huayu * @date : 1/11/2022 * @param : [directMsgJson] * @return : void * @description : Direct 直连模式消费者 Two,消费信息 */ //指定消息队列中的消息,交给对应的方法处理 @RabbitHandler public void consumeOneDirectMsgFromDirectQueue(String directMsgJson){ log.info("***** Direct直连模式 ,消费者Two,消费消息:{} ******",directMsgJson); // TODO 核心业务逻辑处理 } } 3.1.4 请求测试方法 /** * Created On : 1/11/2022. * <p> * Author : huayu * <p> * Description: 测试 RabbitMQ 消息队列的操作入口 */ @Slf4j @RestController public class RabbitMQController { @Autowired private RabbitMQDirectProducer rabbitMQDirectProducer; /** * @author : Administrator * @date : 2022/11/1 * @param : [directMsg] * @return : com.kgc.sct.util.RequestResult<java.lang.String> * @description : 测试direct直连模式 ,发送和消费消息 */ @GetMapping("/direct") public RequestResult<String> testRabbitMQDirect(@RequestParam String directMsg){ log.info("direct直连模式 ,发送消息"); //模拟发送5条直连消息 Stream.of(11,22,33,44,55).forEach(directNo ->{ //模拟创建消息对象 Map<String,Object> directMap =new HashMap<>(); directMap.put("directNo",directNo); directMap.put("directData",directMsg); directMap.put("directTime", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))); //调用直连模式消息生产者 ,发送消息 rabbitMQDirectProducer.sendDirectMsg2DirectExchange(RabbitMQConstant.RABBITMQ_DIRECT_EXCHANGE_KH96 ,RabbitMQConstant.RABBITMQ_DIRECT_ROUTING_KEY_KH96 ,JSON.toJSONString(directMap)); return ResultBuildUtil.success("使用直连模式 。发送消息成功"); } } 3.1.5 请求测试发起请求
3.1.5.1 一个消费者消费者One消费了队列中的所有信息(只有一个队列);
3.1.5.2 两个消费者消费者One和消费者Two依次消费了队列中的所有信息(只有一个队列);
3.2 Fanout 扇形模式
3.2.1 配置类 /** * Created On : 1/11/2022. * <p> * Author : huayu * <p> * Description: Fanout扇形模式 ,自动配置类 ,自动创建队列 ,交换机 ,并将队列绑定到交换机 */ @Configuration public class RabbitMQFanoutConfig { //创建 扇形队列One @Bean public Queue fanoutQueueOne(){ return new Queue(RabbitMQConstant.RABBITMQ_FANOUT_QUEUE_NAME_KH96_ONE); } //创建 扇形队列Two @Bean public Queue fanoutQueueTwo(){ return new Queue(RabbitMQConstant.RABBITMQ_FANOUT_QUEUE_NAME_KH96_TWO); } // 创建扇形交换机 @Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange(RabbitMQConstant.RABBITMQ_FANOUT_EXCHANGE_KH96); } //绑定队列到扇形交换机 ,不需要 指定 路由键 @Bean public Binding fanoutBindingQueueOne(){ //绑定队列到扇形交换机 ,不需要路由键 ,消息是广播发送 ,会给多有绑定的队列群发信息消息(根本没有提供with方法) return BindingBuilder.bind(fanoutQueueOne()) .to(fanoutExchange()); } @Bean public Binding fanoutBindingQueueTwo(){ //绑定队列到扇形交换机,不需要路由键 ,消息是广播发送 ,会给多有绑定的队列群发信息消息(根本没有提供with方法) return BindingBuilder.bind(fanoutQueueTwo()) .to(fanoutExchange()); } } 3.2.2 消息生产者 /** * Created On : 1/11/2022. * <p> * Author : huayu * <p> * Description: RabbitMQFanoutProducer */ @Slf4j @Component public class RabbitMQFanoutProducer { @Autowired private RabbitTemplate rabbitTemplate; /** * @author : huayu * @date : 1/11/2022 * @param : [fanoutExchange, fanoutRoutingKey, fanoutMsg] * @return : void * @description : 使用扇形模式,发送消息到扇形交换机 ,将消息发送到绑定的队列中 */ public void sendFanoutMsg2FanoutExchange(String fanoutExchange,String fanoutRoutingKey,String fanoutMsg){ log.info("++++++ Fanout模式消息生产者 ,发送广播消息:{} ,到交换机:{} ,路由键:{} ++++++", fanoutMsg, fanoutExchange, fanoutRoutingKey); rabbitTemplate.convertAndSend(fanoutExchange, fanoutRoutingKey, fanoutMsg); } } 3.2.3 消费者 3.2.3.1 消费者One /** * Created On : 1/11/2022. * <p> * Author : huayu * <p> * Description: RabbitMQFanoutConsumerOne */ @Slf4j @Component @RabbitListener(queues = RabbitMQConstant.RABBITMQ_FANOUT_QUEUE_NAME_KH96_ONE) public class RabbitMQFanoutConsumerOne { @RabbitHandler public void fanoutConsumeOneFanoutMsgFromFanoutQueueOne(String fanoutMsgJson){ log.info("****** Fanout扇形模式 ,消费One,消费队列One,消息:{} ******",fanoutMsgJson); // TODO 核心业务逻辑处理 } } 3.2.3.2 消费者Two /** * Created On : 1/11/2022. * <p> * Author : huayu * <p> * Description: RabbitMQFanoutConsumerTwo */ @Slf4j @Component //@RabbitListener(queues = RabbitMQConstant.RABBITMQ_FANOUT_QUEUE_NAME_KH96_TWO) public class RabbitMQFanoutConsumerTwo { // @RabbitHandler public void fanoutConsumeTwoFanoutMsgFromFanoutQueueTwo(String fanoutMsgJson){ log.info("****** Fanout扇形模式 ,消费Two,消费队列Two,消息:{} ******",fanoutMsgJson); // TODO 核心业务逻辑处理 } } 3.2.4 请求测试方法 /** * Created On : 1/11/2022. * <p> * Author : huayu * <p> * Description: 测试 RabbitMQ 消息队列的操作入口 */ @Slf4j @RestController public class RabbitMQController { @Autowired private RabbitMQFanoutProducer rabbitMQFanoutProducer; /** * @author : huayu * @date : 1/11/2022 * @param : [fanoutMsg] * @return : com.kgc.scd.uitl.RequestResult<java.lang.String> * @description : 测试扇形(广播)模式 ,发送和消费信息 */ @GetMapping("/fanout") public RequestResult<String> testRabbitMQFanout(@RequestParam String fanoutMsg){ log.info("------- fanout 扇形模式 ,发送消息 -------"); //模拟发送5条直连消息 Stream.of(66,77,88,99,96).forEach(directNo ->{ //模拟创建消息对象 Map<String,Object> fanoutMap =new HashMap<>(); fanoutMap.put("directNo",directNo); fanoutMap.put("directData",fanoutMsg); fanoutMap.put("directTime", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))); //调用扇形模式消息生产者 ,发送消息 rabbitMQFanoutProducer.sendFanoutMsg2FanoutExchange(RabbitMQConstant.RABBITMQ_FANOUT_EXCHANGE_KH96 ,null ,JSON.toJSONString(fanoutMap)); }); return ResultBuildUtil.success("使用扇形模式 。发送消息成功"); } } 3.2.5 请求测试 3.2.5.1 一个消费者消费者One消费了队列One中的所有信息;
3.2.5.2 两个消费者消费者One消费了队列One中的所有信息;
消费者Two消费了队列Two中的所有信息;
3.3 Topic 主题模式
3.3.1 配置类 /** * Created On : 2/11/2022. * <p> * Author : huayu * <p> * Description: Topic 主题模式 ,自动配置类 */ @Configuration public class RabbitMQTopicConfig { //======== 队列 //Topic 主题模式 队列One @Bean public Queue topicQueueOne(){ return new Queue(RabbitMQConstant.RABBITMQ_TOPIC_QUEUE_NAME_KH96_ONE,true); } //Topic 主题模式 队列Two @Bean public Queue topicQueueTwo(){ return new Queue(RabbitMQConstant.RABBITMQ_TOPIC_QUEUE_NAME_KH96_TWO,true); } //Topic 主题模式 队列Three @Bean public Queue topicQueueThree(){ return new Queue(RabbitMQConstant.RABBITMQ_TOPIC_QUEUE_NAME_KH96_THREE,true); } //======= 交换机 //Topic 主题模式 交换机 @Bean public TopicExchange topicExchange(){ return new TopicExchange(RabbitMQConstant.RABBITMQ_TOPIC_EXCHANGE_KH96); } //======= 绑定 // 队列One 绑定 Topic主题模式交换机 和 路由键-唯一匹配规则 @Bean public Binding topicBindingQueueOne(){ return BindingBuilder.bind(topicQueueOne()) .to(topicExchange()) .with(RabbitMQConstant.RABBITMQ_TOPIC_ROUTING_KEY_KH96_ONLY); } // 队列Two 绑定 Topic主题模式交换机 和 路由键-单个单词词匹配规则 @Bean public Binding topicBindingQueueTwo(){ return BindingBuilder.bind(topicQueueTwo()) .to(topicExchange()) .with(RabbitMQConstant.RABBITMQ_TOPIC_ROUTING_KEY_KH96_WORLD); } // 队列Two 绑定 Topic主题模式交换机 和 路由键-模糊匹配规则 @Bean public Binding topicBindingQueueThree(){ return BindingBuilder.bind(topicQueueThree()) .to(topicExchange()) .with(RabbitMQConstant.RABBITMQ_TOPIC_ROUTING_KEY_KH96_LIKE); } } 3.3.2 消息生产者 /** * Created On : 2/11/2022. * <p> * Author : huayu * <p> * Description: RabbitMQ 主题模式消息生产者 */ @Slf4j @Component public class RabbitMQTopicProducer { @Autowired private RabbitTemplate rabbitTemplate; /** * @author : huayu * @date : 1/11/2022 * @param : [topicExchange, topicRoutingKey, topicMsg] * @return : void * @description : 使用主题模式 ,发送消息到主题交换机 ,主题交换机会根据发送消息的路由键 ,根据匹配规则将消息投递到匹配的队列中 */ public void sendTopicMsg2TopicExchange(String topicExchange,String topicRoutingKey,String topicMsg){ log.info("++++++ direct模式消息生产者 ,发送直连消息:{} ,到交换机:{},路由键:{} ++++++",topicMsg,topicExchange,topicRoutingKey); rabbitTemplate.convertAndSend(topicExchange,topicRoutingKey,topicMsg); } } 3.3.3 消费者 3.3.3.1 消费者One /** * Created On : 2/11/2022. * <p> * Author : huayu * <p> * Description: RabbitMQTopicConsumerOne */ @Slf4j @Component @RabbitListener(queues = RabbitMQConstant.RABBITMQ_TOPIC_QUEUE_NAME_KH96_ONE) public class RabbitMQTopicConsumerOne { @RabbitHandler public void consumeTopicMsgFromTopicQueue(String topicMapJson){ log.info("****** Topic 主题模式 ,消费One,消费队列One,消息:{} ******",topicMapJson); // TODO 核心业务逻辑处理 } } 3.3.3.2 消费者Two /** * Created On : 2/11/2022. * <p> * Author : huayu * <p> * Description: RabbitMQTopicConsumerTwo */ @Slf4j @Component @RabbitListener(queues = RabbitMQConstant.RABBITMQ_TOPIC_QUEUE_NAME_KH96_TWO) public class RabbitMQTopicConsumerTwo { @RabbitHandler public void consumeTopicMsgFromTopicQueue(String topicMapJson){ log.info("****** Topic 主题模式 ,消费 Two,消费队列 Two,消息:{} ******",topicMapJson); // TODO 核心业务逻辑处理 } } 3.3.3.3 消费者Three /** * Created On : 2/11/2022. * <p> * Author : huayu * <p> * Description: RabbitMQTopicConsumerThree */ @Slf4j @Component @RabbitListener(queues = RabbitMQConstant.RABBITMQ_TOPIC_QUEUE_NAME_KH96_THREE) public class RabbitMQTopicConsumerThree { @RabbitHandler public void consumeTopicMsgFromTopicQueue(String topicMapJson){ log.info("****** Topic 主题模式 ,消费 Three,消费队列 Three,消息:{} ******",topicMapJson); // TODO 核心业务逻辑处理 } } 3.3.4 请求测试方法 /** * Created On : 1/11/2022. * <p> * Author : huayu * <p> * Description: 测试 RabbitMQ 消息队列的操作入口 */ @Slf4j @RestController public class RabbitMQController { @Autowired private RabbitMQTopicProducer rabbitMQTopicProducer; @GetMapping("/topic") public RequestResult<String> testRabbitMQTopic(@RequestParam String topicMsg){ log.info("------- topic 主题模式 ,发送消息 -------"); //模拟发送5条直连消息 Stream.of(95,96,97,98,99).forEach(directNo ->{ //模拟创建消息对象 Map<String,Object> fanoutMap =new HashMap<>(); fanoutMap.put("directNo",directNo); fanoutMap.put("directData",topicMsg); fanoutMap.put("directTime", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))); //调用主题模式消息生产者 ,发送消息 //场景1:使用唯一路由键 rabbitmq_topic_routing_key_kh96.only , 发送消息 rabbitMQTopicProducer.sendTopicMsg2TopicExchange(RabbitMQConstant.RABBITMQ_TOPIC_EXCHANGE_KH96 ,RabbitMQConstant.RABBITMQ_TOPIC_ROUTING_KEY_KH96_ONLY ,JSON.toJSONString(fanoutMap)); //场景2:使用单词匹配路由键 rabbitmq_topic_routing_key_kh96.* ,发送消息 // rabbitMQTopicProducer.sendTopicMsg2TopicExchange(RabbitMQConstant.RABBITMQ_TOPIC_EXCHANGE_KH96 // ,"rabbitmq_topic_routing_key_kh96.abc" // ,JSON.toJSONString(fanoutMap)); //场景3:0 或多词匹配 rabbitmq_topic_routing_key_kh96.# ,发送消息 // rabbitMQTopicProducer.sendTopicMsg2TopicExchange(RabbitMQConstant.RABBITMQ_TOPIC_EXCHANGE_KH96 // ,"rabbitmq_topic_routing_key_kh96.abc.def" // ,JSON.toJSONString(fanoutMap)); }); return ResultBuildUtil.success("使用主题模式 。发送消息成功"); } } 3.3.5 请求测试 3.3.5.1 场景1:使用唯一路由键 发送消息路由键名: rabbitmq_topic_routing_key_kh96.only 发起请求: 请求结果:队列One,Two,Three都接收到了信息 ,所以对应的消费者One,Two,Three都消费了信息;
3.3.5.2 场景2:使用单词匹配路由键 发送消息路由键名: rabbitmq_topic_routing_key_kh96.abc 发起请求: 请求结果:队列Two,Three都接收到了信息 ,所以对应的消费者Two,Three都消费了信息;
3.3.5.3 场景3:0 或多词匹配 发送消息路由键名: rabbitmq_topic_routing_key_kh96.abc.def 发起请求: 请求结果:只有队列Three接收到了信息 ,所以只有对应的消费者Three消费了信息;
3.3.6 主题模式小结当生产者发送消息到交换机 ,指定的路由键一般都是使用句点(.)作为分隔符 ,分割多个单词 。
比如:词1.词...所谓单词:是由一个或多个单词组成,多个单词组成的路由键 ,就代表某种主题的关键信息 ,路由键长度最多不能超过256字节 。
匹配规则格式:* 或者 #
*代表单个单词 。 比如 队列绑定主题交换机的 路由键:KH96.* ,代表发送消息的路由键是以KH96开头,后面只能跟一个单词 ,如:KH96.aaa,KH96.bbb等 。 再比如:绑定路由键为:KH96.*.KGC,代表发送消息路由键是以KH96开头 ,中间可以带一个单词 ,结尾 ,如:KH96.aa.KGC,KH96.bb.KGC 。 #代表0或多个单词 ,比如 队列绑定主题交换机的 路由键:KH96.# ,代表发送消息的路由键是以KH96开头 ,后面只能跟0个或者多个单词,如:KH96 ,KH96.aaa ,KH96.aaa.bbb。 再比如:绑定路由键为:KH96.#.KGC,代表发送消息路由键是以KH96开头 ,中间可以带一个或多个单词 ,结尾 ,如KH96.KGC,KH96.aa.KGC ,KH96.aa.bb.KGC 。备注:
如果主题交换机 ,队列绑定的路由键使用的不是模糊匹配符,主题交换机跟直连交换机一致 。 如果单独使用# ,代表所有队列都可以收消息 ,主题交换机跟扇形交换机一致。提醒:
主题模式下 ,队列绑定的路由键 ,是允许为多个的 。
如果路由键被更换 ,之前的路由键是不会删除 ,仍然会绑定到当前队列上 。
如果有多个路由键匹配 ,规则为:如果其中一个没有匹配到 ,会自动匹配其他路由键 ,如果需要删除历史路由键 ,需要在RabbitMQ控制台删除 。
3.4 消息 发送确认 - 交换机 ,队列 确认
3.4.1 配置信息 # RabbitMQ配置 spring: rabbitmq: # 打开发送消息确认配置 publisher-confirms: true # 发送消息到交换机确认 ,默认false publisher-returns: true # 发送消息到队列确认,默认是false 3.4.2 消息发送确认配置类 触发机制 ConfirmCallback 函数式接口中的唯一抽象方法 confirm : 是否有交换机都会触发; 标识:true ,发送到交换机正常; 标识:false ,发送到交换机失败,进行特殊处理; ReturnCallback 函数式接口中的唯一抽象方法 returnedMessage :交换机存在且队列不存在才会触发; 触发:发送到队列失败 ,进行特殊处理; /** * Created On : 2/11/2022. * <p> * Author : huayu * <p> * Description: RabbitMQ 消息确认机制: 发送确认 */ @Slf4j @Configuration public class RabbitMQSendMsgAck { @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){ //发送确认 ,消息是通过rabbitTemplate发的 ,所以要重置rabbitTemplate才可以实现 RabbitTemplate rabbitTemplate = new RabbitTemplate(); rabbitTemplate.setConnectionFactory(connectionFactory); //开启触发回调处理方法 ,不论消息推送结果是什么 ,都会强制触发回调方法 rabbitTemplate.setMandatory(true); //指定消息发送到RabbitMQ的broker节点 ,是否正确到达交换机确然 //是否有交换机都会触发 rabbitTemplate.setConfirmCallback( (correlationData, ack, cause) ->{ log.info("###### 发送消息确认回调,数据:{} ######",correlationData); log.info("###### 发送消息确认回调,标识:{} ######",ack); log.info("###### 发送消息确认回调,原因:{} ######\n",cause); //TODO 如果没有到交换机 ,ack返回的是false ,可能是交换机被删除 ,就需要进行特殊处理的业务 ,比如给负责人发送信息或邮件 }); //消息是否正确到达交换机上绑定的 目标队列 //交换机存在且队列不存在才会触发 rabbitTemplate.setReturnCallback( ( message, replyCode, replyText,exchange,routingKey) ->{ log.info("###### 发送消息返回回调,数据:{} ######",message); log.info("###### 发送消息返回回调,返回码:{} ######",replyCode); log.info("###### 发送消息返回回调,返回说明:{} ######",replyText); log.info("###### 发送消息返回回调,交换机:{} ######",exchange); log.info("###### 发送消息返回回调,路由键:{} ######\n",routingKey); //TODO 如果没有到目标队列 ,就需要进行特殊处理的业务 ,比如给负责人发送信息或邮件 }); return rabbitTemplate; } } 3.4.3 交换机 /** * Created On : 2/11/2022. * <p> * Author : huayu * <p> * Description: Ack 测试交换机,没有绑定队列 */ @Configuration public class RabbitMQAckConfig { //ack 测试交换机 ,没有绑定队列 @Bean public DirectExchange directExchange(){ return new DirectExchange(RabbitMQConstant.RABBITMQ_ACK_EXCHANGE_KH96); } } 3.4.4 请求方法 /** * @author : huayu * @date : 2/11/2022 * @param : [topicMsg] * @return : com.kgc.scd.uitl.RequestResult<java.lang.String> * @description : 直连模式测试 Ack 不存在交换机 和 存在交换机 */ @GetMapping("/sendMsgAck") public RequestResult<String> RabbitMQSendMsgAck(@RequestParam String ackMsg){ log.info("------- 直连 模式 测试Ack ,发送消息 -------"); //模拟发送直连消息 //调用直连模式消息生产者,发送消息 //测试1: 不存在的 交换机 rabbitMQDirectProducer.sendDirectMsg2DirectExchange("test_noExchange" ,RabbitMQConstant.RABBITMQ_DIRECT_ROUTING_KEY_KH96 ,JSON.toJSONString(ackMsg)); return ResultBuildUtil.success("使用直连模式 测试Ack 。交换机不存在"); //测试2: 存在的交换机 ,但是没有绑定 队列 // rabbitMQDirectProducer.sendDirectMsg2DirectExchange(RabbitMQConstant.RABBITMQ_ACK_EXCHANGE_KH96 // ,RabbitMQConstant.RABBITMQ_DIRECT_ROUTING_KEY_KH96 // ,JSON.toJSONString(ackMsg)); // return ResultBuildUtil.success("使用直连模式 测试Ack 。交换机 没有绑定队列"); } 3.2.5 请求测试 3.2.5.1 交换机不存在 发起请求: 请求结果:交换机不存在 ,
触发了ConfirmCallback 函数式接口中的唯一抽象方法 confirm ,
返回标识 false,发送到交换机失败 ,
原因 ,该交换机不存在;
注意:如果没有到交换机 ,ack返回的是false ,可能是交换机被删除 ,就需要进行特殊处理的业务 ,比如给负责人发送信息或邮件;
3.2.5.2 交换机存在 ,但是没有绑定 队列 发起请求: 请求结果:交换机存在 ,
触发了ConfirmCallback 函数式接口中的唯一抽象方法 confirm ,
返回标识 true,发送到交换机成功;
没有绑定队列,
触发了ReturnCallback 函数式接口中的唯一抽象方法 returnedMessage ,
返回说明 NO_ROUT,发送到队列失败;
注意:如果没有到目标队列 ,就需要进行特殊处理的业务,比如给负责人发送信息或邮件;
3.2.5.3 交换机存在 ,且绑定了队列 发起请求 请求结果:交换机存在 ,且绑定了队列 ,
触发了ConfirmCallback 函数式接口中的唯一抽象方法 confirm ,
返回标识 true,发送到交换机成功;
没有触发ReturnCallback 函数式接口中的唯一抽象方法 returnedMessage ,
说明发送到队列成功;
3.5 消息确认
3.5.1 自动确认 3.5.1.1 配置信息 # RabbitMQ配置 spring: rabbitmq: # 消费消息确认配置-自动 listener: simple: retry: enabled: true # 开启消费消息失败重试机制 max-attempts: 5 # 指定重试的次数 max-interval: 10000 # 最大重试间隔时间 ,单位毫秒,每次重试的间隔时间 ,不能比当前设置的值大 ,如果计算间隔时间是6s ,最大时间时间5s,会用5秒 initial-interval: 1000 # 重试间隔初始时间 ,单位毫秒 multiplier: 2 #乘子;重试的间隔时间 * 乘子 ,就是下一次重试的时间间隔市场 ,即:1s,2s,4s,8s,16... 3.5.1.2 消费者 模拟异常注意:测试时为了让消费者One一定接收到消息,所以注释掉消费者Two,这样才可以保证消费者One接收消息 ,然后触发异常 ,重试的效果;
/** * Created On : 1/11/2022. * <p> * Author : huayu * <p> * Description: Direct 直连模式消费者 One */ @Slf4j @Component //指定接听的 消息队列 名字 @RabbitListener(queues = RabbitMQConstant.RABBITMQ_DIRECT_QUEUE_NAME_KH96) public class RabbitMQDirectConsumerOne { /** * @author : huayu * @date : 1/11/2022 * @param : [directMsgJson] * @return : void * @description : Direct 直连模式消费者One,消费信息 */ //指定消息队列中的消息,交给对应的方法处理 @RabbitHandler public void consumeOneDirectMsgFromDirectQueue(String directMsgJson){ log.info("***** Direct直连模式 ,消费者One,消费消息:{} ******",directMsgJson); // TODO 核心业务逻辑处理 //默认自动确认 ,模拟消费端消费消息 ,处理异常 ,自动重试 int a = 10 / 0; } } 3.5.1.3 请求方法 /** * @author : huayu * @date : 3/11/2022 * @param : [ackMsg] * @return : com.kgc.scd.uitl.RequestResult<java.lang.String> * @description : 测试 消费者自动 重试 */ @GetMapping("/consumeAckAuto") public RequestResult<String> testRabbitMQConsumeAckAuto(@RequestParam String ackMsg){ log.info("------- 直连 模式 测试Ack 自动 重试 ,发送消息 -------"); //模拟发送直连消息 //消费消息失败重试机制 rabbitMQDirectProducer.sendDirectMsg2DirectExchange(RabbitMQConstant.RABBITMQ_DIRECT_EXCHANGE_KH96 ,RabbitMQConstant.RABBITMQ_DIRECT_ROUTING_KEY_KH96 ,JSON.toJSONString(ackMsg)); return ResultBuildUtil.success("使用直连模式 消费确认-自动消费成功"); } 3.5.1.4 请求测试发起请求:
请求结果:
一共重试了五次
间隔时间为1 ,2 ,4 ,8
(如果还有一次应该为10 ,因为最后一次计算时间16大于最大间隔时间10 ,按最大间隔时间10重试);
3.4.2 手动确认注意:
手动确认需要先将自动确认的配置注释掉; 使用手动确认 ,不能再用@RabbitListener 监听 ,手动确认相关队列,需要我们手动配置消费者; 3.4.2.1 消费消息手动确认的监听器获取消息消费的唯一标识 message.getMessageProperties().getDeliveryTag();
执行业务处理
每个消费者在同一个时间点 ,最多处理一个message ,默认是0(全部) channel.basicQos(1); 获取message的消息内容 message 获取消息对应的目标队列,可以实现一些灵活判断处理message.getMessageProperties().getConsumerQueue() 比如根据不同的目标队列进行不同的处理 在消息处理的时候如果出错会被捕获(消息确认失败) 消息确认channel.basicAck(deliveryTag,false);消息确认失败处理
根据条件判断设置是否重回队列 ,是否支持批量处理 channel.basicNack(deliveryTag,true,false); /** * Created On : 2/11/2022. * <p> * Author : huayu * <p> * Description: 消费端 消费消息手动确认的监听器 ,注意它也是一个消费者 ,并可以通过 消息监听容器工厂 ,动态配置多个 */ @Slf4j @Component public class RabbitMQConsumerManualAckListener implements ChannelAwareMessageListener { @Override public void onMessage(Message message, Channel channel) throws IOException { //获取消息消费的唯一标识 ,rabbitMQ在推送消息时 ,会给每个消息携带一个唯一标识 ,值是一个递增的正整数 long deliveryTag = message.getMessageProperties().getDeliveryTag(); log.info("====== 消费消息的唯一标识:{} ======",deliveryTag); //执行手动确认业务处理 try{ //给每个消费者在同一个时间点 ,最多处理一个message ,默认是0(全部) ,换句话说 ,在接收到消费者的 ack 确认前 ,不会分发新的消息给当前的消费者 //在接收当前消息的ack确认前是不会发送新的消息给它 channel.basicQos(1); //获取message的消息内容 ,发送的消息的json字符串 log.info("====== 消息队列中完整消息内容:{} ======",message); //获取发送的实际内容,发送消息的json字符串 log.info("====== 发送的实际内容:{} ======",new String(message.getBody(),"utf-8")); //获取消息对应的目标队列,可以实现一些灵活判断 //TODO 比如根据目标队列不同 ,可以做不同的处理 log.info("====== 消息的来源队列:{} =======",message.getMessageProperties().getConsumerQueue()); //模拟错误 ,当 deliveryTag 为1的时候 ,进入 报错 ,捕获异常 ,然后(如果设置了重回队列)将消息重回队列 //if(deliveryTag == 1){ // int num = 1/0; //} //消费消息的手动确认 ,消息确认成功-basicAck //第一个参数deliveryTag ,消息的唯一标识 //第二个参数multiple ,消息是否支持批量确认 ,如果是true ,代表可以一次性确认标识小于等于当前标识的所有消息 //如果是false ,只会确认当前消息 channel.basicAck(deliveryTag,false); }catch (Exception e){ //说明消费消息处理失败 ,如果不进行确认(自动确认 ,投递成功即确认 ,消费是否正常 ,不关心) ,消息就会丢失 //消息处理失败确认,代表消息没有正确消费,注意:此种方式一次只能确认一个消息 //第一给参数是消息的唯一标识 , //第二个参数是代表是否重回队列 ,如果是true,重新将该消息放入队列 ,再次消费 //注意:第二个参数要谨慎 ,必须要结合具体业务场景 ,根据业务判断是否需要重回队列 ,一旦处理不当 ,机会导致消息循环入队 ,消息挤压 //不重回队列 require = false // channel.basicReject(deliveryTag,false); //重回队列 require = true channel.basicReject(deliveryTag,true); //消息处理失败确认 ,代表消息没有正确消费 ,注意 ,此种方式支持批量 //第一个参数是消息的唯一标识 , //第二个参数是代表是否支持批量确认 //第三给参数代表是否重回队列 //不重回队列 require = false // channel.basicNack(deliveryTag,true,false); //重回队列 require = true // channel.basicNack(deliveryTag,false,true); //TODO 手动消费异常处理 log.error("====== 消费消息失败 ,异常信息:{} ======",e.getMessage()); } } } 3.4.2.2 消费消息手动确认配置类 配置消费者的数量 setConcurrentConsumers(2); 最大并发消费者数量 setMaxConcurrentConsumers(5); 消费消息确认机制为手动 setAcknowledgeMode(AcknowledgeMode.MANUAL); 设置监听消息队列的名称 ,支持多个队列setQueueNames(RabbitMQConstant.RABBITMQ_DIRECT_QUEUE_NAME_KH96); 设置消息手动确认监听器 setMessageListener(rabbitMQConsumerManualAckListener); /** * Created On : 2/11/2022. * <p> * Author : huayu * <p> * Description: RabbitMQ 消费消息手动确认配置类 */ @Configuration public class RabbitMQConsumeManualAckConfig { @Autowired private RabbitMQConsumerManualAckListener rabbitMQConsumerManualAckListener; /** * @author : huayu * @date : 2/11/2022 * @param : [connectionFactory] * @return : org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer * @description : 自定义消息监听器工程对象 */ @Bean public SimpleMessageListenerContainer simpleBrokerMessageHandler(ConnectionFactory connectionFactory){ //初始化消息监听容器的工程对象 SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); //初始化并发消费者的数量,比如是2 ,代表同时会有 两个消费者 消费消息 // ,投递标识可能会相同 container.setConcurrentConsumers(2); //设置最大的并发消费者数量 ,数量不能低于初始化并发消费者数量 //可以动态的设定当前容器的消费者数量,可以实现动态增加和减少消费者的算法在 SimpleMessageListenerContainer类中实现 container.setMaxConcurrentConsumers(5); //底层动态实现消费者数量的增加减少原理 // 有consumer已连续十个周期(consecutiveActiveTrigger)处于活动状态 ,并且自启动后最后一个consumer运行至少经过了10秒钟 ,则将启动新的consumer 。 // private static final long DEFAULT_START_CONSUMER_MIN_INTERVAL = 10000; // 停止消费者算法的时间间隔 // 有consumer已连续10个周期(consecutiveIdleTrigger)连续空闲状态 ,并且上一个consumer至少在60秒之前停止 ,那么该consumer将停止 // private static final long DEFAULT_STOP_CONSUMER_MIN_INTERVAL = 60000; // 默认连续活动10个周期 // private static final int DEFAULT_CONSECUTIVE_ACTIVE_TRIGGER = 10; // 默认连续空闲10个周期 // private static final int DEFAULT_CONSECUTIVE_IDLE_TRIGGER = 10; //默认的消费消息确认机制是自动 ,需要改为手动 container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置监听消息队列的名称 ,支持多个队列(队列名1 ,队列名2...) ,注意前提是指定的队列必须是存在的 //监听 直连模式的 RabbitMQConstant.RABBITMQ_DIRECT_QUEUE_NAME_KH96 队列 container.setQueueNames(RabbitMQConstant.RABBITMQ_DIRECT_QUEUE_NAME_KH96); //监听 扇形模式的 //RabbitMQConstant.RABBITMQ_FANOUT_QUEUE_NAME_KH96_ONE 队列 //和 RabbitMQConstant.RABBITMQ_FANOUT_QUEUE_NAME_KH96_TWO 队列 // container.setQueueNames(RabbitMQConstant.RABBITMQ_FANOUT_QUEUE_NAME_KH96_ONE // ,RabbitMQConstant.RABBITMQ_FANOUT_QUEUE_NAME_KH96_TWO); //指定消息确认的处理类 ,会同时产生多个消费者 ,参数是上面设置的 , //注意之前使用直连模式 ,消息消费者,要注释掉 ,防止同类型的监听器 ,处理同一队列 //如果不是被当前消息确认的处理类消费(使用注解@RabbitListener),会导致消息不执行手动处理 container.setMessageListener(rabbitMQConsumerManualAckListener); // 返回消息监听容器工厂对象 return container; } } 3.4.2.3 请求方法 //====================== /** * @author : huayu * @date : 3/11/2022 * @param : [ackMsg] * @return : com.kgc.scd.uitl.RequestResult<java.lang.String> * @description : 测试 消费者手动确认 */ @GetMapping("/consumeAckManual") public RequestResult<String> testRabbitMQConsumeAckManual(@RequestParam String ackMsg){ log.info("------- 测试Ack 手动 确认 ,发送消息 -------"); //消息手动确认 //模拟发送直连消息 //测试1 ,2 rabbitMQDirectProducer.sendDirectMsg2DirectExchange( RabbitMQConstant.RABBITMQ_DIRECT_EXCHANGE_KH96 ,RabbitMQConstant.RABBITMQ_DIRECT_ROUTING_KEY_KH96 ,JSON.toJSONString(ackMsg)); return ResultBuildUtil.success("使用直连模式 手动消费确认-消息确认成功"); //测试3 //模拟发送扇形消息 // rabbitMQFanoutProducer.sendFanoutMsg2FanoutExchange( // RabbitMQConstant.RABBITMQ_FANOUT_EXCHANGE_KH96 // ,null // ,JSON.toJSONString(ackMsg)); // // // return ResultBuildUtil.success("使用扇形模式 手动消费确认-消息确认成功"); } 3.4.2.4 请求测试 3.4.2.4.1 模拟发送直连消息并成功确认发送请求:
请求结果:
3.4.2.4.2 模拟发送直连消息 ,抛出异常 ,重回队列发送请求:
代码重点:
请求结果:
3.4.2.4.3 模拟发送扇形消息并成功确认发送请求:
请求结果:
创心域SEO版权声明:以上内容作者已申请原创保护,未经允许不得转载,侵权必究!授权事宜、对本内容有异议或投诉,敬请联系网站管理员,我们将尽快回复您,谢谢合作!