rabbitmq
dev
#mq
基本概念
Publisher 发的消息通过 Connection 中的 Channel 到达 Broker 某个 Virtual Host
消息经过指定的 Exchange,根据 Binding 依据,分发到 0~n 个 Queue 中
Queue 中消息等待 Consumer 消费
Message:由消息头和消息体组成。消息头由一系列的可选属性组成,routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(消息持久性)等
Broker:MQ server
Connection: 客户端和Broker间的TCP连接
Channel:要为每个Connection创建Channel,通过 Channel才能执行AMQP命令,一个Connection可包含多个Channels
Exchange:接收生产者发送的消息,并根据Binding规则将消息路由给队列
Queue:存未被消费者消费的消息
Binding:联系Exchange与Queue,Binding后生成路由表
Exchange收到Message解析Header得到Routing Key,根据Routing Key与Exchange Type将Message路由到Queue
Virtual Host:类似于权限控制组,一个virtual host里面可以 若干个Exchange和Queue,权限控制的最小力度
AMQP 消息路由
AMQP 增加 Exchange 和 Binding 角色
Exchange type
不同类型路由行为不同,即时根据类型的不同分发策略有区别
Direct和Fanout类似于单播和广播模式
,Topic支持自定义匹配规则,按照规则把所有满足条件的消息路由到指定队列
direct(默认),点对点队列
根据routing key全文匹配
发到queue。这种模式可不声明exchange和binder,使用默认exchange ""
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
import org.junit.jupiter.api.*;
import org.junit.jupiter.api.extension.*;
import org.springframework.amqp.rabbit.core.*;
import org.springframework.beans.factory.annotation.*;
import org.springframework.boot.test.context.*;
import org.springframework.test.context.junit.jupiter.*;
@SpringBootTest
@ExtendWith(SpringExtension.class)
public class ProducerTest {
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void testSend() {
rabbitTemplate.convertAndSend( "queueName", "message");
}
}
|
Topic ,发布订阅模型
fanout
routingKey被忽略,为广播模式
,消息都会被转发到与该交换器绑定的所有队列上
安装
RabbitMQ 默认启动时不开启任何插件
1
2
|
rabbitmq-plugins list
rabbitmq-plugins enable rabbitmq_management
|
默认用户名密码 guest/guest,只能通过本地访问
1
2
3
|
rabbitmqctl add_user admin Pass1234
rabbitmqctl set_user_tags admin administrator
rabbitmqctl set_permissions -p "/" admin "." "." ".*"
|
hello world
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
public static void main(String[] argv) throws Exception {
String QUEUE_NAME = "hello";
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, true, null);
for (int i = 0; i < 10; ) {
String message = "NO " + ++i;
TimeUnit.MILLISECONDS.sleep(1000);
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
System.out.printf("send to %s, %s\n", QUEUE_NAME, message);
}
channel.close();
connection.close();
}
|
不同于 JMS,AMQP 允许编程方式创建绝大多数模型,Exchange、Queue 等
创建时自动忽略已存在的 Queue,也可能返回异常(如申明的 Queue 的属性与已存在的 Queue 的属性不一致时)
queueDeclare(指定 Queue 的属性)
- durable:队列本身是否需要持久化。 false,RabbitMQ 重启后该 Queue 消失
- exclusive:确保该队列只对申明它的连接可见,注意是连接而不是 Channel。当相应连接关闭时,该队列自动删除
- autoDelete:true 时队列会在所有 Consumer 都断开连接时自动删除(不管是否是 durable)。队列被第一个 Consumer 连接前不会被删除
basicPublish
第一个参数指定 Exchange ,空串为默认 Exchange,类型 direct
第二个参数指定 routineKey
direct 类型的 Exchange,消息发到所有绑定到该 Exchange 并且也设定了相同的路由键的 Queue 中
消费(default)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
import com.rabbitmq.client.*;
import java.io.*;
import java.util.concurrent.*;
public class Receiver {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.printf(" [ %2$s<===](%1$s) %3$s\n", "Receiver", QUEUE_NAME, message);
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
}
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
|
basicConsume
第一个参数指定 Consumer 消费的队列,即 hello 队列
第二个参数 autoAck 指定消息确认模式,true 表示消息确认是自动完成的(至少在进入 handleDelivery 方法前就已经自动确认了),false 表示必须由 Consumer 自己确认
第三个参数指定 Consumer
消费(pull)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
import com.rabbitmq.client.*;
import java.util.concurrent.*;
public class ReceiverByPull {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
while (true) {
GetResponse resp = channel.basicGet(QUEUE_NAME, true);
if (resp == null) {
System.out.println("Get Nothing!");
TimeUnit.MILLISECONDS.sleep(1000);
} else {
String message = new String(resp.getBody(), "UTF-8");
System.out.printf(" [ %2$s<===](%1$s) %3$s\n", "Receiver", QUEUE_NAME, message);
TimeUnit.MILLISECONDS.sleep(500);
}
}
}
}
|
Exchange & Binding
上例中的 Sender 只申明 hello 队列,然后就开始向默认 Exchange(“”) 发送routineKey也为 hello 的消息
AMQP 基本概念,消息到了 Exchange 后需要按照 Binding 提供的分发依据将消息分发到队列中
队列并不属于 Exchange,队列有自己的生命周期管理,与 Exchange 之间完全通过 Binding 关联
只要保证队列绑定到 Exchange 时使用的绑定键与消息发送时指定的 routineKey 一致就可以了
Exchange、队列和 Binding在rabbitmq声明过后,代码中不再声明3者关系,依然可以执行,只要 RabbitMQ 没有重启,这些模型将会一直生效
AMQP 模型到底是交给 Publisher 申明,还是交给 Consumer 申明,还是直接在 RabbitMQ 中预先创建,这是使用 RabbitMQ 时必须考虑的问题。没有统一结论,按照场景具体分析
默认 Exchange("")
每个队列都自动绑定到默认 Exchange,routineKey 为队列名称
rabbitmq-mock 方便本地调试
配置 mock.rabbitmq=true
1
2
3
4
5
6
7
8
9
|
<dependency>
<groupId>com.github.fridujo</groupId>
<artifactId>rabbitmq-mock</artifactId>
<version>1.0.13</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
import com.github.fridujo.rabbitmq.mock.*;
import org.springframework.amqp.rabbit.connection.*;
import org.springframework.amqp.rabbit.core.*;
import org.springframework.amqp.support.converter.*;
import org.springframework.boot.autoconfigure.condition.*;
import org.springframework.context.annotation.*;
@Configuration
public class AmqpConfig {
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(jsonMessageConverter());
template.setMandatory(true);
return template;
}
@Bean
public RabbitAdmin admin(ConnectionFactory connectionFactory) {
RabbitAdmin admin = new RabbitAdmin(connectionFactory);
admin.setIgnoreDeclarationExceptions(true);
return admin;
}
@Bean
@ConditionalOnProperty(prefix = "mock", name = "rabbitmq", havingValue = "true")
ConnectionFactory connectionFactory() {
return new CachingConnectionFactory(new MockConnectionFactory());
}
}
|
多线程消费同一队列
消费一条消息往往比产生一条消息慢,为防止消息积压,一般开启多个工作线程同时消费消息。RabbitMQ 中可以创建多个 Consumer 消费同一队列
RabbitMQ 默认轮询(round-robin)分发消息,每个消费者会得到相同数量的消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
import java.util.concurrent.TimeUnit;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Sender {
private static final String QUEUE_NAME = "tasks";
private String name;
public Sender(String name) {
this.name = name;
}
public void work() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
for (int i = 0; i < 10; ) {
String message = "NO. " + ++i;
TimeUnit.MILLISECONDS.sleep(100);
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
System.out.printf("(%1$s)[===>%2$s ] %3$s\n", name, ":" + QUEUE_NAME, message);
}
channel.close();
connection.close();
}
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
import com.rabbitmq.client.*;
import java.io.*;
import java.util.concurrent.*;
public class Receiver {
private static final String QUEUE_NAME = "tasks";
private String name;
private int sleepTime;
public Receiver(String name, int sleepTime) {
this.name = name;
this.sleepTime = sleepTime;
}
public void work() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.printf(" [ %2$s<===](%1$s) %3$s\n", name, QUEUE_NAME, message);
try {
TimeUnit.MILLISECONDS.sleep(sleepTime);
} catch (InterruptedException e) {
}
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
|
消费速度相同
1
2
3
4
5
6
7
8
|
public static void main(String[] args) throws Exception {
Receiver recv1 = new Receiver("A", 200);
recv1.work();
Receiver recv2 = new Receiver("B", 200);
recv2.work();
Sender sender = new Sender("S");
sender.work();
}
|
出现慢速消费者
1
2
3
4
5
6
7
8
|
public static void main(String[] args) throws Exception {
Receiver recv1 = new Receiver("A", 200);
recv1.work();
Receiver recv2 = new Receiver("B", 800);
recv2.work();
Sender sender = new Sender("S");
sender.work();
}
|
B 依然分到了一半消息,需要运行很久才能处理完
Fair dispath 公平分发
默认RabbitMQ将第n(取余)个Message分发给第n个Consumer,不管Consumer是否还有unacked Message,即不公平
channel.basic_qos(prefetch_count=1)
按每个消费者的能力分配消息
,联合使用 Qos 和 Acknowledge
从队列视角看,总会有一批消息已推送但尚未获得 ack 确认,Qos 的 prefetchCount 就是用来限制这批未确认消息数量
prefetchCount =1 时,只有在收到消费者发回的上一条消息 ack 确认后,才会向该消费者发送下一条消息prefetchCount 默认= 0,即没有限制,队列会将所有消息尽快发给消费者
消息从队列异步推送给消费者,消费者的 ack 也是异步发送给队列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
import java.io.*;
import java.util.concurrent.*;
public class QosAcknowledgeReceiver {
private static final String QUEUE_NAME = "tasks";
private String name;
private int sleepTime;
public QosAcknowledgeReceiver(String name, int sleepTime) {
this.name = name;
this.sleepTime = sleepTime;
}
public void work() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.printf(" [ %2$s<===](%1$s) %3$s\n", name, QUEUE_NAME, message);
try {
TimeUnit.MILLISECONDS.sleep(sleepTime);
} catch (InterruptedException e) {
}
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}
|
1
2
3
4
5
6
7
8
|
public static void main(String[] args) throws Exception {
QosAcknowledgeReceiver recv1 = new QosAcknowledgeReceiver("A", 200);
recv1.work();
QosAcknowledgeReceiver recv2 = new QosAcknowledgeReceiver("B", 800);
recv2.work();
Sender sender = new Sender("S");
sender.work();
}
|
1
2
3
4
5
6
7
|
Channel channel = ...;
Consumer consumer1 = ...;
Consumer consumer2 = ...;
channel.basicQos(10, false); // Per consumer limit
channel.basicQos(15, true); // Per channel limit
channel.basicConsume("my-queue1", false, consumer1);
channel.basicConsume("my-queue2", false, consumer2);
|
basicQos 特性
- basicQos 中 prefetchSize 参数通过消息的总字节数来限制队列推送消息的速度
- prefetchSize 与 prefetchCount 可以同时设置,达到任何一个限制,则队列暂停推送消息
- global 参数表示前两个参数的作用域,true 表示限制是针对信道的,false 表示限制是针对消费者的
- 可以对同一个信道同时设置 global 为 true 和 false 的 Qos,表示队列要考虑每个消费者的限制,同时还要考虑整个信道的限制
消息发送确认/消息接收确认(ACK)
默认 Message 被消费者正确接收,从 Queue 中移除
发送确认
消息无法路由到队列,确认消息路由失败
消息成功路由时,当需要发送的队列都发送成功后,进行确认消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.*;
import org.springframework.amqp.rabbit.core.*;
import org.springframework.stereotype.*;
import javax.annotation.*;
@Component
public class RabbitTemplateConfig implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {
private final RabbitTemplate rabbitTemplate;
public RabbitTemplateConfig(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
@PostConstruct
public void init() {
//指定 ConfirmCallback,ReturnCallback
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("消息唯一标识:" + correlationData);
System.out.println("确认结果:" + ack);
System.out.println("失败原因:" + cause);
}
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("消息主体 message : "+message);
System.out.println("消息主体 message : "+replyCode);
System.out.println("描述:"+replyText);
System.out.println("消息使用的交换器 exchange : "+exchange);
System.out.println("消息使用的路由键 routing : "+routingKey);
}
}
|
还要配置
1
2
3
4
|
spring:
rabbitmq:
publisher-confirms: true
publisher-returns: true
|
接收确认
消息通过 ACK 确认是否被正确接收,每个 Message 都要被确认(acknowledged),可手动或自动 ACK
-
自动
(消息发给消费者后立即确认,丢失消息的可能):
消费端消费逻辑抛出异常(没有处理成功这条消息),相当于丢失了消息
消息已被处理,但后续代码抛出异常,Spring 事务回滚,同样造成实际意义的消息丢失
-
手动
( 消费者调用 ack、nack、reject 进行确认)
可在业务失败后进行一些操作,如果消息未被 ACK 则 发送到下一个消费者
如果某个服务忘记 ACK,则 RabbitMQ 不会再发送数据给它,认为该服务的处理能力有限(Prefetch)
ACK 机制还可起到 限流
作用(如在接收到某条消息时休眠几秒钟)
消息确认模式:
- AcknowledgeMode.NONE:自动确认
- AcknowledgeMode.AUTO:根据情况确认
- AcknowledgeMode.MANUAL:手动确认
确认消息
默认消费者自动 ack (确认)消息,手动 ack(确认)要修改确认模式为 manual
1
2
3
4
5
|
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual
|
或
1
2
3
4
5
6
7
8
|
@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(jsonMessageConverter());
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
|
1
2
3
4
5
6
7
|
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "q", durable = "true"),
exchange = @Exchange(value = "demo", durable = "true", type = ExchangeTypes.TOPIC),
key = "test"))
public void processMessage(Map body, Message message, Channel channel) throws Exception {
System.out.println(JSON.toJSONString(body));
}
|
basicAck 需要两个参数
- deliveryTag(唯一标识 ID):RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,单调递增正整数,delivery tag 的范围仅限于 Channel
- multiple:为了减少网络流量,手动确认可以被批处理,true 时可一次性确认 delivery_tag 小于等于传入值的所有消息
否认、拒绝消息
发送一个 header 中包含 error 的消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
public void processMessage(Map message, Channel channel,@Headers Map<String,Object> map) {
System.out.println(JSON.toJSONString(message));
if (map.containsKey("error")){
System.out.println("错误的消息");
try {
channel.basicNack((Long)map.get(AmqpHeaders.DELIVERY_TAG),false,true);
return;
} catch (IOException e) {
e.printStackTrace();
}
}
try {
channel.basicAck((Long)map.get(AmqpHeaders.DELIVERY_TAG),false);
} catch (IOException e) {
e.printStackTrace();
}
}
|
此时控制台重复打印,说明该消息被 nack 后一直重新入队列然后一直重新消费
也可以拒绝该消息,消息会被丢弃,不会重回队列
1
|
channel.basicReject((Long)map.get(AmqpHeaders.DELIVERY_TAG),false); //拒绝消息
|
AcknowledgeMode.AUTO ,根据方法的执行情况来决定是否确认还是拒绝(是否重新入 queue)
- 消息成功被消费(消费的过程中没有抛异常),自动确认
- AmqpRejectAndDontRequeueException ,消息会被拒绝,且 requeue = false(不重新入队列)
- ImmediateAcknowledgeAmqpException,消费者会被确认
其他的异常,则消息会被拒绝,且 requeue = true(如果只有一个消费者监听该队列,死循环,多消费端也会造成资源的极大浪费,开发过程中一定要避免)
可以通过 setDefaultRequeueRejected(默认是 true)去设置
发送参数理解
RabbitMQ 3.0开始去掉了对 immediate 参数的支持
-
mandatory
交换器无法根据自身类型和路由键找到一个符合条件的队列时的处理方式
true: 调用 Basic.Return 命令将消息返回给生产者
false:把消息直接丢弃
-
immediate:true ,如果该消息关联的队列上有消费者,则立即投递,否则这条消息不存入队列;如果与路由键匹配的所有队列都没有消费者时,该消息会通过 Basic.Return 返回至生产者
-
props:消息属性集,持久化、优先级、投递模式、过期时间等
最终还是利用 channel.basicPublish () 方法,将 mandatory 设置为 true 来实现
confirm 机制
1
2
|
cachingConnectionFactory.setPublisherConfirms(true);
cachingConnectionFactory.setPublisherReturns(true);
|
区别:(confirm 保证达到交换机,return 保证交换机到达队列)
如果消息没有到 exchange
, 则 confirm
回调,ack=false
如果消息到达 exchange
, 则 confirm
回调,ack=true
exchange
到 queue
成功,则不回调 return
exchange
到 queue
失败,则回调 return
(需设置 mandatory=true
, 否则不回回调,消息就丢了)
- 注意:设置
PublisherReturns
状态为 true
,那么需要设置 rabbitTemplate.setMandatory(true);
具体如何保证消息不丢失,请参考 RabbitMQ 的消息不丢失机制
控制台发 json
properties 加 content_type=application/json
MQ 中间件 - rabbitmq-connection 以及 channel 的思考(连接池)
https://www.jianshu.com/p/24e541170ace
RabbitMQ 动态创建队列并发送消息
https://www.jianshu.com/p/4dfe6cf87549
https://www.cnblogs.com/gordonkong/default.html?page=4
https://www.jianshu.com/u/50d83346eaff
死信队列
实现消息在未被正常消费的场景下,对这些消息进行其他处理,保证消息不会被丢弃
消息会变成死信消息的场景
-
消息被(basic.reject() or basic.nack()) and requeue = false
,即被消费者拒绝签收,且重新入队为false
1.1 注意:消费者设置了自动ACK,当重复投递次数达到了设置的最大retry次数之后,消息也会投递到死信队列,但是最终还是调用nack
/reject
-
消息过期,> TTL
-
队列设置了x-max-length
最大消息数量且已达此值,再次投递,消息将被挤掉,被挤掉的是最靠近被消费那一端的消息