耿腾的博客

常期望安定,还期望即兴。

0%

当递归函数的时间执行函数满足如下的关系式时,可以使用公式法计算时间复杂度:

其中:

  1. $a$ 为递归次数;
  2. $\frac {N} {b}$ 为子问题规模;
  3. $O(N^d)$ 为每次递归完毕之后额外执行的操作的时间复杂度;
  • 如果 $\log_b a < d$,时间复杂度为 $O(N^d)$
  • 如果 $\log_b a > d$,时间复杂度为 $O(N^{\log_b a})$
  • 如果 $\log_b a = d$,时间复杂度为 $O(N^d \times \log N)$

例子

求数组 arr[l..r]中的最大值,用递归方法实现。

  • Java 实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class RecursiveMax {
public static void main(String[] args) {
Integer[] array = {6, 3, 5, 2, 1, 4, 0, 1, 7};
System.out.println(getMax(array, 0, array.length - 1));
}

public static <T extends Comparable<T>> T getMax(T[] arr, int l, int r) {
if (l == r) {
return arr[l];
}

int mid = l + ((r - l) >> 1);
T left = getMax(arr, l, mid);
T right = getMax(arr, mid + 1, r);
return left.compareTo(right) >= 0 ? left : right;
}
}

其中:

  1. 递归次数 $a$ 为 $2$;
  2. 子问题规模 $\frac {N} {b}$ 为 $\frac {N} {2}$,即 $b$ 为 $2$;
  3. 每次递归完毕之后额外执行的操作的时间复杂度 $O(N^d)$ 为 $O(1)$,即 $d$ 为 $0$。

满足:

所以,该算法复杂度为 $O(N^{\log_2 2}) = O(N)$

1. 环形队列

  • Java 实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
public class RingBuffer<T> {
// 队列缓冲区
private final Object[] array;

// 队列尺寸
private final int limit;

// 表示即将入队的索引位置
private int pushIndex;

// 表示即将出队的索引位置
private int popIndex;

// 表示当前队列中元素的个数
private int size;

private RingBuffer(int limit) {
if (limit <= 0) {
throw new IllegalArgumentException("limit should be greater than 0");
}
this.limit = limit;
this.array = new Object[limit];
this.popIndex = this.pushIndex = this.size = 0;
}

public static <T> RingBuffer<T> create(int limit) {
return new RingBuffer<>(limit);
}

public Optional<T> pop() {
if (size == 0) {
return Optional.empty();
}

T value = (T) this.array[this.popIndex];
this.array[this.popIndex] = null; // 去掉引用,避免泄漏
this.size -= 1;
this.popIndex = getNextIndex(this.popIndex);
return Optional.of(value);
}

public boolean empty() {
return this.size == 0;
}

public void push(T value) {
if (size == this.limit) {
throw new IllegalArgumentException("The size has reached the limit");
}

this.array[this.pushIndex] = value;
this.size += 1;
this.pushIndex = getNextIndex(this.pushIndex);
}

@Override
public String toString() {
return "RingBuffer{" +
"array=" + Arrays.toString(array) +
", limit=" + limit +
", pushIndex=" + pushIndex +
", popIndex=" + popIndex +
", size=" + size +
'}';
}

private int getNextIndex(int index) {
return index == this.limit - 1 ? 0 : index + 1;
}

public static void main(String[] args) {
RingBuffer<String> rb = RingBuffer.create(4);
System.out.println("new rb: " + rb);
String[] data = {"hello", "world", "are", "you", "ok"};
for (String s: data) {
try {
rb.push(s);
System.out.println("push " + s);
System.out.println("rb: " + rb);
} catch (Exception e) {
System.out.println("Push '" + s + "' error: " + e);
}
}

Optional<String> op = rb.pop();

while (op.isPresent()) {
System.out.println("pop " + op.get());
System.out.println("rb: " + rb);
op = rb.pop();
}
}
}

输出内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
new rb: RingBuffer{array=[null, null, null, null], limit=4, pushIndex=0, popIndex=0, size=0}
push hello
rb: RingBuffer{array=[hello, null, null, null], limit=4, pushIndex=1, popIndex=0, size=1}
push world
rb: RingBuffer{array=[hello, world, null, null], limit=4, pushIndex=2, popIndex=0, size=2}
push are
rb: RingBuffer{array=[hello, world, are, null], limit=4, pushIndex=3, popIndex=0, size=3}
push you
rb: RingBuffer{array=[hello, world, are, you], limit=4, pushIndex=0, popIndex=0, size=4}
Push 'ok' error: java.lang.IllegalArgumentException: The size has reached the limit
pop hello
rb: RingBuffer{array=[null, world, are, you], limit=4, pushIndex=0, popIndex=1, size=3}
pop world
rb: RingBuffer{array=[null, null, are, you], limit=4, pushIndex=0, popIndex=2, size=2}
pop are
rb: RingBuffer{array=[null, null, null, you], limit=4, pushIndex=0, popIndex=3, size=1}
pop you
rb: RingBuffer{array=[null, null, null, null], limit=4, pushIndex=0, popIndex=0, size=0}

2. 时间复杂度为 $O(1)$ 的栈中最小值获取方法

  • Java 实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
import java.util.Stack;

public class GetMinStack<E extends Comparable<E>> extends Stack<E> {
private final Stack<E> minStack;

public static void main(String[] args) {
GetMinStack<Integer> stack = new GetMinStack<>();
int[] data = {8, 4, 2, 6, 1, 9, -1, 3};
for (int i: data) {
stack.push(i);
Optional<Integer> min = stack.getMin();
System.out.println("push " + i + ", min: " + min.get() + ", stack" + stack + ", minStack: " + stack.getMinStack());
}

while (!stack.empty()) {
int i = stack.pop();
Optional<Integer> min = stack.getMin();
if (min.isPresent()) {
System.out.println("pop " + i + ", min: " + min.get() + ", stack" + stack + ", minStack: " + stack.getMinStack());
} else {
System.out.println("pop " + i + ", stack is empty");
}
}
}

public GetMinStack() {
minStack = new Stack<>();
}

@Override
public synchronized E pop() {
E item = super.pop();
E min = minStack.peek();
// 如果出栈的元素跟最小栈顶元素相等,则最小栈顶也出栈
if (min == item) {
minStack.pop();
}
return item;
}

@Override
public E push(E item) {
if (!minStack.empty()) {
E min = minStack.peek();
// 如果栈不空,看最小栈顶与入栈元素哪个小;
// 一样大或者入栈元素小,则该元素入最小栈;
// 否则不做任何操作
if (min.compareTo(item) >= 0) {
minStack.push(item);
}
} else {
// 栈空就直接入栈
minStack.push(item);
}
return super.push(item);
}

public Optional<E> getMin() {
if (empty()) {
return Optional.empty();
} else {
return Optional.of(minStack.peek());
}
}

public Collection<E> getMinStack() {
return Collections.unmodifiableCollection(this.minStack);
}
}

输出内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
push 8, min: 8, stack[8], minStack: [8]
push 4, min: 4, stack[8, 4], minStack: [8, 4]
push 2, min: 2, stack[8, 4, 2], minStack: [8, 4, 2]
push 6, min: 2, stack[8, 4, 2, 6], minStack: [8, 4, 2]
push 1, min: 1, stack[8, 4, 2, 6, 1], minStack: [8, 4, 2, 1]
push 9, min: 1, stack[8, 4, 2, 6, 1, 9], minStack: [8, 4, 2, 1]
push -1, min: -1, stack[8, 4, 2, 6, 1, 9, -1], minStack: [8, 4, 2, 1, -1]
push 3, min: -1, stack[8, 4, 2, 6, 1, 9, -1, 3], minStack: [8, 4, 2, 1, -1]
pop 3, min: -1, stack[8, 4, 2, 6, 1, 9, -1], minStack: [8, 4, 2, 1, -1]
pop -1, min: 1, stack[8, 4, 2, 6, 1, 9], minStack: [8, 4, 2, 1]
pop 9, min: 1, stack[8, 4, 2, 6, 1], minStack: [8, 4, 2, 1]
pop 1, min: 2, stack[8, 4, 2, 6], minStack: [8, 4, 2]
pop 6, min: 2, stack[8, 4, 2], minStack: [8, 4, 2]
pop 2, min: 4, stack[8, 4], minStack: [8, 4]
pop 4, min: 8, stack[8], minStack: [8]
pop 8, stack is empty

3. 用栈实现队列

  • Java 实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import java.util.Optional;
import java.util.Stack;

public class StackQueue<E> {
// 用来入队
private final Stack<E> pushStack;
// 用来出队
private final Stack<E> popStack;

public static void main(String[] args) {
StackQueue<String> queue = new StackQueue<>();
String[] data = {"hello", "world", "how", "are", "you"};
for (String s: data) {
queue.push(s);
}
System.out.println(queue);
Optional<String> op = queue.poll();
while (op.isPresent()) {
System.out.println(op.get());
op = queue.poll();
}
System.out.println(queue);
for (String s: data) {
queue.push(s);
}
System.out.println(queue);
op = queue.poll();
for (String s: data) {
queue.push(s);
}
System.out.println(queue);
while (op.isPresent()) {
System.out.println(op.get());
op = queue.poll();
}
System.out.println(queue);
}

public StackQueue() {
pushStack = new Stack<>();
popStack = new Stack<>();
}

public int size() {
return pushStack.size() + popStack.size();
}

public boolean empty() {
return pushStack.empty() && popStack.empty();
}

public boolean contains(Object o) {
return pushStack.contains(o) || popStack.contains(o);
}

public void push(E element) {
pushStack.push(element);
}

public Optional<E> poll() {
if (popStack.empty()) {
while (!pushStack.empty()) {
popStack.push(pushStack.pop());
}
if (popStack.empty()) {
return Optional.empty();
}
}

return Optional.of(popStack.pop());
}

@Override
public String toString() {
// => 表示栈顶
return "StackQueue{" +
"pushStack=" + pushStack +
"=>, popStack=" + popStack +
"=>}";
}
}

输出内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
StackQueue{pushStack=[hello, world, how, are, you]=>, popStack=[]=>}
hello
world
how
are
you
StackQueue{pushStack=[]=>, popStack=[]=>}
StackQueue{pushStack=[hello, world, how, are, you]=>, popStack=[]=>}
StackQueue{pushStack=[hello, world, how, are, you]=>, popStack=[you, are, how, world]=>}
hello
world
how
are
you
hello
world
how
are
you
StackQueue{pushStack=[]=>, popStack=[]=>}

4. 用队列实现栈

  • Java 实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import java.util.LinkedList;
import java.util.Optional;
import java.util.Queue;

public class QueueStack<E> {
private Queue<E> data;
private Queue<E> help;

public static void main(String[] args) {
QueueStack<String> stack = new QueueStack<>();
String[] data = {"hello", "world", "how", "are", "you"};
for (String s: data) {
stack.push(s);
}
Optional<String> op = stack.pop();
while (op.isPresent()) {
System.out.println(op.get());
op = stack.pop();
}
}

public QueueStack() {
data = new LinkedList<>();
help = new LinkedList<>();
}

public boolean empty() {
return data.isEmpty() && help.isEmpty();
}

public boolean contains(E object) {
return data.contains(object) || help.contains(object);
}

public int size() {
return data.size() + help.size();
}

public void push(E e) {
data.add(e);
}

public Optional<E> pop() {
int size = data.size();
if (size == 0) {
return Optional.empty();
}

for (int i=0;i<size-1;++i) {
help.add(data.poll());
}

E e = data.poll();

// swap
Queue<E> temp = help;
help = data;
data = temp;

return Optional.of(e);
}
}

输出内容:

1
2
3
4
5
you
are
how
world
hello

单向链表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
public class Node<T> {
private final T value;
private Node<T> next;

public static void main(String[] args) {
Node<String> list = Node.build("hello", "world", "are", "you", "ok");
System.out.println("build: " + list);
Node<String> reversed = list.reverse();
System.out.println("reversed: " + reversed);
Node<String> origin = reversed.reverse();
System.out.println("origin: " + origin);
}

public static <T> Node<T> build(T ...values) {
Node<T> list = null;
Node<T> cur = null;

for (T value: values) {
if (cur == null) {
cur = new Node<>(value);
list = cur;
} else {
cur.setNext(new Node<>(value));
cur = cur.getNext();
}
}

return list;
}

public Node(T value) {
this.value = value;
this.next = null;
}

public Node setNext(Node<T> next) {
this.next = next;
return this;
}

public Node getNext() {
return this.next;
}

public T getValue() {
return this.value;
}

public Node<T> reverse() {
Node<T> head = this;
Node<T> pre = null;

while (head != null) {
Node<T> next = head.getNext();

head.setNext(pre);
pre = head;

head = next;
}

return pre;
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder();
Node<T> cur = this;
while (cur != null) {
sb.append(cur.getValue());
sb.append(" -> ");
cur = cur.getNext();
}
sb.append("null");
return sb.toString();
}
}

输出内容:

1
2
3
build: hello -> world -> are -> you -> ok -> null
reversed: ok -> you -> are -> world -> hello -> null
origin: hello -> world -> are -> you -> ok -> null

双向链表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
public class DoubleNode<T> {
private final T value;
private DoubleNode<T> previous;
private DoubleNode<T> next;

public static void main(String[] args) {
DoubleNode<String> list = DoubleNode.build("hello", "world", "are", "you", "ok");
System.out.println("build: " + list);
DoubleNode<String> reversed = list.reverse();
System.out.println("reversed: " + reversed);
DoubleNode<String> origin = reversed.reverse();
System.out.println("origin: " + origin);
DoubleNode<String> tail = origin.getTailNode();
System.out.println("back: " + tail.toStringBack());
}

public static <T> DoubleNode<T> build(T ...values) {
DoubleNode<T> list = null;
DoubleNode<T> cur = null;

for (T value: values) {
if (cur == null) {
cur = new DoubleNode<>(value);
list = cur;
} else {
DoubleNode<T> node = new DoubleNode<>(value);
node.setPrevious(cur);
cur.setNext(node);
cur = cur.getNext();
}
}

return list;
}

public DoubleNode(T value) {
this.value = value;
this.previous = this.next = null;
}

public DoubleNode<T> setNext(DoubleNode<T> next) {
this.next = next;
return this;
}

public DoubleNode<T> getNext() {
return this.next;
}

public T getValue() {
return this.value;
}

public DoubleNode<T> getTailNode() {
DoubleNode<T> cur = this;
while (cur.getNext() != null) {
cur = cur.getNext();
}
return cur;
}

public DoubleNode<T> getPrevious() {
return this.previous;
}

public DoubleNode<T> setPrevious(DoubleNode<T> previous) {
this.previous = previous;
return this;
}

public DoubleNode<T> reverse() {
DoubleNode<T> head = this;
DoubleNode<T> pre = null;

while (head != null) {
DoubleNode<T> next = head.getNext();

// 其他都跟单链表一样,指针多设置一个
head.setNext(pre);
head.setPrevious(next);
pre = head;

head = next;
}

return pre;
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder();
DoubleNode<T> cur = this;
while (cur != null) {
sb.append(cur.getValue());
sb.append(" -> ");
cur = cur.getNext();
}
sb.append("null");
return sb.toString();
}

public String toStringBack() {
StringBuilder sb = new StringBuilder("null");
DoubleNode<T> cur = this;
while (cur != null) {
sb.append(" <- ");
sb.append(cur.getValue());
cur = cur.getPrevious();
}
return sb.toString();
}
}

输出内容:

1
2
3
4
build: hello -> world -> are -> you -> ok -> null
reversed: ok -> you -> are -> world -> hello -> null
origin: hello -> world -> are -> you -> ok -> null
back: null <- ok <- you <- are <- world <- hello

运算特性

  1. 两个数进行异或运算,二进制位相同为 0,不同为 1
  2. 满足交换律和结合律;
  3. 异或运算可以认为是无进位相加(二进制);
  4. 任何数与 0 异或仍得这个数,即0 ^ N = N
  5. 任何数与自身异或得 0,即 N ^ N = N

典型应用场景

1. 交换

1
2
3
4
5
6
int a = 1;
int b = 2;

a = a ^ b; // a = 1 ^ 2, b = 2
b = a ^ b; // a = 1 ^ 2, b = 1 ^ 2 ^ 2 = 1
a = a ^ b; // a = 1 ^ 2 ^ 1 = 2, b = 1

数组元素交换时,要确保交换的不是一个空间,可以相等,但不能是同一块内存跟自己进行异或运算:

  • Java 实现
1
2
3
4
5
6
7
8
9
10
void swap(int[] array, int i, int j) {
if (i == j) {
// 无法确保 i != j 一定要加这个检查,否则该位置值变为 0
return;
}

array[i] = array[i] ^ array[j];
array[j] = array[i] ^ array[j];
array[i] = array[i] ^ array[j];
}

2. 找到出现奇数次的数

题目: 一个数组中有一种数出现了奇数次,其他数都出现了偶数次,怎么找到这种数。

  • Java 实现
1
2
3
4
5
6
7
int getOddTimesNumber(int[] array) {
int xor = 0;
for (int i: array) {
xor ^= i;
}
return xor;
}
  • Rust 实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
fn get_odd_times_number_0(array: &[i32]) -> i32 {
let mut xor = 0;
for i in array {
xor ^= *i;
}
xor
}

fn get_odd_times_number_1(array: &[i32]) -> i32 {
array.iter().fold(0, |a, b| a ^ *b)
}

#[test]
fn test_get_odd_times_number() {
use rand::prelude::*;

let mut rng = thread_rng();

for _ in 0..100 {
let mut vec = Vec::new();
let odd_number = rng.gen_range(0..5);
for i in 0..5 {
let times: usize = rng.gen_range(1..3) * 2 + if i == odd_number { 1 } else { 0 };
for _ in 0..times {
vec.push(i);
}
}
vec.shuffle(&mut rng);

let get0 = get_odd_times_number_0(&vec);
let get1 = get_odd_times_number_1(&vec);
assert_eq!(odd_number, get0);
assert_eq!(odd_number, get1);
}
}

3. 提取整型数最右侧的 1

题目: 提取整型数最右侧的 1

例如:

1
2
0b10101000 ----> 0b00001000
^---只留这个 `1`

  • Java 实现
1
2
3
int getRightmostOne(int value) {
return value & (~value + 1);
}
  • Rust 实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
fn get_rightmost_one(value: i32) -> i32 {
value & (!value + 1)
}

#[test]
fn test_get_rightmost_one() {
for _ in 0..1000 {
let a: i32 = rand::random();
let b = get_rightmost_one(a);
assert_eq!(b >> b.trailing_zeros(), 1);
let bits = b.leading_zeros() + 1 + b.trailing_zeros();
assert_eq!(bits, size_of::<i32>() as u32 * 8);
}
}

4. 找到两个出现奇数次的数

题目: 一个数组中有两种数出现了奇数次,其他数都出现了偶数次,怎么找到这两种数。

  • Java 实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
int[] getOddTimesNumbers(int[] array) {
int xor = 0;
for (int i: array) {
xor ^= i;
}

// xor == a ^ b
// 因为 a != b (两种数),所以 a ^ b != 0,则必然存在为 1 的二进制位
// 不妨就使用最后一个 1,即
int one = xor & (~xor + 1);
// 假设这个 1 在第 N 位
int a = 0;
for (int i: array) {
// 将数组分为两类:1. N 位上为 1 的;2. N 位上为 0 的。
// a 和 b 一定分别属于这两类,不会同属一类,因为 a ^ b 的 N 位是 1
// 这里只把第 1 类异或起来,得到一种数
if ((i & one) != 0) {
a ^= i;
}
}

// 用之前得到的这种数异或 xor,得到另一种
return new int[]{a, a ^ xor};
}
  • Java 实现(函数式)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
int[] getOddTimesNumbers(int[] array) {
int xor = Arrays.stream(array).reduce(0, (a, b) -> a ^ b);

int one = xor & (~xor + 1);
int a = Arrays.stream(array).reduce(0, (v1, v2) -> {
if ((v2 & one) != 0) {
return v1 ^ v2;
} else {
return v1;
}
});

return new int[] {a, xor ^ a};
}
  • Rust 实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
fn get_odd_times_numbers(array: &[i32]) -> (i32, i32) {
let mut xor = 0;
for i in array {
xor ^= *i;
}

let one = xor & (!xor + 1);
let mut a = 0;
for i in array {
if one & *i != 0 {
a ^= *i;
}
}

(a, xor ^ a)
}
  • Rust 实现(函数式)
1
2
3
4
5
6
7
8
9
10
11
fn get_odd_times_numbers(array: &[i32]) -> (i32, i32) {
let xor = array.iter().fold(0, |a, b| a ^ *b);

let one = xor & (!xor + 1);
let a = array
.iter()
.fold(0, |a, b| if one & b != 0 { a ^ *b } else { a });

(a, xor ^ a)
}

  • Rust 测试代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#[test]
fn test_get_odd_times_numbers() {
use rand::prelude::*;

let mut rng = thread_rng();

for _ in 0..1000 {
let mut vec = Vec::new();
let odd_number_a = rng.gen_range(0..5);
let odd_number_b = loop {
let b = rng.gen_range(0..5);
if b != odd_number_a {
break b;
} else {
continue;
}
};
for i in 0..5 {
let times: usize = rng.gen_range(1..3) * 2
+ if i == odd_number_a || i == odd_number_b {
1
} else {
0
};
for _ in 0..times {
vec.push(i);
}
}
vec.shuffle(&mut rng);

let get = get_odd_times_numbers(&vec);
assert!(get == (odd_number_a, odd_number_b) || get == (odd_number_b, odd_number_a));
}
}

5. 计算某个数为 1 的二进制位数

例如:

1
0b00101100 --> 3
  • Java 实现
1
2
3
4
5
6
7
8
9
10
11
12
int countOnes(int a) {
int count = 0;

while (a != 0) {
count += 1;
int one = a & (~a + 1);
a ^= one; // 把这个 1 给去掉
}

return count;
}

  • Rust 实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
fn count_ones(mut a: i32) -> u32 {
let mut count = 0;

while a != 0 {
count += 1;
// a ^= a & (!a + 1) 在 debug 编译条件下可能溢出,无法通过测试,release 无所谓;wrapping_add 在 debug 条件下也没问题
a ^= a & (!a).wrapping_add(1);
}

count
}


#[test]
fn test_count_ones() {
for _ in 0..1000 {
let a = rand::random();
assert_eq!(count_ones(a), a.count_ones());
}
}

给你一个数组,长度为N,相邻元素互不相等,请返回任意局部最小值的索引。

局部最小定义:

  1. 0 位置如果比 1 位置数小,则 0 位置为局部最小;
  2. N-1 位置如果比 N-2 位置数小,则 N-1 位置为局部最小;
  3. i 位置的数,如果比 i-1i+1 位置的数都小,则 i 位置为局部最小。

  • Java 实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public static int getLocalMinimumIndex(int[] array) {
if (array == null || array.length == 0) {
return -1;
}

if (array.length == 1) {
return 0;
}

if (array[0] < array[1]) {
return 0;
}

if (array[array.length - 1] < array[array.length - 2]) {
return array.length - 1;
}

int left = 1;
int right = array.length - 2;
while (left < right) {
int mid = left + ((right - left) >> 1);
if (array[mid] > array[mid - 1]) {
right = mid - 1;
} else if (array[mid] > array[mid + 1]) {
left = mid + 1;
} else {
return mid;
}
}

return left;
}

  • Rust 实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
fn get_local_minimum_index(array: &[i32]) -> Option<usize> {
let len = array.len();
if len == 0 {
return None;
}
if len == 1 {
return Some(0);
}

if array[0] < array[1] {
return Some(0);
}

if array[len - 1] < array[len - 2] {
return Some(len - 1);
}

let mut left = 1;
let mut right = len - 2;
while left < right {
let mid = mid(left, right);
if array[mid] > array[mid - 1] {
right = mid - 1;
} else if array[mid] > array[mid + 1] {
left = mid + 1;
} else {
return Some(mid);
}
}

Some(left)
}

/// 计算两个无符号值的中间值
fn mid<T: Ord + Div<Output = T> + Add<Output = T> + Copy + Sub<Output = T> + From<u8>>(
v1: T,
v2: T,
) -> T {
match v1.cmp(&v2) {
Ordering::Less => {
let v = v2 - v1;
v1 + v / T::from(2u8)
}
Ordering::Equal => v1,
Ordering::Greater => {
let v = v1 - v2;
v2 + v / T::from(2u8)
}
}
}

/// 判断一个索引是否为局部最小值的索引
fn is_local_minimum(array: &[i32], index: usize) -> bool {
let len = array.len();
if len == 0 {
return false;
}
if len == 1 {
return index == 0;
}

let range = 0..len;
if !range.contains(&index) {
return false;
}

// 1. `0` 位置如果比 `1` 位置数小,则 `0` 位置为局部最小;
if index == 0 && array[0] < array[1] {
return true;
}

// 2. `N-1` 位置如果比 `N-2` 位置数小,则 `N-1` 位置为局部最小;
if index == len - 1 && array[len - 1] < array[len - 2] {
return true;
}

// 3. `i` 位置的数,如果比 `i-1` 和 `i+1` 位置的数逗笑,则 `i` 位置为局部最小。
array[index] < array[index - 1] && array[index] < array[index + 1]
}

// 判断一个数组是否符合要求
fn is_array_valid(array: &[i32]) -> bool {
if array.is_empty() {
return false;
}

let mut last = array[0];

for v in &array[1..] {
if last == *v {
return false;
}

last = *v;
}

true
}

/// 测试代码
#[test]
fn test_get_local_minimum_index() {
for _ in 0..1000 {
// 生成随机数组
let array = loop {
// Cargo.toml里加上rand = "0.8"
let array: [i32; 32] = rand::random();
// 确保数组符合条件
if !is_array_valid(&array) {
continue;
}
break array;
};

if let Some(index) = get_local_minimum_index(&array) {
assert!(is_local_minimum(&array, index));
} else {
assert!(false);
}
}
}

遵循以下公式:

  • 其中,$N_C$ 为处理器的核的数目,可以通过 Runtime.getRuntime().avaliableProcessors() 得到
  • $U_C$ 是期望的 CPU 利用率(介于 0 到 1 之间)
  • $W / C$ 是等待时间与计算时间的比值

例如,对于 CPU 密集型应用,期望 CPU 利用率为 100%,无等待纯计算,则有:

即工作线程数设置为处理器的核心数最合适。

  1. 开闭原则(Open Close Principle)。对扩展开放,对修改关闭。在程序需要进行拓展的时候,不能去修改原有的代码,实现一个热插拔的效果。简言之,是为了使程序的扩展性好,易于维护和升级。想要达到这样的效果,我们需要使用接口和抽象类,后面的具体设计中我们会提到这点。

  2. 里氏代换原则(Liskov Substitution Principle)。里氏代换原则是面向对象设计的基本原则之一。 里氏代换原则中说,任何基类可以出现的地方,子类一定可以出现。LSP 是继承复用的基石,只有当派生类可以替换掉基类,且软件单位的功能不受到影响时,基类才能真正被复用,而派生类也能够在基类的基础上增加新的行为。里氏代换原则是对开闭原则的补充。实现开闭原则的关键步骤就是抽象化,而基类与子类的继承关系就是抽象化的具体实现,所以里氏代换原则是对实现抽象化的具体步骤的规范。

  3. 依赖倒转原则(Dependence Inversion Principle)。开闭原则的基础,具体内容是针对接口编程,依赖于抽象而不依赖于具体。

  4. 接口隔离原则(Interface Segregation Principle)。使用多个隔离的接口,比使用单个接口要好。它还有另外一个意思是:降低类之间的耦合度。由此可见,其实设计模式就是从大型软件架构出发、便于升级和维护的软件设计思想,它强调降低依赖,降低耦合。

  5. 迪米特法则,又称最少知道原则(Demeter Principle)。一个实体应当尽量少地与其他实体之间发生相互作用,使得系统功能模块相对独立。

  6. 合成复用原则(Composite Reuse Principle)。尽量使用合成/聚合的方式,而不是使用继承。

单例模式

一个单一的类,负责创建自己的对象,同时确保只有单个对象被创建。

单例模式三种最典型的懒加载单例写法:

双重检查锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class Singleton {

// 定义为 volatile,防止指令重排序,导致未完成初始化的对象被使用
private static volatile Singleton INSTANCE = null;

public static Singleton getInstance() {
// 基于性能考量,第一次检查,避免每一次都加锁
if (INSTANCE == null) {
// 加锁
synchronized (Singleton1.class) {
// 检查这之前是否有其他线程已经获得过锁并初始化
if (INSTANCE == null) {
INSTANCE = new Singleton();
}
}
}

return INSTANCE;
}
}

静态内部类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class Singleton {

// 外部类被加载时,内部类还不会被加载
private static class SingletonHolder {
private final static Singleton INSTANCE = new Singleton();
}

public static Singleton getInstance() {
// 调用这里时,内部类被加载,实现初始化
// 由JVM保证只加载一次,线程安全
return SingletonHolder.INSTANCE;
}

private Singleton() {}
}

枚举

1
2
3
4
5
6
7
8
// 懒加载,线程安全,还可以防止反序列化
public enum Singleton {
INSTANCE;

public static Singleton getInstance() {
return Singleton.INSTANCE;
}
}

策略模式(Strategy)

策略模式是对算法的包装,是把使用算法的责任和算法本身分割开来,委派给不通过的对象管理。

最典型的例子就是使用比较器(Comparator<T>):

1
2
3
4
5
6
@FunctionalInterface
public interface Comparator<T> {
int compare(T o1, T o2);

// 省略其他...
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import java.util.Arrays;
import java.util.Comparator;

public class Cat {
int weight;

public int GetWeight() {
return weight;
}

public Cat(int weight) {
this.weight = weight;
}

@Override
public String toString() {
return "Cat{" +
"weight=" + weight +
'}';
}

public static void main(String[] args) {
Cat[] cats = {new Cat(12), new Cat(21), new Cat(4), new Cat(8)};

// 这里可以new一个实现了Comparator的类,也可以使用Lambda表达式
Arrays.sort(cats, Comparator.comparingInt(Cat::GetWeight));

System.out.println(Arrays.toString(cats));
}
}

注:

  1. 开闭原则(OCP,Open-Closed Principle),对修改关闭,对扩展开放。
  2. 自定义静态泛型函数时,将类型参数列表写在 static 和返回值类型之间,例如:
    1
    static <T> void sort(T[] array, Comparator c);

抽象工厂(Abstract Factory)

一种为访问类提供一个创建一组相关或相互依赖对象的接口,且访问类无须指定所要产品的具体类就能得到同族的不同等级的产品的模式结构。

例如,农场可以生产动物、蔬菜。农场A生产牛,白菜,农场B生产羊,花椰菜。编码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
// 农场抽象
public abstract class Farm {
public abstract Animal createAnimal();
public abstract Vegetable createVegetable();
}

// 动物抽象
public interface Animal {
}

// 蔬菜抽象
public interface Vegetable {
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 农场A
public class FarmA extends Farm {
@Override
public Animal createAnimal() {
return new Cow();
}

@Override
public Vegetable createVegetable() {
return new Cabbage();
}
}

public class Cow implements Animal {
}

public class Cabbage implements Vegetable {
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 农场B
public class FarmB extends Farm {
@Override
public Animal createAnimal() {
return new Sheep();
}

@Override
public Vegetable createVegetable() {
return new Cauliflower();
}
}

public class Sheep implements Animal {
}

public class Cauliflower implements Vegetable {
}
1
2
3
4
5
6
7
8
// 实际调用
public class Main {
public static void main(String[] args) {
Farm farm = new FarmA();
System.out.println(farm.createAnimal().getClass().getSimpleName());
System.out.println(farm.createVegetable().getClass().getSimpleName());
}
}

输出:

1
2
Cow
Cabbage

使用 Rust 语言编写如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
use std::fmt::Debug;

// 产品抽象
trait Animal {}
trait Vegetable {}

// 工厂抽象
trait Farm {
type A: Animal;
type V: Vegetable;
const NAME: &'static str;

fn create_animal() -> Self::A;
fn create_vegetable() -> Self::V;
}

#[derive(Debug)]
struct Cow;
impl Animal for Cow {}
#[derive(Debug)]
struct Cabbage;
impl Vegetable for Cabbage {}
struct FarmA;
impl Farm for FarmA {
type A = Cow;
type V = Cabbage;
const NAME: &'static str = "A";

fn create_animal() -> Self::A {
Cow
}

fn create_vegetable() -> Self::V {
Cabbage
}
}

#[derive(Debug)]
struct Sheep;
impl Animal for Sheep {}
#[derive(Debug)]
struct Cauliflower;
impl Vegetable for Cauliflower {}
struct FarmB;
impl Farm for FarmB {
type A = Sheep;
type V = Cauliflower;
const NAME: &'static str = "B";

fn create_animal() -> Self::A {
Sheep
}

fn create_vegetable() -> Self::V {
Cauliflower
}
}

// 实际调用
fn run_a_farm<F>()
where
F: Farm,
F::A: Debug,
F::V: Debug,
{
println!("Farm {}:", F::NAME);
println!("\tAnimal: {:?}", F::create_animal());
println!("\tVegetable: {:?}", F::create_vegetable());
}

fn main() {
run_a_farm::<FarmA>();
run_a_farm::<FarmB>();
}

输出结果为:

1
2
3
4
5
6
Farm A:
Animal: Cow
Vegetable: Cabbage
Farm B:
Animal: Sheep
Vegetable: Cauliflower

典型应用场景

  1. 界面换肤,一款皮肤下,不同的控件的样式;

注:

  1. 增加新的产品族,只需要新增工厂实现,满足开闭原则;
  2. 产品族需要新增产品的时候,需要修改所有工厂,不满足开闭原则。
  3. 当系统只存在一类产品时,抽象工厂退化到工厂方法模式。

外观/门面(Facade)

通过为多个复杂的子系统提供一个一致的接口,而使这些子系统更加容易被访问的模式。

它是迪米特法则的典型应用。降低了子系统与客户端之间的耦合度。

迪米特法则(LOD,Law of Demeter)又叫作最少知识原则(The Least Knowledge Principle),一个类对于其他类知道的越少越好,就是说一个对象应当对其他对象有尽可能少的了解,只和朋友通信,不和陌生人说话。

这是 Withoutboats 在 2019 年 3 月的 Rust Latam 上所做报告的一个整理。这个报告主要介绍他参与开发了一年半的语言特性,包括 Rust 异步 I/O 的发展历程,以及目前已经稳定的零成本抽象的async/await 语法的关键实现原理。

Withoutboats 是就职于 Mozilla 的一名研究员,主要从事 Rust 语言开发。他开发的这个语言特性叫做 async/await,这可能是本年度我们在 Rust 语言上做的最重要的事。这解决了困扰我们很久的问题,即我们如何能在 Rust 中拥有零成本抽象的异步IO。

注:因个人水平有限,翻译和整理难免有错误或疏漏之处,欢迎读者批评指正。

async/await

首先,介绍一下 async/await

async 是一个修饰符,它可以应用在函数上,这种函数不会在调用时一句句运行完成,而是立即返回一个 Future 对象,这个 Future 对象最终将给出这个函数的实际返回结果。而在一个这样的 async 函数中,我们可以使用await运算符,将它用在其它会返回 Future 的函数上,直到那些 Future 返回实际结果。通过这种方法,异步并发开发更加方便了。

1
2
3
4
5
6
7
8
9
let user = await db.get_user("withoutboats");

impl Database {
async fn get_user(&mut self, user: &str) -> User {
let sql = format!("select FROM users WHERE username = {}", user);
let db_response = await self.query(&sql);
User::from(db_response)
}
}

这是一段简短的代码样例,我们具体解释一下 Future 。这段代码基本上做的就是一种类似于 ORM 框架所作的事。你有一个叫 get_user 的函数,它接受一个字符串类型的用户名参数,并通过在数据库中查找对应用户的记录来返回一个User对象。它使用的是异步 I/O ,这意味着它得是一个异步函数,而不是普通函数,因此当你调用它时,你可以异步等待(await)它;然后我们看一下函数的实现,首先是用用户名参数拼接出要执行的 SQL 语句,然后是查询数据库,这就是我们实际执行 I/O 的地方,所以这个查询(query)返回的是 Future ,因为它使用的是异步 I/O 。所以在查询数据库时,你只需要使用异步等待(await)来等待响应,在获得响应后就可以从中解析出用户。这个函数看起来像个玩具,但我想强调的是,它与使用阻塞式 I/O 的唯一区别就是这些注解(指async/await)了,你只需将函数标记为异步(async),并在调用它们时加上 await 就行了,开发的心智负担很小,以至于你会忘了自己是在写异步 I/O 而不是阻塞 I/O 。而 Rust 的这种实现让我尤其感到兴奋的是,它的 async/awaitFuture 都是零成本抽象的。

零成本抽象

零成本抽象是 Rust 比较独特的一项准则,这是使 Rust 与其他许多语言相区别的原因之一。在添加新功能时,我们非常关心这些新功能是不是零成本的。不过这并不是我们想出来的点子,它在 C++ 中也很重要,所以我认为最好的解释是 Bjarne Stroustrup 的这句话:

零成本抽象意味着你不使用的东西,你不用为它付出任何代价,进一步讲,你使用的东西,你无法写出比这更好的代码。

Zero Cost Abstractions: What you don’t use, you don’t pay for. And further: What you do use, you couldn’t hand code any better.

也就是说零成本抽象有两个方面:

  1. 该功能不会给不使用该功能的用户增加成本,因此我们不能为了增加新的特性而增加那些会减慢所有程序运行的全局性开销。
  2. 当你确实要使用该功能时,它的速度不会比不使用它的速度慢。如果你觉得,我想使用这个非常好用的功能把开发工作变得轻松,但是它会使我的程序变慢,所以我打算自己造一个,那么这实际上是带来了更大的痛苦。

所以,我将回顾一下我们如何尝试解决异步 I/O 和 Rust 的问题,以及在我们实现这一目标的过程中,某些未能通过这两项零成本测试的特性。

绿色线程的尝试

我们要解决的问题是 异步 I/O 。通常 I/O 处于阻塞状态,因此当你使用 I/O 时,它会阻塞线程,中止你的程序,然后必须通过操作系统重新调度。阻塞式 I/O 的问题是当你尝试通过同一程序提供大量连接时,它无法真正实现扩展。因此对于真正的大规模网络服务,你需要某种形式的非阻塞的或者说异步的 I/O 。尤其是 Rust 是针对具有真正的高性能要求而设计的语言,它是一种系统编程语言,面向那些真正在乎计算资源的人。要在网络的世界中真正取得成功,我们就需要某种解决方案来解决这个异步 I/O 问题。

但是 异步 I/O 的最大问题是它的工作方式 :在你调用 I/O 时,系统调用会立即返回,然后你可以继续进行其他工作,但你的程序需要决定如何回到调用该异步 I/O 暂停的那个任务线上,这就使得在编码上,异步 I/O 的代码要比阻塞 I/O 的代码复杂得多。所以,很多,尤其是以可扩展的网络服务这类特性为目标的语言,一直在试图解决这个问题。比如,让它不再是最终用户需要解决的问题,而是编程语言的一部分或者某个库的一部分等等。

Rust 最初使用的第一个解决方案是 绿色线程,它已经在许多语言中获得成功。绿色线程基本上就像阻塞式 I/O 一样,使用的时候就像是普通的线程,它们会在执行 I/O 时阻塞,一切看起来就跟你在使用操作系统的原生方式一样。但是,它们被设计为语言运行时的一部分,来对那些需要同时运行成千上万甚至数百万个绿色线程的网络服务用例进行优化。一个使用该模型的典型的成功案例就是 Go 语言,它的绿色线程被称为 goroutine。对于 Go 程序来说,同时运行成千上万个 goroutine 是很正常的,因为与操作系统线程不同,创建它们的成本很低。

操作系统线程 绿色线程
内存开销 较大的堆栈,增加大量内存占用 初始堆栈非常小
CPU开销 上下文切换至操作系统的调度器,成本很高 由程序本身的运行时调度

绿色线程的优点 在于,产生操作系统线程时的内存开销要高得多,因为每个操作系统线程会创建一个很大的堆栈,而绿色线程通常的工作方式是,你将产生一个以很小的堆栈,它只会随着时间的推移而增长,而产生一堆不使用大量内存的新线程并不便宜;并且使用类似操作系统原语的问题还在于你依赖于操作系统调度,这意味着你必须从程序的内存空间切换到内核空间,如果成千上万的线程都在快速切换,上下文切换就会增加很多开销。而将调度保持在同一程序中,你将避免使用这些上下文,进而减少开销。所以我相信绿色线程是一个非常好的模型,适用于许多语言,包括 Go 和 Java。

在很长一段时间内, Rust 都有绿色线程,但是在 1.0 版本之前删掉了。我们删掉它是因为它不是零成本抽象的,准确的说就是我在第一个问题中谈到的,它给那些不需要它的人增加了成本。比如你只想编写一个不是网络服务的屏幕打印的 Rust 程序,你必须引入负责调度所有绿色线程的语言运行时。这种方法,尤其是对于试图把 Rust 集成到一个大的 C 应用程序中的人来说,就成为一个问题。很多 Rust 的采用者拥有一些大型C程序,他们想开始使用 Rust 并将 Rust 集成到他们的程序中,只是一小段 Rust 代码。问题是,如果你必须设置运行时才能调用 Rust ,那么这一小部分的 Rust 程序的成本就太高了。因此从 1.0 开始,我们就从语言中删除了绿色线程,并删除了语言的运行时。现在我们都知道它的运行时与 C 基本上相同,这就使得在 Rust 和 C 之间调用非常容易,而且成本很低,这是使 Rust 真正成功的关键因素之一。删除了绿色线程,我们还是需要某种异步 I/O 解决方案;但是我们意识到 这应该是一个基于库的解决方案,我们需要为异步 I/O 提供良好的抽象,它不是语言的一部分,也不是每个程序附带的运行时的一部分,只是可选的并按需使用的库。

Future 的解决方案

最成功的库解决方案是一个叫做 Future 的概念,在 JavaScript 中也叫做 PromiseFuture 表示一个尚未得出的值,你可以在它被解决(resolved)以得出那个值之前对它进行各种操作。在许多语言中,对 Future 所做的工作并不多,这种实现支持很多特性比如组合器(Combinator),尤其是能让我们在此基础上实现更符合人体工程学的 async/await 语法。

Future 可以表示各种各样的东西,尤其适用于表示异步 I/O :当你发起一次网络请求时,你将立即获得一个 Future 对象,而一旦网络请求完成,它将返回任何响应可能包含的值;你也可以表示诸如“超时”之类的东西,“超时”其实就是一个在过了特定时间后被解决的 Future ;甚至不属于 I/O 的工作或者需要放到某个线程池中运行的CPU密集型的工作,也可以通过一个 Future 来表示,这个 Future 将会在线程池完成工作后被解决。

1
2
3
4
5
trait Future {
type Output;
fn schedule<F>(self, callback: F)
where F: FnOnce(Self::Output);
}

Future 存在的问题 是它在大多数语言中的表示方式是这种基于回调的方法,使用这种方式时,你可以指定在 Future 被解决之后运行什么回调函数。也就是说, Future 负责弄清楚什么时候被解决,无论你的回调是什么,它都会运行;而所有的不便也都建立在此模型上,它非常难用,因为已经有很多开发者进行了大量的尝试,发现他们不得不写很多分配性的代码以及使用动态派发;实际上,你尝试调度的每个回调都必须获得自己独立的存储空间,例如 crate 对象、堆内存分配,这些分配以及动态派发无处不在。这种方法没有满足零成本抽象的第二个原则,如果你要使用它,它将比你自己写要慢很多,那你为什么还要用它。

基于轮询的解决方案

1
2
3
4
5
6
7
8
9
10
11
12
// 基于轮询的 Future

trait Future {
type Output;
fn poll(&mut self, waker: &Waker)
-> Poll<Self::Output>;
}

enum Poll<T> {
Ready(T),
Pending,
}

这个非常出色的基于轮询的新方案——我们编写了这个模型,我归功于 Alex 和 Aaron Turon,是他们提出了这个想法——不是由 Future 来调度回调函数,而是由我们去轮询 Future,所以还有另一个被称为执行器(executor)的组件,它负责实际运行 Future ;执行器的工作就是轮询 Future ,而 Future 可能返回“尚未准备就绪(Pending)”,也可能被解决就返回“已就绪(Ready)”。

该模型有很多优点。其中一个优点是,你可以非常容易地取消 Future ,因为取消 Future 只需要停止持有 Future。而如果采用基于回调的方法,要通过调度来取消并使其停止就没这么容易了。

同时它还能够使我们在程序的不同部分之间建立真正清晰的抽象边界,大多数 Future 库都带有事件循环(event loop),这也是调度你的 Future 执行 I/O 的方法,但你实际上对此没有任何控制权。而在 Rust 中,各组件之间的边界非常整洁,执行器(executor)负责调度你的 Future ,反应器(reactor)处理所有的 I/O ,然后是你的实际代码。因此最终用户可以自行决定使用什么执行器,使用他们想使用的反应器,从而获得更强的控制力,这在系统编程语言中真的很重要。而此模型最重要的真正优势在于,它使我们能够以一种真正零成本的完美方式实现这种状态机式的 Future 。也就是当你编写的 Future 代码被编译成实际的本地(native)代码时,它就像一个状态机;在该状态机中,每次 I/O 的暂停点都有一个变体(variant),而每个变体都保存了恢复执行所需的状态。这表示为一个枚举(enum)结构,即一个包含变体判别式及所有可能状态的联合体(union)。

StateMachines

译者注:报告视频中的幻灯片比较模糊,我对其进行了重绘与翻译,下同。

上面的幻灯片尽可能直观地表示了这个状态机模型。可以看到,你执行了两个 I/O 事件,所以它有这几个状态。对于每个状态它都提供了所需的内存空间,足够你在 I/O 事件后恢复执行。

整个 Future 只需要一次堆内存分配,其大小就是你将这个状态机分配到堆中的大小,并且没有额外的开销。你不需要装箱、回调之类的东西,只有真正零成本的完美模型。这些概念对于很多人来说比较难于理解,所以这是我力求做到最好的幻灯片,直观地呈现这个过程中发生了什么:你创建一个 Future,它被分配到某个内存中特定的位置,然后你可以在执行器(executor)中启动它。

PollWakeCycle1

执行器会轮询 Future,直到最终 Future 需要执行某种 I/O 。

PollWakeCycle2

在这种情况下,该 Future 将被移交给处理 I/O 的反应器,即 Future 会等待该特定 I/O 。最终,在该 I/O 事件发生时,反应器将使用你在轮询它时传递的Waker 参数唤醒 Future ,将其传回执行器;

PollWakeCycle3

然后当需要再次执行I/O时,执行器再将其放回反应器;它将像这样来回穿梭,直到最终被解决(resolved)。在被解决并得出最终结果时,执行器知道它已经完成,就会释放句柄和整个Future,整个调用过程就完成了。

PollWakeCycle4

总结一下:这种模型形成了一种循环,我们轮询 Future ,然后等待 I/O 将其唤醒,然后一次又一次地轮询和唤醒,直到最终整个过程完成为止。

PollWakeCycle

并且这种模型相当高效。

QuiteFast

这是在有关 Future 的第一篇文章中发布的基准测试,与其他语言的许多不同实现进行了对比。柱形越高表示性能越好,我们的 Future 模型在最左侧,所以说我们有了非常出色的零成本抽象,即使是与许多其他语言中最快的异步 I/O 实现相比也是相当有竞争力的。

但是,问题在于,你并不希望手动编写这些状态机,把整个应用程序所有状态写成一个状态机并不是件轻松愉快的事。而这种 Future 抽象的真正有用之处在于,我们可以在其之上构建其他 API 。

其它API

组合器(Combinator)

我们能使用的第一个解决方案是 Future 组合器(Combinator)。你可以通过将这些组合器方法应用于 Future 来构建状态机,它们的工作方式类似于迭代器(Iterator)的适配器(如 filtermap)。

1
2
3
4
5
6
7
8
fn fetch_rust_lang(client: Client)
-> impl Future<Output = String> {
client.get("rust-lang.org").and_then(|response| {
response.concat_body().map(|body| {
String::from_utf8_lossy(body)
})
})
}

这个函数的作用是请求 “rust-lang.org”,然后将响应转换为字符串。它并不返回一个字符串,而是返回一个字符串的 Future ,因为它是一个异步函数。函数体中有这样一个 Future,它包含一些会被调用的 I/O 操作,用 and_thenmap 之类的组合器将这些操作全部组合在一起。我们构建了所有这些用处各异的组合器,例如,and_thenmapfiltermap_error等等。我们已经知道这种方式是有一些缺点的,尤其是诸如嵌套回调之类,可读性非常差。

async / await 的实现

因为组合器有这样的缺点,所以我们开始尝试实现 async / awaitasync / await 的第一个版本并不是 Rust 语言的一部分,而是由该库像语法插件一样提供的。

1
2
3
4
5
6
#[async]
fn fetch_rust_lang(client: Client) -> String {
let response = await!(client.get("rust-lang.org"));
let body = await!(response.concat_body());
String::from_utf9_lossy(body)
}

这个函数与之前那个版本的一样,它只是获取 Rust 官网并将其转换为字符串;但是它使用 async / await 来实现,所以更像是顺序执行,看起来更像普通的阻塞 I/O 的工作方式;就像开头那个实例中呈现的一样,它们唯一区别是注解(指 async / await)。我们已经知道,async 注解会将此函数转换为一个返回 Future 的函数,而不是立即返回结果,并且我们需要异步等待(await)这些在函数内部构造的 Future

1
2
3
4
5
6
7
8
await!($future) => {
loop {
match $future.poll() {
Poll::Pending => yield Poll::Pending,
Poll::Ready(value) => break value,
}
}
}

在我们的轮询模型中,await 是一种语法糖;它会进入上面这种循环,你要做的就是在循环中轮询,在一段时间内你将一直得到“尚未准备就绪(Pending)”,然后一直等到它再次被唤醒,终于你等待的 Future 完成了,然后你使用该值跳出了循环,这就是这些 await 表达式的计算结果。

这似乎是一个非常不错的解决方案,async / await 的写法会被编译成我们超棒的零成本的 Future。不过从已发布的 Future 的使用者的反馈看,我们还是发现了一些问题。

新的问题

基本上所有试图使用 Future 的人都会遇到非常令人困惑的错误消息。在这些消息中,编译器会提示你的Future的生命周期不是静态的('static)或没有实现某个 trait 等等;这些提示你并不真正理解,但编译器想提出有用的建议,你也就跟着这个建议去做,直到编译成功;你可能会给闭包加上 move 关键字,或者把某些值放到引用计数的指针(Rc)中,然后将复制(clone)它;你将所有这些开销添加到了似乎并不必要的事情上,却不明白为什么要这样做,而当你已经疲于处理这些时,代码已经乱成一锅粥了,所以很多人都被 Future 的问题卡住了。

CombinatorChain

而且它对于组合器产生这种非常大的类型也没什么办法,你的整个终端窗口(terminal)将被其中一个组合器链的类型填满。你用了and_then,以及又一个 and_then,然后是 map_err 紧跟一个 TCP 流等等等等,你必须仔细研究一下,才能弄清楚所遇到的实际错误是什么。

“When using Futures, error messages are inscrutable.”
“当使用 Future 时,错误信息难以理解。”
“Having to use RefCell or clone everything for each future leads to overcomplicated code that makes me wish Rust had garbage collection.”
“不得不使用 RefCell 以及为每个 future 克隆所有它需要的值产生了过于复杂的代码,这让我开始期待 Rust 能具备垃圾回收功能了。”

我在 reddit 上发现了这条消息,我认为它确实很好地总结了所有有关 Future 的抱怨。使用 Future 时,错误消息难以理解;不得不使用 RefCell 以及为每个 future 克隆所有它需要的值产生了过于复杂的代码,这让我开始期待 Rust 能具备垃圾回收功能了(观众笑)。是的,这不是什么好反馈。

因此,从大约一年半前的情况来看,很明显,有两个问题需要解决,才能让大家更容易使用。

首先,我们需要更好的错误消息,最简单的方法就是将语法构建到语言中,然后它们就可以在你所有的诊断和错误处理代码中加入钩子,从而使你能够真正拥有良好的 async / await 的错误消息。其次,人们遇到的大多数错误实际上是因为他们被一个晦涩难解的问题卡住了——借用问题。正是因为 Future 的设计方式存在着这种根本的局限性,导致一些很普通的编程范式都无法表达。所谓借用问题,就是在最初的 Future 的设计中你不能跨过异步等待点(await point)进行借用,也就是说,如果你要异步等待(await)某件事,你就不能在那个时候持有任何存活的引用。当人们遇到了这种问题,他们通常会被卡住,最终他们会尝试在 await 的时候进行借用然后发现实际上做不到。所以如果我们能够使这种借用被允许,那么大多数这些错误将消失,一切都将变得更易于使用,你可以使用 asyncawait 编写普通的 Rust 代码,并且一切都会正常进行。

这种跨越 await 的借用是非常普遍的,因为 Rust 的 API 天然就具有引用。当你实际编译 Future 时,它必须能够恢复所有状态,而当你拥有某些其它东西的引用时,它们位于同一栈帧中,最终你会得到一个自引用Future 结构(Self-Referential Future)。这是开头的那个 get_user 方法,我们有这样一个 SQL 字符串,而在使用 SQL 字符串调用 query 方法时,我们传递的是 SQL 字符串的引用。

1
2
let sql: String = format!("SELECT FROM users WHERE username = {}", user);
let db_response = await self.query(&sql);

这里存在的问题是,对 SQL 字符串的引用是对存储在相同 Future 状态中的其他内容的引用,因此它成为一种自引用结构。

SelfReferentialFuture

如果把这个 Future 视作一个真的结构体的话,这就是理论上它所拥有的字段。除了代表数据库句柄的 self 之外,还有 SQL 字符串以及对这个 SQL 字符串的引用,即一个最终指回同一结构体中某个字段的引用。

一些新的潜在结构会成为非常棘手的问题,因为我们没有通用的解决方案。当你移动该结构时,我们不允许你使用自引用,是因为我们会在新位置创建一个原有结构的新副本,旧副本会变为无效,但是在复制时,新副本的引用仍指向旧副本的字段,该指针就变成了悬空指针(dangling pointer),而这正是 Rust 必须避免的内存问题。

DanglingPointer

所以我们不能使用自引用结构,因为如果你移动它们,那么它们将失效。然而,我们实际上并不需要真正移动这些 Future 。如果你还记得在堆中通过句柄使用 Future 的模型,它在反应器和执行器之间来回传递,所以 Future 本身永远不会真正移动;而只要你保证不移动,Future 包含自引用就完全没问题。

所以我们需要通过采用某种方式在 Future 的 API 中表达 “在你轮询时,你不允许随意移动它” 来解决这个问题。如果我们能够表达这一点,我们就可以允许 Future 中出现自引用,进而就可以在异步函数中真正使用这些引用,并且一切都会正常工作。因此我们研究了这个问题,最终开发出了被称为 Pin 的新 API 。

Pin

Pin 是一种围绕其他指针类型的适配器,可以将其它指针变为 固定引用(pinned reference)。除了原有指针的保证外,它还保证这个引用再也不会被移动,所以我们可以确保它将一直处在同一内存位置,直到最终被释放。如果你的 API 中有一些内容已表明必须使用 Pin,那么你就知道了它再也不会被移动,这样就你可以使用那种自引用的结构体了。因此,我们修改了 Future 的工作方式,现在它变成了一个经过装箱(boxed)的 Future ,实际上是一个由 Pin 封装的 Box<Future> ;这样无论在哪里装箱,我们把它放在堆内存中,它可以保证永远不会再移动;而当你轮询 Future 时,不再使用一个普通引用,而是使用一个固定引用,这样 Future 就知道它不会被移动。而使这一切起作用的诀窍是,要从固定引用中获取非固定引用,你只能使用不安全的代码。

1
2
3
4
5
6
7
8
9
10
struct Pin<P>(P);

impl<T> Pin<Box<T>> {
fn new(boxed: Box<T>) -> Pin<Box<T>> { ... }
fn as_mut(self) -> Pin<&mut T> { ... }
}

impl<T> Pin<&mut T> {
unsafe fn get_unchecked_mut(self) -> &mut T { ... }
}

所以 Pin API 大致就像这样。在这里你有一个 Pin 结构,它只是另一个指针类型的封装,它没有任何运行时开销或者其它东西,仅仅是将其划分为一个固定的(pinned)对象,然后一个固定的 Box 指针可以转换为一个固定引用,但是将一个固定引用转换为一个非固定引用的唯一方法是使用一个不安全的(unsafe)函数。

1
2
3
4
5
6
trait Future {
type Output;

fn poll(self: Pin<&mut self>, waker: &Waker)
-> Poll<Self::Output>;
}

这样一来,我们要做的就是修改 Future 的 API ,其轮询方法将不再接受一个普通引用,而是接受一个固定引用,而这其实就是我们将要稳定发布的 Future API。而做了这个修改之后,开头示例的写法就能正常工作了。这样你就可以像写阻塞 I/O 的代码那样编写异步 I/O 的代码了,只需要加上 asyncawait 注解,你就能得到这个出色的零成本抽象的异步实现,而即便你自己手写,这基本上也是你能写出的开销最低的实现了。

async / await 的现状及未来

目前的情况是,Pinning 大约在一个月前的最新版本中稳定了,我们正在稳定 Future 的 API ,因此大概会在 1.35,也可能会推到 1.36 稳定,基本上在两三个月内就会知道了。并且我们希望今年的某个时候我们能够拥有 async / await,希望今年夏末能将其稳定下来,这样人们就可以使用这种类似阻塞 I/O 的语法编写无阻塞的 I/O 网络服务了。除了这些稳定化工作,我们也已经开始研究某些更长期的功能,比如流(Stream),我认为它可能是异步的下一个大功能。我们知道一个 Future 只产生一个值,而一个流可以异步地产生很多值;异步地产生值本质上就像是一个异步迭代器,你能够在一个流上进行异步的循环;这个功能对于许多用例来说非常重要,比如流式传输HTTP、WebSocket 推送请求之类的东西,不用像我们的 RPC 模型那样发出网络请求然后获得单个响应,而是能够使用请求流和响应流,在两者之间来回调用。

目前使用异步仍然存在一个限制,即不能在 trait 中使用 async。有许多编译器开发工作正在进行,使其能够支持这个特性。除了这些功能,有时候我们也希望能拥有生成器(Generator),类似于 Python 或 JavaScript 的生成器,除了拥有可以 return 的函数,还能使用可以 yield 的函数,这样你就可以在 yield 之后再次恢复执行;并且你可以将这些函数作为编写迭代器和流的方式,就像异步函数能够让你像编写普通函数那样编写 Future 一样。

最后,我想回顾一下成就这种零成本异步 I/O 的关键点:首先就是这种基于轮询的 Future,它将这些 Future 编译到这种相当紧凑的状态机中;其次是这种实现 async / await 语法的方式,即因为有了 Pin,我们能够跨越异步等待点使用引用。

谢谢大家。

译者注:报告中计划稳定的特性均已稳定发布,参见 areweasyncyet.rs
按照目前稳定的版本,await 已改为后置运算符 .await,所以本文开头的 get_user 方法应当修改为:

1
2
3
4
5
6
7
8
9
let user = db.get_user("withoutboats").await;

impl Database {
async fn get_user(&mut self, user: &str) -> User {
let sql = format!("select FROM users WHERE username = {}", user);
let db_response = self.query(&sql).await;
User::from(db_response)
}
}