rabbitmq
发布于
mq

基本概念

Publisher 发的消息通过 Connection 中的 Channel 到达 Broker 某个 Virtual Host

消息经过指定的 Exchange,根据 Binding 依据,分发到 0~n 个 Queue 中

Queue 中消息等待 Consumer 消费


Broker:MQ server

Connection: 客户端和Broker间的TCP连接

Channel:要为每个Connection创建Channel,通过 Channel才能执行AMQP命令,一个Connection可包含多个Channels

Exchange:接收生产者发送的消息,并根据Binding规则将消息路由给队列

Queue:存未被消费者消费的消息

Binding:联系Exchange与Queue,Binding后生成路由表

Exchange收到Message解析Header得到Routing Key,根据Routing Key与Exchange Type将Message路由到Queue

  • Binding Key

    由Consumer在绑定Exchange与Queue时指定

  • Routing Key

    由Producer发送Message时指定,两者的匹配方式由Exchange Type决定

Virtual Host:类似于权限控制组,一个virtual host里面可以 若干个Exchange和Queue,权限控制的最小力度

Exchange type

不同类型路由行为不同

Direct和Fanout类似于单播和广播模式,Topic支持自定义匹配规则,按照规则把所有满足条件的消息路由到指定队列

direct(默认),点对点队列

根据routing key全文匹配发到queue。这种模式可不声明exchange和binder,使用默认exchange ""

import org.junit.jupiter.api.*;
import org.junit.jupiter.api.extension.*;
import org.springframework.amqp.rabbit.core.*;
import org.springframework.beans.factory.annotation.*;
import org.springframework.boot.test.context.*;
import org.springframework.test.context.junit.jupiter.*;

@SpringBootTest
@ExtendWith(SpringExtension.class)
public class ProducerTest {

	@Autowired
	RabbitTemplate rabbitTemplate;

	@Test
	public void testSend() {
		
		rabbitTemplate.convertAndSend( "queueName", "message");
	}
}

Topic ,发布订阅模型

fanout

任何发送到Fanout Exchange的消息都会被转发到与该Exchange绑定(Binding)的所有队列上,routingKey被忽略,为广播模式

安装

RabbitMQ 默认启动时不开启任何插件

rabbitmq-plugins list
rabbitmq-plugins enable rabbitmq_management

默认用户名密码 guest/guest,只能通过本地访问

rabbitmqctl add_user admin Pass1234
rabbitmqctl set_user_tags admin administrator
rabbitmqctl set_permissions -p "/" admin "." "." ".*"

hello world

public static void main(String[] argv) throws Exception {

	String QUEUE_NAME = "hello";

	ConnectionFactory factory = new ConnectionFactory();
	factory.setHost("localhost");
	Connection connection = factory.newConnection();
	Channel    channel    = connection.createChannel();

	channel.queueDeclare(QUEUE_NAME, false, false, true, null);

	for (int i = 0; i < 10; ) {
		String message = "NO " + ++i;
		TimeUnit.MILLISECONDS.sleep(1000);
		channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
		System.out.printf("send to %s, %s\n", QUEUE_NAME, message);
	}

	channel.close();
	connection.close();
}

不同于 JMS,AMQP 允许编程方式创建绝大多数模型,Exchange、Queue 等

创建时自动忽略已存在的 Queue,也可能返回异常(如申明的 Queue 的属性与已存在的 Queue 的属性不一致时)

queueDeclare(指定 Queue 的属性)

  • durable:队列本身是否需要持久化。 false,RabbitMQ 重启后该 Queue 消失
  • exclusive:确保该队列只对申明它的连接可见,注意是连接而不是 Channel。当相应连接关闭时,该队列自动删除
  • autoDelete:true 时队列会在所有 Consumer 都断开连接时自动删除(不管是否是 durable)。队列被第一个 Consumer 连接前不会被删除

basicPublish

第一个参数指定 Exchange ,空串为默认 Exchange,类型 direct

第二个参数指定 routineKey

direct 类型的 Exchange,消息发到所有绑定到该 Exchange 并且也设定了相同的路由键的 Queue 中

消费(default)

import com.rabbitmq.client.*;

import java.io.*;
import java.util.concurrent.*;

public class Receiver {

   private final static String QUEUE_NAME = "hello";

   public static void main(String[] argv) throws Exception {
      ConnectionFactory factory = new ConnectionFactory();
      factory.setHost("localhost");
      Connection connection = factory.newConnection();
      Channel channel = connection.createChannel();

      channel.queueDeclare(QUEUE_NAME, false, false, false, null);

      Consumer consumer = new DefaultConsumer(channel) {
         @Override
         public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
               throws IOException {
            String message = new String(body, "UTF-8");
            System.out.printf(" [    %2$s<===](%1$s) %3$s\n", "Receiver", QUEUE_NAME, message);
            try {
               TimeUnit.MILLISECONDS.sleep(500);
            } catch (InterruptedException e) {
            }
         }
      };
      channel.basicConsume(QUEUE_NAME, true, consumer);
   }
}

basicConsume

第一个参数指定 Consumer 消费的队列,即 hello 队列

第二个参数 autoAck 指定消息确认模式,true 表示消息确认是自动完成的(至少在进入 handleDelivery 方法前就已经自动确认了),false 表示必须由 Consumer 自己确认

第三个参数指定 Consumer

消费(pull)

import com.rabbitmq.client.*;

import java.util.concurrent.*;

public class ReceiverByPull {
	
	private final static String QUEUE_NAME = "hello";

	public static void main(String[] argv) throws Exception {
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("localhost");
		Connection connection = factory.newConnection();
		Channel channel = connection.createChannel();

		channel.queueDeclare(QUEUE_NAME, false, false, false, null);

		while (true) {
			GetResponse resp = channel.basicGet(QUEUE_NAME, true);
			if (resp == null) {
				System.out.println("Get Nothing!");
				TimeUnit.MILLISECONDS.sleep(1000);
			} else {
				String message = new String(resp.getBody(), "UTF-8");
				System.out.printf(" [    %2$s<===](%1$s) %3$s\n", "Receiver", QUEUE_NAME, message);
				TimeUnit.MILLISECONDS.sleep(500);
			}
		}
	}
}

Exchange & Binding

上例中的 Sender 只申明 hello 队列,然后就开始向默认 Exchange(“”) 发送routineKey也为 hello 的消息

AMQP 基本概念,消息到了 Exchange 后需要按照 Binding 提供的分发依据将消息分发到队列中

队列并不属于 Exchange,队列有自己的生命周期管理,与 Exchange 之间完全通过 Binding 关联

只要保证队列绑定到 Exchange 时使用的绑定键与消息发送时指定的 routineKey 一致就可以了

Exchange、队列和 Binding在rabbitmq声明过后,代码中不再声明3者关系,依然可以执行,只要 RabbitMQ 没有重启,这些模型将会一直生效

AMQP 模型到底是交给 Publisher 申明,还是交给 Consumer 申明,还是直接在 RabbitMQ 中预先创建,这是使用 RabbitMQ 时必须考虑的问题。没有统一结论,按照场景具体分析

默认 Exchange("")

每个队列都自动绑定到默认 Exchange,routineKey 为队列名称


rabbitmq-mock 方便本地调试

配置 mock.rabbitmq=true

<dependency>
  <groupId>com.github.fridujo</groupId>
  <artifactId>rabbitmq-mock</artifactId>
  <version>1.0.13</version>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
import com.github.fridujo.rabbitmq.mock.*;
import org.springframework.amqp.rabbit.connection.*;
import org.springframework.amqp.rabbit.core.*;
import org.springframework.amqp.support.converter.*;
import org.springframework.boot.autoconfigure.condition.*;
import org.springframework.context.annotation.*;

@Configuration
public class AmqpConfig {
 
	@Bean
	public MessageConverter jsonMessageConverter() {

		return new Jackson2JsonMessageConverter();
	}

	@Bean
	public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
		RabbitTemplate template = new RabbitTemplate(connectionFactory);
		template.setMessageConverter(jsonMessageConverter());
		template.setMandatory(true);
		return template;
	}

	@Bean
	public RabbitAdmin admin(ConnectionFactory connectionFactory) {
		RabbitAdmin admin = new RabbitAdmin(connectionFactory);
		admin.setIgnoreDeclarationExceptions(true);
		return admin;
	}

	@Bean
	@ConditionalOnProperty(prefix = "mock", name = "rabbitmq", havingValue = "true")
	ConnectionFactory connectionFactory() {

		return new CachingConnectionFactory(new MockConnectionFactory());
	}
}

多线程消费同一队列

消费一条消息往往比产生一条消息慢,为防止消息积压,一般开启多个工作线程同时消费消息。RabbitMQ 中可以创建多个 Consumer 消费同一队列

RabbitMQ 默认轮询(round-robin)分发消息,每个消费者会得到相同数量的消息

import java.util.concurrent.TimeUnit;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Sender {

   private static final String QUEUE_NAME = "tasks";

   private String name;

   public Sender(String name) {

      this.name = name;
   }

   public void work() throws Exception {

      ConnectionFactory factory = new ConnectionFactory();
      factory.setHost("localhost");
      Connection connection = factory.newConnection();
      Channel    channel    = connection.createChannel();

      channel.queueDeclare(QUEUE_NAME, false, false, false, null);

      for (int i = 0; i < 10; ) {
         String message = "NO. " + ++i;
         TimeUnit.MILLISECONDS.sleep(100);
         channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
         System.out.printf("(%1$s)[===>%2$s    ] %3$s\n", name, ":" + QUEUE_NAME, message);
      }

      channel.close();
      connection.close();
   }
}
import com.rabbitmq.client.*;

import java.io.*;
import java.util.concurrent.*;

public class Receiver {
   private static final String QUEUE_NAME = "tasks";

   private String name;

   private int sleepTime;

   public Receiver(String name, int sleepTime) {
      this.name = name;
      this.sleepTime = sleepTime;
   }

   public void work() throws Exception {
      ConnectionFactory factory = new ConnectionFactory();
      factory.setHost("localhost");
      Connection connection = factory.newConnection();
      Channel channel = connection.createChannel();

      channel.queueDeclare(QUEUE_NAME, false, false, false, null);

      Consumer consumer = new DefaultConsumer(channel) {
         @Override
         public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
               throws IOException {
            String message = new String(body, "UTF-8");
            System.out.printf(" [    %2$s<===](%1$s) %3$s\n", name, QUEUE_NAME, message);
            try {
               TimeUnit.MILLISECONDS.sleep(sleepTime);
            } catch (InterruptedException e) {
            }
         }
      };
      channel.basicConsume(QUEUE_NAME, true, consumer);
   }
}

消费速度相同

public static void main(String[] args) throws Exception {
   Receiver recv1 = new Receiver("A", 200);
   recv1.work();
   Receiver recv2 = new Receiver("B", 200);
   recv2.work();
   Sender sender = new Sender("S");
   sender.work();
}

出现慢速消费者

public static void main(String[] args) throws Exception {
   Receiver recv1 = new Receiver("A", 200);
   recv1.work();
   Receiver recv2 = new Receiver("B", 800);
   recv2.work();
   Sender sender = new Sender("S");
   sender.work();
}

B 依然分到了一半消息,需要运行很久才能处理完

Fair dispath 公平分发

默认RabbitMQ将第n(取余)个Message分发给第n个Consumer,不管Consumer是否还有unacked Message,即不公平

channel.basic_qos(prefetch_count=1)

按每个消费者的能力分配消息,联合使用 Qos 和 Acknowledge

从队列视角看,总会有一批消息已推送但尚未获得 ack 确认,Qos 的 prefetchCount 就是用来限制这批未确认消息数量

prefetchCount =1 时,只有在收到消费者发回的上一条消息 ack 确认后,才会向该消费者发送下一条消息prefetchCount 默认= 0,即没有限制,队列会将所有消息尽快发给消费者

消息从队列异步推送给消费者,消费者的 ack 也是异步发送给队列

import java.io.*;
import java.util.concurrent.*;

public class QosAcknowledgeReceiver {

   private static final String QUEUE_NAME = "tasks";

   private String name;

   private int sleepTime;

   public QosAcknowledgeReceiver(String name, int sleepTime) {
      this.name = name;
      this.sleepTime = sleepTime;
   }

   public void work() throws Exception {
      ConnectionFactory factory = new ConnectionFactory();
      factory.setHost("localhost");
      Connection connection = factory.newConnection();
      final Channel channel = connection.createChannel();

      channel.queueDeclare(QUEUE_NAME, false, false, false, null);

      channel.basicQos(1);

      Consumer consumer = new DefaultConsumer(channel) {
         @Override
         public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
               throws IOException {
            String message = new String(body, "UTF-8");
            System.out.printf(" [    %2$s<===](%1$s) %3$s\n", name, QUEUE_NAME, message);
            try {
               TimeUnit.MILLISECONDS.sleep(sleepTime);
            } catch (InterruptedException e) {
            }
            channel.basicAck(envelope.getDeliveryTag(), false);
         }
      };
      channel.basicConsume(QUEUE_NAME, false, consumer);
   }
}
public static void main(String[] args) throws Exception {
   QosAcknowledgeReceiver recv1 = new QosAcknowledgeReceiver("A", 200);
   recv1.work();
   QosAcknowledgeReceiver recv2 = new QosAcknowledgeReceiver("B", 800);
   recv2.work();
   Sender sender = new Sender("S");
   sender.work();
}
Channel channel = ...;
Consumer consumer1 = ...;
Consumer consumer2 = ...;
channel.basicQos(10, false); // Per consumer limit
channel.basicQos(15, true);  // Per channel limit
channel.basicConsume("my-queue1", false, consumer1);
channel.basicConsume("my-queue2", false, consumer2);

basicQos 特性

  • basicQos 中 prefetchSize 参数通过消息的总字节数来限制队列推送消息的速度
  • prefetchSize 与 prefetchCount 可以同时设置,达到任何一个限制,则队列暂停推送消息
  • global 参数表示前两个参数的作用域,true 表示限制是针对信道的,false 表示限制是针对消费者的
  • 可以对同一个信道同时设置 global 为 true 和 false 的 Qos,表示队列要考虑每个消费者的限制,同时还要考虑整个信道的限制

消息发送确认/消息接收确认(ACK)

默认 Message 被消费者正确接收,从 Queue 中移除

发送确认

消息无法路由到队列,确认消息路由失败

消息成功路由时,当需要发送的队列都发送成功后,进行确认消息

  • 持久化队列:写入磁盘
  • 镜像队列:所有镜像接收成功
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.*;
import org.springframework.amqp.rabbit.core.*;
import org.springframework.stereotype.*;

import javax.annotation.*;

@Component
public class RabbitTemplateConfig implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {

	private final RabbitTemplate rabbitTemplate;

	public RabbitTemplateConfig(RabbitTemplate rabbitTemplate) {

		this.rabbitTemplate = rabbitTemplate;
	}

	@PostConstruct
	public void init() {

		//指定 ConfirmCallback,ReturnCallback
		rabbitTemplate.setConfirmCallback(this);
		rabbitTemplate.setReturnCallback(this);
		
	}

	@Override
	public void confirm(CorrelationData correlationData, boolean ack, String cause) {

		System.out.println("消息唯一标识:" + correlationData);
		System.out.println("确认结果:" + ack);
		System.out.println("失败原因:" + cause);
	}

	@Override
	public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
		System.out.println("消息主体 message : "+message);
		System.out.println("消息主体 message : "+replyCode);
		System.out.println("描述:"+replyText);
		System.out.println("消息使用的交换器 exchange : "+exchange);
		System.out.println("消息使用的路由键 routing : "+routingKey);
	}
}

还要配置

spring:
  rabbitmq:
    publisher-confirms: true
    publisher-returns: true

接收确认

消息通过 ACK 确认是否被正确接收,每个 Message 都要被确认(acknowledged),可手动或自动 ACK

  • 自动(消息发给消费者后立即确认,丢失消息的可能):

    消费端消费逻辑抛出异常(没有处理成功这条消息),相当于丢失了消息

    消息已被处理,但后续代码抛出异常,Spring 事务回滚,同样造成实际意义的消息丢失

  • 手动 ( 消费者调用 ack、nack、reject 进行确认)

可在业务失败后进行一些操作,如果消息未被 ACK 则 发送到下一个消费者

如果某个服务忘记 ACK,则 RabbitMQ 不会再发送数据给它,认为该服务的处理能力有限(Prefetch)

ACK 机制还可起到 限流 作用(如在接收到某条消息时休眠几秒钟)

消息确认模式:

  • AcknowledgeMode.NONE:自动确认
  • AcknowledgeMode.AUTO:根据情况确认
  • AcknowledgeMode.MANUAL:手动确认

确认消息

默认消费者自动 ack (确认)消息,手动 ack(确认)要修改确认模式为 manual

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual

	@Bean
	public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
		SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
		factory.setConnectionFactory(connectionFactory);
		factory.setMessageConverter(jsonMessageConverter());
		factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
		return factory;
	}
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "q", durable = "true"),
                                         exchange = @Exchange(value = "demo", durable = "true", type = ExchangeTypes.TOPIC),
                                         key = "test"))
	public void processMessage(Map body, Message message, Channel channel) throws Exception {

		System.out.println(JSON.toJSONString(body));
	}

basicAck 需要两个参数

  • deliveryTag(唯一标识 ID):RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,单调递增正整数,delivery tag 的范围仅限于 Channel
  • multiple:为了减少网络流量,手动确认可以被批处理,true 时可一次性确认 delivery_tag 小于等于传入值的所有消息

否认、拒绝消息

发送一个 header 中包含 error 的消息

public void processMessage(Map message, Channel channel,@Headers Map<String,Object> map) {
		System.out.println(JSON.toJSONString(message));
		if (map.containsKey("error")){
			System.out.println("错误的消息");
			try {
				channel.basicNack((Long)map.get(AmqpHeaders.DELIVERY_TAG),false,true); 
				return;
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
		try {
			channel.basicAck((Long)map.get(AmqpHeaders.DELIVERY_TAG),false);
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

此时控制台重复打印,说明该消息被 nack 后一直重新入队列然后一直重新消费

也可以拒绝该消息,消息会被丢弃,不会重回队列

channel.basicReject((Long)map.get(AmqpHeaders.DELIVERY_TAG),false);        //拒绝消息

AcknowledgeMode.AUTO ,根据方法的执行情况来决定是否确认还是拒绝(是否重新入 queue)

  • 消息成功被消费(消费的过程中没有抛异常),自动确认
  • AmqpRejectAndDontRequeueException ,消息会被拒绝,且 requeue = false(不重新入队列)
  • ImmediateAcknowledgeAmqpException,消费者会被确认

其他的异常,则消息会被拒绝,且 requeue = true(如果只有一个消费者监听该队列,死循环,多消费端也会造成资源的极大浪费,开发过程中一定要避免)

可以通过 setDefaultRequeueRejected(默认是 true)去设置

发送参数理解

RabbitMQ 3.0开始去掉了对 immediate 参数的支持

  • mandatory

    交换器无法根据自身类型和路由键找到一个符合条件的队列时的处理方式 true: 调用 Basic.Return 命令将消息返回给生产者 false:把消息直接丢弃

  • immediate:true ,如果该消息关联的队列上有消费者,则立即投递,否则这条消息不存入队列;如果与路由键匹配的所有队列都没有消费者时,该消息会通过 Basic.Return 返回至生产者

  • props:消息属性集,持久化、优先级、投递模式、过期时间等

最终还是利用 channel.basicPublish () 方法,将 mandatory 设置为 true 来实现

confirm 机制

cachingConnectionFactory.setPublisherConfirms(true);
cachingConnectionFactory.setPublisherReturns(true);

区别:(confirm 保证达到交换机,return 保证交换机到达队列) 如果消息没有到 exchange, 则 confirm 回调,ack=false 如果消息到达 exchange, 则 confirm 回调,ack=true exchangequeue 成功,则不回调 return exchangequeue 失败,则回调 return(需设置 mandatory=true, 否则不回回调,消息就丢了)

  • 注意:设置 PublisherReturns 状态为 true,那么需要设置 rabbitTemplate.setMandatory(true); 具体如何保证消息不丢失,请参考 RabbitMQ 的消息不丢失机制

控制台发 json

properties 加 content_type=application/json

MQ 中间件 - rabbitmq-connection 以及 channel 的思考(连接池)

https://www.jianshu.com/p/24e541170ace

RabbitMQ 动态创建队列并发送消息

https://www.jianshu.com/p/4dfe6cf87549

https://www.cnblogs.com/gordonkong/default.html?page=4

https://www.jianshu.com/u/50d83346eaff

RabbitMQ 发布订阅-实现延时重试队列

RabbitMQ 高可用之镜像队列

RabbitMQ 延时消息设计

死信队列

实现消息在未被正常消费的场景下,对这些消息进行其他处理,保证消息不会被丢弃

消息会变成死信消息的场景

  1. 消息被(basic.reject() or basic.nack()) and requeue = false,即被消费者拒绝签收,且重新入队为false

    1.1 注意:消费者设置了自动ACK,当重复投递次数达到了设置的最大retry次数之后,消息也会投递到死信队列,但是最终还是调用nack/reject

  2. 消息过期,> TTL

  3. 队列设置了x-max-length最大消息数量且已达此值,再次投递,消息将被挤掉,被挤掉的是最靠近被消费那一端的消息

os 内存布局 & malloc
发布于
os

堆/栈

  • 栈: 函数调用基础(维护函数调用的上下文),大多数编程语言存储局部变量函数参数的地方,线程私有

    自动分配,如函数局部变量 int b;自动在栈中为b开辟空间

  • 堆:应用程序动态分配的内存区域,malloc/new,通常是程序的运行库管理栈空间分配

    自行申请,并指明大小,如 p1 = (char *)malloc(10);或c++ new 运算符。注意p1本身在栈中

栈或堆现有大小不够用时,按增长方向扩大自身尺寸,直到预留的空间被用完为止

栈扩展,触发由 expand_stack() 在Linux中处理的页面错误,调用acct_stack_growth()检查是否适合扩展堆栈

栈大小低于RLIMIT_STACK(通常8MB),栈增长,如果已经达到最大栈大小,溢出,段错误(Segmentation Fault)

运行库向操作系统 批发 较大的堆空间,然后 零售 给程序用

全部“售完”或程序有大量的内存需求时,再根据实际需求向操作系统 进货

os 内存布局&策略

各 malloc 内存分配管理方式离不开 os 内存布局策略

布局

32位经典内存布局

32位默认内存布局

加几种 Random offset,不易受内存溢出攻击

堆仍然向上,但mmap向下增长,os有栈大小限制

64位内存布局

寻址空间大,沿用32位的经典布局,加上随机的mmap起始地址,防止溢出攻击

40TB+内存才把堆内存地址用光

策略

malloc

大内存需求

直接mmap

小内存需求

  1. brk 扩大堆顶
  2. os把需要的内存分页映射过来
  3. 再由这些malloc管理这些堆内存块,减少系统调用

free

free内存时不同由malloc不同策略,不一定会把内存真正地还给系统

所以很多时候,访问free掉的内存,不会立即Run Time Error,只有访问的地址没有对应的内存分页,才会崩掉

系统调用 brk(sbrk)和mmap

brk() 和 sbrk() 扩展heap的上界

#include <unistd.h>
int brk( const void *addr );
void* sbrk ( intptr_t incr );

Brk参数为新的brk上界地址,成功返回1,失败返回0

Sbrk参数为申请内存的大小,返回heap新的上界brk的地址

#include <sys/mman.h>
void *mmap(void *addr, size\_t length, int prot, int flags, int fd, off\_t offset);
int munmap(void *addr, size_t length);

Mmap 可映射磁盘文件到内存中;或匿名映射,不映射磁盘文件,向映射区申请一块内存 Malloc使用的是mmap的第二种用法(匿名映射)

brk、sbrk、mmap属系统调用,每次申请内存都调用,影响性能

其次,这样申请内存容易产生碎片,因为堆从低地址到高地址,高地址内存没有被释放,低地址的内存就不能被回收

ptmalloc

https://blog.csdn.net/z_ryan/article/details/79950737

main_area/no_main_area

两个area 形成环形链表进行管理

每进程只一个主分配区,允许多个非主分配区

main可用brk和mmap分配,而 no_main只能用mmap映射内存块

Chunk结构

主从分配区模式,线程要分配资源时,从链表找一个没加锁的分配区进行分配

使用中/空闲 chunk数据结构基本项同,但有设计技巧,巧妙的节省了内存

使用中的chunk

chunk管理内存块,bin链表管理大小相似的chunk

空闲chunk

原本用户数据区存储四个指针 fd指向后一个空闲的chunk;bk指向前一个空闲的chunk

通过这两个指针将大小相近的chunk连成一个双向链表

large bin中的空闲chunk还有两个指针,fd_nextsize/bk_nextsize,用于加快在large bin中查找最近匹配的空闲chunk

不同的chunk链表通过bins或者fastbins来组织

空闲链表bins

避免频繁系统调用,降低内存分配开销

free释放掉的内存,ptmalloc不马上还给OS,被ptmalloc本身的空闲链表bins管理起来,下次malloc时,从空闲bins寻找 将相似大小的chunk用双向链表链接,称bin

ptmalloc维护了128bin。每个bins都维护了大小相近的双向链表的chunk。基于chunk的大小,有下列几种可用bins:

小内存分配

  • small bin 前64 相邻bin里 chunk大小差8字节

  • large bin 先大小,再最近使用的顺序排列,每次分配找一个最小够用chunk

  • A

    在主分配区

  • M

    mmap出来的

  • P

    上一个内存紧邻的chunk是否在使用

free时,检查附近chunk,尝试把连续空闲chunk合并,放到unstored bin

但当很小chunk释放时,并入fast bin

同样 fast bin连续内存块会被合并到一个unsorted bin里,然后再才进入普通bin里

malloc小内存,先查找fast bin,再查找unsorted bin,最后查找普通的bin,如果unsorted bin里的chunk不合适,则会把它扔到bin里

大内存分配

如果前面的bin里的空闲chunk都不足以满足需要,尝试从top chunk里分配内存

top chunk也不够,从OS拿

特别大的内存,直接从系统mmap出来,不受chunk管理,回收时也会munmap还给os

简而言之 : 小内存: [获取分配区(arena)并加锁] -> fast bin -> unsorted bin -> small bin -> large bin -> top chunk -> 扩展堆 大内存: 直接mmap

总结

释放时几乎是和分配反过来,再加上可一些chunk合并和从一个bin转移到另一个bin的操作 并且如果顶部有足够大的空闲chunk,则收缩堆顶并还给os

介于此,对于ptmalloc的内存分配使用有几个注意事项:

默认后分配内存先释放,因为内存回收是从top chunk开始 避免多线程频繁分配和释放内存,会造成频繁加解锁 不要分配长生命周期的内存块,容易造成内碎片,影响内存回收

tcmalloc(thread cache malloc)

ptmalloc性能问题,一般倾向使用三方malloc

  • 不同分配区(arena)的内存不能交替使用
  • 每个内存块分配都要浪费8字节内存等等

内存管理分为线程内存和中央堆两部分

小内存分配

内部维护几十个不同大小的分配器,和ptmalloc差异是,每个分配器的大小差是不同的,按8字节、16字节、32字节等间隔开

内存分配时找到最小符合条件的,比如833字节到1024字节的内存分配请求都会分配一个1024大小的内存块

如果这些分配器的剩余内存不够了,会向中央堆申请一些内存,打碎以后填入对应分配器中

同样,如果中央堆也没内存了,就向中央内存分配器申请内存

线程缓存内的几十个分配器分别维护了一个大小固定的自由空间链表,直接由这些链表分配内存的时候是不加锁的

但是中央堆是所有线程共享的,在由其分配内存的时候会加自旋锁(spin lock)

线程内存池每次从中央堆申请内存的时候,分配多少内存也直接影响分配性能

申请地太少会导致频繁访问中央堆,也就会频繁加锁,而申请地太多会导致内存浪费

tcmalloc里,这个每次申请的内存量是动态调整的,类似把tcp窗口反过来用的慢启动(slow start)算法调整max_length, 每次申请内存是申请max_length和每个分配器对应的num_objects_to_move中取小的值的个数的内存块

num_objects_to_move取值比较简单,是以64K为基准,并且最小不低于2,最大不高于32的值

也就是说,对于大于等于32K的分配器这个值为2(好像没有这样的分配器 class),对于小于2K的分配器,统一为32

对于max_length就比较复杂了,而且其更多是用于释放内存

max_length由1开始,在其小于num_objects_to_move的时候每次累加1,大于等于的时候累加num_objects_to_move

释放内存的时候,首先max_length会对齐到num_objects_to_move,然后在大于num_objects_to_move的释放次数超过一定阀值,则会按num_objects_to_move缩减大小

大内存分配

大于8个分页, 即32K,tcmalloc直接在中央堆里分配(以分页为单位)

同样按大小维护了256个空闲空间链表,前255个分别是1个分页、2个分页到255个分页的空闲空间,最后一个是更多分页的小的空间。这里的空间如果不够用,就会直接从系统申请了

分页管理 – span

tcmalloc 用span管理内存分页,一个span可以包含几个连续分页

span状态

  • 未分配(在空闲链表中)
  • 作为大对象分配
  • 作为小对象分配(span内记录了小对象的class size)

32位系统中,span分为两级由中央分配器管理。第一级有2^7个节点,第二级是2^13个 32位总共只能有2^20个分页(每个分页4KB = 2^12)

64为系统中,有三级

释放

算分页编号,再找对应span

小对象直接归入小对象分配器的空闲链表。等到空闲空间足够大以后划入中央堆

大对象把物理地址连续的前后的span也找出来,如果空闲则合并,并归入中央堆中

而线程缓存内的分配器,会根据max_length、num_objects_to_move和上一次垃圾收集到现在为止的最小链表长度,按一定的策略回收资源到中央堆中

同样可以在需要时减少某一个线程的max_length来转移内存,但是要等到那个线程下一次执行free,触发垃圾回收之后才会真正把内存返还中央堆

简而言之,就是:

小内存: 线程缓存队列 -> 中央堆 -> 中央页分配器(从系统分配)

大内存: 中央堆 -> 向系统请求

理论上性能提高的主要原因在线程缓存不加锁和少量操作的自旋锁上

不过按照它的实现方式,不适合多线程频繁分配大于8个分页(32KB)的内存。否则自旋锁争用会相当厉害,不过这种情况也比较少。而减少和中央堆交互又依赖于他的线程缓存长度自适应算法

还有就是它使用了外部的数据结构来管理span list,这样不会每次分配内存都要浪费header的长度。但是他的对齐操作又比ptmalloc多浪费了一些内存。(有点空间换时间的意思)

所以无论是ptmalloc还是tcmalloc都应该尽量减少大内存的分配和释放。尽量先分配、后释放

jemalloc

FreeBSD、NetBSD和Firefox的默认malloc

据作者说,高CPU核心数的情况下比tcmalloc性能还好

设计目标:

  • 快速分配和回收
  • 低内存碎片
  • 支持堆性能分析

内存分配分为三部分

  1. 类似tcmalloc,以8、16、64字节等分隔开的small class
  2. 以分页为单位,等差间隔开的large class
  3. huge class

内存块管理也通过一种chunk进行,一个chunk的大小是2^k (默认4 MB)

通过这种分配实现常数时间地分配small和large对象,对数时间地查询huge对象的meta(使用红黑树)

默认64位系统的划分方式如下:

Small: [8], [16, 32, 48, …, 128], [192, 256, 320, …, 512], [768, 1024, 1280, …, 3840] Large: [4 KiB, 8 KiB, 12 KiB, …, 4072 KiB] Huge: [4 MiB, 8 MiB, 12 MiB, …]

jemalloc也用了分配区(arena)来维护内存

线程按第一次分配small或者large内存请求的顺序Round-Robin地选择一个分配区 每个分配区都维护了一系列分页,来提供small和large的内存分配请求。并且从一个分配区分配出去的内存块,在释放的时候一定会回到该分配区

每个分配区内都会包含meta信息,记录了其对应的chunk列表,每个chunk的头部都记录了chunk的分配信息

使用某一个chunk的时候,会把它分割成很多个run,并记录到bin中

不同size的class对应着不同的bin,这点和前面两种分配器一样

bin里,都会有一个红黑树来维护空闲的run,并且在run里,使用了bitmap来记录了分配状态

和前面两种分配器不同的是,分配区会同时维护两组run的红黑树,一组是未被分配过内存(clean区域),另一组是回收的内存(dirty区域)。而且不是前面那两种分配器所使用的链表

同样,每次分配,都是选取最小且符合条件的内存块

但是优先从dirty区域查找

分配区设计和ptmalloc差不多。访问分配区的时候需要对其加锁,或者对某一个size的bin加粒度更小的锁。为减少锁征用,这里又参照tcmalloc引入了线程缓存。并且其线程缓存的垃圾回收机制和tcmalloc一样,也是基于分配请求的频率自动调整的 线程缓存的结构就像一个简化版的arena,加了一些垃圾回收的控制信息

简而言之,就是: 小内存(small class): 线程缓存bin -> 分配区bin(bin加锁) -> 问系统要 中型内存(large class):分配区bin(bin加锁) -> 问系统要 大内存(huge class): 直接mmap组织成N个chunk+全局huge红黑树维护(带缓存)

总结

jemalloc设计上比前两个复杂地多,内部用红黑树管理分页和内存块。并且对内存分配粒度分类地更细

一方面比ptmalloc的锁争用要少,另一方面很多索引和查找都能回归到指数级别,方便了很多复杂功能的实现

大内存分配上,内存碎片也会比tcmalloc少。但是也正是因为他的结构比较复杂,记录了很多meta,所以在分配很多小内存的时候记录meta数据的空间会略微多于tcmalloc

但是又不像ptmalloc那样每一个内存块都有一个header,而采用全局的bitmap记录状态,所以大量小内存的时候,会比ptmalloc消耗的额外内存小

大总结

这些内存管理机制都是针对小内存分配和管理。对大块内存还是直接用了系统调用

所以在程序中应该尽量避免大内存的malloc/new、free/delete操作

分配器的最小粒度都是以8字节为单位的,所以频繁分配小内存,int,bool什么的,仍然会浪费空间

无论是对bool、int、short进行new的时候,实际消耗的内存在ptmalloc和tcmalloc下64位系统地址间距都是32个字节

大量new测试的时候,ptmalloc平均每次new消耗32字节,tcmalloc消耗8字节

所以大量使用这些数据的时候不妨用数组自己维护一个内存池,可以减少很多的内存浪费。(STL的map和set一个节点要消耗近80个字节有这么多浪费在这里了)

多线程下对于比较大的数据结构,为了减少分配时的锁争用,最好是自己维护内存池

单线程的话无所谓了,不过自己维护内存池是增加代码复杂度,减少内存管理复杂度 255个分页以下(1MB)的内存话,tcmalloc的分配和管理机制已经相当nice,没太大必要自己另写一个

Windows下不同类型(int、short和bool)连续new的地址似乎是隔开的,可能是内部实现的粒度更小,不同size的class更多

10M次new的时候,debug模式下明显卡顿了一下,平均每次new的内存消耗是52字节(32位)和72字节(64位)[header更复杂?]

Release模式下很快,并且平均每次new的内存消耗是20字节(32位)和24字节(64位)

可以猜测VC的malloc的debug模式还含有挺大的debug信息。是不是可以得出他的header里,Release版本只有1个指针,Debug里有5个指针呢

springcloud
发布于
java spring springcloud

你对Spring Cloud多了解源自于你对Spring Boot有多了解,你对Spring Boot多了解源自于你对Spring Framework有多了解

版本对应关系

Release Train 发布时间 Spring Boot版本 SC Commons版本 维护信息 主要版本
2020.0.x 2020-12 2.4.x 3.0.0 按计划支持到2023-12
Hoxton 2019-07 2.2.x, 2.3.x (从SR5起) 2.2.x Finchley系列次要版本,常规维护到2021-6
2020-07特殊维护期(不加新功能,只改紧急bug)
2021-12只发布重大错误/安全补丁
Greenwich 2018-11 2.1.x 2.1.x 2020-01停止维护
2020-12-31终结特殊维护期
Finchley 2017-10 2.0.x 2.0.x 2018发布
Edgware 2017-08 1.5.x 1.3.x
Dalston 2017-05 1.5.x 1.2.x
Brixton 2016-09 1.3.x 1.1.x
Angel 2016-05 1.2.x 1.0.x

版本号规则变更适用于所有Spring技术栈,Spring Framework、Spring Boot、Spring Cloud、Spring Data…

spring-cloud-starter-loadbalancer随Spring Cloud Commons 2.2.0开始(Hoxton),它替代了Ribbon

Pivotal OSS support policy

主要版本 3年支持。主或次要版本发布后,严重bug和安全问题,再维护一段时间(6-12个月)

阻断式升级(不向下兼容)

Spring Cloud Netflix进入维护模式,2020完全移除

Netflix 2018宣布核心组件Hystrix、Ribbon、Zuul、Archaius等均进入维护状态

Zuul 2.x,Archaius 2.x均不向下兼容,无法平滑升级,几乎等于无法使用

Spring Cloud 2020.0.0 彻底删除掉Netflix除Eureka外的所有组件(Ribbon,Hystrix,Zuul…)

Netflix 推荐替代 说明
Hystrix Resilience4j Hystrix自己也推荐你使用它代替自己
Hystrix Dashboard / Turbine Micrometer + Monitoring System 监控这件事交给更专业的组件去做
Ribbon Spring Cloud Loadbalancer Spring亲自出手
Zuul 1 Spring Cloud Gateway Spring亲自出手
Archaius 1 Spring Boot外部化配置 + Spring Cloud配置 比Netflix实现的更好、更强大

Spring Cloud LoadBalancer首次引入在Spring Cloud Commons 2.2.0(Hoxton),默认依旧是Ribbon挑大梁

Hoxton:LoadBalancerClient 有两个实现,Spring Cloud 2020.0后,BlockingLoadBalancerClient就是唯一实现了

mvn spring-boot:run -Dspring-boot.run.arguments=--spring.cloud.bootstrap.enabled=true
clion nginx
发布于
nginx

远古时代研究学习、调试 nginx 相当痛苦

visual studio 体积大安装慢

vs build tools 默认装完也2G左右

jetbrains家clion玩起来更轻量些

相关准备

auto/configure

右键 git bash here

auto/configure \
--with-cc=gcc \
--with-cc-opt=-O0 \
--with-debug \
--prefix= \
--conf-path=conf/nginx.conf \
--pid-path=logs/nginx.pid \
--http-log-path=logs/access.log \
--error-log-path=logs/error.log \
--sbin-path=nginx.exe \
--http-client-body-temp-path=temp/client_body_temp \
--http-proxy-temp-path=temp/proxy_temp \
--http-fastcgi-temp-path=temp/fastcgi_temp  \
--http-scgi-temp-path=temp/scgi_temp \
--http-uwsgi-temp-path=temp/uwsgi_temp \
--without-http_rewrite_module \
--without-http_gzip_module \
--without-http_geo_module \
--with-http_realip_module \
--with-http_stub_status_module \
--with-select_module\
--with-http_dav_module

objs 里生成的文件备用

ngx_auto_config.h

ngx_auto_headers.h

ngx_modules.c

clion

new CMakeProject from sources 导入项目

直接选 src,把所有 .h 忽略掉

忽略几个有外部依赖的,暂时不用,如

  • core ngx_regex.c (pcre) ngx_thread_pool.c

  • event ngx_event_connectex.c ngx_event_openssl.c ngx_event_openssl_stapling.c

  • http v2\所有 modules\perl

  • event\modules ngx_devpoll_module.c ngx_epoll_module.c ngx_select_module.c(linux下的) ngx_eventport_module.c ngx_kqueue_module.c ngx_win32_poll_module.c

CMakeLists.txt

cmake_minimum_required(VERSION 3.14)
project(nginx C)

set(CMAKE_C_STANDARD 11)

include_directories(.)
include_directories(core)
include_directories(event)
include_directories(event/modules)
include_directories(http)
include_directories(http/modules)
include_directories(os/win32)

add_executable(nginx
        core/nginx.c
        core/ngx_array.c
        core/ngx_buf.c
        core/ngx_conf_file.c
        core/ngx_connection.c
        core/ngx_cpuinfo.c
        core/ngx_crc32.c
        core/ngx_crypt.c
        core/ngx_cycle.c
        core/ngx_file.c
        core/ngx_hash.c
        core/ngx_inet.c
        core/ngx_list.c
        core/ngx_log.c
        core/ngx_md5.c
        core/ngx_module.c
        core/ngx_murmurhash.c
        core/ngx_open_file_cache.c
        core/ngx_output_chain.c
        core/ngx_palloc.c
        core/ngx_parse.c
        core/ngx_parse_time.c
        core/ngx_proxy_protocol.c
        core/ngx_queue.c
        core/ngx_radix_tree.c
        core/ngx_rbtree.c
        core/ngx_resolver.c
        core/ngx_rwlock.c
        core/ngx_sha1.c
        core/ngx_shmtx.c
        core/ngx_slab.c
        core/ngx_spinlock.c
        core/ngx_string.c
        core/ngx_syslog.c
        core/ngx_times.c
        event/modules/ngx_iocp_module.c
        event/modules/ngx_win32_poll_module.c
        event/modules/ngx_win32_select_module.c
        event/ngx_event.c
        event/ngx_event_accept.c
        event/ngx_event_acceptex.c
        event/ngx_event_connect.c
        event/ngx_event_pipe.c
        event/ngx_event_posted.c
        event/ngx_event_timer.c
        event/ngx_event_udp.c
        http/modules/ngx_http_access_module.c
        http/modules/ngx_http_addition_filter_module.c
        http/modules/ngx_http_auth_basic_module.c
        http/modules/ngx_http_auth_request_module.c
        http/modules/ngx_http_autoindex_module.c
        http/modules/ngx_http_browser_module.c
        http/modules/ngx_http_charset_filter_module.c
        http/modules/ngx_http_chunked_filter_module.c
        http/modules/ngx_http_dav_module.c
        http/modules/ngx_http_empty_gif_module.c
        http/modules/ngx_http_fastcgi_module.c
        http/modules/ngx_http_flv_module.c
        http/modules/ngx_http_headers_filter_module.c
        http/modules/ngx_http_index_module.c
        http/modules/ngx_http_limit_conn_module.c
        http/modules/ngx_http_limit_req_module.c
        http/modules/ngx_http_log_module.c
        http/modules/ngx_http_map_module.c
        http/modules/ngx_http_memcached_module.c
        http/modules/ngx_http_mirror_module.c
        http/modules/ngx_http_mp4_module.c
        http/modules/ngx_http_not_modified_filter_module.c
        http/modules/ngx_http_proxy_module.c
        http/modules/ngx_http_random_index_module.c
        http/modules/ngx_http_range_filter_module.c
        http/modules/ngx_http_realip_module.c
        http/modules/ngx_http_referer_module.c
        http/modules/ngx_http_scgi_module.c
        http/modules/ngx_http_secure_link_module.c
        http/modules/ngx_http_slice_filter_module.c
        http/modules/ngx_http_split_clients_module.c
        http/modules/ngx_http_ssi_filter_module.c
        http/modules/ngx_http_ssi_filter_module.h
        http/modules/ngx_http_static_module.c
        http/modules/ngx_http_stub_status_module.c
        http/modules/ngx_http_sub_filter_module.c
        http/modules/ngx_http_try_files_module.c
        http/modules/ngx_http_upstream_hash_module.c
        http/modules/ngx_http_upstream_ip_hash_module.c
        http/modules/ngx_http_upstream_keepalive_module.c
        http/modules/ngx_http_upstream_least_conn_module.c
        http/modules/ngx_http_upstream_random_module.c
        http/modules/ngx_http_upstream_zone_module.c
        http/modules/ngx_http_userid_filter_module.c
        http/modules/ngx_http_uwsgi_module.c
        http/ngx_http.c
        http/ngx_http_copy_filter_module.c
        http/ngx_http_core_module.c
        http/ngx_http_file_cache.c
        http/ngx_http_header_filter_module.c
        http/ngx_http_parse.c
        http/ngx_http_postpone_filter_module.c
        http/ngx_http_request.c
        http/ngx_http_request_body.c
        http/ngx_http_script.c
        http/ngx_http_special_response.c
        http/ngx_http_upstream.c
        http/ngx_http_upstream_round_robin.c
        http/ngx_http_variables.c
        http/ngx_http_write_filter_module.c
        os/win32/ngx_alloc.c
        os/win32/ngx_dlopen.c
        os/win32/ngx_errno.c
        os/win32/ngx_event_log.c
        os/win32/ngx_files.c
        os/win32/ngx_os.h
        os/win32/ngx_process.c
        os/win32/ngx_process_cycle.c
        os/win32/ngx_shmem.c
        os/win32/ngx_socket.c
        os/win32/ngx_stat.c
        os/win32/ngx_thread.c
        os/win32/ngx_time.c
        os/win32/ngx_udp_wsarecv.c
        os/win32/ngx_user.c
        os/win32/ngx_win32_init.c
        os/win32/ngx_wsarecv.c
        os/win32/ngx_wsarecv_chain.c
        os/win32/ngx_wsasend.c
        os/win32/ngx_wsasend_chain.c
        ngx_auto_config.h
        ngx_auto_headers.h
        ngx_modules.c
        )
        #event/ngx_event_connectex.c
        #http/modules/ngx_http_degradation_module.c
        #http/modules/ngx_http_geo_module.c
        #http/modules/ngx_http_geoip_module.c
        #http/modules/ngx_http_grpc_module.c
        #http/modules/ngx_http_gunzip_filter_module.c
        #http/modules/ngx_http_gzip_filter_module.c
        #http/modules/ngx_http_gzip_static_module.c
        #http/modules/ngx_http_image_filter_module.c
        #http/modules/ngx_http_rewrite_module.c
        #http/modules/ngx_http_ssl_module.c
        #http/modules/ngx_http_xslt_filter_module.c
        # os/win32/ngx_service.c

target_link_libraries(nginx  ws2_32)

tdm gcc 编译

mingw32-make

nginx.conf

daemon off;
master_process off;
worker_processes  1;
软中断
发布于
kernel

软中断

点完外卖后,平台虽会显示配送进度,但也不会傻盯着,得去干别的事情,等外卖到了配送员会通过「电话」通知,电话响了,就停下手中地事情,去拿外卖

中断是一种异步的事件处理机制,可以提高系统的并发处理能力

操作系统收到中断请求,会打断其他进程的运行,中断处理程序,要尽可能快的执行完,减少对正常进程运行调度的影响

中断处理程序在响应中断时,可能还会「临时关闭中断」,意味着,如果当前中断处理程序没有执行完之前,系统中其他的中断请求都无法被响应,也就说中断有可能会丢失,所以中断处理程序要短且快

回到外卖的例子,又点起了外卖,这次共点了两份外卖,由不同地配送员来配送,那么问题来了,当第一份外卖送到时,配送员给我打了长长的电话,说了一些杂七杂八的事情,比如给个好评等等,但如果这时另一位配送员也想给我打电话,因为我在通话中(相当于关闭了中断响应),自然就无法打通我的电话,他可能尝试了几次后就走掉了(相当于丢失了一次中断)

上半部/下半部

Linux 为解决中断处理程序执行过长和中断丢失的问题,将中断过程分成两个阶段,「上半部和下半部分」

  • 上半部用来快速处理中断,一般会暂时关闭中断请求,主要负责处理跟硬件紧密相关或者时间敏感的事情
  • 下半部用来延迟处理上半部未完成的工作,一般以「内核线程」的方式运行

前面的外卖例子,当接到第一位配送员的电话,可告诉配送员说我现在下楼,剩下等见面再说(上半部),然后就可挂断电话,到楼下后,再拿外卖,及跟配送员说其他的事情(下半部)

这样,第一位配送员就不会占用太多时间,当第二位配送员正好过来时,会有很大几率拨通电话

网卡接收网络包的例子

网卡收到网络包后,通过硬件中断通知内核有新数据到,内核调用对应中断处理程序来响应该事件,事件处理也分成上半部和下半部

上部分要快速处理,只把网卡数据读到内存,然后更新硬件寄存器状态

接着,内核触发软中断,较耗时且复杂的事情,交给「软中断处理程序」,也就是中断的下半部,从内存找到网络数据,按照网络协议栈,对网络数据进行逐层解析和处理,最后把数据送给应用程序

所以,中断处理程序的上部分和下半部可以理解为:

  • 上半部直接处理硬件请求,也就是硬中断,主要是负责耗时短的工作,特点是快速执行
  • 下半部是由内核触发,也就说软中断,主要是负责上半部未完成的工作,通常都是耗时比较长的事情,特点是延迟执行

硬中断(上半部)会打断 CPU 正在执行的任务,然后立即执行中断处理程序,软中断(下半部)以内核线程的方式执行,且每 CPU 对应一个软中断内核线程

软中断不只包括硬件设备中断处理程序的下半部,一些内核自定义事件也属于软中断,如内核调度等、RCU 锁等

查看中断

  • 软中断
cat /proc/softirqs
                    CPU0       CPU1       CPU2       CPU3       CPU4       CPU5       CPU6       CPU7
          HI:          0          0          0          0          0          0          0          0
       TIMER:       3235        243       1748         86       7518        264       1329        140
      NET_TX:          0          0          0          0          0          0          0          0
      NET_RX:         20         32          0          3          8         42          4          9
       BLOCK:       2207          0        807          0          0          0          0          0
    IRQ_POLL:          0          0          0          0          0          0          0          0
     TASKLET:       2834          0        807          0          0          1          0          0
       SCHED:       3779        252       1750         89       7522        273       1322        143
     HRTIMER:          0          0          0          0          0          0          0          0
         RCU:       2910        236        643         75       7372        259       1197        118
  • 硬中断
cat /proc/interrupts
           CPU0       CPU1       CPU2       CPU3       CPU4       CPU5       CPU6       CPU7
  8:          0          0          0          0          0          0          0          0   IO-APIC   8-edge      rtc0
  9:          0          0          0          0          0          0          0          0   IO-APIC   9-fasteoi   acpi
NMI:          0          0          0          0          0          0          0          0   Non-maskable interrupts
LOC:         16         16          2         16         16         16         16         16   Local timer interrupts
SPU:          0          0          0          0          0          0          0          0   Spurious interrupts
PMI:          0          0          0          0          0          0          0          0   Performance monitoring interrupts
IWI:          0          0          0          0          0          0          0          0   IRQ work interrupts
RTR:          0          0          0          0          0          0          0          0   APIC ICR read retries
RES:       2250        709       2216        376        464        282        358        290   Rescheduling interrupts
CAL:        384        397        378        380        267        164        381        375   Function call interrupts
TLB:          0          0          0          0          0          0          0          0   TLB shootdowns
HYP:       2871         32        807          3          8         42          4          9   Hypervisor callback interrupts
HRE:          0          0          0          0          0          0          0          0   Hyper-V reenlightenment interrupts
HVS:       3295        297       1780        158       8912        312       1354        174   Hyper-V stimer0 interrupts
ERR:          0
MIS:          0
PIN:          0          0          0          0          0          0          0          0   Posted-interrupt notification event
NPI:          0          0          0          0          0          0          0          0   Nested posted-interrupt event
PIW:          0          0          0          0          0          0          0          0   Posted-interrupt wakeup event

这些数值是累计中断次数,大小没什么参考意义,中断次数的变化速率才是我们要关注的,可用 watch -d cat /proc/softirqs 查看中断次数的变化速率

软中断以内核线程方式执行,可用 ps -ef|grep softirq 看到内核线程,一般中括号内线线程都可以认为是内核线程

一般网络 I/O 比较高的 Web 服务器,NET_RX 中断变化速率相比其他中断类型快很多

sar -n DEV 查看网卡包接收速率情况,然后分析是哪个网卡有大量的网络包进来

再通过 tcpdump 抓包等,分析来源,非法的地址可考虑加防火墙,正常流量,要考虑硬件升级等

java test-2 JUnit 4&5 部分基础
发布于
java
<dependency>
    <groupId>org.junit.jupiter</groupId>
    <artifactId>junit-jupiter-engine</artifactId>
</dependency>
<dependency>
    <groupId>org.junit.platform</groupId>
    <artifactId>junit-platform-runner</artifactId>
    <scope>test</scope>
</dependency>

JUnit4和JUnit5在测试编码风格上没有太大变化

import org.junit.jupiter.api.*;

public class AppTest {

   @BeforeAll
   static void setup() {

      System.out.println("@BeforeAll executed");
   }

   @BeforeEach
   void setupThis() {

      System.out.println("@BeforeEach executed");
   }

   @Tag("DEV")
   @Test
   void testOne() {

   }

   @Tag("PROD")
   @Disabled
   @Test
   void testTwo() {

   }

   @AfterEach
   void tearThis() {

      System.out.println("@AfterEach executed");
   }

   @AfterAll
   static void tear() {

      System.out.println("@AfterAll executed");
   }
}

Assertion

org.junit.jupiter.Assertions

测试期望结果

assertEquals(expected, actual) 最常用

  • assertTrue()
  • assertFalse()
  • assertNotNull()
  • assertArrayEquals()

三方断言类库

JUnit Jupiter足以满足许多测试场景需要,更强大和附加功能,如匹配器,AssertJ,Hamcrest,Truth 等第三方断言库自由选择

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;

import org.junit.jupiter.api.Test;

@Test
void assertWithHamcrestMatcher() {
    assertThat(2 + 1, is(equalTo(3)));
}

JUnit 4 编程模型遗留继续用 org.junit.Assert.assertThat

assertThrows

@Test
void exceptionTesting() {
   Throwable exception = assertThrows(IllegalArgumentException.class, () -> {
      throw new IllegalArgumentException("a message");
   });
   assertEquals("a message", exception.getMessage());
}

Fixture(生命周期相关)

用作 编写测试前准备、测试后清理的固定代码

@BeforeEach/@AfterEach和@BeforeAll/@AfterAll

  1. 实例变量,在 @BeforeEach 中初始化,在 @AfterEach中清理,它们在各个@Test方法中互不影响,因为是不同的实例
  2. 静态变量,在@BeforeAll中初始化,在@AfterAll中清理,它们在各个@Test方法中均是唯一实例,会影响各个@Test方法

大多用 @BeforeEach和 @AfterEach足够,只有某些测试资源初始化耗费时间太长,以至于不得不尽量“复用”时才用 @BeforeAll和 @AfterAll

condition

@Disabled

@EnabledOnOs(OS.WINDOWS)

@EnabledOnOs({ OS.LINUX, OS.MAC })

@DisabledOnOs(OS.WINDOWS)

@DisabledOnJre(JRE.JAVA_8)

@EnabledIfSystemProperty(named = "os.arch", matches = ".*64.*")

@EnabledIfEnvironmentVariable(named = "DEBUG", matches = "true")

测试方法、测试类、测试集、测试运行器(Runners)

  • 测试方法就是用@Test注解的一些函数
  • 测试类是包含一个或多个测试方法的一个 **Test.java文件
  • 测试集是一个suite,可能包含多个测试类
  • 测试运行器则决定了用什么方式偏好去运行这些测试集/类/方法

常见 Runners

  • @RunWith(Parameterized.class) 配合@Parameters使用JUnit的参数化功能

  • @RunWith(Suite.class)

    @SuiteClasses({ATest.class,BTest.class,CTest.class})

  • @RunWith(JUnit4.class) junit4的默认运行器

  • @RunWith(JUnit38ClassRunner.class),兼容junit3.8

  • @RunWith(SpringJUnit4ClassRunner.class)集成了spring的一些功能

JUnit 3 默认 JUnit4ClassRunner

JUnit4默认 BlockJUnit4ClassRunner

JUnit4 extends BlockJUnit4ClassRunner

@RunWith(JUnit4.class) 即调用默认JUnit 运行器

参数化测试

@ParameterizedTest

@ValueSource

@MethodSource

@CsvSource

@ParameterizedTest
@ValueSource(ints = { 0, 1, 5, 100 })
void testAbs(int x) {
    assertEquals(x, Math.abs(x));
}

public class StringUtils {
    public static String capitalize(String s) {
        if (s.length() == 0) {
            return s;
        }
        return Character.toUpperCase(s.charAt(0)) + s.substring(1).toLowerCase();
    }
}

参数化测试,不但要给出输入,还要给出预期输出。测试方法至少需要接收两个参数:

@ParameterizedTest
void testCapitalize(String input, String result) {
    assertEquals(result, StringUtils.capitalize(input));
}

@MethodSource,写个同名静态方法提供测试参数:

@ParameterizedTest
@MethodSource
void testCapitalize(String input, String result) {
    assertEquals(result, StringUtils.capitalize(input));
}

static List<Arguments> testCapitalize() {
    return asList(
            Arguments.arguments("abc", "Abc"), 
            Arguments.arguments("APPLE", "Apple"), 
            Arguments.arguments("gooD", "Good"));
}
@CsvSource({ "abc, Abc", "APPLE, Apple", "gooD, Good" })
@CsvFileSource(resources = { "/test-capitalize.csv" })

Categories

public interface FastTests { /* category marker */ }
public interface SlowTests { /* category marker */ }

public class A {
  @Test
  public void a() {
    fail();
  }

  @Category(SlowTests.class)
  @Test
  public void b() {
  }
}

@Category({SlowTests.class, FastTests.class})
public class B {
  @Test
  public void c() {

  }
}

@RunWith(Categories.class)
@IncludeCategory(SlowTests.class)
@SuiteClasses( { A.class, B.class })
public class SlowTestSuite {
  // Will run A.b and B.c, but not A.a
}

@RunWith(Categories.class)
@IncludeCategory(SlowTests.class)
@ExcludeCategory(FastTests.class)
@SuiteClasses( { A.class, B.class }) 
public class SlowTestSuite {
  // Will run A.b, but not A.a or B.c
}

测试套件

@RunWith(JUnitPlatform.class)
@SelectPackages("junit5.examples")
public class JUnit5TestSuiteExample {
}

过滤测试包、类甚至测试方法

  1. @IncludePackages@ExcludePackages来过滤包
  2. @IncludeClassNamePatterns@ExcludeClassNamePatterns过滤测试类
  3. @IncludeTags@ExcludeTags过滤测试方法
@RunWith(JUnitPlatform.class)
@SelectPackages("junit5.examples")
@IncludePackages("junit5.examples.packageA")
@ExcludeTags("PROD")
public class JUnit5TestSuiteExample {
}

maven FailSafe & Surefire

  • Surefire插件用来执行单元测试

  • FailSafe插件用来执行集成测试

maven的生命周期与集成测试相关的四个阶段

1.pre-integration-test:准备集成测试环境,类似于junit单元测试中的setUp

2.integration-test:执行集成测试

3.post-integration-test:销毁集成测试的环境,类似于junit单元测试中的tearDown

4.校验:分析集成测试的结果

FailSafe工作在integration-test以及verify阶段,与surefire插件不同的是不会因为集成测试中失败而终止整个过程,即post-integration-test可以确定执行

FailSafe插件有两个goal:integration-test 和verify

1、3 如通过容器插件,jetty/tomcat plugin 或cargo plugin实现

2、4由failsafe实现

cargo可加载命令行参数,通过jacoco统计测试覆盖率,tomcat plugin就无法做到

 <plugin>
     <groupId>org.apache.maven.plugins</groupId>
     <artifactId>maven-failsafe-plugin</artifactId>
     <executions>
         <execution>
             <id>integration-test</id>
             <goals>
                 <goal>integration-test</goal>
             </goals>
         </execution>
         <execution>
             <id>verify</id>
             <goals>
                 <goal>verify</goal>
             </goals>
         </execution>
     </executions>
</plugin>

IT用例如果和UT用例放在同项目,必须在在UT的surefire中exclude所有的IT用例

spring boot service注入

junit5

@ExtendWith(SpringExtension.class) 

junit4

@RunWith(SpringRunner.class)

不加,service无法注入,值为null

ref

https://www.cnblogs.com/felixzh/p/12554701.html

https://zhuanlan.zhihu.com/p/162032557

https://www.baeldung.com/junit-5-migration

https://blog.csdn.net/HeatDeath/article/details/79841526

https://www.cnblogs.com/lspz/p/6727123.html

http://www.jfh.com/jfperiodical/article/1455?

https://doczhcn.gitbook.io/junit5/index/index

java test-1 JUnit 简介
发布于
java
public class Max {

   public int max(int a, int b) {

      if (a > b) {
         return a;
      } else {
         return b;
      }
   }
}
public class Test {

   public static void main(String[] args) {

      Max max = new Max();
      if (2 == max.max(1, 2)) {
         System.out.println("pass");
      } else {
         System.out.println("fail");
      }
   }
}

main()方法测试缺陷

不能把测试代码分离

添加新测试方法,要在main() 添加方法调用

没有打印出测试结果和期望结果,例如,expected: 2, but actual: 1

需要对打印或者输出结果进行人为的判断

……

JUnit

  • 给出成功的测试和失败的测试

  • 测试报告:成功率/代码覆盖率

JUnit 3

public class XXTest extends TestCase{

   public void setUp() throws Exception {
   }

   public void tearDown() throws Exception {
   }

   public void testAdd() {
   
   }
}

必须继承自TestCase 测试方法必须test开头

对错误的测试,只能通过fail产生一个错误,try里assertTrue(true)来测试

JUnit 4

用 Java5 Annotation 简化测试用例的编写,更灵活与方便

JUnit5

子项目及不同模块组成

JUnit 5 = JUnit Platform + JUnit Jupiter + JUnit Vintage

1

JUnit3或4的兼容性

JUnit Vintage提供TestEngine在JUnit 5平台上运行基于JUnit 3和JUnit 4的测试的实现

生命周期

2

@Annotations

JUnit5 JUnit4
@Test @Test
@TestFactory N/A
@DisplayName N/A
@BeforeEach @Before
@AfterEach @After
@BeforeAll @BeforeClass
@AfterAll @AfterClass
@Nested N/A
@Tag N/A
@Diable @Ignore
@ExtendWith N/A

@Assertions

JUnit5 JUnit4
fail fail
assertTrue assertTrue
assertThat N/A
assertSame assertSame
assertNull assertNull
assertNotSame assertNotSame
assertNotEquals assertNotEquals
assertNotNull assertNotNull
assertFalse assertFalse
assertEquals assertEquals
assertArrayEquals assertArrayEquals
assertAll
assertThrows
go grpc
发布于
go

message.proto

syntax = "proto3";

package pb;

// The greeter service definition.
service Greeter {
// Sends a greeting
    rpc SayHello (HelloRequest) returns (HelloReply) {}
}

// The request message containing the user's name.
message HelloRequest {
	string name = 1;
}

// The response message containing the greetings
message HelloReply {
	string message = 1;
}

protoc编译工具

https://github.com/google/protobuf 会跳 https://github.com/protocolbuffers/protobuf

  --cpp_out=OUT_DIR           Generate C++ header and source.
  --csharp_out=OUT_DIR        Generate C# source file.
  --java_out=OUT_DIR          Generate Java source file.
  --javanano_out=OUT_DIR      Generate Java Nano source file.
  --js_out=OUT_DIR            Generate JavaScript source.
  --objc_out=OUT_DIR          Generate Objective C header and source.
  --php_out=OUT_DIR           Generate PHP source file.
  --python_out=OUT_DIR        Generate Python source file.
  --ruby_out=OUT_DIR          Generate Ruby source file.

protoc能生成那么多语言的代码,但居然本身没有产生go的代码,需要调用如protoc-gen-go这样的插件!!!


set GOPROXY=https://goproxy.io
mkdir demo
cd demo
go mod init demo

rem protoc的 --go_out 插件
go get -u github.com/golang/protobuf/protoc-gen-go 
go get -u google.golang.org/grpc

github.com/golang/protobuf/protoc-gen-go 会编译出 protoc-gen-go

要是先 google.golang.org/grpc,在其go mod也会把前者(github.com/golang/protobuf)一并给下载了,protoc-gen-go还是要自行编译下才出来

go mod 也不用 go get google.golang.org/grpc,在go build里也会自行下载

可选

//gogo
go get -u github.com/gogo/protobuf/protoc-gen-gogo
//gofast
go get -u github.com/gogo/protobuf/protoc-gen-gofast

即protoc-gen系列 3 个

  • protoc-gen-go
  • protoc-gen-gogo
  • protoc-gen-gofast

go.mod

module demo

go 1.14

require (
	github.com/golang/protobuf v1.3.5
	google.golang.org/grpc v1.28.1
)

gRPC-go

grpc原是google内部项目,归属golang,放在google.golang.org下,后来对外开放迁移到github

https://github.com/grpc/grpc-go

梳理


protoc --go_out=. message.proto
protoc --go_out=plugins=grpc:. message.proto

第二个产生的则除了第一个的PB序列化和反序列化代码外,还增加服务器和客户端通讯、实现的公共库代码

github.com/gogo/protobuf

  • protoc-gen-gogo
  • protoc-gen-gofast

完全兼容google protobuf,主要extend了一些option

可以修饰field/enum/message/package(即对整个文件都有效)

protoc --gofast_out=plugins=grpc:. message.proto

code

server

package main

import (
	"net"
	"log"
	"google.golang.org/grpc"
	"demo/pb"
	"context"
)

// server is used to implement helloworld.GreeterServer.
type server struct{}

// SayHello implements helloworld.GreeterServer
func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
	return &pb.HelloReply{Message: "Hello " + in.Name}, nil
}

func main() {
	lis, err := net.Listen("tcp", ":50051")
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}
	s := grpc.NewServer()
	pb.RegisterGreeterServer(s, &server{})
	s.Serve(lis)
}

client

package main

import (
	"google.golang.org/grpc"
	"log"
	"demo/pb"
	"context"
)

func main() {
	conn, err := grpc.Dial("localhost:50051",grpc.WithInsecure())
	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	defer conn.Close()
	c := pb.NewGreeterClient(conn)
	r, err := c.SayHello(context.Background(), &pb.HelloRequest{Name: "dd"})
	if err != nil {
		log.Fatalf("could not greet: %v", err)
	}
	log.Printf("Greeting: %s", r.Message)
}

gRPC HTTP协议转换

coreos的博客,转载了grpc官方博客gRPC with REST and Open APIs

etcd3改用grpc后为了兼容原来的api,同时要提供http/json方式的API

为满足这需求,要么开发两套API,要么实现一种转换机制,他们选择了后者

协议转换的网关grpc-gateway,接收客户端请求,然后决定直接转发给grpc服务还是转给http服务,当然,http服务也需要请求grpc服务获取响应,然后转为json响应给客户端

grpc
发布于
go

RPC :数据编码内存对象可传输的字节流 转化),请求映射

数据编码

XML 日薄西山,JSON 风头正盛,Protobuf 方兴未艾

why Protobuf ?谷歌出品,某些场景下效率比 JSON 高

所有的优化都是有代价的。思考选择什么和放弃什么

JSON 缺点 & Protobuf 选择

{ "int":12345, "str": "hello", "bool": true }
{ "int":67890, "str": "hello", "bool": false }
  • 非字符串的编码低效

    int value 12345,内存表示只占两个字节,转成 JSON 却要五个字节。 bool 字段则占了四或五个字节。

  • 信息冗余

    同一个接口同一个对像,只是 int 字段的值不同,每次都还要传输"int"这个字段名

JSON 在可读性和编码效率之间选择了可读性,所以效率方面做了一定的牺牲

Protobuf 一方面用 VarInts 对数字进行编码,解决效率问题;另一方面给每个字段指定一个整数编号,只传字段编号,解决冗余问题

接收方如何知道各个编号对应哪个字段?只能事先约定。Protobuf 用 .proto 文件记录字段和编号的对应关系

message Demo {
  int32 i = 1;
  string s = 2;
  bool b = 3;
}

Protobuf 提供了一系列工具,为 proto 描述的 message 生成各种语言的代码

传输效率上去了,工具链也更加复杂了。gRPC 抓过时一定会怀念 JSON 的

请求映射

.proto 作为 IDL,Protobuf 可做很多 JSON 不方便做的事情。其中最重的就是 RPC 描述!

package demo.hello;

service Greeter {
  rpc SayHello (HelloRequest) returns (HelloReply) {}
}

message HelloRequest {
  string name = 1;
}

message HelloReply {
  string message = 1;
}

gRPC 就是用了 Protobuf 的 service 来描述 RPC 接口

proto 文件定义了 Greeter 服务, SayHello 方法,接受 HelloRequest 消息并返回 HelloReply 消息。如何实现这个 Greeter 则是语言无关的,所以叫 IDL

/{包名}.{服务名}/{接口名}

Greeter 服务对应的路径 /demo.hello.Greeter/SayHello

gRPC 协议规定Content-Typeheader 取值 application/grpc,也可application/grpc+proto

想用 JSON 编码,也可application/grpc+json

gRPC 通信(非流式调用,unary)内容

请求内容

POST /demo.hello.Greeter/SayHello HTTP/1.1
Host: grpc.demo.com
Content-Type: application/grpc
Content-Length: 1234

<Length-Prefixed Message>

响应内容

HTTP/1.1 200 OK
Content-Length: 5678
Content-Type: application/grpc

<Length-Prefixed Message>

stream rpc

gRPC 持三种流式接口,参数前加上 stream 关键字

  • 请求流(推送或者短信)
  • 响应流(订阅消息通知)
  • 双向流(实时语音转字幕)
service Greeter {
  rpc SayHello (HelloRequest) returns (HelloReply) {}
  rpc SayHello (stream HelloRequest) returns (HelloReply) {}
  rpc SayHello (HelloRequest) returns (stream HelloReply) {}
  rpc SayHello (stream HelloRequest) returns (stream HelloReply) {}
}

引入Length-Prefixed Message,同一个 gRPC 请求的不同消息共用 HTTP 头信息,给每个消息单独加五字节前缀来表示压缩和长度信息

HTTP/1.1 也支持复用 TCP 连接,但所有请求必须排队(请求、等待、响应)顺序进行。先到先服务。而在实际的业务场景中肯定会有一些请求响应时间很长,客户端在收到响应之前会一直霸占着TCP连接。在这段时间里别的请求要么等待,要么发起新的 TCP 连接。一言以蔽之,HTTP/1.1 不能充分地复用 TCP 连接

HTTP/2 引入 stream概念 ,解决 TCP 连接复用的问题(同样有取舍问题)。可简单理解为逻辑上的 TCP 连接,在一条 TCP 连接上并行收发 HTTP 消息,而无需像 HTTP/1.1 那样等待