耿腾的博客

常期望安定,还期望即兴。

0%

目标

从今天开始,学习零知识证明相关知识,暂定 3 个月(到2024年7月26日完成)的学习目标为:

  1. 完成零知识证明常见算法、实现的学习,至少能够知道它们解决的是什么问题,分别适用于什么业务场景。
  2. 学会并熟练掌握使用至少一个流行的 Rust 零知识证明库。
  3. 参与至少一个零知识证明的(开源?)项目,需要提交有质量的代码。

初步计划

V20240426

  1. 第一个月: World of Z2O-K7E 的学习;产出为“能够在朋友、同事甚至是小学生(如果我有幸能认识一个愿意听我讲的)面前把学到的哲学、原理、应用讲出来,讲清楚”。
  2. 第二个月: 完成 World of Z2O-K7E 的学习并开始学习 Rust 相关库,写 demo;学习产出参考上一个月,写 demo 的产出为能够全面反映该 Rust 库的各种功能的一个 repo。
  3. 第三个月: 继续写 demo,看相关开源项目源码,参与项目;产出物为 项目地址 / commit / PR 记录等等。

介于本人在该领域是完全的“零知识”,以上仅为初步计划,如果有任何修改(即使是放弃)都需要更新本文。

定义

  • $S$、$R$ 为关系模式,$s$、$r$ 为关系实例。
  • 要计算关系除法,需要满足 $S ⊆ R$。
  • 给定一个元组 $t$,令 $t[S]$ 代表元组 $t$ 在 $S$ 中属性上的投影;那么,$r ÷ s$ 是 $R-S$ 模式的一个关系。
  • 元组 $t$ 在 $r ÷ s$ 中的充要条件是满足以下两个
    1. $t$ 在 $Π_{R-S}(r)$ 中
    2. 对于 $s$ 中的每个元组 $t_s$,在 $r$ 中存在一个元组 $t_r$ 且满足:
      • $t_r[S] = t_s[S]$
      • $t_r[R-S] = t$
  • 例子:

$r$:

A1 A2
a e
a f
b e
b g
c e
c f
d f

$s$:

A2
e
f

$r ÷ s$:

A1
a
c

公式

关系代数除法可以使用其它关系代数运算替代:

用上一节的例子就是:

A1
a
b
c
d

A1 A2
a e
a f
b e
b f
c e
c f
d e
d f

公式中是为了计算差($-$)的时候两个操作数模式相同,对 $r$ 做了一下模式的颠倒,其实还是 $r$。

A1 A2
a e
a f
b e
b g
c e
c f
d f

A1 A2
b f
d e

A1
b
d

A1
a
c

想写这样一篇总结也是因为在互联网上看了太多乱七八糟的说法,往往让读者越看越糊涂。这些说法往往给我这样的感觉:

  1. 没有先讲清楚在形容什么:同步、异步、阻塞、非阻塞通常作为形容词使用,无论是用在日常语境,还是用在编程领域内部,如果你不知道这些形容词是放在什么名词之前就开始讲什么意思,甚至一段话中掺杂多个名词去形容而不说明,那就会给人不知所云的感觉。
  2. 概念与实现混为一谈:同步阻塞、异步非阻塞是非常常见的组合方式,所以在讲同步、异步的概念时,它们忍不住拿这些具体实现来讲概念,也就顶多能让读者理解这些固定搭配,难以活学活用(其实能不能做到前者都存疑,甚至作者自己真的懂吗?)。
  3. 不恰当的比喻:比喻的使用是为了帮助表达,本体和喻体之间当然是不同的事物(否则就不是比喻了),但是讲了个生动的比喻,却没有把本体与喻体之间的对应关系讲清楚,或者没有讲清楚这个比喻的边界,给话题引入了不必要的复杂性,起到了相反的作用。

所以,我打算好好聊一下我对这个话题的理解,尽可能不要犯我认为别人犯过的这些错误,欢迎读者监督。

同步与异步

同步这个词在日常语境下与计算机是有着一些不同的。日常语境下,同步经常用在同时进行的事情上,比如“多个平台同步直播卖货”,“推动新时代四化同步发展”,当然它们之间是经过协调一致的,不是无组织无纪律各行其是的。

而在编程领域,“同步”和“异步”通常用来是形容“任务”、“函数”等可执行对象的(以下统一称作“任务”);当然也有用来形容“API”的,此时它们表示的是一种具体实现的风格,即“同步风格的API”、“异步风格的API”,它跟前者有关系也有区别。

那它们究竟是什么意思呢?

无论是同步还是异步,都一定是用来形容多个任务的,我们可以说这两个任务或者这三个函数是同步或者异步执行的,但是一个任务不存在同步或者异步。同步指的是多个任务一个一个执行,不存在时间重叠的情况,异步指的是多个任务不一个一个执行,可能存在时间重叠的情况。

有的说法硬要在讲清楚这个概念之前,先把这个任务假设成一个异步函数,再把它分成调用和获取结果两个步骤,开始谈什么消息,打比方讲什么先打了个电话订餐,又接了个电话通知,然后告诉你这叫异步,这是很容易造成误解的。即便要拆成俩,单个任务的调用和获取结果也一定是一前一后,不可能先获取结果再调用,也不可能同时进行,这两个步骤是同步的,不以具体实现为转移,它们只是可能跟另一个任务是异步的,拆成俩徒增复杂度,没有任何必要。所以干脆不要拆,也不要聊具体实现,我们说一个任务,它就是一个任务,就是执行一段时间拿到结果,不要拆开。比如“吃外卖”,就等于“下单-收到电话-取餐-吃”这整个过程,反正经过了一段时间,不要在意内部细节

那么,同步和异步应该怎么使用呢,比如;

  1. 我先吃了个外卖,然后打了一会游戏,吃外卖和打游戏这两个任务就是同步的。
  2. 我先吃了个外卖,然后打了一会游戏,又写了会代码,这三个任务也是同步的。
  3. 在 2. 的基础上,我在吃外卖的过程中还听了一段时间的音乐,那么听音乐和吃外卖是异步的(不要管我什么时候开的音乐,什么时候关的音乐)。
  4. 在 2. 的基础上,打游戏期间都是保证把音乐关掉了的,但是吃外卖和写代码期间都听了一会音乐,那么听音乐和玩游戏是同步的,和吃外卖、写代码是异步的。

是不是很简单,只要按照那个定义去套就行了。但是有一点要注意,4. 里面的 “打游戏期间都是保证把音乐关掉了” 的 “保证” 二字是很关键的,它确保了听音乐和玩游戏之间的 “一个一个执行” 的条件,这需要我在玩游戏之前确保音乐是关掉的,又保证玩游戏期间没有突然去把音乐打开,如果你只是凭运气,只是偶然造成了 “玩游戏期间音乐是关掉的” 的结果,那么它仍然是异步的,因为异步的定义里说的是“可能存在时间重叠”,你没有确保完全排除这个可能,它们就是异步的。

我们再回到日常语境下的“同步”,会发现它跟编程的“同步”是相通的。比如“多个平台同步直播卖货”,虽然整体上各平台是同时在直播一个流媒体源,看上去更符合异步的定义,但其实把“直播”拆分成“录制”、“拉流”、“转场”、“互动”、“上链接”等任务时,会发现它们之间还是存在一些比较严格的先后顺序的,比如你不可能在媒体源录制之前拉流,也不可能在某个重要互动之前就享受某些优惠。也就是说,日常语境下的同步,更多的是描述两件相关的大任务,它们的子任务只要存在一定程度的“保证”就可以说“同步”,并没有编程领域这么严格。

阻塞与非阻塞

这确实是一对很难脱离具体实现讲的概念。

不过相比同步与异步,阻塞与非阻塞更简单,是用来形容单个可执行对象(以下仍统一称作“任务”)的执行状态的。

可能被形容的有线程、协程、某一段业务逻辑等等:

  1. 我在某线程用 BIO 接口读一个文件,造成该线程无法继续运行,这就是线程阻塞了
  2. 我在某协程内使用该语言/框架提供的 sleep 方法,造成协程无法继续运行,这就是协程阻塞了(所在线程并没有阻塞)
  3. 我写了一段代码,反正调用 IO 或者 sleep 了,我也不知道运行在哪个线程上,哪个协程上,或者只是一个异步的 setTimeout, 反正就是在等,这就是业务逻辑阻塞了(可能线程和协程都没有被阻塞)

但其实这样说有点扩大概念外延的意思,99.9% 的情况下,我们说阻塞指的都是线程。而非阻塞,通常指的也是某个任务不会造成线程阻塞。

为什么线程阻塞是最常讨论的话题呢?因为:

  1. 线程的调度和空间占用,都是比较消耗系统资源的。一个 I/O 密集型业务(比如最常见的 Web 服务),如果会因为 I/O 阻塞线程,那么为了支持多个并发同时进行,就需要为每个独立的并发单独开一个线程。在并发较高时,就需要开大量的线程,占用大量 CPU 和内存资源,所以我们有了 C10K 问题。
  2. 这是一个已经被用各种方式花式解决了的问题了。Linux 上的 epoll、Windows 上的 IOCP、MacOS 上的 kqueue(先不要管它们是异步/同步、阻塞/非阻塞,总之 —>),都提供了可以让调用者不必开一堆线程而只需要复用固定数量的线程即可支撑较高并发的接口。不管是 Nodejs,还是 Vert.X 的异步回调,Go 的有栈协程,Rust 的无栈协程,甭管它们封装成异步风格还是同步风格的API,是甜还是咸,都是建立在前者基础上的。简单说,目的只有一个:不阻塞调用者的线程,或者说线程复用

业务要你等3秒,网络数据包要等几百毫秒才能到,硬盘读写要有时间,这些都会造成业务逻辑的阻塞,如果使用 BIO 接口,则业务逻辑的阻塞也会导致线程的阻塞。而使用了上述的框架/语言,让你不用在这些时候阻塞自己的线程,被阻塞的就只有当前业务逻辑,线程可以去跑别的业务逻辑,实现了线程复用。

组合起来

现在,单讲同步、异步、阻塞、非阻塞,应该没什么问题了,我们再聊聊那些常见的搭配。

我们经常听说同步阻塞、异步非阻塞这种搭配方式,而如果按照前文的定义,这种搭配方式是很奇怪的——同步和异步是用来形容多个任务的,阻塞和非阻塞时是说单个任务的,这样组合在一起是在干嘛?

一般来讲,它们是用来形容某种 API 的风格和实现的。

同步和异步是表示这种 API 的接口风格。比如常见的 Linux 的 BIO 接口、Go 的 IO 接口、Rust 里的 async 函数,这些都是同步风格的接口,先后调用这样的两个接口,它们默认是同步执行的,你要异步执行,需要另外开线程或者协程;Node.js、Vert.X 的绝大部分接口,都是异步风格的接口,先后调用这样的两个接口,它们默认是异步执行的,你要同步执行,需要把第二个接口写到前一个接口的回调函数里。

阻塞和非阻塞是表示这种 API 需要调用者配合使用的线程模型。比如 Linux 的 BIO 接口会阻塞调用者的线程,它就是阻塞的,而 Go 的 IO 接口、Rust 的 async 函数、Node.js 和 Vert.X 里的异步接口,都不会阻塞调用者的线程,它们就是非阻塞的。

在没有协程的年代,同步风格的 BIO 接口就是会导致线程阻塞,它意味着心智负担小,开发方便,但是吃资源;而不会阻塞线程的 NIO/AIO 接口往往是异步风格(或者封装为异步风格),代码写起来心智负担重,性能好,比如 Node.js 和 Vert.X。所以经常有人拿同步代指阻塞,拿异步代指非阻塞,它们成为了同义词两两绑定在一起,即“同步阻塞”和“异步非阻塞”。

而 Go 、Rust 等语言提供的协程,相当于是对“异步非阻塞”的一种高级封装,可以像写同步代码那样写不阻塞线程的代码,让你即拥有高性能,又没有很高的心智负担。但是也很少见他们讨论自己是同步还是异步,阻塞还是非阻塞,因为风格上确实是同步的,封装的实际上是异步常用的非阻塞接口,确实不会阻塞线程,但是协程是可能被阻塞。所以不要套概念,理解原理,理解同步具体指的是谁和谁,异步具体指的是谁和谁,阻塞和非阻塞指的具体是谁,搞清楚对象,套定义就可以了。

总结

在写这篇文章的时候,我并没有查询很多资料,但自认为这算是一个可以简化问题,帮助理解的模型。

希望能给读者一些启发,也欢迎批评指正。

使用数组结构表示堆结构,元素从索引 0 开始,则索引 i 位置:

  • 父节点索引为 $(i - 1) / 2$
  • 左孩子索引为 $2 \times i + 1$
  • 右孩子索引为 $2 \times i + 2$

实现

  • Java 实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
public class Heap {

public static void main(String[] args) {
Heap heap = new Heap(1000);
heap.push(-6);
heap.push(3);
heap.push(-1);
heap.push(0);
heap.push(9);
heap.push(-4);
heap.push(5);
heap.push(7);
heap.push(-2);
heap.push(9);
heap.push(13);

System.out.println(heap);

while (!heap.isEmpty()) {
System.out.println(heap.pop());
System.out.println(heap);
}
}

private final int[] data;
private final int limit;
private int heapSize;

public Heap(int limit) {
this.limit = limit;
this.data = new int[limit];
this.heapSize = 0;
}

public void push(int value) {
if (heapSize >= limit) {
throw new RuntimeException("heapSize must not exceed limit");
}

this.data[heapSize++] = value;
heapInsert(heapSize - 1);
}

public boolean isEmpty() {
return heapSize == 0;
}

public Integer pop() {
if (isEmpty()) {
return null;
}

swap(--heapSize, 0);
int result = this.data[heapSize];
this.data[heapSize] = 0;
heapify(0);
return result;
}

private void heapInsert(int index) {
while (index > 0 && this.data[index] > this.data[(index - 1) / 2]) {
swap(index, (index - 1) / 2);
index = (index - 1) / 2;
}
}

private void swap(int i, int j) {
if (i != j) {
this.data[i] = this.data[i] ^ this.data[j];
this.data[j] = this.data[i] ^ this.data[j];
this.data[i] = this.data[i] ^ this.data[j];
}
}

private void heapify(int index) {
int leftIndex = 2 * index + 1;

while (leftIndex < heapSize) {
int rightIndex = leftIndex + 1;
int greatestIndex = rightIndex < heapSize && this.data[rightIndex] > this.data[leftIndex] ? rightIndex : leftIndex;

if (this.data[greatestIndex] <= this.data[index]) {
break;
}

swap(index, greatestIndex);
index = greatestIndex;
leftIndex = 2 * index + 1;
}
}

@Override
public String toString() {
StringBuilder b = new StringBuilder();
b.append('[');
for (int i = 0; i<heapSize; i++) {
b.append(data[i]);
if (i != heapSize - 1)
b.append(", ");
}
b.append(']');

return b.toString();
}
}

输出内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
[13, 9, 5, 3, 9, -4, -1, -6, -2, 0, 7]
13
[9, 9, 5, 3, 7, -4, -1, -6, -2, 0]
9
[9, 7, 5, 3, 0, -4, -1, -6, -2]
9
[7, 3, 5, -2, 0, -4, -1, -6]
7
[5, 3, -1, -2, 0, -4, -6]
5
[3, 0, -1, -2, -6, -4]
3
[0, -2, -1, -4, -6]
0
[-1, -2, -6, -4]
-1
[-2, -4, -6]
-2
[-4, -6]
-4
[-6]
-6
[]
  • Rust 实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
pub struct Heap<T>(Vec<T>);

impl<T: Ord> Heap<T> {
pub fn new() -> Self {
Self(Vec::new())
}

pub fn push(&mut self, value: T) {
self.0.push(value);
self.heap_insert();
}

pub fn pop(&mut self) -> Option<T> {
if self.0.is_empty() {
return None;
}

let last = self.0.len() - 1;
self.0.swap(0, last);
let result = self.0.pop();
self.heapify();
result
}

fn heapify(&mut self) {
let len = self.0.len();

let mut index = 0;
let mut left = index * 2 + 1;

while left < len {
let right = left + 1;
let greatest = if right < len && self.0[left] < self.0[right] {
right
} else {
left
};

if self.0[greatest] <= self.0[index] {
break;
}

self.0.swap(index, greatest);
index = greatest;
left = index * 2 + 1;
}
}

fn heap_insert(&mut self) {
if self.0.is_empty() {
return;
}

let mut index = self.0.len() - 1;
if index == 0 {
return;
}
let mut parent = (index - 1) / 2;

while self.0[parent] < self.0[index] {
self.0.swap(index, parent);
index = parent;
if index == 0 {
break;
}
parent = (index - 1) / 2;
}
}
}

impl<T: Ord> Iterator for Heap<T> {
type Item = T;

fn next(&mut self) -> Option<Self::Item> {
self.pop()
}
}

#[test]
fn test_heap() {
for _ in 0..1000 {
let array: [u8; 32] = rand::random();
let sorted = {
let mut s = array.clone();
s.sort_by(|a, b| b.cmp(a));
s
};

let mut heap = Heap::new();

for a in array {
heap.push(a);
}

let heap_sorted: Vec<u8> = heap.collect();

assert_eq!(heap_sorted, sorted);
}
}

1. 基本架构

一个键值数据库需要有哪些功能/结构/特性。

  • 支持多种数据结构
  • PUT、GET、DELETE、SCAN
  • 设置过期
  • 索引结构
  • 对外接口,Socket、动态链接库等等
  • 存储方式,内存、硬盘(持久化)
  • 高可用
  • 横向扩展
  • 功能可扩展性

2. 数据结构

Redis 接收到一个键值对操作后,能以 微秒 级别的速度找到数据,并快速完成操作。

Redis 的值有如下数据类型:

  • String(字符串)
  • List(列表)
  • Hash(哈希)
  • Set(集合)
  • Sorted Set(有序集合)
数据类型 数据结构
String 简单动态字符串
List 双向链表/压缩列表
Hash 压缩列表/哈希表
Set 整数数组/哈希表
Sorted Set 压缩列表/跳表

Redis 使用一个全局的哈希表来保存所有键值对,可以在 $O(1)$ 的时间内查找到键值对。

一个哈希表就是一个数组,数组元素为哈希桶,每个哈希桶存放一个链表,连接着哈希值映射到该桶中的多个键值对。

1
2
3
4
5
|0|1|2|3|4|5|
| |
| ---> entry4
|
---> entry1 ---> entry2 ---> entry3

哈希冲突及rehash

当哈希冲突过多时,链表过长,导致查找效率下降,Redis会对哈希表进行rehash。

为了 使 rehash 更高效,Redis 使用了两个全局哈希表:哈希表 1 和 哈希表 2 。默认情况下使用哈希表 1 ,哈希表 2 没有分配空间,当键值对增多时,Redis开始进行rehash:

  1. 给哈希表 2 分配更大的空间,例如是当前哈希表 1 大小的两倍;
  2. 把哈希表 1 中的数据重新映射并拷贝到哈希表 2 中;
  3. 释放哈希表 1 的空间。

然后使用哈希表 2 保存数据,下次rehash再使用哈希表 1 。

避免一次性拷贝大量数据造成 Redis 线程阻塞,Redis采用了渐进式rehash。

渐进式rehash

Redis可以保持处理用户请求,每处理一个请求,就顺带将哈希表 1 中的一个桶的所有entries拷贝到哈希表 2 中。这样就能把一次性大量拷贝的开销,分摊到了多次请求中,避免了耗时可能造成的阻塞。

数据结构的操作效率

整数数组双向链表 都是顺序读写,操作复杂度都是 $O(N)$。

压缩列表

压缩列表表头有 列表长度(zlbytes)列表尾偏移量(zltail)列表中entry个数(zllen) 三个字段,表尾有一个 zlend 字段表示 列表结束

1
| zlbytes | zltail | zllen | entry1 | entry2 | ... | entryN | zlend |

查找表头、表尾元素的复杂度为 $O(1)$,查找其他元素的复杂度为 $O(N)$。

跳表

跳表 是在 有序链表 的基础上增加了多级索引,通过索引位置的几次跳转,实现数据快速定位。

1
2
3
4
5
1 ----------------------> 27 -----------------------> 100 // 二级索引
↓ ↓ ↓
1 --------> 11 ---------> 27 ---------> 50 ---------> 100 // 一级索引
↓ ↓ ↓ ↓ ↓
1 --> 5 --> 11 --> 20 --> 27 --> 33 --> 50 --> 62 --> 100 --> 156 // 链表

跳表 的查找复杂度为 $O(logN)$。

时间复杂度

数据结构的时间复杂度

数据结构 时间复杂度
哈希表 $O(1)$
跳表 $O(logN)$
双向链表 $O(N)$
压缩列表 $O(N)$
整数数组 $O(N)$

不同操作的时间复杂度

  1. 单元数操作是基础。每一种集合类型对单个数据实现的增删改查操作,复杂度由数据结构决定。
  2. 范围操作很耗时。返回一个范围内的数据,复杂度一般是 $O(N)$,应尽量避免,可以使用 2.8 版本之后的 HSCANSSCANZSCAN 等命令进行渐进式遍历。
  3. 统计操作通常高效。集合类型对集合中的元素个数由记录,例如 LLENSCARD
  4. 例外情况只有几个。压缩列表和双向列表的会记录表头和表尾的偏移量,对它们的操作复杂度只有 $O(1)$,例如 LPOPRPOPLPUSHRPUSH

问题

整数数组和压缩列表在查找时间复杂度方面并没有很大优势,为什么Redis还会把它作为底层数据结构?

主要有两方面原因:

  1. 内存利用率。Redis 是内存数据库,大量数据存储在内存中,整数数组和压缩列表结构比较紧凑,相比链表结构占用内存更少,可以提高内存利用率。
  2. 数组对CPU高速缓存支持更友好。集合元素较少时,使用内存紧凑的排列方式,可以利用CPU高速缓存,尤其在一个缓存行内(64字节)有更高效的访问效率。当元素数量超过阈值后,为避免复杂度太高,可以转为哈希表和跳表。

3. 高性能 IO 模型

Redis 的网络 IO 和键值对读写是由单个线程完成的;其他功能,例如持久化、异步删除、集群数据同步,是由额外的线程执行的。

Redis使用 IO多路复用,即一个线程处理多个 IO 流。

可能存在的瓶颈(来自Kaito):

  1. 单个请求耗时会导致后续所有请求阻塞等待,包括以下几种:
    • 操作bigkey,分配、释放内存(4.0推出了lazy-free,可以异步执行bigkey释放)
    • 使用复杂度较高的命令,排序、union、查询全量数据
    • 大量key集中过期
    • 淘汰策略,内存超过Redis内存上线,每次写入都要淘汰一些key,导致耗时变长
    • AOF 开启 always
    • 主从同步生成 RDB 时的 fork
  2. 并发量大时,单线程处理客户端请求,无法利用多核(6.0推出了多线程,可利用多核CPU处理用户请求)

4. AOF(Append Only File)

Redis 的 AOF 是写后日志,先执行命令,把数据写入内存,然后再记录日志。

不会阻塞当前写操作,执行完命令还没来得及记日志就宕机的话,会有数据丢失的风险。

1
2
3
4
5
6
7
*3 --> 这个命令有三个部分
$3 --> 第一部分的长度
set --> 第一部分的内容
$5
hello
$5
world

三种写回策略

  • Always,同步写回,每个命令执行完立马写回磁盘
  • EverySec,每秒写回,写到AOF内存缓存,每秒写回一次
  • No,写到AOF内存缓存,由操作系统决定何时将其写回

日志重写

当日志文件过大时,可以对日志进行重写。可以把某些多条修改同一个键值对的命令合并为一条。

重写流程:

  1. 主线程 fork 出 bgrewriteaof 子进程,主线程在将新指令写入 AOF 缓存时,还会写入 AOF 重写缓存;
  2. 子进程将 fork 拷贝出来的内存快照写成命令日志;
  3. 写完后将AOF重写缓存中的日志写到这个新的日志中,然后就可以用这个新日志替换旧日志了。

其他要点

  1. fork 使用的是操作系统的写时复制(Copy On Write)机制,并不会直接拷贝所有内存。但是会拷贝父进程的的内存页表,如果页表较大,可能导致阻塞。
  2. 主进程在提供服务时,如果操作的是bigkey,写时复制会导致内存分配;如果开启了内存大页(huge page)机制,内存分配会产生阻塞。(所以使用Redis通常要关闭内存大页,其对使用 fork 的程序不友好)
  3. AOF 重写可以使用 bgrewriteaof 命令手动执行,也由两个配置项控制自动触发:
    • auto-aof-rewrite-min-size:表示运行AOF重写的最小大小,默认为 64MB;
    • auto-aof-rewrite-percentage:当前AOF文件大小和上一次重写后AOF文件大小的差值,除以上一次重写AOF文件大小,即增量和上一次全量的比值,默认为 100。
      AOF 文件大小同时满足这两个配置项,会自动触发 AOF 重写。

5. 内存快照 RDB

把某一时刻的内存数据以文件的形式写到磁盘上,即快照。这个快照文件即 RDB 文件(Redis DataBase 的缩写)。

在数据恢复时,可直接把 RDB 文件读入内存,很快完成恢复。

Redis 执行的是全量快照。

可以使用 savebgsave 来生成 RDB 文件;save在主线程中执行,会阻塞;bgsave 会创建子进程专门进行 RDB 文件写入,不会阻塞。

快照进行时数据如何修改

使用 fork 创建子进程,该子进程会跟主线程共享所有内存数据;利用操作系统的写时复制技术,当主进程修改数据时,会重新分配空间生成该数据的副本,并使用该副本提供服务,bgsave 子进程仍然使用未修改的数据进行快照写入。

快照频率

频繁执行全量快照有两方面开销:

  1. 给磁盘带来压力
  2. fork 本身可能阻塞主线程,如果频繁 fork 会导致主线程阻塞所以,如果有一个 bgsave 在运行,就不会启动第二个了

4.0 中提出了混合使用AOF和RDB的方法,即快照以一定的频率执行,在快照间以AOF的形式记录这期间的命令。实现:

  1. 避免了频繁 fork 对主线程的影响
  2. 减少了 AOF 文件大小,提高了数据恢复效率

在配置文件中设置:

1
aof-use-rdb-preamble yes

三点建议

  1. 数据不能丢失时,使用 AOF 和 RDB 混合方案
  2. 允许分钟级别的数据丢失,使用 RDB
  3. 只用 AOF 的话,优先使用 everysec,在可靠性和性能之间比较平衡

课后题:资源风险(Kaito)

  1. 考虑 fork 导致多大程度的写时复制,是否会造成 OOM,或者 swap 到磁盘上导致性能降低;
  2. CPU 核数较少时,Redis的其他子进程会跟主线程竞争CPU,导致性能下降;
  3. 如果 Redis 进程绑定了 CPU,子进程会继承它的 CPU 亲和属性,与父进程争夺同一个 CPU 资源,导致效率下降。所以如果开启RDB和AOF,一定不要绑定CPU。

6. 主从同步

可靠性分两方面:

  1. 数据少丢失(由RDB、AOF保证);
  2. 服务少中断(通过增加副本冗余量保证)

读写分离:主库、从库都可以执行读操作,主库限制性写操作,然后同步给从库。

如果都允许写,会造成各实例副本不一致,需要涉及加锁、协商等各种操作,产生巨额开销。

主从第一次同步

在从库实例上执行:

1
replicaof [主库ip] [主库port]

三个阶段

  1. 主从建立链接、协商同步,从库给主库发送 psync 命令,表示要进行数据同步,主库根据这个命令的参数来启动复制。psync 命令包含了主库的 runID 和复制进度 offset 两个参数。主库收到 psync 命令后,用 FULLRESYNC 响应命令带上两个参数,主库的 runID 和主库目前的复制进度 offset 给从库,从库会记住这两个参数。

runID:每个 Redis 实例启动时自动生成的一个随机 ID,唯一标识一个实例。第一次复制时从库不知道主库的 runID,使用 ?
offset:第一次复制设置为 -1

  1. 主库执行 bgsave 生成 RDB 文件,并发送给从库。从库收到 RDB 文件后,清空数据库并加载 RDB 文件。此时主库不会停止服务,但在内存中会有专门的 replication buffer,记录 RDB 文件生成后收到的所有写操作。
  2. 主库把 replication buffer 中的修改操作发送给从库执行。

主从级联

使用“主-从-从”模式分担主库压力。二级从库可以选择一个内存较高的从库进行同步。

主从间使用基于长连接的命令传播。

网络断开

2.8 前网络闪断会造成重新全量复制,2.8 后可以增量复制,实现继续同步。

(Kaito)主库除了将所有写命令传给从库之外,也会在 repl_backlog_buffer 中记录一份。当从库断线重连后,会发送 psync $master_runid $offset,主库就能通过 $offsetrepl_backlog_buffer 中找到从库断开的位置,只发送 $offset 之后的增量给从库。

repl_backlog_buffer:为了解决从库断开后找到主从差异而设计的环形缓冲区。配置大一点,可以减少全量同步的频率。

replication buffer:客户端、从库与 Redis 通信时,Redis 都会分配一个内存 buffer 用于交互,把数据写进去并通过 socket 发送到对端。当对端为从库时,该 buffer 只负责传播写命令,通常叫做 replication buffer。这个 buffer 由 client-output-buffer-limit 参数控制,如果过小或者从库处理过慢,Redis 会强制断开这个链接。

为啥不用 AOF 同步

  1. 相比 RDB 文件更大,效率低;
  2. 必须打开 AOF,数据丢失不敏感业务没必要开启

7. 哨兵


8. 哨兵集群


9. 切片集群


装饰器(Decorator)

Component:定义一个对象接口,可以给这些对象动态地添加职责。
ConcreteComponent:定义对象,可以给这个对象添加一些职责。
Decorator:维持一个指向 Component 对象的指针,并定义一个与 Component 接口一致的接口。
ConcreteDecorator:实际的装饰对象,向组件添加职责。

  • Java 实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
public class DecoratorPattern {

public static void main(String[] args) {
Component component0 = new ADecorator(new BDecorator(new Component0()));
component0.doSth();
System.out.println();
Component component1 = new BDecorator(new ADecorator(new BDecorator(new ADecorator(new Component1()))));
component1.doSth();
System.out.println();
}

public interface Component {
void doSth();
}

public static class Component0 implements Component {

@Override
public void doSth() {
System.out.print("0");
}
}

public static class Component1 implements Component {

@Override
public void doSth() {
System.out.print("1");
}
}

public static abstract class Decorator implements Component {
protected Component component;
public Decorator(Component component) {
this.component = component;
}
}

public static class ADecorator extends Decorator {

public ADecorator(Component component) {
super(component);
}

@Override
public void doSth() {
component.doSth();
System.out.print("A");
}
}

public static class BDecorator extends Decorator {

public BDecorator(Component component) {
super(component);
}

@Override
public void doSth() {
component.doSth();
System.out.print("B");
}
}
}
  • Rust 实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
pub trait Component {
fn do_sth(&self);
}

pub struct Component1;

impl Component for Component1 {
fn do_sth(&self) {
print!("1");
}
}

pub struct Component2;

impl Component for Component2 {
fn do_sth(&self) {
print!("2");
}
}

pub trait Decorator<T: Component>: Component {
fn wrap(inner: T) -> Self;
}

struct DecoratorA<T> {
inner: T,
}

impl<T: Component> Component for DecoratorA<T> {
fn do_sth(&self) {
self.inner.do_sth();
print!("A");
}
}

impl<T: Component> Decorator<T> for DecoratorA<T> {
fn wrap(inner: T) -> Self {
Self { inner }
}
}

struct DecoratorB<T> {
inner: T,
}

impl<T: Component> Component for DecoratorB<T> {
fn do_sth(&self) {
self.inner.do_sth();
print!("B");
}
}

impl<T: Component> Decorator<T> for DecoratorB<T> {
fn wrap(inner: T) -> Self {
Self { inner }
}
}

#[test]
fn test_decorator() {
let c1 = Component1;
c1.do_sth();
println!();
let ab1 = DecoratorA::wrap(DecoratorB::wrap(Component1));
ab1.do_sth();
println!();
let abbaa2 = DecoratorA::wrap(DecoratorB::wrap(DecoratorB::wrap(DecoratorA::wrap(
DecoratorA::wrap(Component2),
))));
abbaa2.do_sth();
println!();

// 在Rust中,如果不需要运行时动态装饰,就没有必要产生很多小对象
// 装饰了好几轮,最后占用内存还是 Component2 的大小
assert_eq!(
std::mem::size_of::<
DecoratorA<DecoratorB<DecoratorB<DecoratorA<DecoratorA<Component2>>>>>,
>(),
0
);
}

输出内容:

1
2
3
1
1BA
2AABBA

优点:比继承更灵活,避免类爆炸;可以组合;装饰器和构件可以独立变化,符合开闭原则。
缺点:产生很多小对象,增加系统复杂度和理解难度,调试困难。


分区问题

二分

将一个数组分为两个区域,小于等于N在左,大于N的在右;返回右侧的起始位置。

  • Java 实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
static int partition(int[] array, int value, int start, int end) {
if (array == null || start >= end) {
return -1;
}

int left = -1;

for (int i = start; i < end; i++) {
if (array[i] <= value) {
if (left == -1) {
left = start;
} else {
left += 1;
}
swap(array, i, left);
}
}

return left + 1;
}

static int partition(int[] array, int value) {
return partition(array, value, 0, array.length);
}

输出内容:

1
2
3
[2, 3, 5, 1, 2, 6, 4, 3, 8, 4, 3, 5, 1, 3, 5, 8]
10
[2, 3, 1, 2, 4, 3, 4, 3, 1, 3, 6, 5, 8, 5, 5, 8]
  • Rust 实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
#[test]
fn test_partition() {
let mut array = [4, 2, 5, 7, 5, 4, 2, 9, 3, 5, 1];
assert_eq!(array.partition(5), Index::Position(9));
assert_eq!(array.partition(3), Index::Position(4));
assert_eq!(array.partition(11), Index::Greater);
assert_eq!(array.partition(0), Index::Less);
}

#[derive(Debug, Eq, PartialEq)]
pub enum Index {
// 值比所有数组元素都小
Less,
// 能找到一个分区位置
Position(usize),
// 值比所有数组元素都大
Greater,
}

pub trait Partition<T> {
fn partition(&mut self, value: T) -> Index;
}

impl<T: Ord> Partition<T> for [T] {
// 返回大于部分的起始索引
fn partition(&mut self, value: T) -> Index {
let mut left = None;
for i in 0..self.len() {
if self[i] <= value {
if let Some(left_val) = left {
let new_left_val = left_val + 1;
self.swap(i, new_left_val);
left = Some(new_left_val);
} else {
self.swap(i, 0);
left = Some(0);
}
}
}

match left {
None => Index::Less,
Some(i) if i == self.len() - 1 => Index::Greater,
Some(i) => Index::Position(i + 1),
}
}
}

荷兰 🇳🇱 国旗问题(三分)

将一个数组分为三个区域,小于N在左,等于N在中间,大于N的在右;返回中间和右侧的起始位置。

  • Java 实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
static void swap(int[] array, int i, int j) {
if (i == j) {
return;
}

array[i] = array[i] ^ array[j];
array[j] = array[i] ^ array[j];
array[i] = array[i] ^ array[j];
}

static int[] netherlandsFlagPartition(int[] array, int value) {
return netherlandsFlagPartition(array, value, 0, array.length);
}

static int[] netherlandsFlagPartition(int[] array, int value, int start, int end) {
if (array == null || start >= end) {
return null;
}

int left = -1;
int right = end;
int i = start;

while (i < right) {
if (array[i] < value) {
left = left == -1 ? start : (left + 1);
swap(array, left, i);
i += 1;
} else if (array[i] > value) {
right -= 1;
swap(array, right, i);
} else { // array[i] == value
i += 1;
}
}

return new int[]{left + 1, right};
}

输出内容:

1
2
[10, 13]
[2, 3, 1, 2, 4, 3, 4, 3, 1, 3, 5, 5, 5, 8, 6, 8]
  • Rust 实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
use std::cmp::Ordering;

#[test]
fn test_netherlands_flag_partition() {
let mut array = [7, 1, 2, 0, 8, 5, 3, 9, 2, 6, 5, 1, 0, 8, 7, 4];
// sorted: [0, 0, 1, 1, 2, 2, 3, 4, 5, 5, 6, 7, 7, 8, 8, 9]
// index(hex): [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, A, B, C, D, E, F]
assert_eq!(array.netherlands_flag_partition(2), (Some(4), Some(6)));
assert_eq!(array.netherlands_flag_partition(7), (Some(0xB), Some(0xD)));
assert_eq!(array.netherlands_flag_partition(-1), (None, Some(0)));
assert_eq!(array.netherlands_flag_partition(0), (None, Some(2)));
assert_eq!(
array.netherlands_flag_partition(10),
(Some(array.len()), None)
);
assert_eq!(
array.netherlands_flag_partition(9),
(Some(array.len() - 1), None)
);
}

pub trait NetherlandsFlagPartition<T> {
fn netherlands_flag_partition(&mut self, value: T) -> (Option<usize>, Option<usize>);
}

impl<T: Ord> NetherlandsFlagPartition<T> for [T] {
// 返回相等部分和大于部分的起始索引
fn netherlands_flag_partition(&mut self, value: T) -> (Option<usize>, Option<usize>) {
let len = self.len();
let mut left = None;
let mut right = None;
let mut i = 0;
while i < right.unwrap_or(len) {
match self[i].cmp(&value) {
Ordering::Less => {
match left {
None => {
self.swap(0, i);
left = Some(0);
}
Some(left_value) => {
let new_left = left_value + 1;
self.swap(new_left, i);
left = Some(new_left);
}
}
i += 1;
}
Ordering::Equal => {
i += 1;
}
Ordering::Greater => {
match right {
None => {
self.swap(len - 1, i);
right = Some(len - 1);
}
Some(right_value) => {
let new_right = right_value - 1;
self.swap(new_right, i);
right = Some(new_right);
}
}

// i 不要自增,让下一次循环检查新换到前面的值
}
}
}

(left.map(|v| v + 1), right)
}
}
  • Rust 实现(用枚举表示结果)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
use std::cmp::Ordering;

#[test]
fn test_netherlands_flag_partition() {
let mut array = [7, 1, 2, 0, 8, 5, 3, 9, 2, 6, 5, 1, 0, 8, 7, 4];
assert_eq!(
array.netherlands_flag_partition(2),
NetherlandsFlagResult::Three(4, 6)
);
assert_eq!(
array.netherlands_flag_partition(7),
NetherlandsFlagResult::Three(11, 13)
);
assert_eq!(
array.netherlands_flag_partition(-1),
NetherlandsFlagResult::Greater
);
assert_eq!(
array.netherlands_flag_partition(0),
NetherlandsFlagResult::ValueStart(2)
);
assert_eq!(
array.netherlands_flag_partition(10),
NetherlandsFlagResult::Less
);
assert_eq!(
array.netherlands_flag_partition(9),
NetherlandsFlagResult::ValueEnd(15)
);

let mut array = [2, 2, 2, 2, 2];
assert_eq!(
array.netherlands_flag_partition(2),
NetherlandsFlagResult::Equal
);
}

#[derive(Debug, Eq, PartialEq)]
pub enum NetherlandsFlagResult {
/// 分为三部分,分别是 < value, == value, > value, 返回后两部分的起始索引
Three(usize, usize),
/// 分为两部分,== value, > value, 返回第二部分的起始索引
ValueStart(usize),
/// 分为两部分,< value, == value, 返回第二部分的起始索引
ValueEnd(usize),
/// 所有值都小于 value
Less,
/// 所有值都大于 value
Greater,
/// 所有值都等于 value
Equal,
}

pub trait NetherlandsFlagPartition<T> {
fn netherlands_flag_partition(&mut self, value: T) -> NetherlandsFlagResult;
}

impl<T: Ord> NetherlandsFlagPartition<T> for [T] {
// 返回相等部分和大于部分的起始索引
fn netherlands_flag_partition(&mut self, value: T) -> NetherlandsFlagResult {
let len = self.len();
let mut left = None;
let mut right = None;
let mut i = 0;
while i < right.unwrap_or(len) {
match self[i].cmp(&value) {
Ordering::Less => {
match left {
None => {
self.swap(0, i);
left = Some(0);
}
Some(left_value) => {
let new_left = left_value + 1;
self.swap(new_left, i);
left = Some(new_left);
}
}
i += 1;
}
Ordering::Equal => {
i += 1;
}
Ordering::Greater => {
match right {
None => {
self.swap(len - 1, i);
right = Some(len - 1);
}
Some(right_value) => {
let new_right = right_value - 1;
self.swap(new_right, i);
right = Some(new_right);
}
}

// i 不要自增,让下一次循环检查新换到前面的值
}
}
}

match (left.map(|v| v + 1), right) {
(None, Some(i)) => {
if i == 0 {
NetherlandsFlagResult::Greater
} else {
NetherlandsFlagResult::ValueStart(i)
}
}
(Some(i), None) => {
if i >= self.len() {
NetherlandsFlagResult::Less
} else {
NetherlandsFlagResult::ValueEnd(i)
}
}
(Some(i), Some(j)) => {
if i >= self.len() {
NetherlandsFlagResult::Greater
} else {
NetherlandsFlagResult::Three(i, j)
}
}
(None, None) => NetherlandsFlagResult::Equal,
}
}
}

快速排序

v1.0

步骤:

  1. 选择数组最后一个元素 X,在 0..array.length-1 的范围上进行二分,小于等于 X 的在左,大于 X 的在右;
  2. X 与右侧的第一个元素交换;
  3. X 左侧与右侧的数组分别进行上述操作,进行递归,过程中 X 不需要再移动。

时间复杂度:$O(N)$

  • Java 实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
static void quickSort1(int[] array) {
recurse1(array, 0, array.length);
}

static void recurse1(int[] array, int start, int end) {
if (array == null || start >= end - 1) {
return;
}

int pivot = array[end - 1];
int idx = partition(array, pivot, start, end - 1);
swap(array, idx, end - 1);
recurse1(array, start, idx);
recurse1(array, idx + 1, end);
}
  • Rust 实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
#[test]
fn test_quick_sort1() {
for _ in 0..1000 {
let mut array0: [u8; 32] = rand::random();
let mut array1 = array0.clone();

array0.sort();
array1.quick_sort1();

assert_eq!(array0, array1);
}
}

pub trait QuickSort1 {
fn quick_sort1(&mut self);
}

impl<T: Ord + Clone> QuickSort1 for [T] {
fn quick_sort1(&mut self) {
let len = self.len();
if len < 2 {
return;
}

let value = self[len - 1].clone();

match self[..len - 1].partition(value) {
// value 比所有元素都小,把 value 挪到第一个位置,排序剩下的
Index::Less => {
self.swap(0, len - 1);
self[1..].quick_sort1();
}
// 把 value 与第一个大于区的数交换
Index::Position(i) => {
self.swap(i, len - 1);
self[..i].quick_sort1();
self[i + 1..].quick_sort1();
}
// value比所有元素都大,不动,排序前面所有的
Index::Greater => {
self[..len - 1].quick_sort1();
}
}
}
}

v2.0

  1. 选择数组一个元素 X,进行荷兰国旗三分,小于 X 的在左,等于 X 的在中间,大于 X 的在右;
  2. X 左侧与右侧的数组分别进行上述操作,进行递归,过程中位于中间的所有 X 不需要再移动。

时间复杂度:$O(N)$

  • Java 实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
static void quickSort2(int[] array) {
recurse2(array, 0, array.length);
}

static void recurse2(int[] array, int start, int end) {
if (array == null || start >= end - 1) {
return;
}

int pivot = array[end - 1];
int[] idxs = netherlandsFlagPartition(array, pivot, start, end);
if (idxs == null) {
return;
}
recurse2(array, start, idxs[0]);

if (idxs[1] < end - 1) {
recurse2(array, idxs[1], end);
}
}
  • Rust 实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
#[test]
fn test_quick_sort2() {
for _ in 0..1000 {
let mut array0: [i32; 32] = random();
let mut array1 = array0.clone();

array0.sort();
array1.quick_sort2();

assert_eq!(array0, array1);
}
}

pub trait QuickSort2 {
fn quick_sort2(&mut self);
}

impl<T: Ord + Clone> QuickSort2 for [T] {
fn quick_sort2(&mut self) {
if self.len() < 2 {
return;
}

let value = self[0].clone();

match self.netherlands_flag_partition(value) {
NetherlandsFlagResult::Three(start, end) => {
self[..start].quick_sort2();
self[end..].quick_sort2();
}
NetherlandsFlagResult::ValueStart(start) => {
self[start..].quick_sort2();
}
NetherlandsFlagResult::ValueEnd(end) => {
self[..end].quick_sort2();
}
NetherlandsFlagResult::Less | NetherlandsFlagResult::Greater => {
self.quick_sort2();
}
NetherlandsFlagResult::Equal => {
return;
}
}
}
}

v3.0

将 v2.0 中选择数组元素改为随机选择,其他不变。

  1. 划分值越靠近中间,性能越好;越靠近两边性能越差;
  2. 随机算一个数进行划分的目的就是让好情况和坏情况都变成概率事件;
  3. 把每一种情况都列出来,会有每种情况下的复杂度,但概率都是 $1/N$;
  4. 所有情况都考虑,则时间复杂度就是这种概率模型下的长期期望。

时间复杂度:$O(N \times logN)$
额外空间复杂度:$O(N \times logN)$

  • Java 实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
static void quickSort3(int[] array) {
recurse3(array, 0, array.length);
}

static void recurse3(int[] array, int start, int end) {
if (array == null || start >= end - 1) {
return;
}

int pi = (int) (Math.random() * (end - start));
System.out.println(start + pi);
int pivot = array[start + pi];
int[] idxs = netherlandsFlagPartition(array, pivot, start, end);
if (idxs == null) {
return;
}
recurse3(array, start, idxs[0]);

if (idxs[1] < end - 1) {
recurse3(array, idxs[1], end);
}
}
  • Rust 实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
#[test]
fn test_quick_sort3() {
for _ in 0..1000 {
let mut array0: [i32; 32] = random();
let mut array1 = array0.clone();

array0.sort();
array1.quick_sort3();

assert_eq!(array0, array1);
}
}

pub trait QuickSort3 {
fn quick_sort3(&mut self);
}

impl<T: Ord + Clone> QuickSort for [T] {
fn quick_sort3(&mut self) {
if self.len() < 2 {
return;
}

let index = rand::thread_rng().gen_range(0..self.len());
let value = self[index].clone();

match self.netherlands_flag_partition(value) {
NetherlandsFlagResult::Three(start, end) => {
self[..start].quick_sort3();
self[end..].quick_sort3();
}
NetherlandsFlagResult::ValueStart(start) => {
self[start..].quick_sort3();
}
NetherlandsFlagResult::ValueEnd(end) => {
self[..end].quick_sort3();
}
NetherlandsFlagResult::Less | NetherlandsFlagResult::Greater => {
self.quick_sort3();
}
NetherlandsFlagResult::Equal => {
return;
}
}
}
}

Swap 函数

  • Java 实现
1
2
3
4
5
6
7
8
9
static void swap(int[] array, int i, int j) {
if (i == j) {
return;
}

array[i] = array[i] ^ array[j];
array[j] = array[i] ^ array[j];
array[i] = array[i] ^ array[j];
}

归并排序

  • 算法时间复杂度: $O(NlogN)$
  • 相比冒泡、选择等时间复杂度为 $O(N^2)$ 的排序算法,没有浪费比较行为;
  • 插入排序时即便使用二分查找插入位置,也需要将插入位置后的元素依次向右移动,每次插入复杂度不是 $O(1)$。

递归方法

  • Java 实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public class MergeSort {

public static void main(String[] args) {
Integer[] data = {12, 3, 2, 6, 4, 8, 6, 0, 9, 3};
System.out.println(Arrays.toString(data));
mergeSort(data);
System.out.println(Arrays.toString(data));
}

static <T extends Comparable> void mergeSort(T[] array) {
T[] help = (T[]) new Comparable[array.length];
doMergeSort(array, help, 0, array.length);
}

static <T extends Comparable> void doMergeSort(T[] array, T[] help, int start, int end) {
if (start == end - 1) {
return;
}

int mid = start + ((end - start) >> 1);
doMergeSort(array, help, start, mid);
doMergeSort(array, help, mid, end);
mergeSorted(array, help, start, mid, end);
}

static <T extends Comparable> void mergeSorted(T[] array, T[] help, int start, int mid, int end) {
int h = start;
int i = start;
int j = mid;

while (i < mid && j < end) {
if (array[i].compareTo(array[j]) <= 0) {
help[h++] = array[i++];
} else {
help[h++] = array[j++];
}
}

while (i < mid) {
help[h++] = array[i++];
}

while (j < end) {
help[h++] = array[j++];
}

assert h == end;
for (int n = start; n < end; n++) {
array[n] = help[n];
}
}
}

输出内容:

1
2
[12, 3, 2, 6, 4, 8, 6, 0, 9, 3]
[0, 2, 3, 3, 4, 6, 6, 8, 9, 12]
  • Rust 实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
#[test]
fn test_merge_sort() {
for _ in 0..100 {
let mut array0: [u8; 32] = rand::random();
let mut array1 = array0.clone();
array0.merge_sort();
array1.sort();
assert_eq!(array0, array1);
}
}

trait MergeSort {
fn merge_sort(&mut self);
}

impl<T: Ord + Clone> MergeSort for [T] {
fn merge_sort(&mut self) {
let len = self.len();
if len == 0 || len == 1 {
return;
}

let mid = len >> 1;
self[..mid].merge_sort();
self[mid..].merge_sort();

let mut i = 0;
let mut j = mid;
let mut vec = Vec::with_capacity(len);

while i < mid && j < len {
if self[i] <= self[j] {
vec.push(self[i].clone());
i += 1;
} else {
vec.push(self[j].clone());
j += 1;
}
}

while i < mid {
vec.push(self[i].clone());
i += 1;
}

while j < len {
vec.push(self[j].clone());
j += 1;
}

self.clone_from_slice(&vec);
}
}

非递归方法

  • Java 实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public static <T extends Comparable<T>> void mergeSortLoop(T[] array) {
if (array == null || array.length < 2) {
return;
}

Comparable[] help = (T[]) new Comparable[array.length];

int mergeSize = 1;

while (mergeSize < array.length) {
int left = 0;

while (left < array.length) {
int mid = left + mergeSize;

if (mid >= array.length) {
break;
}

int right = Math.min(mid + mergeSize, array.length);
mergeSorted(array, help, left, mid, right);
left = right;
}


if (mergeSize >= array.length >> 1) {
break;
}

mergeSize <<= 1;
}
}
  • Rust 实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
#[test]
fn test_loop_merge_sort() {
for _ in 0..100 {
let mut array0: [u8; 32] = rand::random();
let mut array1 = array0.clone();
array0.loop_merge_sort();
array1.sort();
assert_eq!(array0, array1);
}
}

impl<T: Ord + Clone> MergeSort for [T] {
fn merge_sort(&mut self) {
//...
}

fn loop_merge_sort(&mut self) {
let len = self.len();
if len < 2 {
return;
}

let mut merge_size: usize = 1;
while merge_size < len {
let mut left = 0;

while left < len {
let mid = left + merge_size;
if mid >= len {
break;
}

let right = (mid + merge_size).min(len);

// merge sorted
let mut vec = Vec::with_capacity(right - left);

let mut i = left;
let mut j = mid;
while i < mid && j < right {
if self[i] <= self[j] {
vec.push(self[i].clone());
i += 1;
} else {
vec.push(self[j].clone());
j += 1;
}
}

while i < mid {
vec.push(self[i].clone());
i += 1;
}

while j < right {
vec.push(self[j].clone());
j += 1;
}

(&mut self[left..right]).clone_from_slice(&vec);

left = right;
}

if merge_size > len >> 1 {
break;
}

merge_size <<= 1;
}
}
}

求数组的小和

在一个数组中,一个数左边比它小的数的总和,叫数的小和,所有数的小和加起来叫数组小和。求数组小和。

例子:[1, 3, 4, 2, 5]

1 左边比自己小的数:没有
3 左边比自己小的数:1
4 左边比自己小的数:1 3
2 左边比自己小的数:1
5 左边比自己小的数:1 3 4 2

所以,数组的小和为 $1 + 1 + 3 + 1 + 1 + 3 + 4 + 2 = 16$

  • Java 实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
public class SmallSum {
public static void main(String[] args) {
System.out.println(smallSum(new int[]{1, 3, 4, 2, 5}) + " == " + getSmallSumSimple(new int[]{1, 3, 4, 2, 5}));
}

static int getSmallSumSimple(int[] array) {
int smallSum = 0;
for (int i = 0; i < array.length; ++i) {
for (int j = 0; j < i; ++j) {
if (array[j] < array[i]) {
smallSum += array[j];
}
}
}
return smallSum;
}

static int smallSum(int[] array) {
int[] help = new int[array.length];
return doMergeSmallSum(array, help, 0, help.length);
}

static int doMergeSmallSum(int[] array, int[] help, int start, int end) {
if (end - start < 2) {
return 0;
}

int mid = start + ((end - start) >> 1);
int left = doMergeSmallSum(array, help, start, mid);
int right = doMergeSmallSum(array, help, mid, end);
return left + right + doMerge(array, help, start, mid, end);
}

static int doMerge(int[] array, int[] help, int start, int mid, int end) {
int i = start;
int j = mid;
int k = start;
int sum = 0;

while (i < mid && j < end) {
if (array[i] < array[j]) {
int t = array[i++];
help[k++] = t;
sum += t * (end - j);
} else {
help[k++] = array[j++];
}
}

while (i < mid) {
help[k++] = array[i++];
}

while (j < end) {
help[k++] = array[j++];
}

for (k = start; k < end; k++) {
array[k] = help[k];
}

return sum;
}
}

求数组中的降序对

例如:[3, 1, 7, 0, 2] 中的降序对有 (3, 1)(3, 0)(3, 2)(1, 0)(7, 0)(7, 2)

即求数组中每个数右边有多少个数比它小,就有多少个降序对。

  • Rust 实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
#[test]
fn test_get_descending_pairs() {
for _ in 0..100 {
let mut array0: [u8; 32] = rand::random();
let mut array1 = array0.clone();
let set0 = array0.get_descending_pairs();
let set1 = array1.get_descending_pairs_simple();
assert_eq!(set0, set1);
}
}

trait DescendingPair<T> {
fn get_descending_pairs(&mut self) -> HashSet<(T, T)>;
fn get_descending_pairs_simple(&mut self) -> HashSet<(T, T)>;
}

impl<T: Ord + Clone + Hash> DescendingPair<T> for [T] {
fn get_descending_pairs(&mut self) -> HashSet<(T, T)> {
let mut set = HashSet::new();
if self.len() < 2 {
return set;
}

let mid = self.len() >> 1;
let mut left = self[..mid].get_descending_pairs();
let mut right = self[mid..].get_descending_pairs();
set.extend(left.drain());
set.extend(right.drain());

let mut help = Vec::with_capacity(self.len());
let mut j = 0;
let mut k = mid;
let len = self.len();

while j < mid && k < len {
if self[j] > self[k] {
let mut temp = self[k..]
.iter()
.map(|v| (self[j].clone(), v.clone()))
.collect::<HashSet<_>>();
set.extend(temp.drain());

help.push(self[j].clone());
j += 1;
} else {
help.push(self[k].clone());
k += 1;
}
}

help.extend_from_slice(&self[j..mid]);
help.extend_from_slice(&self[k..]);

assert_eq!(help.len(), self.len());
self.clone_from_slice(&help);

set
}

fn get_descending_pairs_simple(&mut self) -> HashSet<(T, T)> {
let mut set = HashSet::new();
for i in 0..self.len() {
for j in 0..i {
if self[j] > self[i] {
set.insert((self[j].clone(), self[i].clone()));
}
}
}
set
}
}