什么是 listpack

  • 定义: Redis 的紧凑序列化容器,存储“字符串或整数”的有序序列,强调内存占用低与顺序遍历高效

  • 设计目标: 取代旧的 ziplist,避免溢出缺陷、改进编码布局;支持双向遍历、在末尾有哨兵字节

  • 典型用途:

    • quicklist 的节点内存格式(list 类型的底层节点容器)

    • streams 的分片条目容器

    • 小型聚合结构(如部分 hash/zset 的小规模编码)

内存布局(头+若干 entry+EOF)

  • Header(6字节): 4字节总长度、2字节元素个数(可能为未知)

  • Entries: 变长编码;每个 entry 存“前向编码+数据+回溯长度(backlen)”

  • EOF: 固定 0xFF 作为收尾

#define LP_HDR_SIZE 6       /* 32 bit total len + 16 bit number of elements. */
#define LP_EOF 0xFF
  • 头字段访问宏与设置宏:
#define lpGetTotalBytes(p)           (((uint32_t)(p)[0]<<0) | ...
#define lpGetNumElements(p)          (((uint32_t)(p)[4]<<0) | ...
#define lpSetTotalBytes(p,v) do { ... } while(0)
#define lpSetNumElements(p,v) do { ... } while(0)

listpack 内部布局

graph LR
  H["Header (6B): total_bytes + num_elements"] --> E1["Entry #1 [enc | payload | backlen]"]
  E1 --> E2["Entry #2 [enc | payload | backlen]"]
  E2 --> E3["Entry #3 [enc | payload | backlen]"]
  E3 --> EOF["EOF 0xFF"]
  E2 -. "backlen" .-> E1
  E3 -. "backlen" .-> E2
  • Header(6B): total_bytes(4B) + num_elements(2B)

  • 若干 Entry: 每个为 [enc | payload | backlen]

  • enc: 类型与长度编码(整数/字符串多档位)

  • payload: 实际数据或整数编码内容

  • backlen: 可变长(最多5B),用于反向遍历

  • EOF: 固定 0xFF

quicklist 节点如何承载 listpack

graph LR
  QH["quicklist head"] --> N1["quicklistNode #1
(listpack)"] N1 <--> N2["quicklistNode #2
(listpack)"] N2 <--> N3["quicklistNode #3
(listpack)"] N3 --> QT["quicklist tail"]
  • quicklist 是双向链表

  • 每个 quicklistNode 的数据区是一个 listpack

  • list 的元素分布在多个 listpack 节点中;可按 fill/bytes 策略合并/拆分节点

编码形式(紧凑多档位)

#define LP_ENCODING_INT 0
#define LP_ENCODING_STRING 1
#define LP_ENCODING_7BIT_UINT 0
#define LP_ENCODING_13BIT_INT 0xC0
#define LP_ENCODING_16BIT_INT 0xF1
#define LP_ENCODING_24BIT_INT 0xF2
#define LP_ENCODING_32BIT_INT 0xF3
#define LP_ENCODING_64BIT_INT 0xF4
#define LP_ENCODING_6BIT_STR 0x80
#define LP_ENCODING_12BIT_STR 0xE0
#define LP_ENCODING_32BIT_STR 0xF0
  • 整数编码:7bit 无符号、13/16/24/32/64 bit 多档,按值选择最短编码

    编码类型 编码字节 大小 范围
    7-bit unsigned 0xxxxxxx 1字节 0-127
    13-bit signed 110xxxxx xxxxxxxx 2字节 -4096 到 4095
    16-bit signed 11110001 xxxxxxxx xxxxxxxx 3字节 -32768 到 32767
    24-bit signed 11110010 xxxxxxxx xxxxxxxx xxxxxxxx 4字节 -8388608 到 8388607
    32-bit signed 11110011 [4 bytes] 5字节 -2^31 到 2^31-1
    64-bit signed 11110100 [8 bytes] 9字节 -2^63 到 2^63-1
  • 字符串长度编码:6/12/32 bit 三档

    编码类型 编码字节 长度范围
    6-bit string 10xxxxxx 0-63 字节
    12-bit string 1110xxxx xxxxxxxx 0-4095 字节
    32-bit string 11110000 [4 bytes length] 0-2^32-1 字节
  • 每个 entry 末尾都有一个 backlen 字段,用于从后向前遍历:

    • 使用变长编码,1-5字节

    • 存储该 entry 的总长度(包括编码、数据和 backlen 本身)

    • 支持从 listpack 尾部向前遍历

核心操作

创建和释放

unsigned char *lpNew(size_t capacity);      // 创建新的 listpack
void lpFree(unsigned char *lp);             // 释放 listpack

插入

unsigned char *lpAppend(unsigned char *lp, unsigned char *s, uint32_t slen);
unsigned char *lpPrepend(unsigned char *lp, unsigned char *s, uint32_t slen);
unsigned char *lpInsertString(unsigned char *lp, unsigned char *s, uint32_t slen,
                              unsigned char *p, int where, unsigned char **newp);

查找和遍历

unsigned char *lpFirst(unsigned char *lp);   // 第一个元素
unsigned char *lpLast(unsigned char *lp);    // 最后一个元素
unsigned char *lpNext(unsigned char *lp, unsigned char *p);  // 下一个元素
unsigned char *lpPrev(unsigned char *lp, unsigned char *p);  // 前一个元素
unsigned char *lpSeek(unsigned char *lp, long index);        // 按索引查找

删除

unsigned char *lpDelete(unsigned char *lp, unsigned char *p, unsigned char **newp);
unsigned char *lpDeleteRange(unsigned char *lp, long index, unsigned long num);
typedef struct {
    unsigned char *sval;
    uint32_t slen;
    long long lval;
} listpackEntry;
unsigned char *lpNew(size_t capacity);
void lpFree(unsigned char *lp);
unsigned char* lpShrinkToFit(unsigned char *lp);
unsigned char *lpInsertString(...);
unsigned char *lpInsertInteger(...);
unsigned char *lpPrepend(...);
unsigned char *lpAppend(...);
unsigned char *lpReplace(...);
unsigned char *lpDelete(...);
unsigned char *lpBatchAppend(...);
unsigned char *lpBatchInsert(...);
unsigned char *lpBatchDelete(...);
unsigned long lpLength(unsigned char *lp);
unsigned char *lpGet(...);
unsigned char *lpGetValue(...);
int lpGetIntegerValue(...);
unsigned char *lpFirst(unsigned char *lp);
unsigned char *lpLast(unsigned char *lp);
unsigned char *lpNext(unsigned char *lp, unsigned char *p);
unsigned char *lpPrev(unsigned char *lp, unsigned char *p);
size_t lpBytes(unsigned char *lp);
unsigned char *lpSeek(unsigned char *lp, long index);

配置

// Hash 类型的 listpack 相关配置
server.hash_max_listpack_entries  // 最大元素数量
server.hash_max_listpack_value    // 最大值长度

源码

创建 Listpack (lpNew)

unsigned char *lpNew(size_t capacity) {
    // 分配内存:至少 LP_HDR_SIZE+1 字节,或者指定的 capacity
    unsigned char *lp = lp_malloc(capacity > LP_HDR_SIZE+1 ? capacity : LP_HDR_SIZE+1);
    if (lp == NULL) return NULL;
    
    // 设置总字节数:头部(6字节) + EOF标记(1字节) = 7字节
    lpSetTotalBytes(lp, LP_HDR_SIZE+1);
    
    // 设置元素个数为0
    lpSetNumElements(lp, 0);
    
    // 在末尾设置EOF标记(0xFF)
    lp[LP_HDR_SIZE] = LP_EOF;
    
    return lp;
}

编码格式

13-bit signed 编码格式

利用位运算和偏移量,实现在有符号整数需求下使用无符号存储的高效方案

13位无符号整数范围: 0 到 2^13-1 = 0 到 8191,共可以表示:8192 个不同的值

13位有符号整数范围:-4096 到 4095, 共 4096 + 4095 + 1 = 8192 个不同的值

利用偏移量 2^13= 8192 可以将负数映射到 0-8191 范围内

编码映射

-4096 + 8192 = 4096

-1 + 8192 = 8191

0 + 8192 = 8192

4095 + 8192 = 12287

13-bit signed

  • 1个字节存储编码类型 + 高5位数据
  • 1个字节存储低8位数据
  • 总共2个字节存储13位数据

位操作

  • (v»8)|LP_ENCODING_13BIT_INT:

    • 提取13位数据的高5位

    • 与编码类型标识(110)合并

    • 形成第一个字节

  • v&0xff:

    • 提取13位数据的低8位
    • 形成第二个字节

设计原理

  • 将13位数据分割为高5位和低8位

  • 高5位与编码类型标识合并存储

  • 低8位单独存储

  • 总共用2个字节存储13位数据,节省空间

以 -100 为例

v = -100

v = 1«13+(-100)=8192 + (-100) = 8092

// 第一个字节:高5位数据 + 编码类型标识

intenc[0] = (v»8) | LP_ENCODING_13BIT_INT;

intenc[0] = (8092»8) | 0xC0;

intenc[0] = 31 | 0xC0;

intenc[0] = 0xDF;

// 第二个字节:低8位数据

intenc[1] = v & 0xFF;

intenc[1] = 8092 & 0xFF;

intenc[1] = 0x9C;

解码代码

else if (LP_ENCODING_IS_13BIT_INT(p[0])) {
    uval = ((p[0]&0x1f)<<8) | p[1];
    // ...
}

解码过程

// 对于编码 0xDF 0x9C:

p[0] = 0xDF, p[1] = 0x9C

// 提取数据高5位

p[0] & 0x1F = 0xDF & 0x1F = 0x1F = 31

// 左移8位,恢复位置

(31 « 8) = 7936

// 与低8位合并

7936 | 0x9C = 7936 | 156 = 8092

// 最终得到原始的无符号值:8092

if (uval >= negstart) {
        uval = negmax-uval;
        val = uval;
        val = -val-1;
} else {
        val = uval;
}

negstart = (uint64_t)1«12; (4096) - 负数判断阈值

negmax = 8191; - 最大值,用于负数转换

uval = negmax-uval; - 使用 8191 进行反向转换