耿腾的博客

常期望安定,还期望即兴。

0%

1. 基本架构

一个键值数据库需要有哪些功能/结构/特性。

  • 支持多种数据结构
  • PUT、GET、DELETE、SCAN
  • 设置过期
  • 索引结构
  • 对外接口,Socket、动态链接库等等
  • 存储方式,内存、硬盘(持久化)
  • 高可用
  • 横向扩展
  • 功能可扩展性

2. 数据结构

Redis 接收到一个键值对操作后,能以 微秒 级别的速度找到数据,并快速完成操作。

Redis 的值有如下数据类型:

  • String(字符串)
  • List(列表)
  • Hash(哈希)
  • Set(集合)
  • Sorted Set(有序集合)
数据类型 数据结构
String 简单动态字符串
List 双向链表/压缩列表
Hash 压缩列表/哈希表
Set 整数数组/哈希表
Sorted Set 压缩列表/跳表

Redis 使用一个全局的哈希表来保存所有键值对,可以在 $O(1)$ 的时间内查找到键值对。

一个哈希表就是一个数组,数组元素为哈希桶,每个哈希桶存放一个链表,连接着哈希值映射到该桶中的多个键值对。

1
2
3
4
5
|0|1|2|3|4|5|
| |
| ---> entry4
|
---> entry1 ---> entry2 ---> entry3

哈希冲突及rehash

当哈希冲突过多时,链表过长,导致查找效率下降,Redis会对哈希表进行rehash。

为了 使 rehash 更高效,Redis 使用了两个全局哈希表:哈希表 1 和 哈希表 2 。默认情况下使用哈希表 1 ,哈希表 2 没有分配空间,当键值对增多时,Redis开始进行rehash:

  1. 给哈希表 2 分配更大的空间,例如是当前哈希表 1 大小的两倍;
  2. 把哈希表 1 中的数据重新映射并拷贝到哈希表 2 中;
  3. 释放哈希表 1 的空间。

然后使用哈希表 2 保存数据,下次rehash再使用哈希表 1 。

避免一次性拷贝大量数据造成 Redis 线程阻塞,Redis采用了渐进式rehash。

渐进式rehash

Redis可以保持处理用户请求,每处理一个请求,就顺带将哈希表 1 中的一个桶的所有entries拷贝到哈希表 2 中。这样就能把一次性大量拷贝的开销,分摊到了多次请求中,避免了耗时可能造成的阻塞。

数据结构的操作效率

整数数组双向链表 都是顺序读写,操作复杂度都是 $O(N)$。

压缩列表

压缩列表表头有 列表长度(zlbytes)列表尾偏移量(zltail)列表中entry个数(zllen) 三个字段,表尾有一个 zlend 字段表示 列表结束

1
| zlbytes | zltail | zllen | entry1 | entry2 | ... | entryN | zlend |

查找表头、表尾元素的复杂度为 $O(1)$,查找其他元素的复杂度为 $O(N)$。

跳表

跳表 是在 有序链表 的基础上增加了多级索引,通过索引位置的几次跳转,实现数据快速定位。

1
2
3
4
5
1 ----------------------> 27 -----------------------> 100 // 二级索引
↓ ↓ ↓
1 --------> 11 ---------> 27 ---------> 50 ---------> 100 // 一级索引
↓ ↓ ↓ ↓ ↓
1 --> 5 --> 11 --> 20 --> 27 --> 33 --> 50 --> 62 --> 100 --> 156 // 链表

跳表 的查找复杂度为 $O(logN)$。

时间复杂度

数据结构的时间复杂度

数据结构 时间复杂度
哈希表 $O(1)$
跳表 $O(logN)$
双向链表 $O(N)$
压缩列表 $O(N)$
整数数组 $O(N)$

不同操作的时间复杂度

  1. 单元数操作是基础。每一种集合类型对单个数据实现的增删改查操作,复杂度由数据结构决定。
  2. 范围操作很耗时。返回一个范围内的数据,复杂度一般是 $O(N)$,应尽量避免,可以使用 2.8 版本之后的 HSCANSSCANZSCAN 等命令进行渐进式遍历。
  3. 统计操作通常高效。集合类型对集合中的元素个数由记录,例如 LLENSCARD
  4. 例外情况只有几个。压缩列表和双向列表的会记录表头和表尾的偏移量,对它们的操作复杂度只有 $O(1)$,例如 LPOPRPOPLPUSHRPUSH

问题

整数数组和压缩列表在查找时间复杂度方面并没有很大优势,为什么Redis还会把它作为底层数据结构?

主要有两方面原因:

  1. 内存利用率。Redis 是内存数据库,大量数据存储在内存中,整数数组和压缩列表结构比较紧凑,相比链表结构占用内存更少,可以提高内存利用率。
  2. 数组对CPU高速缓存支持更友好。集合元素较少时,使用内存紧凑的排列方式,可以利用CPU高速缓存,尤其在一个缓存行内(64字节)有更高效的访问效率。当元素数量超过阈值后,为避免复杂度太高,可以转为哈希表和跳表。

3. 高性能 IO 模型

Redis 的网络 IO 和键值对读写是由单个线程完成的;其他功能,例如持久化、异步删除、集群数据同步,是由额外的线程执行的。

Redis使用 IO多路复用,即一个线程处理多个 IO 流。

可能存在的瓶颈(来自Kaito):

  1. 单个请求耗时会导致后续所有请求阻塞等待,包括以下几种:
    • 操作bigkey,分配、释放内存(4.0推出了lazy-free,可以异步执行bigkey释放)
    • 使用复杂度较高的命令,排序、union、查询全量数据
    • 大量key集中过期
    • 淘汰策略,内存超过Redis内存上线,每次写入都要淘汰一些key,导致耗时变长
    • AOF 开启 always
    • 主从同步生成 RDB 时的 fork
  2. 并发量大时,单线程处理客户端请求,无法利用多核(6.0推出了多线程,可利用多核CPU处理用户请求)

4. AOF(Append Only File)

Redis 的 AOF 是写后日志,先执行命令,把数据写入内存,然后再记录日志。

不会阻塞当前写操作,执行完命令还没来得及记日志就宕机的话,会有数据丢失的风险。

1
2
3
4
5
6
7
*3 --> 这个命令有三个部分
$3 --> 第一部分的长度
set --> 第一部分的内容
$5
hello
$5
world

三种写回策略

  • Always,同步写回,每个命令执行完立马写回磁盘
  • EverySec,每秒写回,写到AOF内存缓存,每秒写回一次
  • No,写到AOF内存缓存,由操作系统决定何时将其写回

日志重写

当日志文件过大时,可以对日志进行重写。可以把某些多条修改同一个键值对的命令合并为一条。

重写流程:

  1. 主线程 fork 出 bgrewriteaof 子进程,主线程在将新指令写入 AOF 缓存时,还会写入 AOF 重写缓存;
  2. 子进程将 fork 拷贝出来的内存快照写成命令日志;
  3. 写完后将AOF重写缓存中的日志写到这个新的日志中,然后就可以用这个新日志替换旧日志了。

其他要点

  1. fork 使用的是操作系统的写时复制(Copy On Write)机制,并不会直接拷贝所有内存。但是会拷贝父进程的的内存页表,如果页表较大,可能导致阻塞。
  2. 主进程在提供服务时,如果操作的是bigkey,写时复制会导致内存分配;如果开启了内存大页(huge page)机制,内存分配会产生阻塞。(所以使用Redis通常要关闭内存大页,其对使用 fork 的程序不友好)
  3. AOF 重写可以使用 bgrewriteaof 命令手动执行,也由两个配置项控制自动触发:
    • auto-aof-rewrite-min-size:表示运行AOF重写的最小大小,默认为 64MB;
    • auto-aof-rewrite-percentage:当前AOF文件大小和上一次重写后AOF文件大小的差值,除以上一次重写AOF文件大小,即增量和上一次全量的比值,默认为 100。
      AOF 文件大小同时满足这两个配置项,会自动触发 AOF 重写。

5. 内存快照 RDB

把某一时刻的内存数据以文件的形式写到磁盘上,即快照。这个快照文件即 RDB 文件(Redis DataBase 的缩写)。

在数据恢复时,可直接把 RDB 文件读入内存,很快完成恢复。

Redis 执行的是全量快照。

可以使用 savebgsave 来生成 RDB 文件;save在主线程中执行,会阻塞;bgsave 会创建子进程专门进行 RDB 文件写入,不会阻塞。

快照进行时数据如何修改

使用 fork 创建子进程,该子进程会跟主线程共享所有内存数据;利用操作系统的写时复制技术,当主进程修改数据时,会重新分配空间生成该数据的副本,并使用该副本提供服务,bgsave 子进程仍然使用未修改的数据进行快照写入。

快照频率

频繁执行全量快照有两方面开销:

  1. 给磁盘带来压力
  2. fork 本身可能阻塞主线程,如果频繁 fork 会导致主线程阻塞所以,如果有一个 bgsave 在运行,就不会启动第二个了

4.0 中提出了混合使用AOF和RDB的方法,即快照以一定的频率执行,在快照间以AOF的形式记录这期间的命令。实现:

  1. 避免了频繁 fork 对主线程的影响
  2. 减少了 AOF 文件大小,提高了数据恢复效率

在配置文件中设置:

1
aof-use-rdb-preamble yes

三点建议

  1. 数据不能丢失时,使用 AOF 和 RDB 混合方案
  2. 允许分钟级别的数据丢失,使用 RDB
  3. 只用 AOF 的话,优先使用 everysec,在可靠性和性能之间比较平衡

课后题:资源风险(Kaito)

  1. 考虑 fork 导致多大程度的写时复制,是否会造成 OOM,或者 swap 到磁盘上导致性能降低;
  2. CPU 核数较少时,Redis的其他子进程会跟主线程竞争CPU,导致性能下降;
  3. 如果 Redis 进程绑定了 CPU,子进程会继承它的 CPU 亲和属性,与父进程争夺同一个 CPU 资源,导致效率下降。所以如果开启RDB和AOF,一定不要绑定CPU。

6. 主从同步

可靠性分两方面:

  1. 数据少丢失(由RDB、AOF保证);
  2. 服务少中断(通过增加副本冗余量保证)

读写分离:主库、从库都可以执行读操作,主库限制性写操作,然后同步给从库。

如果都允许写,会造成各实例副本不一致,需要涉及加锁、协商等各种操作,产生巨额开销。

主从第一次同步

在从库实例上执行:

1
replicaof [主库ip] [主库port]

三个阶段

  1. 主从建立链接、协商同步,从库给主库发送 psync 命令,表示要进行数据同步,主库根据这个命令的参数来启动复制。psync 命令包含了主库的 runID 和复制进度 offset 两个参数。主库收到 psync 命令后,用 FULLRESYNC 响应命令带上两个参数,主库的 runID 和主库目前的复制进度 offset 给从库,从库会记住这两个参数。

runID:每个 Redis 实例启动时自动生成的一个随机 ID,唯一标识一个实例。第一次复制时从库不知道主库的 runID,使用 ?
offset:第一次复制设置为 -1

  1. 主库执行 bgsave 生成 RDB 文件,并发送给从库。从库收到 RDB 文件后,清空数据库并加载 RDB 文件。此时主库不会停止服务,但在内存中会有专门的 replication buffer,记录 RDB 文件生成后收到的所有写操作。
  2. 主库把 replication buffer 中的修改操作发送给从库执行。

主从级联

使用“主-从-从”模式分担主库压力。二级从库可以选择一个内存较高的从库进行同步。

主从间使用基于长连接的命令传播。

网络断开

2.8 前网络闪断会造成重新全量复制,2.8 后可以增量复制,实现继续同步。

(Kaito)主库除了将所有写命令传给从库之外,也会在 repl_backlog_buffer 中记录一份。当从库断线重连后,会发送 psync $master_runid $offset,主库就能通过 $offsetrepl_backlog_buffer 中找到从库断开的位置,只发送 $offset 之后的增量给从库。

repl_backlog_buffer:为了解决从库断开后找到主从差异而设计的环形缓冲区。配置大一点,可以减少全量同步的频率。

replication buffer:客户端、从库与 Redis 通信时,Redis 都会分配一个内存 buffer 用于交互,把数据写进去并通过 socket 发送到对端。当对端为从库时,该 buffer 只负责传播写命令,通常叫做 replication buffer。这个 buffer 由 client-output-buffer-limit 参数控制,如果过小或者从库处理过慢,Redis 会强制断开这个链接。

为啥不用 AOF 同步

  1. 相比 RDB 文件更大,效率低;
  2. 必须打开 AOF,数据丢失不敏感业务没必要开启

7. 哨兵


8. 哨兵集群


9. 切片集群


装饰器(Decorator)

Component:定义一个对象接口,可以给这些对象动态地添加职责。
ConcreteComponent:定义对象,可以给这个对象添加一些职责。
Decorator:维持一个指向 Component 对象的指针,并定义一个与 Component 接口一致的接口。
ConcreteDecorator:实际的装饰对象,向组件添加职责。

  • Java 实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
public class DecoratorPattern {

public static void main(String[] args) {
Component component0 = new ADecorator(new BDecorator(new Component0()));
component0.doSth();
System.out.println();
Component component1 = new BDecorator(new ADecorator(new BDecorator(new ADecorator(new Component1()))));
component1.doSth();
System.out.println();
}

public interface Component {
void doSth();
}

public static class Component0 implements Component {

@Override
public void doSth() {
System.out.print("0");
}
}

public static class Component1 implements Component {

@Override
public void doSth() {
System.out.print("1");
}
}

public static abstract class Decorator implements Component {
protected Component component;
public Decorator(Component component) {
this.component = component;
}
}

public static class ADecorator extends Decorator {

public ADecorator(Component component) {
super(component);
}

@Override
public void doSth() {
component.doSth();
System.out.print("A");
}
}

public static class BDecorator extends Decorator {

public BDecorator(Component component) {
super(component);
}

@Override
public void doSth() {
component.doSth();
System.out.print("B");
}
}
}
  • Rust 实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
pub trait Component {
fn do_sth(&self);
}

pub struct Component1;

impl Component for Component1 {
fn do_sth(&self) {
print!("1");
}
}

pub struct Component2;

impl Component for Component2 {
fn do_sth(&self) {
print!("2");
}
}

pub trait Decorator<T: Component>: Component {
fn wrap(inner: T) -> Self;
}

struct DecoratorA<T> {
inner: T,
}

impl<T: Component> Component for DecoratorA<T> {
fn do_sth(&self) {
self.inner.do_sth();
print!("A");
}
}

impl<T: Component> Decorator<T> for DecoratorA<T> {
fn wrap(inner: T) -> Self {
Self { inner }
}
}

struct DecoratorB<T> {
inner: T,
}

impl<T: Component> Component for DecoratorB<T> {
fn do_sth(&self) {
self.inner.do_sth();
print!("B");
}
}

impl<T: Component> Decorator<T> for DecoratorB<T> {
fn wrap(inner: T) -> Self {
Self { inner }
}
}

#[test]
fn test_decorator() {
let c1 = Component1;
c1.do_sth();
println!();
let ab1 = DecoratorA::wrap(DecoratorB::wrap(Component1));
ab1.do_sth();
println!();
let abbaa2 = DecoratorA::wrap(DecoratorB::wrap(DecoratorB::wrap(DecoratorA::wrap(
DecoratorA::wrap(Component2),
))));
abbaa2.do_sth();
println!();

// 在Rust中,如果不需要运行时动态装饰,就没有必要产生很多小对象
// 装饰了好几轮,最后占用内存还是 Component2 的大小
assert_eq!(
std::mem::size_of::<
DecoratorA<DecoratorB<DecoratorB<DecoratorA<DecoratorA<Component2>>>>>,
>(),
0
);
}

输出内容:

1
2
3
1
1BA
2AABBA

优点:比继承更灵活,避免类爆炸;可以组合;装饰器和构件可以独立变化,符合开闭原则。
缺点:产生很多小对象,增加系统复杂度和理解难度,调试困难。


分区问题

二分

将一个数组分为两个区域,小于等于N在左,大于N的在右;返回右侧的起始位置。

  • Java 实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
static int partition(int[] array, int value, int start, int end) {
if (array == null || start >= end) {
return -1;
}

int left = -1;

for (int i = start; i < end; i++) {
if (array[i] <= value) {
if (left == -1) {
left = start;
} else {
left += 1;
}
swap(array, i, left);
}
}

return left + 1;
}

static int partition(int[] array, int value) {
return partition(array, value, 0, array.length);
}

输出内容:

1
2
3
[2, 3, 5, 1, 2, 6, 4, 3, 8, 4, 3, 5, 1, 3, 5, 8]
10
[2, 3, 1, 2, 4, 3, 4, 3, 1, 3, 6, 5, 8, 5, 5, 8]
  • Rust 实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
#[test]
fn test_partition() {
let mut array = [4, 2, 5, 7, 5, 4, 2, 9, 3, 5, 1];
assert_eq!(array.partition(5), Index::Position(9));
assert_eq!(array.partition(3), Index::Position(4));
assert_eq!(array.partition(11), Index::Greater);
assert_eq!(array.partition(0), Index::Less);
}

#[derive(Debug, Eq, PartialEq)]
pub enum Index {
// 值比所有数组元素都小
Less,
// 能找到一个分区位置
Position(usize),
// 值比所有数组元素都大
Greater,
}

pub trait Partition<T> {
fn partition(&mut self, value: T) -> Index;
}

impl<T: Ord> Partition<T> for [T] {
// 返回大于部分的起始索引
fn partition(&mut self, value: T) -> Index {
let mut left = None;
for i in 0..self.len() {
if self[i] <= value {
if let Some(left_val) = left {
let new_left_val = left_val + 1;
self.swap(i, new_left_val);
left = Some(new_left_val);
} else {
self.swap(i, 0);
left = Some(0);
}
}
}

match left {
None => Index::Less,
Some(i) if i == self.len() - 1 => Index::Greater,
Some(i) => Index::Position(i + 1),
}
}
}

荷兰 🇳🇱 国旗问题(三分)

将一个数组分为三个区域,小于N在左,等于N在中间,大于N的在右;返回中间和右侧的起始位置。

  • Java 实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
static void swap(int[] array, int i, int j) {
if (i == j) {
return;
}

array[i] = array[i] ^ array[j];
array[j] = array[i] ^ array[j];
array[i] = array[i] ^ array[j];
}

static int[] netherlandsFlagPartition(int[] array, int value) {
return netherlandsFlagPartition(array, value, 0, array.length);
}

static int[] netherlandsFlagPartition(int[] array, int value, int start, int end) {
if (array == null || start >= end) {
return null;
}

int left = -1;
int right = end;
int i = start;

while (i < right) {
if (array[i] < value) {
left = left == -1 ? start : (left + 1);
swap(array, left, i);
i += 1;
} else if (array[i] > value) {
right -= 1;
swap(array, right, i);
} else { // array[i] == value
i += 1;
}
}

return new int[]{left + 1, right};
}

输出内容:

1
2
[10, 13]
[2, 3, 1, 2, 4, 3, 4, 3, 1, 3, 5, 5, 5, 8, 6, 8]
  • Rust 实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
use std::cmp::Ordering;

#[test]
fn test_netherlands_flag_partition() {
let mut array = [7, 1, 2, 0, 8, 5, 3, 9, 2, 6, 5, 1, 0, 8, 7, 4];
// sorted: [0, 0, 1, 1, 2, 2, 3, 4, 5, 5, 6, 7, 7, 8, 8, 9]
// index(hex): [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, A, B, C, D, E, F]
assert_eq!(array.netherlands_flag_partition(2), (Some(4), Some(6)));
assert_eq!(array.netherlands_flag_partition(7), (Some(0xB), Some(0xD)));
assert_eq!(array.netherlands_flag_partition(-1), (None, Some(0)));
assert_eq!(array.netherlands_flag_partition(0), (None, Some(2)));
assert_eq!(
array.netherlands_flag_partition(10),
(Some(array.len()), None)
);
assert_eq!(
array.netherlands_flag_partition(9),
(Some(array.len() - 1), None)
);
}

pub trait NetherlandsFlagPartition<T> {
fn netherlands_flag_partition(&mut self, value: T) -> (Option<usize>, Option<usize>);
}

impl<T: Ord> NetherlandsFlagPartition<T> for [T] {
// 返回相等部分和大于部分的起始索引
fn netherlands_flag_partition(&mut self, value: T) -> (Option<usize>, Option<usize>) {
let len = self.len();
let mut left = None;
let mut right = None;
let mut i = 0;
while i < right.unwrap_or(len) {
match self[i].cmp(&value) {
Ordering::Less => {
match left {
None => {
self.swap(0, i);
left = Some(0);
}
Some(left_value) => {
let new_left = left_value + 1;
self.swap(new_left, i);
left = Some(new_left);
}
}
i += 1;
}
Ordering::Equal => {
i += 1;
}
Ordering::Greater => {
match right {
None => {
self.swap(len - 1, i);
right = Some(len - 1);
}
Some(right_value) => {
let new_right = right_value - 1;
self.swap(new_right, i);
right = Some(new_right);
}
}

// i 不要自增,让下一次循环检查新换到前面的值
}
}
}

(left.map(|v| v + 1), right)
}
}
  • Rust 实现(用枚举表示结果)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
use std::cmp::Ordering;

#[test]
fn test_netherlands_flag_partition() {
let mut array = [7, 1, 2, 0, 8, 5, 3, 9, 2, 6, 5, 1, 0, 8, 7, 4];
assert_eq!(
array.netherlands_flag_partition(2),
NetherlandsFlagResult::Three(4, 6)
);
assert_eq!(
array.netherlands_flag_partition(7),
NetherlandsFlagResult::Three(11, 13)
);
assert_eq!(
array.netherlands_flag_partition(-1),
NetherlandsFlagResult::Greater
);
assert_eq!(
array.netherlands_flag_partition(0),
NetherlandsFlagResult::ValueStart(2)
);
assert_eq!(
array.netherlands_flag_partition(10),
NetherlandsFlagResult::Less
);
assert_eq!(
array.netherlands_flag_partition(9),
NetherlandsFlagResult::ValueEnd(15)
);

let mut array = [2, 2, 2, 2, 2];
assert_eq!(
array.netherlands_flag_partition(2),
NetherlandsFlagResult::Equal
);
}

#[derive(Debug, Eq, PartialEq)]
pub enum NetherlandsFlagResult {
/// 分为三部分,分别是 < value, == value, > value, 返回后两部分的起始索引
Three(usize, usize),
/// 分为两部分,== value, > value, 返回第二部分的起始索引
ValueStart(usize),
/// 分为两部分,< value, == value, 返回第二部分的起始索引
ValueEnd(usize),
/// 所有值都小于 value
Less,
/// 所有值都大于 value
Greater,
/// 所有值都等于 value
Equal,
}

pub trait NetherlandsFlagPartition<T> {
fn netherlands_flag_partition(&mut self, value: T) -> NetherlandsFlagResult;
}

impl<T: Ord> NetherlandsFlagPartition<T> for [T] {
// 返回相等部分和大于部分的起始索引
fn netherlands_flag_partition(&mut self, value: T) -> NetherlandsFlagResult {
let len = self.len();
let mut left = None;
let mut right = None;
let mut i = 0;
while i < right.unwrap_or(len) {
match self[i].cmp(&value) {
Ordering::Less => {
match left {
None => {
self.swap(0, i);
left = Some(0);
}
Some(left_value) => {
let new_left = left_value + 1;
self.swap(new_left, i);
left = Some(new_left);
}
}
i += 1;
}
Ordering::Equal => {
i += 1;
}
Ordering::Greater => {
match right {
None => {
self.swap(len - 1, i);
right = Some(len - 1);
}
Some(right_value) => {
let new_right = right_value - 1;
self.swap(new_right, i);
right = Some(new_right);
}
}

// i 不要自增,让下一次循环检查新换到前面的值
}
}
}

match (left.map(|v| v + 1), right) {
(None, Some(i)) => {
if i == 0 {
NetherlandsFlagResult::Greater
} else {
NetherlandsFlagResult::ValueStart(i)
}
}
(Some(i), None) => {
if i >= self.len() {
NetherlandsFlagResult::Less
} else {
NetherlandsFlagResult::ValueEnd(i)
}
}
(Some(i), Some(j)) => {
if i >= self.len() {
NetherlandsFlagResult::Greater
} else {
NetherlandsFlagResult::Three(i, j)
}
}
(None, None) => NetherlandsFlagResult::Equal,
}
}
}

快速排序

v1.0

步骤:

  1. 选择数组最后一个元素 X,在 0..array.length-1 的范围上进行二分,小于等于 X 的在左,大于 X 的在右;
  2. X 与右侧的第一个元素交换;
  3. X 左侧与右侧的数组分别进行上述操作,进行递归,过程中 X 不需要再移动。

时间复杂度:$O(N)$

  • Java 实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
static void quickSort1(int[] array) {
recurse1(array, 0, array.length);
}

static void recurse1(int[] array, int start, int end) {
if (array == null || start >= end - 1) {
return;
}

int pivot = array[end - 1];
int idx = partition(array, pivot, start, end - 1);
swap(array, idx, end - 1);
recurse1(array, start, idx);
recurse1(array, idx + 1, end);
}
  • Rust 实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
#[test]
fn test_quick_sort1() {
for _ in 0..1000 {
let mut array0: [u8; 32] = rand::random();
let mut array1 = array0.clone();

array0.sort();
array1.quick_sort1();

assert_eq!(array0, array1);
}
}

pub trait QuickSort1 {
fn quick_sort1(&mut self);
}

impl<T: Ord + Clone> QuickSort1 for [T] {
fn quick_sort1(&mut self) {
let len = self.len();
if len < 2 {
return;
}

let value = self[len - 1].clone();

match self[..len - 1].partition(value) {
// value 比所有元素都小,把 value 挪到第一个位置,排序剩下的
Index::Less => {
self.swap(0, len - 1);
self[1..].quick_sort1();
}
// 把 value 与第一个大于区的数交换
Index::Position(i) => {
self.swap(i, len - 1);
self[..i].quick_sort1();
self[i + 1..].quick_sort1();
}
// value比所有元素都大,不动,排序前面所有的
Index::Greater => {
self[..len - 1].quick_sort1();
}
}
}
}

v2.0

  1. 选择数组一个元素 X,进行荷兰国旗三分,小于 X 的在左,等于 X 的在中间,大于 X 的在右;
  2. X 左侧与右侧的数组分别进行上述操作,进行递归,过程中位于中间的所有 X 不需要再移动。

时间复杂度:$O(N)$

  • Java 实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
static void quickSort2(int[] array) {
recurse2(array, 0, array.length);
}

static void recurse2(int[] array, int start, int end) {
if (array == null || start >= end - 1) {
return;
}

int pivot = array[end - 1];
int[] idxs = netherlandsFlagPartition(array, pivot, start, end);
if (idxs == null) {
return;
}
recurse2(array, start, idxs[0]);

if (idxs[1] < end - 1) {
recurse2(array, idxs[1], end);
}
}
  • Rust 实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
#[test]
fn test_quick_sort2() {
for _ in 0..1000 {
let mut array0: [i32; 32] = random();
let mut array1 = array0.clone();

array0.sort();
array1.quick_sort2();

assert_eq!(array0, array1);
}
}

pub trait QuickSort2 {
fn quick_sort2(&mut self);
}

impl<T: Ord + Clone> QuickSort2 for [T] {
fn quick_sort2(&mut self) {
if self.len() < 2 {
return;
}

let value = self[0].clone();

match self.netherlands_flag_partition(value) {
NetherlandsFlagResult::Three(start, end) => {
self[..start].quick_sort2();
self[end..].quick_sort2();
}
NetherlandsFlagResult::ValueStart(start) => {
self[start..].quick_sort2();
}
NetherlandsFlagResult::ValueEnd(end) => {
self[..end].quick_sort2();
}
NetherlandsFlagResult::Less | NetherlandsFlagResult::Greater => {
self.quick_sort2();
}
NetherlandsFlagResult::Equal => {
return;
}
}
}
}

v3.0

将 v2.0 中选择数组元素改为随机选择,其他不变。

  1. 划分值越靠近中间,性能越好;越靠近两边性能越差;
  2. 随机算一个数进行划分的目的就是让好情况和坏情况都变成概率事件;
  3. 把每一种情况都列出来,会有每种情况下的复杂度,但概率都是 $1/N$;
  4. 所有情况都考虑,则时间复杂度就是这种概率模型下的长期期望。

时间复杂度:$O(N \times logN)$
额外空间复杂度:$O(N \times logN)$

  • Java 实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
static void quickSort3(int[] array) {
recurse3(array, 0, array.length);
}

static void recurse3(int[] array, int start, int end) {
if (array == null || start >= end - 1) {
return;
}

int pi = (int) (Math.random() * (end - start));
System.out.println(start + pi);
int pivot = array[start + pi];
int[] idxs = netherlandsFlagPartition(array, pivot, start, end);
if (idxs == null) {
return;
}
recurse3(array, start, idxs[0]);

if (idxs[1] < end - 1) {
recurse3(array, idxs[1], end);
}
}
  • Rust 实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
#[test]
fn test_quick_sort3() {
for _ in 0..1000 {
let mut array0: [i32; 32] = random();
let mut array1 = array0.clone();

array0.sort();
array1.quick_sort3();

assert_eq!(array0, array1);
}
}

pub trait QuickSort3 {
fn quick_sort3(&mut self);
}

impl<T: Ord + Clone> QuickSort for [T] {
fn quick_sort3(&mut self) {
if self.len() < 2 {
return;
}

let index = rand::thread_rng().gen_range(0..self.len());
let value = self[index].clone();

match self.netherlands_flag_partition(value) {
NetherlandsFlagResult::Three(start, end) => {
self[..start].quick_sort3();
self[end..].quick_sort3();
}
NetherlandsFlagResult::ValueStart(start) => {
self[start..].quick_sort3();
}
NetherlandsFlagResult::ValueEnd(end) => {
self[..end].quick_sort3();
}
NetherlandsFlagResult::Less | NetherlandsFlagResult::Greater => {
self.quick_sort3();
}
NetherlandsFlagResult::Equal => {
return;
}
}
}
}

Swap 函数

  • Java 实现
1
2
3
4
5
6
7
8
9
static void swap(int[] array, int i, int j) {
if (i == j) {
return;
}

array[i] = array[i] ^ array[j];
array[j] = array[i] ^ array[j];
array[i] = array[i] ^ array[j];
}

归并排序

  • 算法时间复杂度: $O(NlogN)$
  • 相比冒泡、选择等时间复杂度为 $O(N^2)$ 的排序算法,没有浪费比较行为;
  • 插入排序时即便使用二分查找插入位置,也需要将插入位置后的元素依次向右移动,每次插入复杂度不是 $O(1)$。

递归方法

  • Java 实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public class MergeSort {

public static void main(String[] args) {
Integer[] data = {12, 3, 2, 6, 4, 8, 6, 0, 9, 3};
System.out.println(Arrays.toString(data));
mergeSort(data);
System.out.println(Arrays.toString(data));
}

static <T extends Comparable> void mergeSort(T[] array) {
T[] help = (T[]) new Comparable[array.length];
doMergeSort(array, help, 0, array.length);
}

static <T extends Comparable> void doMergeSort(T[] array, T[] help, int start, int end) {
if (start == end - 1) {
return;
}

int mid = start + ((end - start) >> 1);
doMergeSort(array, help, start, mid);
doMergeSort(array, help, mid, end);
mergeSorted(array, help, start, mid, end);
}

static <T extends Comparable> void mergeSorted(T[] array, T[] help, int start, int mid, int end) {
int h = start;
int i = start;
int j = mid;

while (i < mid && j < end) {
if (array[i].compareTo(array[j]) <= 0) {
help[h++] = array[i++];
} else {
help[h++] = array[j++];
}
}

while (i < mid) {
help[h++] = array[i++];
}

while (j < end) {
help[h++] = array[j++];
}

assert h == end;
for (int n = start; n < end; n++) {
array[n] = help[n];
}
}
}

输出内容:

1
2
[12, 3, 2, 6, 4, 8, 6, 0, 9, 3]
[0, 2, 3, 3, 4, 6, 6, 8, 9, 12]
  • Rust 实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
#[test]
fn test_merge_sort() {
for _ in 0..100 {
let mut array0: [u8; 32] = rand::random();
let mut array1 = array0.clone();
array0.merge_sort();
array1.sort();
assert_eq!(array0, array1);
}
}

trait MergeSort {
fn merge_sort(&mut self);
}

impl<T: Ord + Clone> MergeSort for [T] {
fn merge_sort(&mut self) {
let len = self.len();
if len == 0 || len == 1 {
return;
}

let mid = len >> 1;
self[..mid].merge_sort();
self[mid..].merge_sort();

let mut i = 0;
let mut j = mid;
let mut vec = Vec::with_capacity(len);

while i < mid && j < len {
if self[i] <= self[j] {
vec.push(self[i].clone());
i += 1;
} else {
vec.push(self[j].clone());
j += 1;
}
}

while i < mid {
vec.push(self[i].clone());
i += 1;
}

while j < len {
vec.push(self[j].clone());
j += 1;
}

self.clone_from_slice(&vec);
}
}

非递归方法

  • Java 实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public static <T extends Comparable<T>> void mergeSortLoop(T[] array) {
if (array == null || array.length < 2) {
return;
}

Comparable[] help = (T[]) new Comparable[array.length];

int mergeSize = 1;

while (mergeSize < array.length) {
int left = 0;

while (left < array.length) {
int mid = left + mergeSize;

if (mid >= array.length) {
break;
}

int right = Math.min(mid + mergeSize, array.length);
mergeSorted(array, help, left, mid, right);
left = right;
}


if (mergeSize >= array.length >> 1) {
break;
}

mergeSize <<= 1;
}
}
  • Rust 实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
#[test]
fn test_loop_merge_sort() {
for _ in 0..100 {
let mut array0: [u8; 32] = rand::random();
let mut array1 = array0.clone();
array0.loop_merge_sort();
array1.sort();
assert_eq!(array0, array1);
}
}

impl<T: Ord + Clone> MergeSort for [T] {
fn merge_sort(&mut self) {
//...
}

fn loop_merge_sort(&mut self) {
let len = self.len();
if len < 2 {
return;
}

let mut merge_size: usize = 1;
while merge_size < len {
let mut left = 0;

while left < len {
let mid = left + merge_size;
if mid >= len {
break;
}

let right = (mid + merge_size).min(len);

// merge sorted
let mut vec = Vec::with_capacity(right - left);

let mut i = left;
let mut j = mid;
while i < mid && j < right {
if self[i] <= self[j] {
vec.push(self[i].clone());
i += 1;
} else {
vec.push(self[j].clone());
j += 1;
}
}

while i < mid {
vec.push(self[i].clone());
i += 1;
}

while j < right {
vec.push(self[j].clone());
j += 1;
}

(&mut self[left..right]).clone_from_slice(&vec);

left = right;
}

if merge_size > len >> 1 {
break;
}

merge_size <<= 1;
}
}
}

求数组的小和

在一个数组中,一个数左边比它小的数的总和,叫数的小和,所有数的小和加起来叫数组小和。求数组小和。

例子:[1, 3, 4, 2, 5]

1 左边比自己小的数:没有
3 左边比自己小的数:1
4 左边比自己小的数:1 3
2 左边比自己小的数:1
5 左边比自己小的数:1 3 4 2

所以,数组的小和为 $1 + 1 + 3 + 1 + 1 + 3 + 4 + 2 = 16$

  • Java 实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
public class SmallSum {
public static void main(String[] args) {
System.out.println(smallSum(new int[]{1, 3, 4, 2, 5}) + " == " + getSmallSumSimple(new int[]{1, 3, 4, 2, 5}));
}

static int getSmallSumSimple(int[] array) {
int smallSum = 0;
for (int i = 0; i < array.length; ++i) {
for (int j = 0; j < i; ++j) {
if (array[j] < array[i]) {
smallSum += array[j];
}
}
}
return smallSum;
}

static int smallSum(int[] array) {
int[] help = new int[array.length];
return doMergeSmallSum(array, help, 0, help.length);
}

static int doMergeSmallSum(int[] array, int[] help, int start, int end) {
if (end - start < 2) {
return 0;
}

int mid = start + ((end - start) >> 1);
int left = doMergeSmallSum(array, help, start, mid);
int right = doMergeSmallSum(array, help, mid, end);
return left + right + doMerge(array, help, start, mid, end);
}

static int doMerge(int[] array, int[] help, int start, int mid, int end) {
int i = start;
int j = mid;
int k = start;
int sum = 0;

while (i < mid && j < end) {
if (array[i] < array[j]) {
int t = array[i++];
help[k++] = t;
sum += t * (end - j);
} else {
help[k++] = array[j++];
}
}

while (i < mid) {
help[k++] = array[i++];
}

while (j < end) {
help[k++] = array[j++];
}

for (k = start; k < end; k++) {
array[k] = help[k];
}

return sum;
}
}

求数组中的降序对

例如:[3, 1, 7, 0, 2] 中的降序对有 (3, 1)(3, 0)(3, 2)(1, 0)(7, 0)(7, 2)

即求数组中每个数右边有多少个数比它小,就有多少个降序对。

  • Rust 实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
#[test]
fn test_get_descending_pairs() {
for _ in 0..100 {
let mut array0: [u8; 32] = rand::random();
let mut array1 = array0.clone();
let set0 = array0.get_descending_pairs();
let set1 = array1.get_descending_pairs_simple();
assert_eq!(set0, set1);
}
}

trait DescendingPair<T> {
fn get_descending_pairs(&mut self) -> HashSet<(T, T)>;
fn get_descending_pairs_simple(&mut self) -> HashSet<(T, T)>;
}

impl<T: Ord + Clone + Hash> DescendingPair<T> for [T] {
fn get_descending_pairs(&mut self) -> HashSet<(T, T)> {
let mut set = HashSet::new();
if self.len() < 2 {
return set;
}

let mid = self.len() >> 1;
let mut left = self[..mid].get_descending_pairs();
let mut right = self[mid..].get_descending_pairs();
set.extend(left.drain());
set.extend(right.drain());

let mut help = Vec::with_capacity(self.len());
let mut j = 0;
let mut k = mid;
let len = self.len();

while j < mid && k < len {
if self[j] > self[k] {
let mut temp = self[k..]
.iter()
.map(|v| (self[j].clone(), v.clone()))
.collect::<HashSet<_>>();
set.extend(temp.drain());

help.push(self[j].clone());
j += 1;
} else {
help.push(self[k].clone());
k += 1;
}
}

help.extend_from_slice(&self[j..mid]);
help.extend_from_slice(&self[k..]);

assert_eq!(help.len(), self.len());
self.clone_from_slice(&help);

set
}

fn get_descending_pairs_simple(&mut self) -> HashSet<(T, T)> {
let mut set = HashSet::new();
for i in 0..self.len() {
for j in 0..i {
if self[j] > self[i] {
set.insert((self[j].clone(), self[i].clone()));
}
}
}
set
}
}

当递归函数的时间执行函数满足如下的关系式时,可以使用公式法计算时间复杂度:

其中:

  1. $a$ 为递归次数;
  2. $\frac {N} {b}$ 为子问题规模;
  3. $O(N^d)$ 为每次递归完毕之后额外执行的操作的时间复杂度;
  • 如果 $\log_b a < d$,时间复杂度为 $O(N^d)$
  • 如果 $\log_b a > d$,时间复杂度为 $O(N^{\log_b a})$
  • 如果 $\log_b a = d$,时间复杂度为 $O(N^d \times \log N)$

例子

求数组 arr[l..r]中的最大值,用递归方法实现。

  • Java 实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class RecursiveMax {
public static void main(String[] args) {
Integer[] array = {6, 3, 5, 2, 1, 4, 0, 1, 7};
System.out.println(getMax(array, 0, array.length - 1));
}

public static <T extends Comparable<T>> T getMax(T[] arr, int l, int r) {
if (l == r) {
return arr[l];
}

int mid = l + ((r - l) >> 1);
T left = getMax(arr, l, mid);
T right = getMax(arr, mid + 1, r);
return left.compareTo(right) >= 0 ? left : right;
}
}

其中:

  1. 递归次数 $a$ 为 $2$;
  2. 子问题规模 $\frac {N} {b}$ 为 $\frac {N} {2}$,即 $b$ 为 $2$;
  3. 每次递归完毕之后额外执行的操作的时间复杂度 $O(N^d)$ 为 $O(1)$,即 $d$ 为 $0$。

满足:

所以,该算法复杂度为 $O(N^{\log_2 2}) = O(N)$

1. 环形队列

  • Java 实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
public class RingBuffer<T> {
// 队列缓冲区
private final Object[] array;

// 队列尺寸
private final int limit;

// 表示即将入队的索引位置
private int pushIndex;

// 表示即将出队的索引位置
private int popIndex;

// 表示当前队列中元素的个数
private int size;

private RingBuffer(int limit) {
if (limit <= 0) {
throw new IllegalArgumentException("limit should be greater than 0");
}
this.limit = limit;
this.array = new Object[limit];
this.popIndex = this.pushIndex = this.size = 0;
}

public static <T> RingBuffer<T> create(int limit) {
return new RingBuffer<>(limit);
}

public Optional<T> pop() {
if (size == 0) {
return Optional.empty();
}

T value = (T) this.array[this.popIndex];
this.array[this.popIndex] = null; // 去掉引用,避免泄漏
this.size -= 1;
this.popIndex = getNextIndex(this.popIndex);
return Optional.of(value);
}

public boolean empty() {
return this.size == 0;
}

public void push(T value) {
if (size == this.limit) {
throw new IllegalArgumentException("The size has reached the limit");
}

this.array[this.pushIndex] = value;
this.size += 1;
this.pushIndex = getNextIndex(this.pushIndex);
}

@Override
public String toString() {
return "RingBuffer{" +
"array=" + Arrays.toString(array) +
", limit=" + limit +
", pushIndex=" + pushIndex +
", popIndex=" + popIndex +
", size=" + size +
'}';
}

private int getNextIndex(int index) {
return index == this.limit - 1 ? 0 : index + 1;
}

public static void main(String[] args) {
RingBuffer<String> rb = RingBuffer.create(4);
System.out.println("new rb: " + rb);
String[] data = {"hello", "world", "are", "you", "ok"};
for (String s: data) {
try {
rb.push(s);
System.out.println("push " + s);
System.out.println("rb: " + rb);
} catch (Exception e) {
System.out.println("Push '" + s + "' error: " + e);
}
}

Optional<String> op = rb.pop();

while (op.isPresent()) {
System.out.println("pop " + op.get());
System.out.println("rb: " + rb);
op = rb.pop();
}
}
}

输出内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
new rb: RingBuffer{array=[null, null, null, null], limit=4, pushIndex=0, popIndex=0, size=0}
push hello
rb: RingBuffer{array=[hello, null, null, null], limit=4, pushIndex=1, popIndex=0, size=1}
push world
rb: RingBuffer{array=[hello, world, null, null], limit=4, pushIndex=2, popIndex=0, size=2}
push are
rb: RingBuffer{array=[hello, world, are, null], limit=4, pushIndex=3, popIndex=0, size=3}
push you
rb: RingBuffer{array=[hello, world, are, you], limit=4, pushIndex=0, popIndex=0, size=4}
Push 'ok' error: java.lang.IllegalArgumentException: The size has reached the limit
pop hello
rb: RingBuffer{array=[null, world, are, you], limit=4, pushIndex=0, popIndex=1, size=3}
pop world
rb: RingBuffer{array=[null, null, are, you], limit=4, pushIndex=0, popIndex=2, size=2}
pop are
rb: RingBuffer{array=[null, null, null, you], limit=4, pushIndex=0, popIndex=3, size=1}
pop you
rb: RingBuffer{array=[null, null, null, null], limit=4, pushIndex=0, popIndex=0, size=0}

2. 时间复杂度为 $O(1)$ 的栈中最小值获取方法

  • Java 实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
import java.util.Stack;

public class GetMinStack<E extends Comparable<E>> extends Stack<E> {
private final Stack<E> minStack;

public static void main(String[] args) {
GetMinStack<Integer> stack = new GetMinStack<>();
int[] data = {8, 4, 2, 6, 1, 9, -1, 3};
for (int i: data) {
stack.push(i);
Optional<Integer> min = stack.getMin();
System.out.println("push " + i + ", min: " + min.get() + ", stack" + stack + ", minStack: " + stack.getMinStack());
}

while (!stack.empty()) {
int i = stack.pop();
Optional<Integer> min = stack.getMin();
if (min.isPresent()) {
System.out.println("pop " + i + ", min: " + min.get() + ", stack" + stack + ", minStack: " + stack.getMinStack());
} else {
System.out.println("pop " + i + ", stack is empty");
}
}
}

public GetMinStack() {
minStack = new Stack<>();
}

@Override
public synchronized E pop() {
E item = super.pop();
E min = minStack.peek();
// 如果出栈的元素跟最小栈顶元素相等,则最小栈顶也出栈
if (min == item) {
minStack.pop();
}
return item;
}

@Override
public E push(E item) {
if (!minStack.empty()) {
E min = minStack.peek();
// 如果栈不空,看最小栈顶与入栈元素哪个小;
// 一样大或者入栈元素小,则该元素入最小栈;
// 否则不做任何操作
if (min.compareTo(item) >= 0) {
minStack.push(item);
}
} else {
// 栈空就直接入栈
minStack.push(item);
}
return super.push(item);
}

public Optional<E> getMin() {
if (empty()) {
return Optional.empty();
} else {
return Optional.of(minStack.peek());
}
}

public Collection<E> getMinStack() {
return Collections.unmodifiableCollection(this.minStack);
}
}

输出内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
push 8, min: 8, stack[8], minStack: [8]
push 4, min: 4, stack[8, 4], minStack: [8, 4]
push 2, min: 2, stack[8, 4, 2], minStack: [8, 4, 2]
push 6, min: 2, stack[8, 4, 2, 6], minStack: [8, 4, 2]
push 1, min: 1, stack[8, 4, 2, 6, 1], minStack: [8, 4, 2, 1]
push 9, min: 1, stack[8, 4, 2, 6, 1, 9], minStack: [8, 4, 2, 1]
push -1, min: -1, stack[8, 4, 2, 6, 1, 9, -1], minStack: [8, 4, 2, 1, -1]
push 3, min: -1, stack[8, 4, 2, 6, 1, 9, -1, 3], minStack: [8, 4, 2, 1, -1]
pop 3, min: -1, stack[8, 4, 2, 6, 1, 9, -1], minStack: [8, 4, 2, 1, -1]
pop -1, min: 1, stack[8, 4, 2, 6, 1, 9], minStack: [8, 4, 2, 1]
pop 9, min: 1, stack[8, 4, 2, 6, 1], minStack: [8, 4, 2, 1]
pop 1, min: 2, stack[8, 4, 2, 6], minStack: [8, 4, 2]
pop 6, min: 2, stack[8, 4, 2], minStack: [8, 4, 2]
pop 2, min: 4, stack[8, 4], minStack: [8, 4]
pop 4, min: 8, stack[8], minStack: [8]
pop 8, stack is empty

3. 用栈实现队列

  • Java 实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import java.util.Optional;
import java.util.Stack;

public class StackQueue<E> {
// 用来入队
private final Stack<E> pushStack;
// 用来出队
private final Stack<E> popStack;

public static void main(String[] args) {
StackQueue<String> queue = new StackQueue<>();
String[] data = {"hello", "world", "how", "are", "you"};
for (String s: data) {
queue.push(s);
}
System.out.println(queue);
Optional<String> op = queue.poll();
while (op.isPresent()) {
System.out.println(op.get());
op = queue.poll();
}
System.out.println(queue);
for (String s: data) {
queue.push(s);
}
System.out.println(queue);
op = queue.poll();
for (String s: data) {
queue.push(s);
}
System.out.println(queue);
while (op.isPresent()) {
System.out.println(op.get());
op = queue.poll();
}
System.out.println(queue);
}

public StackQueue() {
pushStack = new Stack<>();
popStack = new Stack<>();
}

public int size() {
return pushStack.size() + popStack.size();
}

public boolean empty() {
return pushStack.empty() && popStack.empty();
}

public boolean contains(Object o) {
return pushStack.contains(o) || popStack.contains(o);
}

public void push(E element) {
pushStack.push(element);
}

public Optional<E> poll() {
if (popStack.empty()) {
while (!pushStack.empty()) {
popStack.push(pushStack.pop());
}
if (popStack.empty()) {
return Optional.empty();
}
}

return Optional.of(popStack.pop());
}

@Override
public String toString() {
// => 表示栈顶
return "StackQueue{" +
"pushStack=" + pushStack +
"=>, popStack=" + popStack +
"=>}";
}
}

输出内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
StackQueue{pushStack=[hello, world, how, are, you]=>, popStack=[]=>}
hello
world
how
are
you
StackQueue{pushStack=[]=>, popStack=[]=>}
StackQueue{pushStack=[hello, world, how, are, you]=>, popStack=[]=>}
StackQueue{pushStack=[hello, world, how, are, you]=>, popStack=[you, are, how, world]=>}
hello
world
how
are
you
hello
world
how
are
you
StackQueue{pushStack=[]=>, popStack=[]=>}

4. 用队列实现栈

  • Java 实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import java.util.LinkedList;
import java.util.Optional;
import java.util.Queue;

public class QueueStack<E> {
private Queue<E> data;
private Queue<E> help;

public static void main(String[] args) {
QueueStack<String> stack = new QueueStack<>();
String[] data = {"hello", "world", "how", "are", "you"};
for (String s: data) {
stack.push(s);
}
Optional<String> op = stack.pop();
while (op.isPresent()) {
System.out.println(op.get());
op = stack.pop();
}
}

public QueueStack() {
data = new LinkedList<>();
help = new LinkedList<>();
}

public boolean empty() {
return data.isEmpty() && help.isEmpty();
}

public boolean contains(E object) {
return data.contains(object) || help.contains(object);
}

public int size() {
return data.size() + help.size();
}

public void push(E e) {
data.add(e);
}

public Optional<E> pop() {
int size = data.size();
if (size == 0) {
return Optional.empty();
}

for (int i=0;i<size-1;++i) {
help.add(data.poll());
}

E e = data.poll();

// swap
Queue<E> temp = help;
help = data;
data = temp;

return Optional.of(e);
}
}

输出内容:

1
2
3
4
5
you
are
how
world
hello

单向链表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
public class Node<T> {
private final T value;
private Node<T> next;

public static void main(String[] args) {
Node<String> list = Node.build("hello", "world", "are", "you", "ok");
System.out.println("build: " + list);
Node<String> reversed = list.reverse();
System.out.println("reversed: " + reversed);
Node<String> origin = reversed.reverse();
System.out.println("origin: " + origin);
}

public static <T> Node<T> build(T ...values) {
Node<T> list = null;
Node<T> cur = null;

for (T value: values) {
if (cur == null) {
cur = new Node<>(value);
list = cur;
} else {
cur.setNext(new Node<>(value));
cur = cur.getNext();
}
}

return list;
}

public Node(T value) {
this.value = value;
this.next = null;
}

public Node setNext(Node<T> next) {
this.next = next;
return this;
}

public Node getNext() {
return this.next;
}

public T getValue() {
return this.value;
}

public Node<T> reverse() {
Node<T> head = this;
Node<T> pre = null;

while (head != null) {
Node<T> next = head.getNext();

head.setNext(pre);
pre = head;

head = next;
}

return pre;
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder();
Node<T> cur = this;
while (cur != null) {
sb.append(cur.getValue());
sb.append(" -> ");
cur = cur.getNext();
}
sb.append("null");
return sb.toString();
}
}

输出内容:

1
2
3
build: hello -> world -> are -> you -> ok -> null
reversed: ok -> you -> are -> world -> hello -> null
origin: hello -> world -> are -> you -> ok -> null

双向链表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
public class DoubleNode<T> {
private final T value;
private DoubleNode<T> previous;
private DoubleNode<T> next;

public static void main(String[] args) {
DoubleNode<String> list = DoubleNode.build("hello", "world", "are", "you", "ok");
System.out.println("build: " + list);
DoubleNode<String> reversed = list.reverse();
System.out.println("reversed: " + reversed);
DoubleNode<String> origin = reversed.reverse();
System.out.println("origin: " + origin);
DoubleNode<String> tail = origin.getTailNode();
System.out.println("back: " + tail.toStringBack());
}

public static <T> DoubleNode<T> build(T ...values) {
DoubleNode<T> list = null;
DoubleNode<T> cur = null;

for (T value: values) {
if (cur == null) {
cur = new DoubleNode<>(value);
list = cur;
} else {
DoubleNode<T> node = new DoubleNode<>(value);
node.setPrevious(cur);
cur.setNext(node);
cur = cur.getNext();
}
}

return list;
}

public DoubleNode(T value) {
this.value = value;
this.previous = this.next = null;
}

public DoubleNode<T> setNext(DoubleNode<T> next) {
this.next = next;
return this;
}

public DoubleNode<T> getNext() {
return this.next;
}

public T getValue() {
return this.value;
}

public DoubleNode<T> getTailNode() {
DoubleNode<T> cur = this;
while (cur.getNext() != null) {
cur = cur.getNext();
}
return cur;
}

public DoubleNode<T> getPrevious() {
return this.previous;
}

public DoubleNode<T> setPrevious(DoubleNode<T> previous) {
this.previous = previous;
return this;
}

public DoubleNode<T> reverse() {
DoubleNode<T> head = this;
DoubleNode<T> pre = null;

while (head != null) {
DoubleNode<T> next = head.getNext();

// 其他都跟单链表一样,指针多设置一个
head.setNext(pre);
head.setPrevious(next);
pre = head;

head = next;
}

return pre;
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder();
DoubleNode<T> cur = this;
while (cur != null) {
sb.append(cur.getValue());
sb.append(" -> ");
cur = cur.getNext();
}
sb.append("null");
return sb.toString();
}

public String toStringBack() {
StringBuilder sb = new StringBuilder("null");
DoubleNode<T> cur = this;
while (cur != null) {
sb.append(" <- ");
sb.append(cur.getValue());
cur = cur.getPrevious();
}
return sb.toString();
}
}

输出内容:

1
2
3
4
build: hello -> world -> are -> you -> ok -> null
reversed: ok -> you -> are -> world -> hello -> null
origin: hello -> world -> are -> you -> ok -> null
back: null <- ok <- you <- are <- world <- hello

运算特性

  1. 两个数进行异或运算,二进制位相同为 0,不同为 1
  2. 满足交换律和结合律;
  3. 异或运算可以认为是无进位相加(二进制);
  4. 任何数与 0 异或仍得这个数,即0 ^ N = N
  5. 任何数与自身异或得 0,即 N ^ N = N

典型应用场景

1. 交换

1
2
3
4
5
6
int a = 1;
int b = 2;

a = a ^ b; // a = 1 ^ 2, b = 2
b = a ^ b; // a = 1 ^ 2, b = 1 ^ 2 ^ 2 = 1
a = a ^ b; // a = 1 ^ 2 ^ 1 = 2, b = 1

数组元素交换时,要确保交换的不是一个空间,可以相等,但不能是同一块内存跟自己进行异或运算:

  • Java 实现
1
2
3
4
5
6
7
8
9
10
void swap(int[] array, int i, int j) {
if (i == j) {
// 无法确保 i != j 一定要加这个检查,否则该位置值变为 0
return;
}

array[i] = array[i] ^ array[j];
array[j] = array[i] ^ array[j];
array[i] = array[i] ^ array[j];
}

2. 找到出现奇数次的数

题目: 一个数组中有一种数出现了奇数次,其他数都出现了偶数次,怎么找到这种数。

  • Java 实现
1
2
3
4
5
6
7
int getOddTimesNumber(int[] array) {
int xor = 0;
for (int i: array) {
xor ^= i;
}
return xor;
}
  • Rust 实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
fn get_odd_times_number_0(array: &[i32]) -> i32 {
let mut xor = 0;
for i in array {
xor ^= *i;
}
xor
}

fn get_odd_times_number_1(array: &[i32]) -> i32 {
array.iter().fold(0, |a, b| a ^ *b)
}

#[test]
fn test_get_odd_times_number() {
use rand::prelude::*;

let mut rng = thread_rng();

for _ in 0..100 {
let mut vec = Vec::new();
let odd_number = rng.gen_range(0..5);
for i in 0..5 {
let times: usize = rng.gen_range(1..3) * 2 + if i == odd_number { 1 } else { 0 };
for _ in 0..times {
vec.push(i);
}
}
vec.shuffle(&mut rng);

let get0 = get_odd_times_number_0(&vec);
let get1 = get_odd_times_number_1(&vec);
assert_eq!(odd_number, get0);
assert_eq!(odd_number, get1);
}
}

3. 提取整型数最右侧的 1

题目: 提取整型数最右侧的 1

例如:

1
2
0b10101000 ----> 0b00001000
^---只留这个 `1`

  • Java 实现
1
2
3
int getRightmostOne(int value) {
return value & (~value + 1);
}
  • Rust 实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
fn get_rightmost_one(value: i32) -> i32 {
value & (!value + 1)
}

#[test]
fn test_get_rightmost_one() {
for _ in 0..1000 {
let a: i32 = rand::random();
let b = get_rightmost_one(a);
assert_eq!(b >> b.trailing_zeros(), 1);
let bits = b.leading_zeros() + 1 + b.trailing_zeros();
assert_eq!(bits, size_of::<i32>() as u32 * 8);
}
}

4. 找到两个出现奇数次的数

题目: 一个数组中有两种数出现了奇数次,其他数都出现了偶数次,怎么找到这两种数。

  • Java 实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
int[] getOddTimesNumbers(int[] array) {
int xor = 0;
for (int i: array) {
xor ^= i;
}

// xor == a ^ b
// 因为 a != b (两种数),所以 a ^ b != 0,则必然存在为 1 的二进制位
// 不妨就使用最后一个 1,即
int one = xor & (~xor + 1);
// 假设这个 1 在第 N 位
int a = 0;
for (int i: array) {
// 将数组分为两类:1. N 位上为 1 的;2. N 位上为 0 的。
// a 和 b 一定分别属于这两类,不会同属一类,因为 a ^ b 的 N 位是 1
// 这里只把第 1 类异或起来,得到一种数
if ((i & one) != 0) {
a ^= i;
}
}

// 用之前得到的这种数异或 xor,得到另一种
return new int[]{a, a ^ xor};
}
  • Java 实现(函数式)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
int[] getOddTimesNumbers(int[] array) {
int xor = Arrays.stream(array).reduce(0, (a, b) -> a ^ b);

int one = xor & (~xor + 1);
int a = Arrays.stream(array).reduce(0, (v1, v2) -> {
if ((v2 & one) != 0) {
return v1 ^ v2;
} else {
return v1;
}
});

return new int[] {a, xor ^ a};
}
  • Rust 实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
fn get_odd_times_numbers(array: &[i32]) -> (i32, i32) {
let mut xor = 0;
for i in array {
xor ^= *i;
}

let one = xor & (!xor + 1);
let mut a = 0;
for i in array {
if one & *i != 0 {
a ^= *i;
}
}

(a, xor ^ a)
}
  • Rust 实现(函数式)
1
2
3
4
5
6
7
8
9
10
11
fn get_odd_times_numbers(array: &[i32]) -> (i32, i32) {
let xor = array.iter().fold(0, |a, b| a ^ *b);

let one = xor & (!xor + 1);
let a = array
.iter()
.fold(0, |a, b| if one & b != 0 { a ^ *b } else { a });

(a, xor ^ a)
}

  • Rust 测试代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#[test]
fn test_get_odd_times_numbers() {
use rand::prelude::*;

let mut rng = thread_rng();

for _ in 0..1000 {
let mut vec = Vec::new();
let odd_number_a = rng.gen_range(0..5);
let odd_number_b = loop {
let b = rng.gen_range(0..5);
if b != odd_number_a {
break b;
} else {
continue;
}
};
for i in 0..5 {
let times: usize = rng.gen_range(1..3) * 2
+ if i == odd_number_a || i == odd_number_b {
1
} else {
0
};
for _ in 0..times {
vec.push(i);
}
}
vec.shuffle(&mut rng);

let get = get_odd_times_numbers(&vec);
assert!(get == (odd_number_a, odd_number_b) || get == (odd_number_b, odd_number_a));
}
}

5. 计算某个数为 1 的二进制位数

例如:

1
0b00101100 --> 3
  • Java 实现
1
2
3
4
5
6
7
8
9
10
11
12
int countOnes(int a) {
int count = 0;

while (a != 0) {
count += 1;
int one = a & (~a + 1);
a ^= one; // 把这个 1 给去掉
}

return count;
}

  • Rust 实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
fn count_ones(mut a: i32) -> u32 {
let mut count = 0;

while a != 0 {
count += 1;
// a ^= a & (!a + 1) 在 debug 编译条件下可能溢出,无法通过测试,release 无所谓;wrapping_add 在 debug 条件下也没问题
a ^= a & (!a).wrapping_add(1);
}

count
}


#[test]
fn test_count_ones() {
for _ in 0..1000 {
let a = rand::random();
assert_eq!(count_ones(a), a.count_ones());
}
}

给你一个数组,长度为N,相邻元素互不相等,请返回任意局部最小值的索引。

局部最小定义:

  1. 0 位置如果比 1 位置数小,则 0 位置为局部最小;
  2. N-1 位置如果比 N-2 位置数小,则 N-1 位置为局部最小;
  3. i 位置的数,如果比 i-1i+1 位置的数都小,则 i 位置为局部最小。

  • Java 实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public static int getLocalMinimumIndex(int[] array) {
if (array == null || array.length == 0) {
return -1;
}

if (array.length == 1) {
return 0;
}

if (array[0] < array[1]) {
return 0;
}

if (array[array.length - 1] < array[array.length - 2]) {
return array.length - 1;
}

int left = 1;
int right = array.length - 2;
while (left < right) {
int mid = left + ((right - left) >> 1);
if (array[mid] > array[mid - 1]) {
right = mid - 1;
} else if (array[mid] > array[mid + 1]) {
left = mid + 1;
} else {
return mid;
}
}

return left;
}

  • Rust 实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
fn get_local_minimum_index(array: &[i32]) -> Option<usize> {
let len = array.len();
if len == 0 {
return None;
}
if len == 1 {
return Some(0);
}

if array[0] < array[1] {
return Some(0);
}

if array[len - 1] < array[len - 2] {
return Some(len - 1);
}

let mut left = 1;
let mut right = len - 2;
while left < right {
let mid = mid(left, right);
if array[mid] > array[mid - 1] {
right = mid - 1;
} else if array[mid] > array[mid + 1] {
left = mid + 1;
} else {
return Some(mid);
}
}

Some(left)
}

/// 计算两个无符号值的中间值
fn mid<T: Ord + Div<Output = T> + Add<Output = T> + Copy + Sub<Output = T> + From<u8>>(
v1: T,
v2: T,
) -> T {
match v1.cmp(&v2) {
Ordering::Less => {
let v = v2 - v1;
v1 + v / T::from(2u8)
}
Ordering::Equal => v1,
Ordering::Greater => {
let v = v1 - v2;
v2 + v / T::from(2u8)
}
}
}

/// 判断一个索引是否为局部最小值的索引
fn is_local_minimum(array: &[i32], index: usize) -> bool {
let len = array.len();
if len == 0 {
return false;
}
if len == 1 {
return index == 0;
}

let range = 0..len;
if !range.contains(&index) {
return false;
}

// 1. `0` 位置如果比 `1` 位置数小,则 `0` 位置为局部最小;
if index == 0 && array[0] < array[1] {
return true;
}

// 2. `N-1` 位置如果比 `N-2` 位置数小,则 `N-1` 位置为局部最小;
if index == len - 1 && array[len - 1] < array[len - 2] {
return true;
}

// 3. `i` 位置的数,如果比 `i-1` 和 `i+1` 位置的数逗笑,则 `i` 位置为局部最小。
array[index] < array[index - 1] && array[index] < array[index + 1]
}

// 判断一个数组是否符合要求
fn is_array_valid(array: &[i32]) -> bool {
if array.is_empty() {
return false;
}

let mut last = array[0];

for v in &array[1..] {
if last == *v {
return false;
}

last = *v;
}

true
}

/// 测试代码
#[test]
fn test_get_local_minimum_index() {
for _ in 0..1000 {
// 生成随机数组
let array = loop {
// Cargo.toml里加上rand = "0.8"
let array: [i32; 32] = rand::random();
// 确保数组符合条件
if !is_array_valid(&array) {
continue;
}
break array;
};

if let Some(index) = get_local_minimum_index(&array) {
assert!(is_local_minimum(&array, index));
} else {
assert!(false);
}
}
}