RabbitMQ
MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。
RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议
来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、 安全。AMQP协议更多用在企业系统内,对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。
适用场景
异步提升效率
场景说明:用户注册后,需要发注册邮件和注册短信。传统的做法有两种 1.串行的方式;2.并行方式
串行
将注册信息写入数据库成功后,发送注册邮件,再发送注册短信。
以上三个任务全部完成后,返回给客户端。耗时150ms
。
并行
将注册信息写入数据库成功后,发送注册邮件的同时,发送注册短信。
以上三个任务完成后,返回给客户端。与串行的差别是,并行的方式可以提高处理的时间,耗时100ms
。
引入消息队列
将不是必须的业务逻辑,异步处理。
流量削峰
流量削峰是消息队列中的常用场景,一般在秒杀活动中使用广泛。
将前端的请求写入消息队列,而非直接请求后端接口。这样后端接口就可以尽其最大能力处理请求,避免因突然大量请求而导致宕机的情况。同时也可以设置消息队列的最大容量,超过的就直接丢弃或重定向至错误页面。
安装
Mac下使用HomeBrew进行安装
Cent OS7下使用yum安装
执行以下命令后将会自动安装rabbitmq及其所需的依赖
1
| yum install rabbitmq-server -y
|
看到出现以下信息便安装完成了。

安装完成后会自动将RabbitMQ的一些脚本添加到/usr/sbin
目录中。
- 配置文件位置:
/etc/rabbitmq/rabbitmq.config
- 脚本命令位置:
/usr/sbin
- 样例配置文件位置:
/usr/share/doc/rabbitmq-server-3.3.5/rabbitmq.config.example
配置
开启来宾账户
开启后配置文件应是如下图所示:
⚠️ 删除前面的%%
注释符号后,最后面的,
也要记得删除!!
五种工作模式
1 2 3 4 5 6
| <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.9.0</version> </dependency>
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| public class RabbitMQUtil { private static ConnectionFactory connectionFactory; static { connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/ems"); connectionFactory.setUsername("admin"); connectionFactory.setPassword("123"); }
public static Connection getConnection() { try { return connectionFactory.newConnection(); } catch (TimeoutException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } return null; } }
|
简易模式
特点:
- 生产者直接将消息发送至队列,消费者直接从队列中获取消息。
无交换机参与
单生产者,单消费者
(若有多个消费者,也只有一个消费者能收到消息)
生产者核心代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| Connection connection = RabbitMQUtil.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare("hello",false,false,false,null);
channel.basicPublish("","hello",false, null,"你好啊啊啊啊 ,RabbitMQ".getBytes("utf-8"));
channel.close(); connection.close();
|
消费者核心代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("hello",false,false,false,null);
DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" Received Msg: '" + message + "'"); };
channel.basicConsume("hello", true,deliverCallback,consumerTag->{ });
|
工作模式(work queue
)

特点:
- 生产者直接将消息发送至队列,消费者直接从队列中获取消息。
无交换机参与
单生产者,多消费者
。
- 消费者之间默认
公平竞争
,即每个消费者拿到的消息数量相同。
生产者核心代码
1 2 3 4 5 6 7 8 9
| channel.queueDeclare("work",false,false,false,null); for (int i = 1; i <= 30; i++) { channel.basicPublish("","work",null,(i+"hello,rabbitmq!").getBytes()); }
channel.close(); connection.close();
|
消费者1核心代码
1 2 3 4 5 6 7 8 9 10 11
| channel.queueDeclare("work",false,false,false,null);
Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者1:"+new String(body)); } };
channel.basicConsume("work",true,consumer);
|
消费者2核心代码
1 2 3 4 5 6 7 8 9 10 11
| channel.queueDeclare("work",false,false,false,null);
Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者2:"+new String(body)); } };
channel.basicConsume("work",true,consumer);
|
广播模式(fanout
)
也叫发布/订阅模式(publish/subscribe)

特点:
- 生产者将消息发送给交换机,由交换机分发消息给每个消费者所拥有的
临时队列
。
单生产者,多消费者
,且每个消费者都有一个自己的临时队列。
- 生产者发布一条消息,每个消费者都能收到。
- 交换机type为
fanout
。
生产者核心代码
1 2 3 4 5 6 7 8 9 10 11 12 13
| Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("logs", BuiltinExchangeType.FANOUT); for (int i = 1; i <= 10; i++) { channel.basicPublish("logs","",null,("你好啊"+i).getBytes()); }
channel.close(); connection.close();
|
消费者1核心代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| channel.exchangeDeclare("logs", BuiltinExchangeType.FANOUT);
String tempQueueName = channel.queueDeclare().getQueue();
channel.queueBind(tempQueueName,"logs","");
channel.basicConsume(tempQueueName,false,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("广播模式消息消费者1Received : " + new String(body)); channel.basicAck(envelope.getDeliveryTag(),false); } });
|
消费者2核心代码
1 2 3 4 5 6 7 8 9 10 11 12 13
| channel.exchangeDeclare("logs", BuiltinExchangeType.FANOUT);
String tempQueueName = channel.queueDeclare().getQueue();
channel.queueBind(tempQueueName,"logs",""); channel.basicConsume(tempQueueName,false,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("广播模式消息消费者2Received : " + new String(body)); channel.basicAck(envelope.getDeliveryTag(),false); } });
|
路由模式(routing
)

特点:
- 单生产者,多消费者。
- 生产者发送消息时指定routingKey,消费者绑定自己的临时队列和交换机名称时,指定routingKey(可多个)。
- 交换机type为
direct
。
生产者核心代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| channel.exchangeDeclare("logs", BuiltinExchangeType.DIRECT);
channel.basicPublish("logs","error",null,"error message".getBytes());
channel.basicPublish("logs","warning",null,"warning message".getBytes());
channel.basicPublish("logs","info",null,"info message".getBytes());
channel.basicPublish("logs","trace",null,"trace message".getBytes());
|
消费者1核心代码:
1 2 3 4 5 6 7 8 9 10 11 12
| String queue = channel.queueDeclare().getQueue();
channel.queueBind(queue,"logs","error");
channel.basicConsume(queue,true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("路由key为error的消费者收到消息: " + new String(body)); } });
|
消费者2核心代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| String queue = channel.queueDeclare().getQueue();
channel.queueBind(queue,"logs","warning"); channel.queueBind(queue,"logs","info"); channel.queueBind(queue,"logs","error");
channel.basicConsume(queue,true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("路由key为info,warning,error的消费者收到消息: " + new String(body)); } });
|
动态路由模式(topic
)

特点:
- 单生产者,多消费者。
- 生产者发送消息时指定routingKey,消费者绑定自己的临时队列和交换机名称时,指定routingKey(可多个)。
- 交换机type为
topic
。
这里的routingKey与路由模式的routingKey并不相同!!
形如:”stock.usd.nyse”, “nyse.vmw”, “quick.orange.rabbit”, “user.*.*“, “#.update”
*
(星号) 仅能替代1个
单词。#
(哈希) 能够替代0或多个
单词。
生产者核心代码
1 2 3 4 5
| channel.exchangeDeclare("logs", BuiltinExchangeType.TOPIC);
channel.basicPublish("logs","user.save",null,"topic模式的消息1".getBytes()); channel.basicPublish("logs","userService.update.save",null,"topic模式的消息2".getBytes());
|
消费者1核心代码
1 2 3 4 5 6 7 8 9 10 11
| String queue = channel.queueDeclare().getQueue();
channel.queueBind(queue,"logs","*.save");
channel.basicConsume(queue,true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("路由key为*.save的消费者收到了消息: " + new String(body)); } });
|
消费者2核心代码
1 2 3 4 5 6 7 8 9
| final String queue = channel.queueDeclare().getQueue();
channel.queueBind(queue,"logs","userService.update.*"); channel.basicConsume(queue,true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("路由key为userService.update.*的消费者收到了消息: " + new String(body)); } });
|
SpringBoot整合RabbitMQ
导入依赖
1 2 3 4 5
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
|
配置rabbitmq
1 2 3 4 5 6 7 8 9
| spring.application.name=rabbitmq-springboot
spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.virtual-host=/ems spring.rabbitmq.username=admin spring.rabbitmq.password=123
|
简易模式
生产者代码
1 2 3 4 5 6 7 8 9 10 11
| @Autowired private RabbitTemplate rabbitTemplate;
@Test void testSimple() { rabbitTemplate.convertAndSend("hello","hello world !"); }
|
消费者代码
1 2 3 4 5 6 7 8 9 10
| @Component
@RabbitListener(queuesToDeclare = {@Queue("hello")}) public class HelloConsumer {
@RabbitHandler public void receive(String message){ System.out.println("消费者1" + message); } }
|
输出结果
工作模式
生产者核心代码
1 2 3 4 5 6 7 8 9 10
|
@Test void testWorkModel() { for (int i = 1; i <= 10; i++) { rabbitTemplate.convertAndSend("work","message"+i); } }
|
输出结果
广播模式
生产者核心代码
1 2 3 4 5 6 7
|
@Test void testFanoutModel(){ rabbitTemplate.convertAndSend("fanout-model","","这里是广播消息"); }
|
消费者核心代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| @RabbitListener(bindings = { @QueueBinding(value = @Queue(), exchange = @Exchange(value = "fanout-model", type = "fanout")) }) public void fanout1(String message) { System.out.println("fanout模式消费者1 Received: " + message); }
@RabbitListener(bindings = { @QueueBinding(value = @Queue(), exchange = @Exchange(value = "fanout-model", type = "fanout")) }) public void fanout2(String message) { System.out.println("fanout模式消费者2 Received: " + message); }
@RabbitListener(bindings = { @QueueBinding(value = @Queue(), exchange = @Exchange(value = "fanout-model", type = "fanout")) }) public void fanout3(String message) { System.out.println("fanout模式消费者3 Received: " + message); }
|
输出结果
路由模式
生产者核心代码
1 2 3 4 5 6 7 8 9 10
|
@Test void testRoutingModel(){ rabbitTemplate.convertAndSend("routing-model","error","error message"); rabbitTemplate.convertAndSend("routing-model","info","info message"); rabbitTemplate.convertAndSend("routing-model","warning","warning message"); rabbitTemplate.convertAndSend("routing-model","trace","trace message"); }
|
消费者核心代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| @RabbitListener(bindings = { @QueueBinding(value = @Queue(),exchange = @Exchange(value = "routing-model",type = "direct"),key = "error") }) public void routingConsumer1(String message){ System.out.println("routingKey为error的消费者 Received: " + message); }
@RabbitListener(bindings = { @QueueBinding(value = @Queue(),exchange = @Exchange(value = "routing-model",type = "direct"),key = {"error","info","warning"}) }) public void routingConsumer2(String message){ System.out.println("routingKey为error,info,warning的消费者 Received: " + message); }
|
输出结果
⚠️由于没有2个消费者都没有绑定routingKey为trace
的交换机,所以消息会丢失。
动态路由模式
生产者核心代码
1 2 3 4 5 6 7 8 9
|
@Test void testTopicModel(){ rabbitTemplate.convertAndSend("topic-model","userService.save.test","消息1"); rabbitTemplate.convertAndSend("topic-model","userService.update.save","消息2"); rabbitTemplate.convertAndSend("topic-model","userService.update.save.all","消息3"); }
|
消费者核心代码
1 2 3 4 5 6 7 8 9 10 11 12 13
| @RabbitListener(bindings = { @QueueBinding(value = @Queue(),exchange = @Exchange(value = "topic-model",type = "topic"),key = "userService.save.*") }) public void topicConsumer1(String message){ System.out.println("路由key为userService.save.* Received: " + message); }
@RabbitListener(bindings = { @QueueBinding(value = @Queue(),exchange = @Exchange(value = "topic-model",type = "topic"),key = "userService.update.#") }) public void topicConsumer2(String message){ System.out.println("路由key为userService.update.# Received: " + message); }
|
输出结果