rabbbitmq 堆积分析

ddatsh

dev #mq

业务洗数据,binlog变更发至rabbitmq, 堵死现象分析

事件

业务 sql 洗大表数据,产生大量binlog,发往rabbitmq

rabbitmq node节点CPU跑满

整个rabbitmq集群不可用

node 2、3是ram 节点(事后运维说观察到上图CPU爆满,才将disk节点改成ram,正是由此,整个rabbitmq集群不可用)

由于node 2 3变成ram节点,大量堵着未消费完的数据,还未同步结束,node 2内存占满

本次解决方案:暴力重启整个rabbitmq集群

发现堆积的数据直接消失


相关解决方案

业务经常会面临此问题,消息生产太快,消费不过来,导致队列堆积很长,把服务器内存耗尽,这时RabbitMQ的处理能力很低下

社区总结起来解决方案大体包括:


rabbitmq 原理


Rabbitmq的中处理队列收发逻辑的是一个有穷状态机进程,它对消息的处理流程可以概括为

  1. 既有生产者也有消费者时,状态机处理流程 :接收消息->持久化->发送消息->接收消息 –> … ->

    流控机制的控制下,收发速率能够保持基本一致,队列中堆积的消息数会非常低

  2. 没有消费者时,处理流程如橙色线条,MQ会持续接收消息并持久化直到磁盘被写满,因为没有发送逻辑,这时可以达到更高的生产速率

  3. 有消息堆积时,处理流程如绿色线条,MQ会持续从队列中取出堆积的消息将其发送出去,直到

    • 没有了堆积消息
    • 或消费者的qos被用光
    • 或没有消费者
    • 或消费者的channel被阻塞

    如果一直没有满足上述4个条件之一,MQ就会持续的发送堆积消息,不去处理新来的消息,在流控机制的作用下,发送端就被阻塞了

总结

从上述描述可以看出,消息堆积后,发送速率降低是MQ的处理流程使然,不是bug

这样的流程设计基于以下两个原因:

  1. 让堆积的消息更快的被消费掉,降低消息的时延

  2. MQ中堆积的消息越少,每个消息处理的平均开销就越少,可以提高整体性能,所以需要尽快将堆积消息发送出去

应对措施

  1. 设置合适的qos值,当qos值被用光,而新的ack未被mq接收时,就可以跳出发送循环,去接收新的消息
  2. 消费者主动block接收进程,消费者感知到接收消息的速度过快时,主动block,利用block与unblock方法调节接收速率。当接收进程被block时,mq跳出发送循环

性能调优

避免雷区

避免流控机制触发

消息大小不要超过4MB

消息包大小由1K到10MB,当包大小达到4.5MB时,服务器的性能出现明显的异常,传输率尤其是每秒订阅消息的数量,出现波动,不稳定;同时有一部分订阅者的TCP连接出现断开的现象。可能是客户端底层或者RabbitMQ服务端在进行拆包,组包的时候,出现了明显的压力,而导致异常的发生

磁盘也可能形成瓶颈

磁盘也可能形成瓶颈,如果单台机器队列很多,确认只在必要时才使用duration(持久化),避免把磁盘跑满;

队列的消息大量累积后

队列的消息大量累积后,发送和消费速度都会受到影响,导致服务进一步恶化,可采用的方法是,额外的脚本监控每个队列的消息数,超过限额会执行purge操作,简单粗暴但是有效的保证了服务稳定

单机限制

由于用线程模拟大量发布者,且是服务器单节点,受客户端主机网卡的限制,发布线程没有速度控制,导致有大量数据发送,服务器带宽下行速率也满负荷,上行带宽却明显低于下行速率,导致服务器内存有大量消息堆积,进而触发RabbitMQ服务器paging操作,才出现了上述不稳定和订阅者断开现象

对发布端做适当流量控制,断开连接现象不再出现,但每秒消息数仍然不稳定

模式对性能的影响

分析三种模式 direct fanout topic

不同的模式对于新建交换机、新建队列、绑定等操作性能影响不大,

但是在direct模式下明显消息发布的性能比其他模式强很多,并且消息发送到相同队列比发送到不同队列性能稍好

持久化对消息性能的影响参考

在消息持久化模式下:

发布:13888msg/s 
订阅:15384msg/s

在消息非持久化模式下:

发布:18867msg/s 
订阅:26315msg/s

问题分析/解决方案

问题分析:

RabbitMQ内存占用已经 7.8G 允许的值为 6G左右

vm_memory_high_watermark 是0.4 也就是物理内存的40% ;服务器为16G * 40% = 6.4G

一般在产生的原因是长期的生产者发送速率大于消费者消费速率导致. 触发了RabbitMQ 的流控

解决方案:

  1. 增加消费者端的消费能力,或者增加消费者(根本解决)
  2. 控制消息产生者端的发送速率(不太现实)
  3. 增加mq的内存(治标不治本)

参数调优

Java

如何优化消费端处理缓慢造成的消息堆积问题?

默认情况下,rabbitmq消费者为单线程串行消费,这也是队列的特性

源码在org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer中

private volatile int prefetchCount = 1;
private volatile int concurrentConsumers = 1;

可以通过设置并发消费提高消费的速率,从而减少消息堆积的问题

protected int initializeConsumers() {
		int count = 0;
		synchronized (this.consumersMonitor) {
			if (this.consumers == null) {
				this.cancellationLock.reset();
				this.consumers = new HashSet<BlockingQueueConsumer>(this.concurrentConsumers);
				for (int i = 0; i < this.concurrentConsumers; i++) {
					BlockingQueueConsumer consumer = createBlockingQueueConsumer();
					this.consumers.add(consumer);
					count++;
				}
			}
		}
		return count;
	}

prefetchCount是BlockingQueueConsumer内部维护的一个阻塞队列LinkedBlockingQueue的大小,其作用就是如果某个消费者队列阻塞,就无法接收新的消息,该消息会发送到其它未阻塞的消费者