网络请求框架----OkHttp原理

一.前言

在 Android 中,网络请求是一个必不可少的功能,因此就有许多代表网络请求客户端的组件库,具有代表性的有下面三种:

  • Apache 的 HTTP 客户端组件 HttpClient。
  • Java JDK 自带的 HttpURLConnection 标准库。
  • 重写应用层代码的 HTTP 组件库。
HttpClient

这是 Android 6.0 之前经常会使用的 API ,但是因为不支持 HTTP/2,支持 HTTP/2 的版本还处于 beta 阶段,不适合用于 Android APP 中使用,所以 Google 在 6.0 版本里面删除了 HttpClient 相关 API。

HttpURLConnection

这是 Java 自带的一个组件,不需要引入依赖就可以使用,同样这个组件库也无法支持 HTTP/2, 支持的版本也要到 Java 9 后才行。同时这个标准库 封装层次太低,并且支持特性太少,缺乏连接池管理,域名机制控制等特性,因此在 Android 中使用就会相当繁琐。

OkHttp

上述两个组件都是不支持 HTTP/ 2 ,但是 HTTP/2 对于移动客户端而言,
无论是从握手延迟、响应延迟,
还是资源开销看都有相当吸引力,而且 OkHttp 在弱网和无网环境下有自动检测和恢复机制,这使得 OkHttp 成为 Android 最常见的网络请求库。

二.简介

OkHttp 是一个支持 HTTP 和 HTTP/2 的封装的网络请求客户端,适用于 Android 和 java 应用程序。OkHttp 有如下优点:

  • 支持 HTTPS/HTTP2/WebSocket
  • 内部维护任务队列线程池,支持 HTTP/2 的并发访问
  • 内部维护连接池,支持 HTTP/1.x 的 keep-Alive 机制,也支持 HTTP/2 的多路复用, 减少连接创建开销。
  • 通过缓存避免重复的请求
  • 请求失败时自动重试主机的其他ip,自动重定向。

三.原理

1.初始化

OkHttp 的使用初始化有两种方式。

  • 默认方式:
1
OkHttpClient mOkHttpClient = new OkHttpClient();
  • 自定义配置方式:
1
2
3
4
5
6
7
8
9
10
11
12
OkHttpClient.Builder builder = new OkHttpClient.Builder()
.connectTimeout()
.writeTimeout()
.readTimeout()
.cache()
.addInterceptor()
.connectionPool()
.dns()
...
;

OkHttpClient mOkHttpClient = builder.build();

不管是哪种方式,对于 OkHttp 来说都是初始化一些配置,因为这里的参数十分多,所以这里使用的 Builder 设计模式进行简化。Builder 初始化的对象主要有:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public OkHttpClient() {
this(new Builder());
}
public Builder newBuilder() {
return new Builder(this);
}

public static final class Builder {
public Builder() {
dispatcher = new Dispatcher(); //请求的调度器
protocols = DEFAULT_PROTOCOLS; // 默认支持的协议
connectionSpecs = DEFAULT_CONNECTION_SPECS; // 默认连接配置
eventListenerFactory = EventListener.factory(EventListener.NONE); // 对于请求和回调的监听
proxySelector = ProxySelector.getDefault(); // 代理服务器的选择
cookieJar = CookieJar.NO_COOKIES; // 默认没有 Cookie
socketFactory = SocketFactory.getDefault(); // Socket 的工厂
hostnameVerifier = OkHostnameVerifier.INSTANCE; //主机名认证
certificatePinner = CertificatePinner.DEFAULT; // 安全认证相关的配置
proxyAuthenticator = Authenticator.NONE; // 安全认证相关的配置
authenticator = Authenticator.NONE; // 安全认证相关的配置
connectionPool = new ConnectionPool(); //连接池
dns = Dns.SYSTEM; // DNS 域名解析系统
followSslRedirects = true; // 允许SSL重定向
followRedirects = true; // 允许重定向
retryOnConnectionFailure = true; // 允许失败重连
connectTimeout = 10_000; // 连接超时 , 10 s
readTimeout = 10_000; // 读取 超时 ,10 s
writeTimeout = 10_000; // 写入超时,10s
pingInterval = 0; //ping 间隔时间,这是 WebSocket 长连接的活性检测的间隔时间
}

上面这些配置都是 OkHttpClient 的默认属性,当然也可以使用自己自定义的属性。而且可以看到每一次初始化都会创建新的的 Builder ,因此也会重新创建一个连接池,调度器等耗资源的类,因此在使用 OkHttpClient 通常使用的单例模式,使得整个系统只有一个 请求调度器和连接池,减少资源的消耗。

2.发起请求

先看一个请求的创建的方式

1
2
3
Request.Builder requestBuilder = new Request.Builder().url(url);
requestBuilder.method("GET", null);
Request request = requestBuilder.build();

可以看到这里同样的是使用 Builder 的模式来创建一个 Request 请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public final class Request {
final HttpUrl url;
final String method;
final Headers headers;
final RequestBody body;
final Object tag;

private volatile CacheControl cacheControl; // Lazily initialized.

Request(Builder builder) {
this.url = builder.url;
this.method = builder.method;
this.headers = builder.headers.build();
this.body = builder.body;
this.tag = builder.tag != null ? builder.tag : this;
}

Request 主要是对请求的 Url ,请求方法,请求头,请求体,以及缓存首部字段的一个封装而已。对于一个网络请求的, OkHttp 有两种执行的方式:

  • 同步的:executed,这种方式不能在 主线程中调用。
1
okHttpClient.newCall(request).execute();
  • 异步的 enqueue(responseCallback)。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    okHttpClient.newCall(request).enqueue(new Callback() {
    @Override
    public void onFailure(Call call, IOException e) {
    }

    @Override
    public void onResponse(Call call, Response response) throws IOException {

    }
    });

3.处理请求

在 OkHttp 中一个请求的处理主要是由 dispatcher 分发器负责,先看 Dispatcher 类主要有什么东西。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public final class Dispatcher {
private int maxRequests = 64; //最大并发的请求数 为 64
private int maxRequestsPerHost = 5; //每个主机最大请求数为 默认为 5
private Runnable idleCallback;

/** Executes calls. Created lazily. */
private ExecutorService executorService; //请求处理的线程池

/** Ready async calls in the order they'll be run. */
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>(); //异步处理的准备队列

/** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>(); //异步处理的执行队列

/** Running synchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>(); //同步处理的执行队列

}

简单来说就是对于同步请求,使用一个 队列进行保存。对于异步的请求,有一个准备的队列和一个正在执行的队列进行保存。因为同步的方式还是在 主线程中运行,因此没有使用到线程池,而对于异步的方式,OkHttp 使用了线程池对异步请求进行管理。

在一个请求发起之后就是对请求的处理,因为处理请求的方式有同步和异步两种,所以具体的实现也有所不同,下面先看 同步的方式:

(1)同步请求的方式
1
2
3
4
 // okHttpClient 类中
@Override public Call newCall(Request request) {
return new RealCall(this, request, false /* for web socket */);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
   // RealCall 类中,实现 Call 接口
final class RealCall implements Call {
...
@Override public Response execute() throws IOException {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
try {
client.dispatcher().executed(this); //添加到请求队列中
Response result = getResponseWithInterceptorChain(); //拦截链,对应的一系列方法调用包括请求,得到响应后返回
if (result == null) throw new IOException("Canceled");
return result;
} finally {
client.dispatcher().finished(this); //结束这个请求
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
  //Call 接口
public interface Call extends Cloneable {

Request request();

Response execute() throws IOException;

void enqueue(Callback responseCallback);

boolean isExecuted();

boolean isCanceled();

Call clone();

interface Factory {
Call newCall(Request request);
}
}

首先将 Request 封装为一个 RealCall, 这个 RealCall 实现了 Call 接口,在 Call 接口中可以看到里面既有 Request 也有 Response ,显然 Call 接口定义的就是一次 网络请求和其对应的响应的抽象。

在 RealCall 类中一个请求的处理步骤主要是分为三步:

  • client.dispatcher().executed(this); 添加到队列。
  • getResponseWithInterceptorChain(),发起拦截链,同时得到响应后返回。
  • client.dispatcher().finished(this);结束这个请求。
dispatcher().executed(this) / client.dispatcher().finished(this)
1
2
3
4
/** Used by {@code Call#execute} to signal it is in-flight. */
synchronized void executed(RealCall call) {
runningSyncCalls.add(call);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/** Used by {@code Call#execute} to signal completion. */
void finished(RealCall call) {
finished(runningSyncCalls, call, false);
}

private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized (this) {
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
if (promoteCalls) promoteCalls(); //对于 executed 方式,这里为false 所以不执行 promoteCalls
runningCallsCount = runningCallsCount();
idleCallback = this.idleCallback;
}

if (runningCallsCount == 0 && idleCallback != null) {
idleCallback.run();
}
}

对于同步方式,添加队列就是将请求添加到同步执行的队列,然后就调用拦截器链得到请求后就结束这个 Call 。结束的时候就直接从队列中移除。

(1)异步请求的方式
1
2
3
4
5
6
7
8
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}

可以看到这里将回调的接口封装在 AsyncCall 类里面,这个类继承了 NamedRunnable 抽象类,然后就执行 调度器的分发

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public abstract class NamedRunnable implements Runnable {
protected final String name;

public NamedRunnable(String format, Object... args) {
this.name = Util.format(format, args);
}

@Override public final void run() {
String oldName = Thread.currentThread().getName();
Thread.currentThread().setName(name);
try {
execute();
} finally {
Thread.currentThread().setName(oldName);
}
}

protected abstract void execute();
}

NamedRunnable 实现了 Runnable 接口,并在 run 方法中调用了抽象方法 execute ,那么也就是说 AsyncCall 的 execute 方法最终会在子线程中执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
 // 实现 Runnable 接口
final class AsyncCall extends NamedRunnable {
private final Callback responseCallback;

AsyncCall(Callback responseCallback) {
this.responseCallback = responseCallback;
}

@Override protected void execute() {
boolean signalledCallback = false;
try {
Response response = getResponseWithInterceptorChain(); //拦截器链
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled")); //失败的回调
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response); //成功的回调
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
responseCallback.onFailure(RealCall.this, e);
}
} finally {
client.dispatcher().finished(this); //结束这个 Call
}
}
}

在 AsyncCall 的 方法中,首先就是通过拦截器链得到响应,然后对响应进行判断,如果成功就调用 responseCallback.onResponse ,失败就调用就 responseCallback.onFailure 。

在 RealCall 类中一个异步请求的处理步骤主要是分为三步:

  • 在主线程,client.dispatcher().executed(new AsyncCall(responseCallback)); 将 回调接口封装为 AsyncCall 后添加到队列中。
  • 在run 方法中 getResponseWithInterceptorChain(),发起拦截链,同时得到响应后返回。
  • 在run 方法中 ,client.dispatcher().finished(this);结束这个请求。
dispatcher.enqueue(new AsyncCall(responseCallback))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
synchronized void enqueue(AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call);
executorService().execute(call);
} else {
readyAsyncCalls.add(call);
}
}

/** Returns the number of running calls that share a host with {@code call}. */
private int runningCallsForHost(AsyncCall call) {
int result = 0;
for (AsyncCall c : runningAsyncCalls) {
if (c.host().equals(call.host())) result++;
}
return result;
}

判断 runningAsyncCalls 正在执行的队列的大小是否小于最大请求数量(最大线程数量、
并发数量)并且 所有的 AsyncCall 请求加起来是否小于最大主机请求限制。

  • 如否

将 AsyncCalls 加入到 readyAsyncCalls,的准备队列

  • 如是

加入到 runningAsyncCalls,正在执行的队列中 ,并加入线程池执行。

最后 client.dispatcher().finished
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/** Used by {@code AsyncCall#run} to signal completion. */
void finished(AsyncCall call) {
finished(runningAsyncCalls, call, true);
}



private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized (this) {
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
if (promoteCalls) promoteCalls(); 因为 为 true 所以还要执行 promoteCalls
runningCallsCount = runningCallsCount();
idleCallback = this.idleCallback;
}

if (runningCallsCount == 0 && idleCallback != null) {
idleCallback.run();
}
}



private void promoteCalls() {
if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.

for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();

if (runningCallsForHost(call) < maxRequestsPerHost) {
i.remove();
runningAsyncCalls.add(call);
executorService().execute(call);
}

if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
}
}

对于异步请求的结束,首先 判断正在执行的队列 runningAsyncCalls 是否还有请求,有则返回继续请求,没有就判断准备队列 readyAsyncCalls 是否还有请求,没有则返回,有则添加到正在执行的队列,然后执行线程.

image.png

4.拦截器链

OkHttp 基本上所有的核心功能都是由拦截器链完成的,包括缓存,网络请求获取响应等。在前面的代码中可以看到对于请求的响应的获取都是通过下面这行代码实现的。

1
Response response = getResponseWithInterceptorChain();

下面就看拦截器链的具体实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
interceptors.addAll(client.interceptors());
interceptors.add(retryAndFollowUpInterceptor);
interceptors.add(new BridgeInterceptor(client.cookieJar()));
interceptors.add(new CacheInterceptor(client.internalCache()));
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors());
}
interceptors.add(new CallServerInterceptor(forWebSocket));

Interceptor.Chain chain = new RealInterceptorChain(
interceptors, null, null, null, 0, originalRequest);
return chain.proceed(originalRequest);
}

其逻辑大致分为两部分:

  • 创建一系列拦截器,包括用户自定义的拦截器,并将其放入一个拦截器List中。
  • 创建一个拦截器链 RealInterceptorChain, 并执行拦截器链的 proceed 方法.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public final class RealInterceptorChain implements Interceptor.Chain {
...

@Override public Response proceed(Request request) throws IOException {
return proceed(request, streamAllocation, httpCodec, connection);
}
...
public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
....
// Call the next interceptor in the chain.
// 获取下一个拦截器,并调用其 intercept 方法。
RealInterceptorChain next = new RealInterceptorChain(
interceptors, streamAllocation, httpCodec, connection, index + 1, request);
Interceptor interceptor = interceptors.get(index);
Response response = interceptor.intercept(next);
....
return response;
}
}

在拦截器的 intercept 方法里,以 RetryAndFollowUpInterceptor 为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public final class RetryAndFollowUpInterceptor implements Interceptor {

@Override public Response intercept(Chain chain) throws IOException {
Request request = chain.request();
// 对请求前的处理
streamAllocation = new StreamAllocation(
client.connectionPool(), createAddress(request.url()), callStackTrace);

int followUpCount = 0;
Response priorResponse = null;
while (true) {
if (canceled) {
streamAllocation.release();
throw new IOException("Canceled");
}

Response response = null;
boolean releaseConnection = true;
try {
//这里再去调用拦截器链的 proceed ,
response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null);
releaseConnection = false;

}

// 下面就是对 得到响应后的处理,这里就省略。
}

在 RealInterceptorChain 里会去执行拦截器链的 proceed 方法。而在拦截器链中又会执行下一个拦截器的 intercept 方法,在下一个拦截器的 intercept 中又会去执行拦截器链的 proceed ,此时 index + 1 。所以整个执行链就在拦截器与拦截器链中交替执行,最终完成所有拦截器的操作。这也是 OkHttp 拦截器的链式执行逻辑。而一个拦截器的 intercept 方法所执行的逻辑大致分为三部分:

  • 在发起请求前对request进行处理
  • 调用下一个拦截器,获取response
  • 对response进行处理,返回给上一个拦截器

这就是 OkHttp 拦截器机制的核心逻辑。所以一个网络请求实际上就是一个个拦截器执行其intercept 方法的过程。而这其中除了用户自定义的拦截器外还有几个核心拦截器完成了网络访问的核心逻辑,按照先后顺序依次是:

  • RetryAndFollowUpInterceptor
  • BridgeInterceptor
  • CacheInterceptor
  • ConnectIntercetor
  • CallServerInterceptor
(1)RetryAndFollowUpInterceptor
1
2
3
4
5
6
/**
* This interceptor recovers from failures and follows redirects as necessary. It may throw an
* {@link IOException} if the call was canceled.
*/
public final class RetryAndFollowUpInterceptor implements Interceptor {
/**

从英文解释就连可以看出这个拦截器主要负责失败重传和在必要的时候进行重定向,当一个请求由于各种原因失败了,处理以得到新的Request,沿着拦截器链继续新的Request。

(2)BridgeInterceptor
1
2
3
4
5
6
7

/**
* Bridges from application code to network code. First it builds a network request from a user
* request. Then it proceeds to call the network. Finally it builds a user response from the network
* response.
*/
public final class BridgeInterceptor implements Interceptor {

这个拦截器作为应用程序模块代码和网络请求模块代码的桥梁,首先会从使用者的 request 构建一个真正的网络请求,然后将这个请求提交给网络请求模块,最后就从网络请求模块返回的数据构建一个 response 给使用者。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
@Override public Response intercept(Chain chain) throws IOException {

Request userRequest = chain.request();
Request.Builder requestBuilder = userRequest.newBuilder();

RequestBody body = userRequest.body();
if (body != null) {
MediaType contentType = body.contentType();
if (contentType != null) {
requestBuilder.header("Content-Type", contentType.toString());
}

long contentLength = body.contentLength();
if (contentLength != -1) {
requestBuilder.header("Content-Length", Long.toString(contentLength));
requestBuilder.removeHeader("Transfer-Encoding");
} else {
requestBuilder.header("Transfer-Encoding", "chunked");
requestBuilder.removeHeader("Content-Length");
}
}

if (userRequest.header("Host") == null) {
requestBuilder.header("Host", hostHeader(userRequest.url(), false));
}

if (userRequest.header("Connection") == null) {
requestBuilder.header("Connection", "Keep-Alive");
}

// If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
// the transfer stream.
boolean transparentGzip = false;
if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
transparentGzip = true;
requestBuilder.header("Accept-Encoding", "gzip");
}

List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
if (!cookies.isEmpty()) {
requestBuilder.header("Cookie", cookieHeader(cookies));
}

if (userRequest.header("User-Agent") == null) {
requestBuilder.header("User-Agent", Version.userAgent());
}


//到这里网络请求前拦截的动作就已经完成,主要有:
// 设置内容长度,内容编码
// 设置gzip压缩
//添加cookie
//设置其他请求头首部,如 User-Agent,Host,Keep-alive 等。其中 Keep-Alive 是实现多路复用的必要步骤
//下面就到下一个拦截器去获取真正的网络响应。

Response networkResponse = chain.proceed(requestBuilder.build());


// 获取网络的响应后,在这里也进行拦截,做一些处理,比如压缩,添加响应头等。
HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());

Response.Builder responseBuilder = networkResponse.newBuilder()
.request(userRequest);

if (transparentGzip
&& "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
&& HttpHeaders.hasBody(networkResponse)) {
GzipSource responseBody = new GzipSource(networkResponse.body().source());
Headers strippedHeaders = networkResponse.headers().newBuilder()
.removeAll("Content-Encoding")
.removeAll("Content-Length")
.build();
responseBuilder.headers(strippedHeaders);
responseBuilder.body(new RealResponseBody(strippedHeaders, Okio.buffer(responseBody)));
}

return responseBuilder.build();
}

BridgeInterceptor 作为客户端和网络请求的桥梁,在这里将 Request 和 Response 做一个处理。主要有:

1.在请求前拦截:
  • 设置内容长度,内容编码
  • 设置gzip压缩
  • 添加cookie
  • 设置其他请求头首部,如 User-Agent,Host,Keep-alive 等。其中 Keep-Alive 是实现多路复用的必要步骤
2.调用下一个拦截器去获取响应
3.获取响应后再次拦截
  • 压缩
  • 添加/删除响应首部字段
(3)CacheInterceptor
1
2
/** Serves requests from the cache and writes responses to the cache. */
public final class CacheInterceptor implements Interceptor {

CacheInterceptor 主要是负责读取缓存和更新缓存。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
@Override public Response intercept(Chain chain) throws IOException {
Response cacheCandidate = cache != null
? cache.get(chain.request())
: null;

long now = System.currentTimeMillis();

// 获取定义响应读取的策略,分为仅从网络获取响应,仅从缓存获取响应,或者网络和缓存配合。
CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
Request networkRequest = strategy.networkRequest;
Response cacheResponse = strategy.cacheResponse;

if (cache != null) {
cache.trackResponse(strategy);
}

if (cacheCandidate != null && cacheResponse == null) {
closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it.
}

// If we're forbidden from using the network and the cache is insufficient, fail.
//如果指定仅从缓存获取但是缓存没有就返回一个 504
if (networkRequest == null && cacheResponse == null) {
return new Response.Builder()
.request(chain.request())
.protocol(Protocol.HTTP_1_1)
.code(504)
.message("Unsatisfiable Request (only-if-cached)")
.body(Util.EMPTY_RESPONSE)
.sentRequestAtMillis(-1L)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
}

// If we don't need the network, we're done.
//如果没有指定从网络获取并且缓存不为空,那么就将缓存返回。
if (networkRequest == null) {
return cacheResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.build();
}

//去网络获取响应
Response networkResponse = null;
try {
networkResponse = chain.proceed(networkRequest);
} finally {
// If we're crashing on I/O or otherwise, don't leak the cache body.
if (networkResponse == null && cacheCandidate != null) {
closeQuietly(cacheCandidate.body());
}
}

// If we have a cache response too, then we're doing a conditional get.
//必要的时候更新缓存,并返回
if (cacheResponse != null) {
if (networkResponse.code() == HTTP_NOT_MODIFIED) {
Response response = cacheResponse.newBuilder()
.headers(combine(cacheResponse.headers(), networkResponse.headers()))
.sentRequestAtMillis(networkResponse.sentRequestAtMillis())
.receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();
networkResponse.body().close();

// Update the cache after combining headers but before stripping the
// Content-Encoding header (as performed by initContentStream()).
cache.trackConditionalCacheHit();
cache.update(cacheResponse, response);
return response;
} else {
closeQuietly(cacheResponse.body());
}
}

//如果没有缓存就将这个响应写入缓存。
Response response = networkResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();

if (cache != null) {
if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) {
// Offer this request to the cache.
CacheRequest cacheRequest = cache.put(response);
return cacheWritingResponse(cacheRequest, response);
}

if (HttpMethod.invalidatesCache(networkRequest.method())) {
try {
cache.remove(networkRequest);
} catch (IOException ignored) {
// The cache cannot be written.
}
}
}

return response;
}

缓存的主要总结步骤如下:

  • 如果指定仅从缓存获取但是缓存没有就返回一个 504
  • 如果没有指定从网络获取并且缓存不为空,那么就将缓存返回。
  • 去网络获取响应
  • 已经有缓存并且缓存需要更新的时, 更新缓存,并返回
  • 如果没有缓存就将这个响应写入缓存。

缓存使用的是策略模式,将缓存的策略封装在 CacheStrategy ,这个类告诉 CacheInterceptor 是使用缓存还是使用网络请求 。
缓存操作的定义是 接口 InternalCache ,主要操作有 put, get, 和 更新等。而具体的实现类说就是 Cache

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public final class Cache implements Closeable, Flushable {
....

final InternalCache internalCache = new InternalCache() {
@Override public Response get(Request request) throws IOException {
return Cache.this.get(request);
}

@Override public CacheRequest put(Response response) throws IOException {
return Cache.this.put(response);
}

@Override public void remove(Request request) throws IOException {
Cache.this.remove(request);
}

@Override public void update(Response cached, Response network) {
Cache.this.update(cached, network);
}

@Override public void trackConditionalCacheHit() {
Cache.this.trackConditionalCacheHit();
}

@Override public void trackResponse(CacheStrategy cacheStrategy) {
Cache.this.trackResponse(cacheStrategy);
}
};

// 缓存的核心类
final DiskLruCache cache;



}

可以缓存这里的核心类是 DiskLruCache ,Cache 虽然没有实现 InternalCache 接口吗,当时基本上左右的具体的功能,都是由 Cache 结合 InternalCache 完成。

(4)ConnectIntercetor
1
2
/** Opens a connection to the target server and proceeds to the next interceptor. */
public final class ConnectInterceptor implements Interceptor {

这个拦截器即打开一个连接到目标服务器,并将这个链接提交到下一个拦截器。

1
2
3
4
5
6
7
8
9
10
11
12
@Override public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Request request = realChain.request();
StreamAllocation streamAllocation = realChain.streamAllocation();

// We need the network to satisfy this request. Possibly for validating a conditional GET.
boolean doExtensiveHealthChecks = !request.method().equals("GET");
HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);
RealConnection connection = streamAllocation.connection();

return realChain.proceed(request, streamAllocation, httpCodec, connection);
}

虽然这个只有这么点代码,但是实际上关于连接池的复用等功能都被上面的类封装起来了。之所以采用复用的原因是
客户端和服务器建立 socket 连接需要经历 TCP 的三次握手和四次挥手,是一种比较消耗资源的动作。Http 中有一种 keepAlive connections 的机制,在和客户端通信结束以后可以保持连接指定的时间。OkHttp3 支持 5 个并发 socket 连接,默认的 keepAlive 时间为 5 分钟。这种复用的模式就是 设计模式中的享元模式。

1.StreamAllocation

这个类协调三个实体之间的关系。

  • Connections:连接远程服务器的物理 Socket 连接
  • Streams : 基于 Connection 的逻辑 Http 请求/响应对
    一个请求/响应 对应一个 Streams . 在 Http1.x,一个 Streams 对应一个 Connections。在 Http2.0,多个 Streams 可对应一个 Connections,进行并发请求。
  • Calls : 逻辑 Stream 序列,也就是请求/响应 队列

image.png

StreamAllocation 会通过 ConnectPool 获取或者新生成一个 RealConnection 来得到一个连接到 Server 的 Connection 连接,
同时会生成一个 HttpCodec 用于下一个 CallServerInterceptor ,以完成最终的请求.在 newStream 方法中

1
2
3
4
5
6
7
8
9
public HttpCodec newStream(OkHttpClient client, boolean doExtensiveHealthChecks) {

try {
找到一个合适的连接,可能复用已有连接也可能是重新创建的连接,返回的连接由连接池负责决定。
RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
writeTimeout, connectionRetryEnabled, doExtensiveHealthChecks);
HttpCodec resultCodec = resultConnection.newCodec(client, this);
....
}

2.ConnectionPool
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
public final class ConnectionPool {
private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */,
Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp ConnectionPool", true));

/** The maximum number of idle connections for each address. */
private final int maxIdleConnections;
private final long keepAliveDurationNs;
private final Runnable cleanupRunnable = new Runnable() {
@Override public void run() {
......
}
};

private final Deque<RealConnection> connections = new ArrayDeque<>();
final RouteDatabase routeDatabase = new RouteDatabase();
boolean cleanupRunning;
......

/**
*返回符合要求的可重用连接,如果没有返回NULL
*/
RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {
......
}

/*
* 去除重复连接。主要针对多路复用场景下,一个 address 只需要一个连接
*/
Socket deduplicate(Address address, StreamAllocation streamAllocation) {
......
}

/*
* 将连接加入连接池
*/
void put(RealConnection connection) {
......
}

/*
* 当有连接空闲时唤起cleanup线程清洗连接池
*/
boolean connectionBecameIdle(RealConnection connection) {
......
}

/**
* 扫描连接池,清除空闲连接
*/
long cleanup(long now) {
......
}

/*
* 标记泄露连接
*/
private int pruneAndGetAllocationCount(RealConnection connection, long now) {
......
}
}

ConnectionPool 内部通过一个双端队列 dequeue) 来维护当前所有连接,主要涉及到的操作包括:

  • put:放入新连接
  • get:从连接池中获取连接
  • evictAll:关闭所有连接
  • connectionBecameIdle:连接变空闲后调用清理线程
  • deduplicate:清除重复的多路复用线程

下面就看看一个是如何找到的:

1
2
RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
writeTimeout, connectionRetryEnabled, doExtensiveHealthChecks);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
int writeTimeout, boolean connectionRetryEnabled, boolean doExtensiveHealthChecks)
throws IOException {
while (true) {
RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
connectionRetryEnabled);

// If this is a brand new connection, we can skip the extensive health checks.
synchronized (connectionPool) {
if (candidate.successCount == 0) {
return candidate;
}
}

// Do a (potentially slow) check to confirm that the pooled connection is still good. If it
// isn't, take it out of the pool and start again.
if (!candidate.isHealthy(doExtensiveHealthChecks)) {
noNewStreams();
continue;
}

return candidate;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
/**
* Returns a connection to host a new stream. This prefers the existing connection if it exists,
* then the pool, finally building a new connection.
*/
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
boolean connectionRetryEnabled) throws IOException {
Route selectedRoute;
synchronized (connectionPool) {
if (released) throw new IllegalStateException("released");
if (codec != null) throw new IllegalStateException("codec != null");
if (canceled) throw new IOException("Canceled");

// Attempt to use an already-allocated connection.
RealConnection allocatedConnection = this.connection;
if (allocatedConnection != null && !allocatedConnection.noNewStreams) {
return allocatedConnection;
}

// Attempt to get a connection from the pool.
Internal.instance.get(connectionPool, address, this, null);
if (connection != null) {
return connection;
}

selectedRoute = route;
}

// If we need a route, make one. This is a blocking operation.
if (selectedRoute == null) {
selectedRoute = routeSelector.next();
}

RealConnection result;
synchronized (connectionPool) {
if (canceled) throw new IOException("Canceled");

// Now that we have an IP address, make another attempt at getting a connection from the pool.
// This could match due to connection coalescing.
Internal.instance.get(connectionPool, address, this, selectedRoute);
if (connection != null) return connection;

// Create a connection and assign it to this allocation immediately. This makes it possible
// for an asynchronous cancel() to interrupt the handshake we're about to do.
route = selectedRoute;
refusedStreamCount = 0;
result = new RealConnection(connectionPool, selectedRoute);
acquire(result);
}

// Do TCP + TLS handshakes. This is a blocking operation.
result.connect(connectTimeout, readTimeout, writeTimeout, connectionRetryEnabled);
routeDatabase().connected(result.route());

Socket socket = null;
synchronized (connectionPool) {
// Pool the connection.
Internal.instance.put(connectionPool, result);

// If another multiplexed connection to the same address was created concurrently, then
// release this connection and acquire that one.
if (result.isMultiplexed()) {
socket = Internal.instance.deduplicate(connectionPool, address, this);
result = connection;
}
}
closeQuietly(socket);

return result;
}

上面找连接的步骤可以总结为:

  • 查看当前 streamAllocation 是否有之前已经分配过的连接,有则直接使用
  • 从连接池中查找可复用的连接,有则返回该连接
  • 配置路由,配置后再次从连接池中查找是否有可复用连接,有则直接返回
  • 新建一个连接,并修改其 StreamAllocation 标记计数,将其放入连接池中
  • 查看连接池是否有重复的多路复用连接,有则清除,一个地址只需要一个连接。

而在连接池中判断一个连接是否可以复用的条件为:

  • 连接没有达到共享上限
  • 非host域必须完全一样
  • 如果此时host域也相同,则符合条件,可以被复用
  • 如果host不相同,在HTTP/2的域名切片场景下一样可以复用.

对于连接的清除,ConnectPool 有一个独立的线程进行清理的工作:

  • 遍历连接池中所有连接,标记泄露连接(即空闲时间即将达到5分钟)
  • 如果被标记的连接满足(空闲 socket 连接超过5个&& keepalive 时间大于5分钟),就将此 连接从 Deque 中移除,并关闭连接,返回 0,也就是将要执行 wait(0),提醒立刻再次扫描
  • 如果(目前还可以塞得下5个连接,但是有可能泄漏的连接(即空闲时间即将达到5分钟)),就返回此连接即将到期的剩余时间,供下次清理
  • 如果(全部都是活跃的连接),就返回默认的keep-alive时间,也就是5分钟后再执行清理。
3.RealConnection

描述一个物理 Socket 连接,连接池中维护多个 RealConnection 实例。由于Http/2支持多路复用,
一个 RealConnection 可以支持多个网络访问请求,所以 OkHttp 又引入了 StreamAllocation 来描述一个实际的网络请求开销(从逻辑上一个Stream对应一个Call,但在实际网络请求过程中一个Call常常涉及到多次请求。如重定向,Authenticate等场景。所以准确地说,一个 Stream 对应一次请求,而一个 Call 对应一组有逻辑关联的 Stream ),一个 RealConnection 对应一个或多个 StreamAllocation ,所以 StreamAllocation 可以看做是 RealConenction 的计数器,当 RealConnection 的引用计数变为 0,且长时间没有被其他请求重新占用就将被释放.

多路复用:
报头压缩:HTTP/2 使用 HPACK 压缩格式压缩请求和响应报头数据,减少不必要流量开销.
请求与响应复用:HTTP/2 通过引入新的二进制分帧层实现了完整的请求和响应复用,客户端和服务器可以将 HTTP 消息分解为互不依赖的帧,然后交错发送,最后再在另一端将其重新组装
指定数据流优先级:将 HTTP 消息分解为很多独立的帧之后,我们就可以复用多个数据流中的帧,
客户端和服务器交错发送和传输这些帧的顺序就成为关键的性能决定因素。为了做到这一点,HTTP/2 标准允许每个数据流都有一个关联的权重和依赖关系
流控制:HTTP/2 提供了一组简单的构建块,这些构建块允许客户端和服务器实现其自己的数据流和连接级流控制.

4.HttpCodec

针对不同的版本,OkHttp 为我们提供了 HttpCodec1(Http1.x)和 HttpCodec2(Http2).他们就是协议的具体实现类。

(5)CallServerInterceptor(forWebSocket)
1
2
/** This is the last interceptor in the chain. It makes a network call to the server. */
public final class CallServerInterceptor implements Interceptor {

这是整个拦截链的最后一个拦截器,负责和服务器发送请求和从服务器读取响应,
利用 HttpCodec 完成最终请求的发送。

到这里整个拦截链的分析就到这里,大致流程如图,责任链模式在这里就体现得十分清楚:

image.png

0%