rabbitmq安装目录(RabbitMQ)
RabbitMQ
一 、MQ
1.同步调用的优缺点
同步调用的优点:
时效性较强 ,可以立即得到结果同步调用的问题:
耦合度高 性能和吞吐能力下降 有额外的资源消耗 有级联失败问题2.异步调用
异步调用常见实现就是事件驱动模式
好处:
吞吐量提升:无需等待订阅者处理完成 ,响应更快速
故障隔离:服务没有直接调用 ,不存在级联失败问题
调用间没有阻塞 ,不会造成无效的资源占用
耦合度极低 ,每个服务都可以灵活插拔 ,可替换
流量削峰:不管发布事件的流量波动多大 ,都由Broker接收 ,订阅者可以按照自己的速度去处理事件
缺点:
架构复杂了 ,业务没有明显的流程线 ,不好管理 需要依赖于Broker的可靠 、安全 、性能3.MQ实现
MQ ,中文是消息队列(MessageQueue) ,字面来看就是存放消息的队列 。也就是事件驱动架构中的Broker 。
几种常见MQ的对比:
RabbitMQ ActiveMQ RocketMQ Kafka 公司/社区 Rabbit Apache 阿里 Apache 开发语言 Erlang Java Java Scala&Java 协议支持 AMQP,XMPP ,SMTP ,STOMP OpenWire,STOMP,REST,XMPP,AMQP 自定义协议 自定义协议 可用性 高 一般 高 高 单机吞吐量 一般 差 高 非常高 消息延迟 微秒级 毫秒级 毫秒级 毫秒以内 消息可靠性 高 一般 高 一般追求可用性:Kafka 、 RocketMQ 、RabbitMQ
追求可靠性:RabbitMQ 、RocketMQ
追求吞吐能力:RocketMQ 、Kafka
追求消息低延迟:RabbitMQ 、Kafka
二 、Linux安装RabbitMQ
1.1.下载镜像
方式一:在线拉取
docker pull rabbitmq:3-management方式二:从本地加载
在课前资料已经提供了镜像包:
上传到虚拟机中后 ,使用命令加载镜像即可:
docker load -i mq.tar1.2.安装MQ
执行下面的命令来运行MQ容器:
docker run \ -e RABBITMQ_DEFAULT_USER=itcast \ -e RABBITMQ_DEFAULT_PASS=123321 \ --name mq \ --hostname mq1 \ -p 15672:15672 \ -p 5672:5672 \ -d \ rabbitmq:3-management注意5672是MQ消息通信的端口 ,15672是ui端口
三 、RabbitMQ
1.种类
大致分为两种
1.**无交换机的 **基本消息队列 和 工作消息队列 2.有交换机的发布订阅 ,路由 ,主题2.基于SpringAMQP使用
2.1基础消息队列先使用测试类生成队列
public class ConsumerTest { public static void main(String[] args) throws IOException, TimeoutException { // 1.建立连接 ConnectionFactory factory = new ConnectionFactory(); // 1.1.设置连接参数 ,分别是:主机名 、端口号 、vhost、用户名 、密码 factory.setHost("116.62.32.68"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("itcast"); factory.setPassword("123321"); // 1.2.建立连接 Connection connection = factory.newConnection(); // 2.创建通道Channel Channel channel = connection.createChannel(); // 3.创建队列 String queueName = "simple.queue"; channel.queueDeclare(queueName, false, false, false, null); // 4.订阅消息 channel.basicConsume(queueName, true, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // 5.处理消息 String message = new String(body); System.out.println("接收到消息:【" + message + "】"); } }); System.out.println("等待接收消息 。 。 。 。"); } }引入依赖
<!--AMQP依赖 ,包含RabbitMQ--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>生产者编写yaml文件的MQ地址 ,端口号 ,用户名密码
spring: rabbitmq: host: 192.168.150.101 # 主机名 port: 5672 # 端口 virtual-host: / # 虚拟主机 username: itcast # 用户名 password: 123321 # 密码编写生产者代码
@RunWith(SpringRunner.class) @SpringBootTest public class SpringAmqpTest { //类似于redisTemplate @Autowired private RabbitTemplate rabbitTemplate; @Test public void testSimpleQueue(){ //队列名 String queueName = "simple.queue"; //消息 String message = "hello,SpringAmqp"; //发送消息到队列 rabbitTemplate.convertAndSend(queueName,message); } }消费者编写yaml文件的MQ地址 ,端口号 ,用户名密码
同上
编写消费者代码
@Component public class SpringRabbitListener { @RabbitListener(queues = "simple.queue") public void listenSimpleQueueMessage(String msg) throws InterruptedException { System.out.println("spring 消费者接收到消息:【" + msg + "】"); } } 2.2工作消息队列消费者:
//工作队列 @RabbitListener(queues = "simple.queue") public void listenWorkQueue1Message(String msg) throws InterruptedException { System.out.println("spring 消费者1接收到消息:【" + msg + "】"); } @RabbitListener(queues = "simple.queue") public void listenSimpleQueue2Message(String msg) throws InterruptedException { System.out.println("spring 消费者2接收到消息:【" + msg + "】"); } logging: pattern: dateformat: MM-dd HH:mm:ss:SSS spring: rabbitmq: host: # 主机名 port: 5672 # 端口 virtual-host: / # 虚拟主机 username: itcast # 用户名 password: 123321 # 密码 listener: direct: prefetch: 1 # 消息预期 ,之前把所有消息预期导致平分消息 ,现在是处理一个取一个 2.3广播Fanout多了一个交换机,交换机决定将生产者的消息发送给哪个队列 ,这决定就是交换机种类不同规则不同 ,因此分为三种,广播 、路由和主题 。
广播:只要队列绑定了交换机 ,就发送消息到队列
1.编写消费者:使用注解声明队列和交换机(同时绑定消费者的队列和交换机)
//3.广播(绑定队列 ,同时绑定交换机) @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "fanout.queue1"), exchange = @Exchange(name = "fanout.exange",type = ExchangeTypes.FANOUT) )) public void listenFanoutQueue1(String message){ System.out.println("广播方式队列1接受消息:"+message); } @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "fanout.queue2"), exchange = @Exchange(name = "fanout.exange",type = ExchangeTypes.FANOUT) )) public void listenFanoutQueue2(String message){ System.out.println("广播方式队列2接受消息:"+message); }2.生产者
//广播类型交换机 @Test public void testFanoutExchangeQueue(){ //交换机名称 String exchangeName = "fanout.exange"; //消息 String message = "广播消息"; //发送 ,第二个参数是队列的key,在路由类型中可以使用到 rabbitTemplate.convertAndSend(exchangeName,"",message); } 2.4路由Route和广播相比 ,在队列中多了一个key属性 。
交换机通过消息的key来对应路由到相同key的队列上 。
1.消费者:
/** 4.路由 */ @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct.queue1"), exchange = @Exchange(name = "direct.exchange",type = ExchangeTypes.DIRECT), key = {"red","blue"} )) public void listenDirectQueue1(String message){ System.out.println("路由红、蓝队列接受消息:"+message); } @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct.queue2"), exchange = @Exchange(name = "direct.exchange",type = ExchangeTypes.DIRECT), key = {"red","green"} )) public void listenDirectQueue2(String message){ System.out.println("路由红 、绿队列接受消息:"+message); }2.生产者
//路由类型交换机 @Test public void testDirectExchangeQueue(){ //交换机名称 String exchangeName = "direct.exchange"; //消息 String message = "路由消息"; //发送 ,第二个参数是队列的key,在路由类型中可以使用到 rabbitTemplate.convertAndSend(exchangeName,"red",message); } 2.5主题Topickey不再是一组单词 ,而是一个规则
通配符规则:
#:匹配一个或多个词
*:匹配不多不少恰好1个词
举例:
item.#:能够匹配item.spu.insert 或者 item.spu
item.*:只能匹配item.spu
1.消费者
/** * 主题 */ @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "topic.queue1"), exchange = @Exchange(name = "topic.exchange",type = ExchangeTypes.TOPIC), key = "china.#" )) public void listenTopicQueue1(String message){ System.out.println("中国队列接受消息:"+message); } @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "topic.queue2"), exchange = @Exchange(name = "topic.exchange",type = ExchangeTypes.TOPIC), key = "#.news" )) public void listenTopicQueue2(String message){ System.out.println("新闻队列接受消息:"+message); }2.生产者
//主题类型交换机 @Test public void testTopicExchangeQueue(){ //交换机名称 String exchangeName = "topic.exchange"; //消息 String message = "china.lala消息"; //发送 ,第二个参数是队列的key,在路由类型中可以使用到 rabbitTemplate.convertAndSend(exchangeName,"china.news",message); }3.消息转换器
Spring会把你发送的消息序列化为字节发送给MQ ,接收消息的时候 ,还会把字节反序列化为Java对象 。
默认情况下Spring采用的序列化方式是JDK序列化 。众所周知 ,JDK序列化存在下列问题:
因此需要配置一个Json转换器
步骤 在publisher和consumer两个服务中都引入依赖: <dependency> <groupId>com.fasterxml.jackson.dataformat</groupId> <artifactId>jackson-dataformat-xml</artifactId> <version>2.9.10</version> </dependency> 配置消息转换器 。在启动类中添加一个Bean即可: @Bean public MessageConverter jsonMessageConverter(){ return new Jackson2JsonMessageConverter(); }创心域SEO版权声明:以上内容作者已申请原创保护,未经允许不得转载,侵权必究!授权事宜、对本内容有异议或投诉,敬请联系网站管理员,我们将尽快回复您,谢谢合作!