rabbbitmq

发布于
mq

rabbitmq 记录

安装相关

RabbitMQ 默认启动时不开启任何插件

rabbitmq-plugins list
rabbitmq-plugins enable rabbitmq_management

默认用户 guest/guest,只能通过本地访问

rabbitmqctl add_user admin Pass1234
rabbitmqctl set_user_tags admin administrator
rabbitmqctl set_permissions -p "/" admin "." "." ".*"

概念

Publisher 发送的消息通过 Connection 中的 Channel 到达 Broker 的某个 Virtual Host

消息经过指定的 Exchange,根据 Binding 提供的分发依据,分发到 0~n 个 Queue 中

Queue 中的消息等待 Consumer 消费

https://www.rabbitmq.com/tutorials/amqp-concepts.html

https://www.rabbitmq.com/amqp-0-9-1-quickref.html


hello world

发送

import java.util.concurrent.TimeUnit;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Sender {

	private static final String QUEUE_NAME = "hello";

	public static void main(String[] argv) throws Exception {
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("localhost");
		Connection connection = factory.newConnection();
		Channel channel = connection.createChannel();

		channel.queueDeclare(QUEUE_NAME, false, false, false, null);

		for (int i = 0; i < 10;) {
			String message = "NO. " + ++i;
			TimeUnit.MILLISECONDS.sleep(1000);
			channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
			System.out.printf("(%1$s)[===>%2$s    ] %3$s\n", "Sender", ":" + QUEUE_NAME, message);
		}

		channel.close();
		connection.close();
	}
}

不同于 JMS,AMQP 允许编程方式创建绝大多数模型,Exchange、Queue 等

创建时自动忽略已存在的 Queue,也可能返回异常(如申明的 Queue 的属性与已存在的 Queue 的属性不一致时)

queueDeclare(指定 Queue 的属性)

  • durable:队列本身是否需要持久化。 false,RabbitMQ 重启后该 Queue 消失
  • exclusive:确保该队列只对申明它的连接可见,注意是连接而不是 Channel。当相应连接关闭时,该队列自动删除
  • autoDelete:true 时队列会在所有 Consumer 都断开连接时自动删除(不管是否是 durable)。队列被第一个 Consumer 连接前不会被删除

basicPublish

第一个参数指定 Exchange ,空串为默认 Exchange,类型 direct

第二个参数指定 routineKey

direct 类型的 Exchange,消息发到所有绑定到该 Exchange 并且也设定了相同的路由键的 Queue 中

消费(default)

import com.rabbitmq.client.*;

import java.io.*;
import java.util.concurrent.*;

public class Receiver {

   private final static String QUEUE_NAME = "hello";

   public static void main(String[] argv) throws Exception {
      ConnectionFactory factory = new ConnectionFactory();
      factory.setHost("localhost");
      Connection connection = factory.newConnection();
      Channel channel = connection.createChannel();

      channel.queueDeclare(QUEUE_NAME, false, false, false, null);

      Consumer consumer = new DefaultConsumer(channel) {
         @Override
         public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
               throws IOException {
            String message = new String(body, "UTF-8");
            System.out.printf(" [    %2$s<===](%1$s) %3$s\n", "Receiver", QUEUE_NAME, message);
            try {
               TimeUnit.MILLISECONDS.sleep(500);
            } catch (InterruptedException e) {
            }
         }
      };
      channel.basicConsume(QUEUE_NAME, true, consumer);
   }
}

basicConsume

第一个参数指定 Consumer 消费的队列,即 hello 队列

第二个参数 autoAck 指定消息确认模式,true 表示消息确认是自动完成的(至少在进入 handleDelivery 方法前就已经自动确认了),false 表示必须由 Consumer 自己确认

第三个参数指定 Consumer

消费(pull)

import com.rabbitmq.client.*;

import java.util.concurrent.*;

public class ReceiverByPull {
	
	private final static String QUEUE_NAME = "hello";

	public static void main(String[] argv) throws Exception {
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("localhost");
		Connection connection = factory.newConnection();
		Channel channel = connection.createChannel();

		channel.queueDeclare(QUEUE_NAME, false, false, false, null);

		while (true) {
			GetResponse resp = channel.basicGet(QUEUE_NAME, true);
			if (resp == null) {
				System.out.println("Get Nothing!");
				TimeUnit.MILLISECONDS.sleep(1000);
			} else {
				String message = new String(resp.getBody(), "UTF-8");
				System.out.printf(" [    %2$s<===](%1$s) %3$s\n", "Receiver", QUEUE_NAME, message);
				TimeUnit.MILLISECONDS.sleep(500);
			}
		}
	}
}

Exchange & Binding

上例中的 Sender 只申明 hello 队列,然后就开始向默认 Exchange(“”) 发送routineKey也为 hello 的消息

AMQP 基本概念,消息到了 Exchange 后需要按照 Binding 提供的分发依据将消息分发到队列中

队列并不属于 Exchange,队列有自己的生命周期管理,与 Exchange 之间完全通过 Binding 关联

只要保证队列绑定到 Exchange 时使用的绑定键与消息发送时指定的 routineKey 一致就可以了

Exchange、队列和 Binding在rabbitmq声明过后,代码中不再声明3者关系,依然可以执行,只要 RabbitMQ 没有重启,这些模型将会一直生效

AMQP 模型到底是交给 Publisher 申明,还是交给 Consumer 申明,还是直接在 RabbitMQ 中预先创建,这是使用 RabbitMQ 时必须考虑的问题。没有统一结论,按照场景具体分析

默认 Exchange(“”)

每个队列都自动绑定到默认 Exchange,routineKey 为队列名称


rabbitmq-mock 方便本地调试

配置 mock.rabbitmq=true

<dependency>
  <groupId>com.github.fridujo</groupId>
  <artifactId>rabbitmq-mock</artifactId>
  <version>1.0.13</version>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
import com.github.fridujo.rabbitmq.mock.*;
import org.springframework.amqp.rabbit.connection.*;
import org.springframework.amqp.rabbit.core.*;
import org.springframework.amqp.support.converter.*;
import org.springframework.boot.autoconfigure.condition.*;
import org.springframework.context.annotation.*;

@Configuration
public class AmqpConfig {
 
	@Bean
	public MessageConverter jsonMessageConverter() {

		return new Jackson2JsonMessageConverter();
	}

	@Bean
	public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
		RabbitTemplate template = new RabbitTemplate(connectionFactory);
		template.setMessageConverter(jsonMessageConverter());
		template.setMandatory(true);
		return template;
	}

	@Bean
	public RabbitAdmin admin(ConnectionFactory connectionFactory) {
		RabbitAdmin admin = new RabbitAdmin(connectionFactory);
		admin.setIgnoreDeclarationExceptions(true);
		return admin;
	}

	@Bean
	@ConditionalOnProperty(prefix = "mock", name = "rabbitmq", havingValue = "true")
	ConnectionFactory connectionFactory() {

		return new CachingConnectionFactory(new MockConnectionFactory());
	}
}

多线程消费同一队列

消费一条消息往往比产生一条消息慢,为防止消息积压,一般开启多个工作线程同时消费消息。RabbitMQ 中可以创建多个 Consumer 消费同一队列

RabbitMQ 默认轮询(round-robin)分发消息,每个消费者会得到相同数量的消息

import java.util.concurrent.TimeUnit;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Sender {

   private static final String QUEUE_NAME = "tasks";

   private String name;

   public Sender(String name) {

      this.name = name;
   }

   public void work() throws Exception {

      ConnectionFactory factory = new ConnectionFactory();
      factory.setHost("localhost");
      Connection connection = factory.newConnection();
      Channel    channel    = connection.createChannel();

      channel.queueDeclare(QUEUE_NAME, false, false, false, null);

      for (int i = 0; i < 10; ) {
         String message = "NO. " + ++i;
         TimeUnit.MILLISECONDS.sleep(100);
         channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
         System.out.printf("(%1$s)[===>%2$s    ] %3$s\n", name, ":" + QUEUE_NAME, message);
      }

      channel.close();
      connection.close();
   }
}
import com.rabbitmq.client.*;

import java.io.*;
import java.util.concurrent.*;

public class Receiver {
   private static final String QUEUE_NAME = "tasks";

   private String name;

   private int sleepTime;

   public Receiver(String name, int sleepTime) {
      this.name = name;
      this.sleepTime = sleepTime;
   }

   public void work() throws Exception {
      ConnectionFactory factory = new ConnectionFactory();
      factory.setHost("localhost");
      Connection connection = factory.newConnection();
      Channel channel = connection.createChannel();

      channel.queueDeclare(QUEUE_NAME, false, false, false, null);

      Consumer consumer = new DefaultConsumer(channel) {
         @Override
         public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
               throws IOException {
            String message = new String(body, "UTF-8");
            System.out.printf(" [    %2$s<===](%1$s) %3$s\n", name, QUEUE_NAME, message);
            try {
               TimeUnit.MILLISECONDS.sleep(sleepTime);
            } catch (InterruptedException e) {
            }
         }
      };
      channel.basicConsume(QUEUE_NAME, true, consumer);
   }
}

消费速度相同

public static void main(String[] args) throws Exception {
   Receiver recv1 = new Receiver("A", 200);
   recv1.work();
   Receiver recv2 = new Receiver("B", 200);
   recv2.work();
   Sender sender = new Sender("S");
   sender.work();
}

出现慢速消费者

public static void main(String[] args) throws Exception {
   Receiver recv1 = new Receiver("A", 200);
   recv1.work();
   Receiver recv2 = new Receiver("B", 800);
   recv2.work();
   Sender sender = new Sender("S");
   sender.work();
}

B 依然分到了一半消息,需要运行很久才能处理完

公平分发(fair dispatch)

按每个消费者的能力分配消息,联合使用 Qos 和 Acknowledge

从队列视角看,总会有一批消息已推送但尚未获得 ack 确认,Qos 的 prefetchCount 就是用来限制这批未确认消息数量的

prefetchCount =1 时,只有在收到消费者发回的上一条消息 ack 确认后,才会向该消费者发送下一条消息prefetchCount 默认= 0,即没有限制,队列会将所有消息尽快发给消费者

消息从队列异步推送给消费者,消费者的 ack 也是异步发送给队列

import java.io.*;
import java.util.concurrent.*;

public class QosAcknowledgeReceiver {

   private static final String QUEUE_NAME = "tasks";

   private String name;

   private int sleepTime;

   public QosAcknowledgeReceiver(String name, int sleepTime) {
      this.name = name;
      this.sleepTime = sleepTime;
   }

   public void work() throws Exception {
      ConnectionFactory factory = new ConnectionFactory();
      factory.setHost("localhost");
      Connection connection = factory.newConnection();
      final Channel channel = connection.createChannel();

      channel.queueDeclare(QUEUE_NAME, false, false, false, null);

      channel.basicQos(1);

      Consumer consumer = new DefaultConsumer(channel) {
         @Override
         public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
               throws IOException {
            String message = new String(body, "UTF-8");
            System.out.printf(" [    %2$s<===](%1$s) %3$s\n", name, QUEUE_NAME, message);
            try {
               TimeUnit.MILLISECONDS.sleep(sleepTime);
            } catch (InterruptedException e) {
            }
            channel.basicAck(envelope.getDeliveryTag(), false);
         }
      };
      channel.basicConsume(QUEUE_NAME, false, consumer);
   }
}
public static void main(String[] args) throws Exception {
   QosAcknowledgeReceiver recv1 = new QosAcknowledgeReceiver("A", 200);
   recv1.work();
   QosAcknowledgeReceiver recv2 = new QosAcknowledgeReceiver("B", 800);
   recv2.work();
   Sender sender = new Sender("S");
   sender.work();
}
Channel channel = ...;
Consumer consumer1 = ...;
Consumer consumer2 = ...;
channel.basicQos(10, false); // Per consumer limit
channel.basicQos(15, true);  // Per channel limit
channel.basicConsume("my-queue1", false, consumer1);
channel.basicConsume("my-queue2", false, consumer2);

basicQos 特性( https://www.rabbitmq.com/consumer-prefetch.html)

  • basicQos 中 prefetchSize 参数通过消息的总字节数来限制队列推送消息的速度
  • prefetchSize 与 prefetchCount 可以同时设置,达到任何一个限制,则队列暂停推送消息
  • global 参数表示前两个参数的作用域,true 表示限制是针对信道的,false 表示限制是针对消费者的
  • 可以对同一个信道同时设置 global 为 true 和 false 的 Qos,表示队列要考虑每个消费者的限制,同时还要考虑整个信道的限制

消息发送确认/消息接收确认(ACK)

默认 Message 被消费者正确接收,从 Queue 中移除

发送确认

消息无法路由到队列,确认消息路由失败

消息成功路由时,当需要发送的队列都发送成功后,进行确认消息

  • 持久化队列:写入磁盘
  • 镜像队列:所有镜像接收成功

    import org.springframework.amqp.core.*;
    import org.springframework.amqp.rabbit.connection.*;
    import org.springframework.amqp.rabbit.core.*;
    import org.springframework.stereotype.*;
    
    import javax.annotation.*;
    
    @Component
    public class RabbitTemplateConfig implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {
    
    	private final RabbitTemplate rabbitTemplate;
    
    	public RabbitTemplateConfig(RabbitTemplate rabbitTemplate) {
    
    		this.rabbitTemplate = rabbitTemplate;
    	}
    
    	@PostConstruct
    	public void init() {
    
    		//指定 ConfirmCallback,ReturnCallback
    		rabbitTemplate.setConfirmCallback(this);
    		rabbitTemplate.setReturnCallback(this);
    		
    	}
    
    	@Override
    	public void confirm(CorrelationData correlationData, boolean ack, String cause) {
    
    		System.out.println("消息唯一标识:" + correlationData);
    		System.out.println("确认结果:" + ack);
    		System.out.println("失败原因:" + cause);
    	}
    
    	@Override
    	public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
    		System.out.println("消息主体 message : "+message);
    		System.out.println("消息主体 message : "+replyCode);
    		System.out.println("描述:"+replyText);
    		System.out.println("消息使用的交换器 exchange : "+exchange);
    		System.out.println("消息使用的路由键 routing : "+routingKey);
    	}
    }
    

还要配置

spring:
  rabbitmq:
    publisher-confirms: true
    publisher-returns: true

接收确认

消息通过 ACK 确认是否被正确接收,每个 Message 都要被确认(acknowledged),可手动或自动 ACK

  • 自动(消息发给消费者后立即确认,丢失消息的可能):

消费端消费逻辑抛出异常(没有处理成功这条消息),相当于丢失了消息

消息已被处理,但后续代码抛出异常,Spring 事务回滚,同样造成实际意义的消息丢失

  • 手动 ( 消费者调用 ack、nack、reject 进行确认)

可在业务失败后进行一些操作,如果消息未被 ACK 则 发送到下一个消费者

如果某个服务忘记 ACK,则 RabbitMQ 不会再发送数据给它,认为该服务的处理能力有限(Prefetch)

ACK 机制还可起到 限流 作用(如在接收到某条消息时休眠几秒钟)

消息确认模式:

  • AcknowledgeMode.NONE:自动确认
  • AcknowledgeMode.AUTO:根据情况确认
  • AcknowledgeMode.MANUAL:手动确认

确认消息

默认消费者自动 ack (确认)消息,手动 ack(确认)要修改确认模式为 manual

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual

	@Bean
	public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
		SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
		factory.setConnectionFactory(connectionFactory);
		factory.setMessageConverter(jsonMessageConverter());
		factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
		return factory;
	}
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "q", durable = "true"),
                                         exchange = @Exchange(value = "demo", durable = "true", type = ExchangeTypes.TOPIC),
                                         key = "test"))
	public void processMessage(Map body, Message message, Channel channel) throws Exception {

		System.out.println(JSON.toJSONString(body));
	}

basicAck 需要两个参数

  • deliveryTag(唯一标识 ID):RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,单调递增正整数,delivery tag 的范围仅限于 Channel
  • multiple:为了减少网络流量,手动确认可以被批处理,true 时可一次性确认 delivery_tag 小于等于传入值的所有消息

否认、拒绝消息

发送一个 header 中包含 error 的消息

public void processMessage(Map message, Channel channel,@Headers Map<String,Object> map) {
		System.out.println(JSON.toJSONString(message));
		if (map.containsKey("error")){
			System.out.println("错误的消息");
			try {
				channel.basicNack((Long)map.get(AmqpHeaders.DELIVERY_TAG),false,true); 
				return;
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
		try {
			channel.basicAck((Long)map.get(AmqpHeaders.DELIVERY_TAG),false);
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

此时控制台重复打印,说明该消息被 nack 后一直重新入队列然后一直重新消费

也可以拒绝该消息,消息会被丢弃,不会重回队列

channel.basicReject((Long)map.get(AmqpHeaders.DELIVERY_TAG),false);        //拒绝消息

AcknowledgeMode.AUTO ,根据方法的执行情况来决定是否确认还是拒绝(是否重新入 queue)

  • 消息成功被消费(消费的过程中没有抛异常),自动确认
  • AmqpRejectAndDontRequeueException ,消息会被拒绝,且 requeue = false(不重新入队列)
  • ImmediateAcknowledgeAmqpException,消费者会被确认

其他的异常,则消息会被拒绝,且 requeue = true(如果只有一个消费者监听该队列,死循环,多消费端也会造成资源的极大浪费,开发过程中一定要避免)

可以通过 setDefaultRequeueRejected(默认是 true)去设置

发送参数理解

RabbitMQ 3.0开始去掉了对 immediate 参数的支持

  • mandatory

交换器无法根据自身类型和路由键找到一个符合条件的队列时的处理方式 true: 调用 Basic.Return 命令将消息返回给生产者 false:把消息直接丢弃

  • immediate:true ,如果该消息关联的队列上有消费者,则立即投递,否则这条消息不存入队列;如果与路由键匹配的所有队列都没有消费者时,该消息会通过 Basic.Return 返回至生产者

  • props:消息属性集,持久化、优先级、投递模式、过期时间等

最终还是利用 channel.basicPublish () 方法,将 mandatory 设置为 true 来实现

confirm 机制

cachingConnectionFactory.setPublisherConfirms(true);
cachingConnectionFactory.setPublisherReturns(true);

区别:(confirm 保证达到交换机,return 保证交换机到达队列) 如果消息没有到 exchange, 则 confirm 回调,ack=false 如果消息到达 exchange, 则 confirm 回调,ack=true exchangequeue 成功,则不回调 return exchangequeue 失败,则回调 return(需设置 mandatory=true, 否则不回回调,消息就丢了)

  • 注意:设置 PublisherReturns 状态为 true,那么需要设置 rabbitTemplate.setMandatory(true); 具体如何保证消息不丢失,请参考 RabbitMQ 的消息不丢失机制

控制台发 json

properties 加 content_type=application/json

MQ 中间件 - rabbitmq-connection 以及 channel 的思考(连接池)

https://www.jianshu.com/p/24e541170ace

RabbitMQ 动态创建队列并发送消息

https://www.jianshu.com/p/4dfe6cf87549

https://www.cnblogs.com/gordonkong/default.html?page=4

https://www.jianshu.com/u/50d83346eaff