RabbitMQ

MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。

RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、 安全。AMQP协议更多用在企业系统内,对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。

适用场景

异步提升效率

场景说明:用户注册后,需要发注册邮件和注册短信。传统的做法有两种 1.串行的方式;2.并行方式

串行

将注册信息写入数据库成功后,发送注册邮件,再发送注册短信。

以上三个任务全部完成后,返回给客户端。耗时150ms

img

并行

将注册信息写入数据库成功后,发送注册邮件的同时,发送注册短信。

以上三个任务完成后,返回给客户端。与串行的差别是,并行的方式可以提高处理的时间,耗时100ms

img

引入消息队列

将不是必须的业务逻辑,异步处理。

1543774-20181230161605799-363702530

流量削峰

流量削峰是消息队列中的常用场景,一般在秒杀活动中使用广泛。

1543774-20181230161744411-963536299

将前端的请求写入消息队列,而非直接请求后端接口。这样后端接口就可以尽其最大能力处理请求,避免因突然大量请求而导致宕机的情况。同时也可以设置消息队列的最大容量,超过的就直接丢弃或重定向至错误页面。

安装

Mac下使用HomeBrew进行安装

1
brew install rabbitmq

Cent OS7下使用yum安装

执行以下命令后将会自动安装rabbitmq及其所需的依赖

1
yum install rabbitmq-server -y
image-20201117191910638

看到出现以下信息便安装完成了。

image-20201117195715069

安装完成后会自动将RabbitMQ的一些脚本添加到/usr/sbin目录中。

  • 配置文件位置:/etc/rabbitmq/rabbitmq.config
  • 脚本命令位置:/usr/sbin
  • 样例配置文件位置:/usr/share/doc/rabbitmq-server-3.3.5/rabbitmq.config.example
image-20201117200133692 image-20201117201739814 image-20201117201947285

配置

开启来宾账户

image-20201117204654984

开启后配置文件应是如下图所示:

⚠️ 删除前面的%%注释符号后,最后面的,也要记得删除!!

image-20201117204804641

五种工作模式

1
2
3
4
5
6
<!-- 普通maven项目引入rabitmq依赖 -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.9.0</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// RabbitMQ工具类   
public class RabbitMQUtil {
private static ConnectionFactory connectionFactory;
// 类加载时初始化 只执行一次
static {
// 初始化连接工厂对象
connectionFactory = new ConnectionFactory();
// 设置主机地址
connectionFactory.setHost("127.0.0.1");
// 设置端口
connectionFactory.setPort(5672);
// 设置虚拟主机地址
connectionFactory.setVirtualHost("/ems");
// 设置用户名和密码
connectionFactory.setUsername("admin");
connectionFactory.setPassword("123");
}

/**
* 获取连接的方法
* @return Connection对象
*/
public static Connection getConnection() {
try {
// 返回连接对象
return connectionFactory.newConnection();
} catch (TimeoutException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
}

简易模式

image-20201119195007087

特点:

  • 生产者直接将消息发送至队列,消费者直接从队列中获取消息。无交换机参与
  • 单生产者,单消费者(若有多个消费者,也只有一个消费者能收到消息)

生产者核心代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 获取连接
Connection connection = RabbitMQUtil.getConnection();
// 获取通道
final Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare("hello",false,false,false,null);
// 生产消息
// 参数1:交换机名称,该模式无交换机参与,所以使用空字符串即可。
// 参数2:路由key, 该模式下即队列的名称,生产者将消息发布至该队列中
// 参数3:为true则mandatory标志告诉broker代理服务器至少将消息route到一个队列中,否则将消息返回给发送者;
// 参数4:消息的属性字段
// 参数5:要发布的消息主体
channel.basicPublish("","hello",false, null,"你好啊啊啊啊 ,RabbitMQ".getBytes("utf-8"));

// 关闭资源
channel.close();
connection.close();

消费者核心代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 获取连接
Connection connection = RabbitMQUtil.getConnection();
// 获取通道
Channel channel = connection.createChannel();
// 队列声明
// 参数1: 队列名称,若没有则会自动生成一个
// 参数2: 队列是否持久化,false的话rabbitmq重启后队列内的内容将全部丢失
// 参数3: 是否是独占型队列,
// 参数4: 是否自动删除队列,true的话队列里没有内容且没有消费者监听该队列,服务器将删除这个队列。
// 参数5: 队列的一些额外的配置
channel.queueDeclare("hello",false,false,false,null);

// 设置消息回调
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
// 输出接收到的信息并输出
System.out.println(" Received Msg: '" + message + "'");
};
// 消费消息
channel.basicConsume("hello", true,deliverCallback,consumerTag->{ });

工作模式(work queue)

work queue

特点:

  • 生产者直接将消息发送至队列,消费者直接从队列中获取消息。无交换机参与
  • 单生产者,多消费者
  • 消费者之间默认公平竞争,即每个消费者拿到的消息数量相同。

生产者核心代码

1
2
3
4
5
6
7
8
9
// 声明队列
channel.queueDeclare("work",false,false,false,null);
for (int i = 1; i <= 30; i++) {
// 发送消息
channel.basicPublish("","work",null,(i+"hello,rabbitmq!").getBytes());
}
// 关闭资源
channel.close();
connection.close();

消费者1核心代码

1
2
3
4
5
6
7
8
9
10
11
 // 声明队列
channel.queueDeclare("work",false,false,false,null);
// 消息回调
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1:"+new String(body));
}
};
// 消费消息
channel.basicConsume("work",true,consumer);

消费者2核心代码

1
2
3
4
5
6
7
8
9
10
11
 // 声明队列
channel.queueDeclare("work",false,false,false,null);
// 消息回调
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2:"+new String(body));
}
};
// 消费消息
channel.basicConsume("work",true,consumer);

广播模式(fanout)

也叫发布/订阅模式(publish/subscribe)

fanout

特点:

  • 生产者将消息发送给交换机,由交换机分发消息给每个消费者所拥有的临时队列
  • 单生产者,多消费者,且每个消费者都有一个自己的临时队列。
  • 生产者发布一条消息,每个消费者都能收到。
  • 交换机type为fanout

生产者核心代码

1
2
3
4
5
6
7
8
9
10
11
12
13
 // 创建连接
Connection connection = RabbitMQUtil.getConnection();
// 创建通道
Channel channel = connection.createChannel();
// 声明交换机名称及类型fanout
channel.exchangeDeclare("logs", BuiltinExchangeType.FANOUT);
for (int i = 1; i <= 10; i++) {
// 生产消息
channel.basicPublish("logs","",null,("你好啊"+i).getBytes());
}
// 关闭资源
channel.close();
connection.close();

消费者1核心代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 声明交换机
channel.exchangeDeclare("logs", BuiltinExchangeType.FANOUT);
// 获取临时队列 并返回临时队列名称
String tempQueueName = channel.queueDeclare().getQueue();
// 绑定队列和交换机 参数3为routingkey,此模式下会被忽略。
channel.queueBind(tempQueueName,"logs","");
// 消费消息 并关闭自动确认,需消费者手动确认
channel.basicConsume(tempQueueName,false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("广播模式消息消费者1Received : " + new String(body));
// 手动确认
channel.basicAck(envelope.getDeliveryTag(),false);
}
});

消费者2核心代码

1
2
3
4
5
6
7
8
9
10
11
12
13
// 声明交换机
channel.exchangeDeclare("logs", BuiltinExchangeType.FANOUT);
// 获取临时队列 并返回临时队列名称
String tempQueueName = channel.queueDeclare().getQueue();
// 绑定队列和交换机
channel.queueBind(tempQueueName,"logs","");
channel.basicConsume(tempQueueName,false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("广播模式消息消费者2Received : " + new String(body));
channel.basicAck(envelope.getDeliveryTag(),false);
}
});

路由模式(routing)

routing

特点:

  • 单生产者,多消费者。
  • 生产者发送消息时指定routingKey,消费者绑定自己的临时队列和交换机名称时,指定routingKey(可多个)。
  • 交换机type为direct

生产者核心代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 声明交换机名称及其类型为direct
channel.exchangeDeclare("logs", BuiltinExchangeType.DIRECT);
// 生产消息
// 参数1: 交换机名称
// 参数2: routingKey
// 参数3: 消息属性设置
// 参数4: 要发送的消息主体
// 发送rouintKey为error的消息
channel.basicPublish("logs","error",null,"error message".getBytes());
// 发送rouintKey为warning的消息
channel.basicPublish("logs","warning",null,"warning message".getBytes());
// 发送rouintKey为info的消息
channel.basicPublish("logs","info",null,"info message".getBytes());
// 若发布 没有与该路由key绑定的队列 的消息, 则其他路由key的消费者都无法消费该消息
channel.basicPublish("logs","trace",null,"trace message".getBytes());

消费者1核心代码:

1
2
3
4
5
6
7
8
9
10
11
12
// 获取临时队列的名称
String queue = channel.queueDeclare().getQueue();
// 绑定队列及其路由Key
channel.queueBind(queue,"logs","error");

// 消费消息
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("路由key为error的消费者收到消息: " + new String(body));
}
});

消费者2核心代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 获取临时队列
String queue = channel.queueDeclare().getQueue();
// 绑定多个路由key:info,warning,error
channel.queueBind(queue,"logs","warning");
channel.queueBind(queue,"logs","info");
channel.queueBind(queue,"logs","error");

// 消费消息
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("路由key为info,warning,error的消费者收到消息: " + new String(body));
}
});

动态路由模式(topic)

topic

特点:

  • 单生产者,多消费者。
  • 生产者发送消息时指定routingKey,消费者绑定自己的临时队列和交换机名称时,指定routingKey(可多个)。
  • 交换机type为topic
这里的routingKey与路由模式的routingKey并不相同!!

形如:”stock.usd.nyse”, “nyse.vmw”, “quick.orange.rabbit”, “user.*.*“, “#.update”

  • * (星号) 仅能替代1个单词。
  • #(哈希) 能够替代0或多个单词。

生产者核心代码

1
2
3
4
5
// 声明交换机名称及其类型topic
channel.exchangeDeclare("logs", BuiltinExchangeType.TOPIC);
// 生产消息
channel.basicPublish("logs","user.save",null,"topic模式的消息1".getBytes());
channel.basicPublish("logs","userService.update.save",null,"topic模式的消息2".getBytes());

消费者1核心代码

1
2
3
4
5
6
7
8
9
10
11
// 获取临时队列名称
String queue = channel.queueDeclare().getQueue();
// 队列绑定交换机和动态路由Key
channel.queueBind(queue,"logs","*.save");
// 消费消息
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("路由key为*.save的消费者收到了消息: " + new String(body));
}
});

消费者2核心代码

1
2
3
4
5
6
7
8
9
final String queue = channel.queueDeclare().getQueue();
// 队列绑定交换机和动态路由Key
channel.queueBind(queue,"logs","userService.update.*");
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("路由key为userService.update.*的消费者收到了消息: " + new String(body));
}
});

SpringBoot整合RabbitMQ

导入依赖

1
2
3
4
5
<!--RabbitMQ依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置rabbitmq

1
2
3
4
5
6
7
8
9
# 应用名
spring.application.name=rabbitmq-springboot

# 配置RabbitMQ
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/ems
spring.rabbitmq.username=admin
spring.rabbitmq.password=123

简易模式

生产者代码

1
2
3
4
5
6
7
8
9
10
11
@Autowired
private RabbitTemplate rabbitTemplate;

/**
* 简易模式
*/
@Test
void testSimple() {
// 发送消息 参数1: 队列名称 参数2: 要发送的消息
rabbitTemplate.convertAndSend("hello","hello world !");
}

消费者代码

1
2
3
4
5
6
7
8
9
10
@Component
// 消费者监听器 声明一个名称为hello的队列 默认是持久化 非独占 不自动删除的队列
@RabbitListener(queuesToDeclare = {@Queue("hello")})
public class HelloConsumer {

@RabbitHandler
public void receive(String message){
System.out.println("消费者1" + message);
}
}

输出结果

image-20201121194553228

工作模式

生产者核心代码

1
2
3
4
5
6
7
8
9
10
/**
* 测试第二种work模型 默认版: 公平竞争
*/
@Test
void testWorkModel() {
for (int i = 1; i <= 10; i++) {
// 发送10条消息
rabbitTemplate.convertAndSend("work","message"+i);
}
}

输出结果

image-20201121194717874

广播模式

生产者核心代码

1
2
3
4
5
6
7
/**
* 第三种fanout模式 广播模式
*/
@Test
void testFanoutModel(){
rabbitTemplate.convertAndSend("fanout-model","","这里是广播消息");
}

消费者核心代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@RabbitListener(bindings = {
@QueueBinding(value = @Queue(), exchange = @Exchange(value = "fanout-model", type = "fanout"))
})
public void fanout1(String message) {
System.out.println("fanout模式消费者1 Received: " + message);
}


@RabbitListener(bindings = {
@QueueBinding(value = @Queue(), exchange = @Exchange(value = "fanout-model", type = "fanout"))
})
public void fanout2(String message) {
System.out.println("fanout模式消费者2 Received: " + message);
}

@RabbitListener(bindings = {
@QueueBinding(value = @Queue(), exchange = @Exchange(value = "fanout-model", type = "fanout"))
})
public void fanout3(String message) {
System.out.println("fanout模式消费者3 Received: " + message);
}

输出结果

image-20201121195143069

路由模式

生产者核心代码

1
2
3
4
5
6
7
8
9
10
/**
* 第四种touting模型 路由模型
*/
@Test
void testRoutingModel(){
rabbitTemplate.convertAndSend("routing-model","error","error message");
rabbitTemplate.convertAndSend("routing-model","info","info message");
rabbitTemplate.convertAndSend("routing-model","warning","warning message");
rabbitTemplate.convertAndSend("routing-model","trace","trace message");
}

消费者核心代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@RabbitListener(bindings = {
@QueueBinding(value = @Queue(),exchange = @Exchange(value = "routing-model",type = "direct"),key = "error")
})
public void routingConsumer1(String message){
System.out.println("routingKey为error的消费者 Received: " + message);
}



@RabbitListener(bindings = {
@QueueBinding(value = @Queue(),exchange = @Exchange(value = "routing-model",type = "direct"),key = {"error","info","warning"})
})
public void routingConsumer2(String message){
System.out.println("routingKey为error,info,warning的消费者 Received: " + message);
}

输出结果

image-20201121195345038

⚠️由于没有2个消费者都没有绑定routingKey为trace的交换机,所以消息会丢失。

动态路由模式

生产者核心代码

1
2
3
4
5
6
7
8
9
/**
* 测试第五种topic模式 动态路由模式
*/
@Test
void testTopicModel(){
rabbitTemplate.convertAndSend("topic-model","userService.save.test","消息1");
rabbitTemplate.convertAndSend("topic-model","userService.update.save","消息2");
rabbitTemplate.convertAndSend("topic-model","userService.update.save.all","消息3");
}

消费者核心代码

1
2
3
4
5
6
7
8
9
10
11
12
13
@RabbitListener(bindings = {
@QueueBinding(value = @Queue(),exchange = @Exchange(value = "topic-model",type = "topic"),key = "userService.save.*")
})
public void topicConsumer1(String message){
System.out.println("路由key为userService.save.* Received: " + message);
}

@RabbitListener(bindings = {
@QueueBinding(value = @Queue(),exchange = @Exchange(value = "topic-model",type = "topic"),key = "userService.update.#")
})
public void topicConsumer2(String message){
System.out.println("路由key为userService.update.# Received: " + message);
}

输出结果

image-20201121200000365