首页IT科技rabbitmq安装目录(RabbitMQ)

rabbitmq安装目录(RabbitMQ)

时间2025-05-02 12:48:12分类IT科技浏览3034
导读:RabbitMQ 一、MQ...

RabbitMQ

一            、MQ

1.同步调用的优缺点

同步调用的优点:

时效性较强            ,可以立即得到结果

同步调用的问题:

耦合度高 性能和吞吐能力下降 有额外的资源消耗 有级联失败问题

2.异步调用

异步调用常见实现就是事件驱动模式

好处:

吞吐量提升:无需等待订阅者处理完成               ,响应更快速

故障隔离:服务没有直接调用      ,不存在级联失败问题

调用间没有阻塞         ,不会造成无效的资源占用

耦合度极低               ,每个服务都可以灵活插拔         ,可替换

流量削峰:不管发布事件的流量波动多大      ,都由Broker接收               ,订阅者可以按照自己的速度去处理事件

缺点:

架构复杂了           ,业务没有明显的流程线   ,不好管理 需要依赖于Broker的可靠               、安全      、性能

3.MQ实现

MQ                ,中文是消息队列(MessageQueue)             ,字面来看就是存放消息的队列            。也就是事件驱动架构中的Broker               。

几种常见MQ的对比:

RabbitMQ ActiveMQ RocketMQ Kafka 公司/社区 Rabbit Apache 阿里 Apache 开发语言 Erlang Java Java Scala&Java 协议支持 AMQP,XMPP              ,SMTP               ,STOMP OpenWire,STOMP   ,REST,XMPP,AMQP 自定义协议 自定义协议 可用性 高 一般 高 高 单机吞吐量 一般 差 高 非常高 消息延迟 微秒级 毫秒级 毫秒级 毫秒以内 消息可靠性 高 一般 高 一般

追求可用性:Kafka         、 RocketMQ               、RabbitMQ

追求可靠性:RabbitMQ         、RocketMQ

追求吞吐能力:RocketMQ      、Kafka

追求消息低延迟:RabbitMQ               、Kafka

二           、Linux安装RabbitMQ

1.1.下载镜像

方式一:在线拉取

docker pull rabbitmq:3-management

方式二:从本地加载

在课前资料已经提供了镜像包:

上传到虚拟机中后            ,使用命令加载镜像即可:

docker load -i mq.tar

1.2.安装MQ

执行下面的命令来运行MQ容器:

docker run \ -e RABBITMQ_DEFAULT_USER=itcast \ -e RABBITMQ_DEFAULT_PASS=123321 \ --name mq \ --hostname mq1 \ -p 15672:15672 \ -p 5672:5672 \ -d \ rabbitmq:3-management

注意5672是MQ消息通信的端口               ,15672是ui端口

三   、RabbitMQ

1.种类

大致分为两种

1.**无交换机的 **基本消息队列 和 工作消息队列 2.有交换机的发布订阅      ,路由         ,主题

2.基于SpringAMQP使用

2.1基础消息队列

先使用测试类生成队列

public class ConsumerTest { public static void main(String[] args) throws IOException, TimeoutException { // 1.建立连接 ConnectionFactory factory = new ConnectionFactory(); // 1.1.设置连接参数               ,分别是:主机名                、端口号             、vhost、用户名              、密码 factory.setHost("116.62.32.68"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("itcast"); factory.setPassword("123321"); // 1.2.建立连接 Connection connection = factory.newConnection(); // 2.创建通道Channel Channel channel = connection.createChannel(); // 3.创建队列 String queueName = "simple.queue"; channel.queueDeclare(queueName, false, false, false, null); // 4.订阅消息 channel.basicConsume(queueName, true, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // 5.处理消息 String message = new String(body); System.out.println("接收到消息:【" + message + "】"); } }); System.out.println("等待接收消息      。         。               。         。"); } }

引入依赖

<!--AMQP依赖         ,包含RabbitMQ--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>

生产者编写yaml文件的MQ地址      ,端口号               ,用户名密码

spring: rabbitmq: host: 192.168.150.101 # 主机名 port: 5672 # 端口 virtual-host: / # 虚拟主机 username: itcast # 用户名 password: 123321 # 密码

编写生产者代码

@RunWith(SpringRunner.class) @SpringBootTest public class SpringAmqpTest { //类似于redisTemplate @Autowired private RabbitTemplate rabbitTemplate; @Test public void testSimpleQueue(){ //队列名 String queueName = "simple.queue"; //消息 String message = "hello,SpringAmqp"; //发送消息到队列 rabbitTemplate.convertAndSend(queueName,message); } }

消费者编写yaml文件的MQ地址           ,端口号   ,用户名密码

同上

编写消费者代码

@Component public class SpringRabbitListener { @RabbitListener(queues = "simple.queue") public void listenSimpleQueueMessage(String msg) throws InterruptedException { System.out.println("spring 消费者接收到消息:【" + msg + "】"); } } 2.2工作消息队列

消费者:

//工作队列 @RabbitListener(queues = "simple.queue") public void listenWorkQueue1Message(String msg) throws InterruptedException { System.out.println("spring 消费者1接收到消息:【" + msg + "】"); } @RabbitListener(queues = "simple.queue") public void listenSimpleQueue2Message(String msg) throws InterruptedException { System.out.println("spring 消费者2接收到消息:【" + msg + "】"); } logging: pattern: dateformat: MM-dd HH:mm:ss:SSS spring: rabbitmq: host: # 主机名 port: 5672 # 端口 virtual-host: / # 虚拟主机 username: itcast # 用户名 password: 123321 # 密码 listener: direct: prefetch: 1 # 消息预期                ,之前把所有消息预期导致平分消息             ,现在是处理一个取一个 2.3广播Fanout

多了一个交换机,交换机决定将生产者的消息发送给哪个队列              ,这决定就是交换机种类不同规则不同               ,因此分为三种   ,广播               、路由和主题      。

广播:只要队列绑定了交换机            ,就发送消息到队列

1.编写消费者:使用注解声明队列和交换机(同时绑定消费者的队列和交换机)

//3.广播(绑定队列               ,同时绑定交换机) @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "fanout.queue1"), exchange = @Exchange(name = "fanout.exange",type = ExchangeTypes.FANOUT) )) public void listenFanoutQueue1(String message){ System.out.println("广播方式队列1接受消息:"+message); } @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "fanout.queue2"), exchange = @Exchange(name = "fanout.exange",type = ExchangeTypes.FANOUT) )) public void listenFanoutQueue2(String message){ System.out.println("广播方式队列2接受消息:"+message); }

2.生产者

//广播类型交换机 @Test public void testFanoutExchangeQueue(){ //交换机名称 String exchangeName = "fanout.exange"; //消息 String message = "广播消息"; //发送      ,第二个参数是队列的key,在路由类型中可以使用到 rabbitTemplate.convertAndSend(exchangeName,"",message); } 2.4路由Route

和广播相比         ,在队列中多了一个key属性               。

交换机通过消息的key来对应路由到相同key的队列上           。

1.消费者:

/** 4.路由 */ @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct.queue1"), exchange = @Exchange(name = "direct.exchange",type = ExchangeTypes.DIRECT), key = {"red","blue"} )) public void listenDirectQueue1(String message){ System.out.println("路由红   、蓝队列接受消息:"+message); } @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct.queue2"), exchange = @Exchange(name = "direct.exchange",type = ExchangeTypes.DIRECT), key = {"red","green"} )) public void listenDirectQueue2(String message){ System.out.println("路由红            、绿队列接受消息:"+message); }

2.生产者

//路由类型交换机 @Test public void testDirectExchangeQueue(){ //交换机名称 String exchangeName = "direct.exchange"; //消息 String message = "路由消息"; //发送               ,第二个参数是队列的key,在路由类型中可以使用到 rabbitTemplate.convertAndSend(exchangeName,"red",message); } 2.5主题Topic

key不再是一组单词         ,而是一个规则

通配符规则:

#:匹配一个或多个词

*:匹配不多不少恰好1个词

举例:

item.#:能够匹配item.spu.insert 或者 item.spu

item.*:只能匹配item.spu

1.消费者

/** * 主题 */ @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "topic.queue1"), exchange = @Exchange(name = "topic.exchange",type = ExchangeTypes.TOPIC), key = "china.#" )) public void listenTopicQueue1(String message){ System.out.println("中国队列接受消息:"+message); } @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "topic.queue2"), exchange = @Exchange(name = "topic.exchange",type = ExchangeTypes.TOPIC), key = "#.news" )) public void listenTopicQueue2(String message){ System.out.println("新闻队列接受消息:"+message); }

2.生产者

//主题类型交换机 @Test public void testTopicExchangeQueue(){ //交换机名称 String exchangeName = "topic.exchange"; //消息 String message = "china.lala消息"; //发送      ,第二个参数是队列的key,在路由类型中可以使用到 rabbitTemplate.convertAndSend(exchangeName,"china.news",message); }

3.消息转换器

Spring会把你发送的消息序列化为字节发送给MQ               ,接收消息的时候           ,还会把字节反序列化为Java对象   。

默认情况下Spring采用的序列化方式是JDK序列化                。众所周知   ,JDK序列化存在下列问题:

因此需要配置一个Json转换器

步骤 在publisher和consumer两个服务中都引入依赖: <dependency> <groupId>com.fasterxml.jackson.dataformat</groupId> <artifactId>jackson-dataformat-xml</artifactId> <version>2.9.10</version> </dependency> 配置消息转换器             。在启动类中添加一个Bean即可: @Bean public MessageConverter jsonMessageConverter(){ return new Jackson2JsonMessageConverter(); }

创心域SEO版权声明:以上内容作者已申请原创保护,未经允许不得转载,侵权必究!授权事宜、对本内容有异议或投诉,敬请联系网站管理员,我们将尽快回复您,谢谢合作!

展开全文READ MORE
如何制作个人博客网站(打造精致个人博客,用WordPress采集尽显魅力) win11安装助手无法使用(如何修复错误0xC1900101?Win11安装助手错误代码0xc1900101的原因以及解决方法)