rabbitmq

ddatsh

dev #mq

基本概念

Publisher 发的消息通过 Connection 中的 Channel 到达 Broker 某个 Virtual Host

消息经过指定的 Exchange,根据 Binding 依据,分发到 0~n 个 Queue 中

Queue 中消息等待 Consumer 消费


Message:由消息头和消息体组成。消息头由一系列的可选属性组成,routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(消息持久性)等

Broker:MQ server

Connection: 客户端和Broker间的TCP连接

Channel:要为每个Connection创建Channel,通过 Channel才能执行AMQP命令,一个Connection可包含多个Channels

Exchange:接收生产者发送的消息,并根据Binding规则将消息路由给队列

Queue:存未被消费者消费的消息

Binding:联系Exchange与Queue,Binding后生成路由表

Exchange收到Message解析Header得到Routing Key,根据Routing Key与Exchange Type将Message路由到Queue

Virtual Host:类似于权限控制组,一个virtual host里面可以 若干个Exchange和Queue,权限控制的最小力度


AMQP 消息路由

AMQP 增加 Exchange 和 Binding 角色

Exchange type

不同类型路由行为不同,即时根据类型的不同分发策略有区别

Direct和Fanout类似于单播和广播模式,Topic支持自定义匹配规则,按照规则把所有满足条件的消息路由到指定队列

direct(默认),点对点队列

根据routing key全文匹配发到queue。这种模式可不声明exchange和binder,使用默认exchange ""

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
import org.junit.jupiter.api.*;
import org.junit.jupiter.api.extension.*;
import org.springframework.amqp.rabbit.core.*;
import org.springframework.beans.factory.annotation.*;
import org.springframework.boot.test.context.*;
import org.springframework.test.context.junit.jupiter.*;

@SpringBootTest
@ExtendWith(SpringExtension.class)
public class ProducerTest {

	@Autowired
	RabbitTemplate rabbitTemplate;

	@Test
	public void testSend() {
		
		rabbitTemplate.convertAndSend( "queueName", "message");
	}
}

Topic ,发布订阅模型

fanout

routingKey被忽略,为广播模式,消息都会被转发到与该交换器绑定的所有队列上

安装

RabbitMQ 默认启动时不开启任何插件

1
2
rabbitmq-plugins list
rabbitmq-plugins enable rabbitmq_management

默认用户名密码 guest/guest,只能通过本地访问

1
2
3
rabbitmqctl add_user admin Pass1234
rabbitmqctl set_user_tags admin administrator
rabbitmqctl set_permissions -p "/" admin "." "." ".*"

hello world

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
public static void main(String[] argv) throws Exception {

	String QUEUE_NAME = "hello";

	ConnectionFactory factory = new ConnectionFactory();
	factory.setHost("localhost");
	Connection connection = factory.newConnection();
	Channel    channel    = connection.createChannel();

	channel.queueDeclare(QUEUE_NAME, false, false, true, null);

	for (int i = 0; i < 10; ) {
		String message = "NO " + ++i;
		TimeUnit.MILLISECONDS.sleep(1000);
		channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
		System.out.printf("send to %s, %s\n", QUEUE_NAME, message);
	}

	channel.close();
	connection.close();
}

不同于 JMS,AMQP 允许编程方式创建绝大多数模型,Exchange、Queue 等

创建时自动忽略已存在的 Queue,也可能返回异常(如申明的 Queue 的属性与已存在的 Queue 的属性不一致时)

queueDeclare(指定 Queue 的属性)

basicPublish

第一个参数指定 Exchange ,空串为默认 Exchange,类型 direct

第二个参数指定 routineKey

direct 类型的 Exchange,消息发到所有绑定到该 Exchange 并且也设定了相同的路由键的 Queue 中

消费(default)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import com.rabbitmq.client.*;

import java.io.*;
import java.util.concurrent.*;

public class Receiver {

   private final static String QUEUE_NAME = "hello";

   public static void main(String[] argv) throws Exception {
      ConnectionFactory factory = new ConnectionFactory();
      factory.setHost("localhost");
      Connection connection = factory.newConnection();
      Channel channel = connection.createChannel();

      channel.queueDeclare(QUEUE_NAME, false, false, false, null);

      Consumer consumer = new DefaultConsumer(channel) {
         @Override
         public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
               throws IOException {
            String message = new String(body, "UTF-8");
            System.out.printf(" [    %2$s<===](%1$s) %3$s\n", "Receiver", QUEUE_NAME, message);
            try {
               TimeUnit.MILLISECONDS.sleep(500);
            } catch (InterruptedException e) {
            }
         }
      };
      channel.basicConsume(QUEUE_NAME, true, consumer);
   }
}

basicConsume

第一个参数指定 Consumer 消费的队列,即 hello 队列

第二个参数 autoAck 指定消息确认模式,true 表示消息确认是自动完成的(至少在进入 handleDelivery 方法前就已经自动确认了),false 表示必须由 Consumer 自己确认

第三个参数指定 Consumer

消费(pull)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import com.rabbitmq.client.*;

import java.util.concurrent.*;

public class ReceiverByPull {
	
	private final static String QUEUE_NAME = "hello";

	public static void main(String[] argv) throws Exception {
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("localhost");
		Connection connection = factory.newConnection();
		Channel channel = connection.createChannel();

		channel.queueDeclare(QUEUE_NAME, false, false, false, null);

		while (true) {
			GetResponse resp = channel.basicGet(QUEUE_NAME, true);
			if (resp == null) {
				System.out.println("Get Nothing!");
				TimeUnit.MILLISECONDS.sleep(1000);
			} else {
				String message = new String(resp.getBody(), "UTF-8");
				System.out.printf(" [    %2$s<===](%1$s) %3$s\n", "Receiver", QUEUE_NAME, message);
				TimeUnit.MILLISECONDS.sleep(500);
			}
		}
	}
}

Exchange & Binding

上例中的 Sender 只申明 hello 队列,然后就开始向默认 Exchange(“”) 发送routineKey也为 hello 的消息

AMQP 基本概念,消息到了 Exchange 后需要按照 Binding 提供的分发依据将消息分发到队列中

队列并不属于 Exchange,队列有自己的生命周期管理,与 Exchange 之间完全通过 Binding 关联

只要保证队列绑定到 Exchange 时使用的绑定键与消息发送时指定的 routineKey 一致就可以了

Exchange、队列和 Binding在rabbitmq声明过后,代码中不再声明3者关系,依然可以执行,只要 RabbitMQ 没有重启,这些模型将会一直生效

AMQP 模型到底是交给 Publisher 申明,还是交给 Consumer 申明,还是直接在 RabbitMQ 中预先创建,这是使用 RabbitMQ 时必须考虑的问题。没有统一结论,按照场景具体分析

默认 Exchange("")

每个队列都自动绑定到默认 Exchange,routineKey 为队列名称


rabbitmq-mock 方便本地调试

配置 mock.rabbitmq=true

1
2
3
4
5
6
7
8
9
<dependency>
  <groupId>com.github.fridujo</groupId>
  <artifactId>rabbitmq-mock</artifactId>
  <version>1.0.13</version>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import com.github.fridujo.rabbitmq.mock.*;
import org.springframework.amqp.rabbit.connection.*;
import org.springframework.amqp.rabbit.core.*;
import org.springframework.amqp.support.converter.*;
import org.springframework.boot.autoconfigure.condition.*;
import org.springframework.context.annotation.*;

@Configuration
public class AmqpConfig {
 
	@Bean
	public MessageConverter jsonMessageConverter() {

		return new Jackson2JsonMessageConverter();
	}

	@Bean
	public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
		RabbitTemplate template = new RabbitTemplate(connectionFactory);
		template.setMessageConverter(jsonMessageConverter());
		template.setMandatory(true);
		return template;
	}

	@Bean
	public RabbitAdmin admin(ConnectionFactory connectionFactory) {
		RabbitAdmin admin = new RabbitAdmin(connectionFactory);
		admin.setIgnoreDeclarationExceptions(true);
		return admin;
	}

	@Bean
	@ConditionalOnProperty(prefix = "mock", name = "rabbitmq", havingValue = "true")
	ConnectionFactory connectionFactory() {

		return new CachingConnectionFactory(new MockConnectionFactory());
	}
}

多线程消费同一队列

消费一条消息往往比产生一条消息慢,为防止消息积压,一般开启多个工作线程同时消费消息。RabbitMQ 中可以创建多个 Consumer 消费同一队列

RabbitMQ 默认轮询(round-robin)分发消息,每个消费者会得到相同数量的消息

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import java.util.concurrent.TimeUnit;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Sender {

   private static final String QUEUE_NAME = "tasks";

   private String name;

   public Sender(String name) {

      this.name = name;
   }

   public void work() throws Exception {

      ConnectionFactory factory = new ConnectionFactory();
      factory.setHost("localhost");
      Connection connection = factory.newConnection();
      Channel    channel    = connection.createChannel();

      channel.queueDeclare(QUEUE_NAME, false, false, false, null);

      for (int i = 0; i < 10; ) {
         String message = "NO. " + ++i;
         TimeUnit.MILLISECONDS.sleep(100);
         channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
         System.out.printf("(%1$s)[===>%2$s    ] %3$s\n", name, ":" + QUEUE_NAME, message);
      }

      channel.close();
      connection.close();
   }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import com.rabbitmq.client.*;

import java.io.*;
import java.util.concurrent.*;

public class Receiver {
   private static final String QUEUE_NAME = "tasks";

   private String name;

   private int sleepTime;

   public Receiver(String name, int sleepTime) {
      this.name = name;
      this.sleepTime = sleepTime;
   }

   public void work() throws Exception {
      ConnectionFactory factory = new ConnectionFactory();
      factory.setHost("localhost");
      Connection connection = factory.newConnection();
      Channel channel = connection.createChannel();

      channel.queueDeclare(QUEUE_NAME, false, false, false, null);

      Consumer consumer = new DefaultConsumer(channel) {
         @Override
         public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
               throws IOException {
            String message = new String(body, "UTF-8");
            System.out.printf(" [    %2$s<===](%1$s) %3$s\n", name, QUEUE_NAME, message);
            try {
               TimeUnit.MILLISECONDS.sleep(sleepTime);
            } catch (InterruptedException e) {
            }
         }
      };
      channel.basicConsume(QUEUE_NAME, true, consumer);
   }
}

消费速度相同

1
2
3
4
5
6
7
8
public static void main(String[] args) throws Exception {
   Receiver recv1 = new Receiver("A", 200);
   recv1.work();
   Receiver recv2 = new Receiver("B", 200);
   recv2.work();
   Sender sender = new Sender("S");
   sender.work();
}

出现慢速消费者

1
2
3
4
5
6
7
8
public static void main(String[] args) throws Exception {
   Receiver recv1 = new Receiver("A", 200);
   recv1.work();
   Receiver recv2 = new Receiver("B", 800);
   recv2.work();
   Sender sender = new Sender("S");
   sender.work();
}

B 依然分到了一半消息,需要运行很久才能处理完

Fair dispath 公平分发

默认RabbitMQ将第n(取余)个Message分发给第n个Consumer,不管Consumer是否还有unacked Message,即不公平

channel.basic_qos(prefetch_count=1)

按每个消费者的能力分配消息,联合使用 Qos 和 Acknowledge

从队列视角看,总会有一批消息已推送但尚未获得 ack 确认,Qos 的 prefetchCount 就是用来限制这批未确认消息数量

prefetchCount =1 时,只有在收到消费者发回的上一条消息 ack 确认后,才会向该消费者发送下一条消息prefetchCount 默认= 0,即没有限制,队列会将所有消息尽快发给消费者

消息从队列异步推送给消费者,消费者的 ack 也是异步发送给队列

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import java.io.*;
import java.util.concurrent.*;

public class QosAcknowledgeReceiver {

   private static final String QUEUE_NAME = "tasks";

   private String name;

   private int sleepTime;

   public QosAcknowledgeReceiver(String name, int sleepTime) {
      this.name = name;
      this.sleepTime = sleepTime;
   }

   public void work() throws Exception {
      ConnectionFactory factory = new ConnectionFactory();
      factory.setHost("localhost");
      Connection connection = factory.newConnection();
      final Channel channel = connection.createChannel();

      channel.queueDeclare(QUEUE_NAME, false, false, false, null);

      channel.basicQos(1);

      Consumer consumer = new DefaultConsumer(channel) {
         @Override
         public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
               throws IOException {
            String message = new String(body, "UTF-8");
            System.out.printf(" [    %2$s<===](%1$s) %3$s\n", name, QUEUE_NAME, message);
            try {
               TimeUnit.MILLISECONDS.sleep(sleepTime);
            } catch (InterruptedException e) {
            }
            channel.basicAck(envelope.getDeliveryTag(), false);
         }
      };
      channel.basicConsume(QUEUE_NAME, false, consumer);
   }
}
1
2
3
4
5
6
7
8
public static void main(String[] args) throws Exception {
   QosAcknowledgeReceiver recv1 = new QosAcknowledgeReceiver("A", 200);
   recv1.work();
   QosAcknowledgeReceiver recv2 = new QosAcknowledgeReceiver("B", 800);
   recv2.work();
   Sender sender = new Sender("S");
   sender.work();
}
1
2
3
4
5
6
7
Channel channel = ...;
Consumer consumer1 = ...;
Consumer consumer2 = ...;
channel.basicQos(10, false); // Per consumer limit
channel.basicQos(15, true);  // Per channel limit
channel.basicConsume("my-queue1", false, consumer1);
channel.basicConsume("my-queue2", false, consumer2);

basicQos 特性

消息发送确认/消息接收确认(ACK)

默认 Message 被消费者正确接收,从 Queue 中移除

发送确认

消息无法路由到队列,确认消息路由失败

消息成功路由时,当需要发送的队列都发送成功后,进行确认消息

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.*;
import org.springframework.amqp.rabbit.core.*;
import org.springframework.stereotype.*;

import javax.annotation.*;

@Component
public class RabbitTemplateConfig implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {

	private final RabbitTemplate rabbitTemplate;

	public RabbitTemplateConfig(RabbitTemplate rabbitTemplate) {

		this.rabbitTemplate = rabbitTemplate;
	}

	@PostConstruct
	public void init() {

		//指定 ConfirmCallback,ReturnCallback
		rabbitTemplate.setConfirmCallback(this);
		rabbitTemplate.setReturnCallback(this);
		
	}

	@Override
	public void confirm(CorrelationData correlationData, boolean ack, String cause) {

		System.out.println("消息唯一标识:" + correlationData);
		System.out.println("确认结果:" + ack);
		System.out.println("失败原因:" + cause);
	}

	@Override
	public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
		System.out.println("消息主体 message : "+message);
		System.out.println("消息主体 message : "+replyCode);
		System.out.println("描述:"+replyText);
		System.out.println("消息使用的交换器 exchange : "+exchange);
		System.out.println("消息使用的路由键 routing : "+routingKey);
	}
}

还要配置

1
2
3
4
spring:
  rabbitmq:
    publisher-confirms: true
    publisher-returns: true

接收确认

消息通过 ACK 确认是否被正确接收,每个 Message 都要被确认(acknowledged),可手动或自动 ACK

可在业务失败后进行一些操作,如果消息未被 ACK 则 发送到下一个消费者

如果某个服务忘记 ACK,则 RabbitMQ 不会再发送数据给它,认为该服务的处理能力有限(Prefetch)

ACK 机制还可起到 限流 作用(如在接收到某条消息时休眠几秒钟)

消息确认模式:

确认消息

默认消费者自动 ack (确认)消息,手动 ack(确认)要修改确认模式为 manual

1
2
3
4
5
spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual

1
2
3
4
5
6
7
8
	@Bean
	public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
		SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
		factory.setConnectionFactory(connectionFactory);
		factory.setMessageConverter(jsonMessageConverter());
		factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
		return factory;
	}
1
2
3
4
5
6
7
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "q", durable = "true"),
                                         exchange = @Exchange(value = "demo", durable = "true", type = ExchangeTypes.TOPIC),
                                         key = "test"))
	public void processMessage(Map body, Message message, Channel channel) throws Exception {

		System.out.println(JSON.toJSONString(body));
	}

basicAck 需要两个参数

否认、拒绝消息

发送一个 header 中包含 error 的消息

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
public void processMessage(Map message, Channel channel,@Headers Map<String,Object> map) {
		System.out.println(JSON.toJSONString(message));
		if (map.containsKey("error")){
			System.out.println("错误的消息");
			try {
				channel.basicNack((Long)map.get(AmqpHeaders.DELIVERY_TAG),false,true); 
				return;
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
		try {
			channel.basicAck((Long)map.get(AmqpHeaders.DELIVERY_TAG),false);
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

此时控制台重复打印,说明该消息被 nack 后一直重新入队列然后一直重新消费

也可以拒绝该消息,消息会被丢弃,不会重回队列

1
channel.basicReject((Long)map.get(AmqpHeaders.DELIVERY_TAG),false);        //拒绝消息

AcknowledgeMode.AUTO ,根据方法的执行情况来决定是否确认还是拒绝(是否重新入 queue)

其他的异常,则消息会被拒绝,且 requeue = true(如果只有一个消费者监听该队列,死循环,多消费端也会造成资源的极大浪费,开发过程中一定要避免)

可以通过 setDefaultRequeueRejected(默认是 true)去设置

发送参数理解

RabbitMQ 3.0开始去掉了对 immediate 参数的支持

最终还是利用 channel.basicPublish () 方法,将 mandatory 设置为 true 来实现

confirm 机制

1
2
cachingConnectionFactory.setPublisherConfirms(true);
cachingConnectionFactory.setPublisherReturns(true);

区别:(confirm 保证达到交换机,return 保证交换机到达队列) 如果消息没有到 exchange, 则 confirm 回调,ack=false 如果消息到达 exchange, 则 confirm 回调,ack=true exchangequeue 成功,则不回调 return exchangequeue 失败,则回调 return(需设置 mandatory=true, 否则不回回调,消息就丢了)

控制台发 json

properties 加 content_type=application/json

MQ 中间件 - rabbitmq-connection 以及 channel 的思考(连接池)

https://www.jianshu.com/p/24e541170ace

RabbitMQ 动态创建队列并发送消息

https://www.jianshu.com/p/4dfe6cf87549

https://www.cnblogs.com/gordonkong/default.html?page=4

https://www.jianshu.com/u/50d83346eaff

RabbitMQ 发布订阅-实现延时重试队列

RabbitMQ 高可用之镜像队列

RabbitMQ 延时消息设计

死信队列

实现消息在未被正常消费的场景下,对这些消息进行其他处理,保证消息不会被丢弃

消息会变成死信消息的场景

  1. 消息被(basic.reject() or basic.nack()) and requeue = false,即被消费者拒绝签收,且重新入队为false

    1.1 注意:消费者设置了自动ACK,当重复投递次数达到了设置的最大retry次数之后,消息也会投递到死信队列,但是最终还是调用nack/reject

  2. 消息过期,> TTL

  3. 队列设置了x-max-length最大消息数量且已达此值,再次投递,消息将被挤掉,被挤掉的是最靠近被消费那一端的消息