0%

quicklist实现

List属于单值多value,链表用的是双向链表结构,list支持pop、push来操作头部和尾部,既可以用做栈也可以用做队列

在3.2版本之前使用的是ziplist和linkedlist,在3.2版本之后使用的是quicklist

quicklist结构

由于linkedlist的附加空间相对太高,prev和next指针就要占去16个字节,而且每个节点的内存都是单独分配,加剧了内存的碎片化,使用quicklist来代替之前的ziplist+linkedlist

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
typedef struct quicklist {
quicklistNode *head;
quicklistNode *tail;
// 元素总数量
unsigned long count; /* total count of all entries in all ziplists */
// 元素总字节
unsigned long len; /* number of quicklistNodes */
int fill : QL_FILL_BITS; /* fill factor for individual nodes */
unsigned int compress : QL_COMP_BITS; /* depth of end nodes not to compress;0=off */
unsigned int bookmark_count: QL_BM_BITS;
quicklistBookmark bookmarks[];
} quicklist;

typedef struct quicklistNode {
struct quicklistNode *prev;
struct quicklistNode *next;
unsigned char *zl; // 对应的是ziplist
// ziplist的字节总数
unsigned int sz; /* ziplist size in bytes */
// ziplist中的元素数量
unsigned int count : 16; /* count of items in ziplist */
unsigned int encoding : 2; /* RAW==1 or LZF==2 */
unsigned int container : 2; /* NONE==1 or ZIPLIST==2 */
unsigned int recompress : 1; /* was this node previous compressed? */
unsigned int attempted_compress : 1; /* node can't compress; too small */
unsigned int extra : 10; /* more bits to steal for future usage */
} quicklistNode;

看代码好像底层还是使用的ziplist,而且还有前驱和后继指针,就是一个ziplist+linkedlist混合体,把多个ziplist使用双向指针串联起来了。

默认单个ziplist长度为8字节,使用 list-max-ziplist-size来进行配置

1
2
3
4
5
6
7
8
9
10
# For a fixed maximum size, use -5 through -1, meaning:
# -5: max size: 64 Kb <-- not recommended for normal workloads
# -4: max size: 32 Kb <-- not recommended
# -3: max size: 16 Kb <-- probably not recommended
# -2: max size: 8 Kb <-- good
# -1: max size: 4 Kb <-- good
# Positive numbers mean store up to _exactly_ that number of elements
# per list node.
# The highest performing option is usually -2 (8 Kb size) or -1 (4 Kb size),
# but if your use case is unique, adjust the settings as necessary.

IO模型

linux中有几种IO模型,如select、poll、epoll,这几个分别是什么呢?

select模型

在select模型下是利用轮询socket句柄的方式来实现监测socket中是否有IO数据到达,每次调用select,都需要把fd集合从用户态拷贝到内核态,然后在内核态还要遍历一遍传进来的所有fd,在fd很多时开销很大

select默认支持的文件描述符是1024

poll模型

poll模型与select类似,不过其是基于链表存储的,没有最大连接数限制,但是跟select一样,需要把fd集合在用户态和内核态之间来回复制

epoll模型

epoll改进了轮询socket句柄的方式,采用notify机制来进行监测,为每个fd指定一个回调函数,当设备就绪,唤醒等待队列上的等待者,就会调用这个回调函数

排查系统缓慢

排查系统很慢的时候,先观察IO等待时间,如果IO等待时间很低,可以查看CPU空闲时间百分比;如果IO等待时间很高,需要确定是什么原因导致的IO等待时间占比这么高;如果IO等待时间很低,而且CPU空闲时间很高,那就不是CPU资源的问题,需要从其他地方找原因。

先使用uptime命令

1
2
uptime
14:22:15 up 226 days, 15:08, 2 users, load average: 0.38, 0.19, 0.15

根据1分钟、5分钟、15分钟的平均负载来确定问题所处时间。

如果处于高负载状态,使用top命令来观察是哪些进程在消耗CPU

使用iostat命令可以查看是哪个分区正在执行大量的IO操作

有时候我们去查找的时候已经不是第一现场了,看实时的数据肯定是不行了。可以使用sar命令来查看历史数据

1
2
3
4
5
6
7
8
9
10
# 查看当天的CPU统计信息
sar
#查看当天的内存信息
sar -r
#查看当天的磁盘信息
sar -b

# 查看某时间段内的信息
# -s 开始时间 -e 结束时间
sar -s 18:00:00 -e 18:30:00

redis实现限流

我们可以使用redis来实现一个简单的滑动窗口限流,滑动窗口的话我们可以使用zset的score来进行实现。value需要保证唯一性,暂且使用时间戳。

通过统计该窗口内的行为数量和限制的最大数量maxCount进行比较就可以得出当前的请求是否允许

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public class RedisRateLimiter {
@Autowired
private StringRedisTemplate stringRedisTemplate;

@Test
public void test() {
for (int i = 0; i < 10; i++) {
System.out.println(isActionAllow("user/list", "127.0.0.1", 1, 5));
try {
Thread.sleep(150);
} catch (InterruptedException e) {
e.printStackTrace();
}

}

}

/**
* 使用zset实现简单的滑动窗口限流
*
* @param uri
* @param ip
* @param period 时间窗口 单位秒
* @param maxCount 最大允许数量
* @return
*/
public boolean isActionAllow(String uri, String ip, int period, int maxCount) {
// key为 uri和ip
String key = String.format("hist:%s:%s", uri, ip);
long cur = System.currentTimeMillis();
List<Object> pipelined = stringRedisTemplate.executePipelined(
new RedisCallback<Long>() {

@Override
public Long doInRedis(RedisConnection connection) throws DataAccessException {
// 记录行为
connection.zAdd(key.getBytes(), cur, String.valueOf(cur).getBytes());
// 移除时间窗口之前的行为记录
connection.zRemRangeByScore(key.getBytes(), 0, cur - period * 1000);
// 获取窗口内的行为数量
Long count = connection.zCard(key.getBytes());
// 清理冷数据,防止冷数据持续占用内存
connection.expire(key.getBytes(), period + 1);
return count;
}
}
);

// System.out.println(pipelined);
Object o = pipelined.get(2);
// System.out.println(o);
return Long.parseLong(String.valueOf(o)) <= maxCount;
}
}

如果时间窗口内允许的数量较大,会消耗大量的内存。则不适合该方式

布隆过滤器

redis原生不自带布隆过滤器,需要自己去编译该插件进行安装

安装

从github进行下载https://github.com/RedisBloom/RedisBloom/tags,进入下载目录进行编译make,生成redisbloom.so文件

修改redis.conf加载插件 loadmodule /usr/local/myself/redis/module/RedisBloom-2.2.18/redisbloom.so,重启redis

基本命令

  • bf.add 添加元素

    1
    2
    3
    4
    127.0.0.1:6379> bf.add art user1
    (integer) 1
    127.0.0.1:6379> bf.add art user2
    (integer) 1
  • bf.exists 查询元素是否存在

    1
    2
    3
    4
    127.0.0.1:6379> BF.EXISTS art user3
    (integer) 0
    127.0.0.1:6379> BF.EXISTS art user1
    (integer) 1
阅读全文 »