OkHttp的任务队列----ArrayDeque

一.前言

OkHttp 的处理网络请求的时候会有两个队列。一个是正在执行的队列,一个是等待执行的队列。

1
2
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();

可以看到他们的实现都是 ArrayDeque ,这是一个底层是循环数组的实现,所谓循环数组就是当不需要扩容的时候,数组在逻辑上是首位相连的一个圆,没有固定的头尾,当数组满的时候就进行扩容。

image.png
下面主要就 JDK1.8 的源码分析 ArrayDeque 的原理,后面也会提及JDK 10 。

二.ArrayDeque

1.源码中的介绍

1
2
3
4
5
6
7
8
9
Resizable-array implementation of the {@link Deque} interface.  Array
* deques have no capacity restrictions; they grow as necessary to support
* usage. They are not thread-safe; in the absence of external
* synchronization, they do not support concurrent access by multiple threads.
* Null elements are prohibited. This class is likely to be faster than
* {@link Stack} when used as a stack, and faster than {@link LinkedList}
* when used as a queue.

...

上面对 ArrayDeque 的介绍可以分几点:

  • ArrayDeque 没有内容的限制,当需要的时候可以进行扩容。
  • 是不是线程安全的,在多线程的情况下不支持并发安全。
  • 不允许元素为 null。
  • 在某些情况下,作为栈使用比 Stack 快,作为队列使用比 LinkedList 快。

下面就看看具体的实现。

2.源码分析

内部的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
transient Object[] elements; // non-private to simplify nested class access

//头元素下标
transient int head;

//尾部元素下一个位置的下标
transient int tail;

//最小的初始容量
private static final int MIN_INITIAL_CAPACITY = 8;


//默认初始容量为 16
public ArrayDeque() {
elements = new Object[16];
}

// 当指定的容量不是 2 的倍数的时候,转换为最接近的数
//比如 17 最近的是 16
public ArrayTest(int numElements) {
allocateElements(numElements);
}
private void allocateElements(int numElements) {
int initialCapacity = MIN_INITIAL_CAPACITY;
// Find the best power of two to hold elements.
// Tests "<=" because arrays aren't kept full.
if (numElements >= initialCapacity) {
initialCapacity = numElements;
initialCapacity |= (initialCapacity >>> 1);
initialCapacity |= (initialCapacity >>> 2);
initialCapacity |= (initialCapacity >>> 4);
initialCapacity |= (initialCapacity >>> 8);
initialCapacity |= (initialCapacity >>> 16);
initialCapacity++;

if (initialCapacity < 0) // Too many elements, must back off
initialCapacity >>>= 1; // Good luck allocating 2^30 elements
}
elements = new Object[initialCapacity];
}

上面的源码可以看到一下几点:

  • ArrayDeque 内部是基于数组实现的。维护一个头部小标和一个尾部后一个元素下标。
  • 默认初始量为 16 ,自己指定时也会转换为 2 的倍数。
获取队列头部元素

element(), getFirst(),peek(),peekFirst() 操作,其都是调用 getFirst() 实现的,访问队列头部元素但不删除

1
2
3
4
5
6
7
8
9
10
/**
* @throws NoSuchElementException {@inheritDoc}
*/
public E getFirst() {
@SuppressWarnings("unchecked")
E result = (E) elements[head];
if (result == null)
throw new NoSuchElementException();
return result;
}

删除队列头部

remove(),removeFirst(),poll(),pollFirst() 操作,其都是调用 pollFirst() 实现的

1
2
3
4
5
6
7
8
9
10
11
12
public E pollFirst() {
final Object[] elements = this.elements;
final int h = head;
@SuppressWarnings("unchecked")
E result = (E) elements[h];
// Element is null if deque empty
if (result != null) {
elements[h] = null; // Must null out slot
head = (h + 1) & (elements.length - 1);
}
return result;
}

对上面的步骤做一下解释:

  • 首先取出头部的元素,将其置为空。
  • 然后 head 这个下标就循环后移。

这里的

1
head = (h + 1) & (elements.length - 1);

其实就跟

1
head =  (h + 1) % (elements.length)

是一样的作用,目的就是当 head 到达数组末尾的时候,+1 就能回到数组的起始下标 即 0 ,从而实现循环。之所以能用 & 操作是因为在初始或者扩容的时候都要求数组的最后长度为 2 的倍数。比如默认长度 16 时, h + 1 的二进制为 0001 0000 ,length - 1二进制就为 0000 1111 ,相与就为 0 ,其他情况就正常加 1 .

添加元素

添加元素到队列尾部的操作可以发现 add(E e),offer(E e),offerLast(E e),addLast(E e) 操作都是调用 addLast(E e) 实现的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
 public void addLast(E e) {
if (e == null)
throw new NullPointerException();
elements[tail] = e;
//判断头部下标和尾部下标是否重叠
if ( (tail = (tail + 1) & (elements.length - 1)) == head)
//进行扩容
doubleCapacity();
}

private void doubleCapacity() {
assert head == tail;
int p = head;
int n = elements.length;
int r = n - p; // number of elements to the right of p
int newCapacity = n << 1;
if (newCapacity < 0)
throw new IllegalStateException("Sorry, deque too big");
Object[] a = new Object[newCapacity];
System.arraycopy(elements, p, a, 0, r);
System.arraycopy(elements, 0, a, r, p);
elements = a;
head = 0;
tail = n;
}

  • 首先,但添加的元素为 null 的时候就会抛出异常,说明 ArrayDeque 不能添加 null .
  • 添加完一个元素后就会判断 头部和尾部下标时候重叠,如果重叠就说明数组已经满了需要进行扩容。
  • 将数组容量直接 X2 ,然后将原本的元素调用 System.arraycopy 进行复制。
  • 重新指定 head ,tail 。

具体的扩容可见下图:

image.png

JDK 10 中源码

在JDK 10 对 ArrayDeque 的实现有些不同,就是不再直接扩容 X2 倍,而是根据需要 X2 或者增加原来的 1/2。因此对下标的循环加1 也就直接进行判断是否达到数组的尾部,是就直接置 0.

1
2
3
4
static final int inc(int i, int modulus) {
if (++i >= modulus) i = 0;
return i;
}

三.ArrayDeque 与 LinkedList

区别

LinkedList 实现了 List 接口又实现了 Deque 接口(Deque 是 Queue 的子接口),而 LinkedList 的实现是基于双向链表结构的,其容量没有限制,是非并发安全的队列,所以不仅可以当成列表使用,还可以当做双向队列使用,同时也可以当成栈使用(因为还实现了 pop 和 push 方法)。此外 LinkedList 的元素可以为 null 值

ArrayDeque 是一个用循环数组实现的双端队列 Deque,满足可以同时在数组两端插入或删除元素的需求ArrayDeque 是非线程安全的,当多个线程同时使用的时候需要手动同步,不允许放 null 元素,没有索引位置,不能根据索引位置获取数组中的元素。

使用场景

  • 如果只需要 Deque 接口且从两端进行操作则一般来说 ArrayDeque 效率更高一些。
  • 如果同时需要根据索引位置进行操作或经常需要在中间进行插入和删除操作则应该优先选 LinkedList 效率高些。
0%