介绍 redis 中多种基础的数据结构, 包括 sds, adlist, dict, skipList, intset, ziplist, redisObject.
sds
Redis 使用 sds 来处理字符串,用来解决 c 语言中传统字符串的一些限制问题,比如 二进制安全,避免内存频繁重分配,防止字符数组访问越界,实现O(1) 复杂度获取字符串长度,减少频繁内存分配产生的内存碎片。
Redis 使用 sds 来存储 key,用来作为 AOF 的缓冲区和与客户端通信中的缓冲区,用来实现日志系统的消息,构建协议(RESP)。
数据结构实现
1 2 3 4 5 6 7 8 9 10 11 12 13 // simple dynamic string header struct sdshdr { // buf 中已占用空间的长度 // 通过 len 来确定有效数据范围,保证 '\0' 也可以作为数据部分,也就是二进制安全 int len; // buf 中剩余可用空间的长度 int free; // buf.size = len + free + 1 char buf[]; };
避免内存频繁重分配的方法
当需要修改字符串长度的操作时,传统的c函数在n次操作下基本会有n次内存重分配操作,sds 通过使用 free 字段和空间预分配、未使用空间惰性删除来避免频繁的内存重分配,优化为至多n次内存重分配。
空间与分配
策略伪代码:
1 2 3 4 5 6 7 8 addlen <= nowfree return; oldSds = getSdsPrt(s); len = oldSds->len; newlen = addlen + len; if(newlen < PRE) newlen *= 2; else newlen += PRE newSds = zrealloc(oldSds, newlen + 1 + size(sds)) newSds->free = newlen - len;
策略:当需要扩展 buf 部分时,根据当前 len
与需求大小addlen
的和来决定除了要分配满足 addlen
大小的空间还将要多预分配多少空间大小以便在下次还需要扩展空间时可以适当地命中该次多预分配的空间,从而减少内存重分配次数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 /* * T = O(N) * 1. 如果 oldlen+ addlen < 1MB 则 newMaxLen = 2 (oldlen + addlen); newFree = newmaxlen - oldlen; 实际上的预扩充操作是 * 在caller 调用后使得: sds 在满足 addlen 后,使得 newfree 的大小与 caller 调用后使用掉 addlen这部分空间后 sds 的 len 的大小一样 * 2. oldlen+ addlen >= 1MB, 则 newMaxLen = oldlen + addlen + 1MB, caller 使用 addlen 这部分后的 free 固定= 1MB * 总结: * 1. 在当前 sds buf 大小较小时,每次扩充操作使得 下一次在需要扩充的 addlen 阈值(也就是 newfree)为 = oldlen + addlen. * 可以发现 newfree 是根据 sds 的上一次字符使用量 与 该次所需要的addlen 动态生成的, * 如果对 sds 扩充操作需求是较为 连续平均的,将会大大降低整个过程中 引发的内存扩充重分配次数 * 2. 在 sds buf 本身已经较大时, 直接使得 newfree = 1MB * 这种预留分配策略使得以前的 n次 concat 操作的 n 次扩容优化为 最多 n 次 */ sds sdsMakeRoomFor(sds s, size_t addlen) { struct sdshdr *sh, *newsh; // 获取 s 目前 free size_t free = sdsavail(s); size_t len, newlen; // s 目前的空余空间已经足够,无须再进行扩展,直接返回 if (free >= addlen) return s; // 获取 s 目前已占用空间的长度 len = sdslen(s); // s 其实是一个字符指针,指向 buf 的起始地址,减去 sdshdr 的大小其实就是指向该结构体起始地址的指针 // 用这种操作来获取给定 buf 所属的结构体指针,很巧妙 sh = (void*) (s-(sizeof(struct sdshdr))); // newlen 为分配后 len 的最小值 newlen = (len+addlen); // 根据新长度,为 s 分配新空间所需的大小 if (newlen < SDS_MAX_PREALLOC) // 如果新长度小于 SDS_MAX_PREALLOC // 那么为它分配两倍于所需长度的空间 newlen *= 2; else // 否则,分配长度为目前长度加上 SDS_MAX_PREALLOC newlen += SDS_MAX_PREALLOC; // T = O(N) newsh = zrealloc(sh, sizeof(struct sdshdr)+newlen+1); // 内存不足,分配失败,返回 if (newsh == NULL) return NULL; // 更新 sds 的空余长度 newsh->free = newlen - len; // 返回 sds return newsh->buf; }
空间惰性释放
当需要裁剪 sds 内容时,redis 不会立马释放掉当前不需要的空间,而是通过 len
与 free
字段来实现”逻辑“删除。
trim
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 sds sdstrim(sds s, const char *cset) { struct sdshdr *sh = (void*) (s-(sizeof(struct sdshdr))); char *start, *end, *sp, *ep; size_t len; sp = start = s; ep = end = s+sdslen(s)-1; // 双指针寻找左右界限,都是一直移动直到遇到不属于 cset 中的第一个字符 while(sp <= end && strchr(cset, *sp)) sp++; while(ep > start && strchr(cset, *ep)) ep--; len = (sp > ep) ? 0 : ((ep-sp)+1); if (sh->buf != sp) memmove(sh->buf, sp, len); sh->buf[len] = '\0'; sh->free = sh->free+(sh->len-len); sh->len = len; return s; }
clear
1 2 3 4 5 6 7 8 9 void sdsclear(sds s) { struct sdshdr *sh = (void*) (s-(sizeof(struct sdshdr))); sh->free += sh->len; sh->len = 0; sh->buf[0] = '\0'; }
adlist
adlist( abstract doubly linked list) 是redis的抽象双端链表,特点 双端,无环,有表头和表尾指针,节点支持多种类型数据。
adlist 作为基础数据结构,用于 Pub/Sub 订阅模型列表,等待队列,和作为基础数据结构。
数据结构实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 typedef struct listNode { // 前置节点 struct listNode *prev; // 后置节点 struct listNode *next; // 节点的值,注意类型是 void *,可以存储任意类型数据的指针 void *value; } listNode; typedef struct list { // 表头节点 listNode *head; // 表尾节点 listNode *tail; // 节点值复制函数 void *(*dup)(void *ptr); // 节点值释放函数 void (*free)(void *ptr); // 节点值对比函数 int (*match)(void *ptr, void *key); // 链表所包含的节点数量 unsigned long len; } list;
dict
dict,字典是redis的核心数据结构,用来快速查找元素,redis 采用链表来解决哈希冲突,哈希算法使用 MurmurHash2。
数据结构实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 typedef struct dictEntry { // 键 void *key; // 值 union { void *val; uint64_t u64; int64_t s64; } v; // 指向下个哈希表节点,形成链表 struct dictEntry *next; } dictEntry; /* * 字典类型特定函数 */ typedef struct dictType { // 计算哈希值的函数 unsigned int (*hashFunction)(const void *key); // 复制键的函数 void *(*keyDup)(void *privdata, const void *key); // 复制值的函数 void *(*valDup)(void *privdata, const void *obj); // 对比键的函数 int (*keyCompare)(void *privdata, const void *key1, const void *key2); // 销毁键的函数 void (*keyDestructor)(void *privdata, void *key); // 销毁值的函数 void (*valDestructor)(void *privdata, void *obj); } dictType; /* This is our hash table structure. Every dictionary has two of this as we * implement incremental rehashing, for the old to the new table. */ /* * 哈希表 * * 每个字典都使用两个哈希表,从而实现渐进式 rehash 。 */ typedef struct dictht { // 哈希表数组 dictEntry **table; // 哈希表大小 unsigned long size; // 哈希表大小掩码,用于计算索引值 // 总是等于 size - 1 unsigned long sizemask; // 该哈希表已有节点的数量 unsigned long used; } dictht; /* * 字典 * 定义一组操作(dictType)与数据 */ typedef struct dict { // 类型特定函数 dictType *type; // 私有数据 void *privdata; // 哈希表,注意有两个哈希表,用以 rehash dictht ht[2]; // rehash 索引 // 当 rehash 不在进行时,值为 -1 int rehashidx; /* rehashing not in progress if rehashidx == -1 */ // 目前正在运行的安全迭代器的数量 int iterators; /* number of iterators currently running */ } dict;
rehash
目的:为了维持dict中负载因子在一个合理的范围,以便不影响数据库性能,对dict的 hash table 大小进行调整,然后对现有的 k-v 对进行 rehash 到新的 hash table 上。
rehash 的时机 :
当前 负载因子 >=1 并且没有进行 AOF RDB 的阻塞操作
如果当前阻塞,负载因子 >= 5 也会强制进行rehash
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 static int _dictExpandIfNeeded(dict *d) { /* Incremental rehashing already in progress. Return. */ // 渐进式 rehash 已经在进行了,直接返回 if (dictIsRehashing(d)) return DICT_OK; /* If the hash table is empty expand it to the initial size. */ // 如果字典(的 0 号哈希表)为空,那么创建并返回初始化大小的 0 号哈希表 // T = O(1) if (d->ht[0].size == 0) return dictExpand(d, DICT_HT_INITIAL_SIZE); /* If we reached the 1:1 ratio, and we are allowed to resize the hash * table (global setting) or we should avoid it but the ratio between * elements/buckets is over the "safe" threshold, we resize doubling * the number of buckets. */ // 一下两个条件之一为真时,对字典进行扩展 // 1)字典已使用节点数和字典大小之间的比率接近 1:1 // 并且 dict_can_resize 为真 // 2)已使用节点数和字典大小之间的比率超过 dict_force_resize_ratio if (d->ht[0].used >= d->ht[0].size && (dict_can_resize || d->ht[0].used/d->ht[0].size > dict_force_resize_ratio)) { // 新哈希表的大小至少是目前已使用节点数的两倍 // T = O(N) return dictExpand(d, d->ht[0].used*2); } return DICT_OK; }
关于第二点 :
redis 在进行持久化操作时,使用 fork 子进程来实现,而大多数操作系统采用 COW 来优化子进程的内存使用,在子进程创建时,和父进程拥有相同的内存页,只有当其中一个进程尝试修改内存内容时,操作系统才去先 copy 这部分页面到需要修改的进程空间中,这样避免了在子进程创建时全量 copy 父进程的内存内容,避免了大量不必要的内存开销。所以对于 rehash ,当有 AOF RDB 持久化所创建的子进程时,为了避免一些不必要的 COW 引起的内存开销,rehash 的阈值要求会更严格一些;并且 redish 提供了 dictEnableResize() / dictDisableResize()
来主动控制 rehash 的进行,但需要注意的是当负载因子 >= 5 时,会强制 rehash.
rehash 过程:
分配新的 hash table -> ht[1], 且新 table 的大小满足:始终是 2 N 2^N 2 N ,如果是扩容,则是大于等于 ht[0] * 2 的的第一个 2 N 2^N 2 N , 若是收缩,则是大于等于 ht[0]/2 的第一个2 N 2^N 2 N .
1 2 3 4 5 6 7 8 9 10 11 static unsigned long _dictNextPower(unsigned long size) { unsigned long i = DICT_HT_INITIAL_SIZE; if (size >= LONG_MAX) return LONG_MAX; while(1) { if (i >= size) return i; i *= 2; } }
渐进式 rehash,逐步迁移数据,分而治之,通常 redis 中 dict 中 k-v 对会非常多,避免一次性迁移导致大量数据的读写会让服务暂停,所以把数据迁移操作分摊在对外部数据的操作之中。每次对 dict 进行增删改查时,除了完成特定的操作外,还会顺带将 rehashidx
所指示的 hash 槽中的所有节点迁移到 ht[1] 中,更新 rehashidx
. 需要注意的是,在整个 rehash 过程中,仍然响应受客户端请求,对于查询,则会依次查找 ht[0], ht[1]. 更新与删除类似。对于新增,则只会在 ht[1] 中操作。
到某一个时间点 rehashidx == ht[0].size
,迁移完成,将 rehashidx
设置为 -1,释放 ht[0]的空间,将ht[1] 设置为 ht[0].
rehash 并不是一个由另外的线程或者进程进行的子任务,而是被一个标志位指示:rehashidx != -1
,并且不是一个连续不被打断的操作,而是分散于对 dict 的数据操作之中,保证了服务的可用性。
redis 的主要数据工作模式是基于事件驱动的单线程模型,Linux 上通过 IO多路复用(epoll) 来接受客户端请求。单线程的数据工作模式避免了多线程下的数据竞争,上下文切换和因锁机制导致的性能损失。但后来的版本引入了多线程来使得 网络IO 方面的 处理请求与返回响应被 独立的线程处理,优化了网络 IO 的性能瓶颈。
参考: