OKHttp 源码解析

OKHttp 的使用可以说是非常的简单了,几行代码就可以搞定:

OkHttpClient client = new OkHttpClient();
Request req = new Request.Builder()
    .url("https://192.168.2.141")
    .build();
//同步请求
client.newCall(req).execute();

//异步请求
client.newCall(req).enqueue(new Callback() {
    @Override
    public void onFailure(Call call, IOException e) {

    }

    @Override
    public void onResponse(Call call, Response response) throws IOException {

    }
});

几行代码搞定,基本可以分为三部分:创建 Client创建 Request同步/异步执行,那么它内部是怎么实现的呢?那就得通过源码来看了。

创建 OKHttpClient

我们通过 new 关键字创建了 Client,直接上代码:

  public OkHttpClient() {
    this(new Builder());
  }

可以看到,在它的内部新建了一个 Builder 对象作为参数调用了另外一个构造方法,先看看这个 Builder 是干什么的:

    public Builder() {
      dispatcher = new Dispatcher();    //【重点】分发器
      protocols = DEFAULT_PROTOCOLS;    //协议
      connectionSpecs = DEFAULT_CONNECTION_SPECS;   //传输版本,连接协议
      eventListenerFactory = EventListener.factory(EventListener.NONE); //客户端事件监听工厂
      proxySelector = ProxySelector.getDefault();   //代理
      cookieJar = CookieJar.NO_COOKIES; //cookie
      socketFactory = SocketFactory.getDefault();   //socket 工厂
      hostnameVerifier = OkHostnameVerifier.INSTANCE;   //确认主机名
      certificatePinner = CertificatePinner.DEFAULT;    //证书
      proxyAuthenticator = Authenticator.NONE;  //代理身份验证
      authenticator = Authenticator.NONE;   // 本地身份验证
      connectionPool = new ConnectionPool();    //连接池
      dns = Dns.SYSTEM; //dns
      followSslRedirects = true;    //安全套接层重定向
      followRedirects = true;   //本地重定向
      retryOnConnectionFailure = true;  //重试连接失败
      connectTimeout = 10_000;  //连接超时
      readTimeout = 10_000; //读超时
      writeTimeout = 10_000;    //写超时 
      pingInterval = 0; //ping超时
    }

很简单,就是给一些变量进行了初始化,例如 cookiednsconnectionTimeoutreadTimeoutwriteTimeout、等等,将这个 Builder 对象作为参数传入 OKHttpClient 当中:

  OkHttpClient(Builder builder) {
  //将 Builder 设置的值赋值给client
    this.dispatcher = builder.dispatcher;
    ...省略
    this.interceptors = Util.immutableList(builder.interceptors);
    this.networkInterceptors = Util.immutableList(builder.networkInterceptors);
    ...省略
    // 默认不是用SSL
    boolean isTLS = false;
    for (ConnectionSpec spec : connectionSpecs) {
      isTLS = isTLS || spec.isTls();
    }

    if (builder.sslSocketFactory != null || !isTLS) {
      this.sslSocketFactory = builder.sslSocketFactory;
      this.certificateChainCleaner = builder.certificateChainCleaner;
    } else {
      X509TrustManager trustManager = systemDefaultTrustManager();
      this.sslSocketFactory = systemDefaultSslSocketFactory(trustManager);
      this.certificateChainCleaner = CertificateChainCleaner.get(trustManager);
    }
    //还是赋值
    this.hostnameVerifier = builder.hostnameVerifier;
    ...省略

    //判断有没有设置拦截器
    if (interceptors.contains(null)) {
      throw new IllegalStateException("Null interceptor: " + interceptors);
    }
    if (networkInterceptors.contains(null)) {
      throw new IllegalStateException("Null network interceptor: " + networkInterceptors);
    }
  }

这样,Client 就创建成功了。

创建 Request

前面我们创建 Request 是这么写的:

Request req = new Request.Builder()
    .url("https://192.168.2.141")
    .build();

调用了 Request 内部类 Builder 的 build 方法,看一下 Builder 类:

  public static class Builder {
    HttpUrl url;
    String method;
    Headers.Builder headers;
    RequestBody body;
    Object tag;

    public Builder() {
      this.method = "GET";
      this.headers = new Headers.Builder();
    }
    ...省略
  }

这里就可以看到,OKHttp 默认是使用 GET 方法发起请求的

然后这个 Headers 实际上就是我们发起请求时候的消息头。OKHttp 默认为我们创建一个长度为 20 的 List 来存储这些消息头。关于 HTTP 消息头可以看这里:网络编程总结(二):HTTP 协议

除了 OKHttp 已经为我们设置好的消息头之外,我们还可以自己添加/删除/修改,Bilder 类为我们提供了 addHeaderremoveHeaderheader 方法。

与此同时,Builder 类还提供了一系列的添加 URL 的方法:url(HttpUrl url)url(String url)url(URL url)

当然,HTTP 还有其他的请求方法,例如 POST/PUT/DELETE 等等。自然, OKHTTP 也提供了相应的方法,以 POST 为例:

    public Builder post(RequestBody body) {
      return method("POST", body);
    }

        public Builder method(String method, @Nullable RequestBody body) {
      if (method == null) throw new NullPointerException("method == null");
      if (method.length() == 0) throw new IllegalArgumentException("method.length() == 0");
      if (body != null && !HttpMethod.permitsRequestBody(method)) {
        throw new IllegalArgumentException("method " + method + " must not have a request body.");
      }
      if (body == null && HttpMethod.requiresRequestBody(method)) {
        throw new IllegalArgumentException("method " + method + " must have a request body.");
      }
      this.method = method;
      this.body = body;
      return this;
    }

然后调用 build() 方法,返回一个 Request 对象:

    public Request build() {
      if (url == null) throw new IllegalStateException("url == null");
      return new Request(this);
    }

这次,Request 创建成功。

发起请求

最一开始就已经说过了,发起请求有两种方式:同步和异步,我们依次来说:

同步请求

Client 调用 newCall 方法将 Request 传入:

  @Override public Call newCall(Request request) {
    return RealCall.newRealCall(this, request, false /* for web socket */);
  }

实际上是调用了 RealCall 的 newCall 方法:

  static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
    // Safely publish the Call instance to the EventListener.
    RealCall call = new RealCall(client, originalRequest, forWebSocket);
    call.eventListener = client.eventListenerFactory().create(call);
    return call;
  }

返回了 RealCall 对象,然后我们调用的实际上是该对象的 execute 方法:

  @Override public Response execute() throws IOException {
    //先检查该请求已经是否已经执行过了。
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    //用于捕获栈信息
    captureCallStackTrace();
    //事件监听器开始启动 call
    eventListener.callStart(this);
    try {
        //【重点1】,这里会调用 client 的 dispatcher 方法获取 Dispatcher 对象,调用该对象的 executed方法
      client.dispatcher().executed(this);
      //【终点2】getResponseWithInterceptorChain 方法
      Response result = getResponseWithInterceptorChain();
      if (result == null) throw new IOException("Canceled");
      return result;
    } catch (IOException e) {
      eventListener.callFailed(this, e);
      throw e;
    } finally {
    //【重点3】调用 Dispatcher 的finished 方法
      client.dispatcher().finished(this);
    }
  }

上面的代码标出了重点,主要是 Dispatcher 对象的两个方法和 getResponseWithInterceptorChain,尤其是 getResponseWithInterceptorChain,它直接返回了 Response 对象。

Diapatcher
Dispatcher 什么时候创建的呢?还记得创建 Client 的时候吗?Builder 类中直接创建了 Dispatcher 对象,并将其赋值给了 Client 的属性。

Dispatcher 的 executed 方法又干了什么呢?

  /** Used by {@code Call#execute} to signal it is in-flight. */
  synchronized void executed(RealCall call) {
    runningSyncCalls.add(call);
  }

可以看到,该方法是一个同步方法,在该方法当中将我们的 Call 添加到了 runningSyncCalls 当中,通过名字可以看出来是一个 正在运行的同步 Call 集合,通过代码我们可以看到 runningSyncCalls 是一个 双端队列,关于双端队列,可以看这里:java 集合框架(十五)Deque

private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();

在将 RealCall 添加到队列之后,开始执行 getResponseWithInterceptorChain 方法:

  Response getResponseWithInterceptorChain() throws IOException {
    // Build a full stack of interceptors.
    List<Interceptor> interceptors = new ArrayList<>();
    interceptors.addAll(client.interceptors());
    interceptors.add(retryAndFollowUpInterceptor);
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
    interceptors.add(new CacheInterceptor(client.internalCache()));
    interceptors.add(new ConnectInterceptor(client));
    if (!forWebSocket) {
      interceptors.addAll(client.networkInterceptors());
    }
    interceptors.add(new CallServerInterceptor(forWebSocket));

    Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
        originalRequest, this, eventListener, client.connectTimeoutMillis(),
        client.readTimeoutMillis(), client.writeTimeoutMillis());

    return chain.proceed(originalRequest);
  }

这个方法实际上是 OKHttp 的核心方法,在这个方法中,我们先是创建了一个有序队列,然后依次添加一些拦截器,首先是我们自己创建的,因为我们没有创建,所以 client 的 interceptor 自然就是空的,然后就是一些系统内置的拦截器了:

  • retryAndFollowUpInterceptor:重试的 interceptor
  • BridgeInterceptor:桥接 interceptor
  • CacheInterceptor:缓存 interceptor
  • ConnectInterceptor:
  • CallServerInterceptor

接下来就是遍历处理这些 Interceptor 了,就是下面那段代码的工作:

    Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
        originalRequest, this, eventListener, client.connectTimeoutMillis(),
        client.readTimeoutMillis(), client.writeTimeoutMillis());
    return chain.proceed(originalRequest);

RealInterceptorChain 类的注释为:

A concrete interceptor chain that carries the entire interceptor chain: all application
interceptors, the OkHttp core, all network interceptors, and finally the network caller.

翻译过来就是:

携带了所有拦截器的拦截器链:所有的应用拦截器,OKHttp 核心拦截器,网络拦截器,最后是网络访问

这个类中最主要的方法是:

    // Call the next interceptor in the chain.
    RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
        connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
        writeTimeout);
    Interceptor interceptor = interceptors.get(index);
    Response response = interceptor.intercept(next);

注释也可以看出来,是访问链条中的下一个拦截器,通过 index+1 来达到访问下一个的目的。

所以之前我们在 ArrayList 当中添加的拦截器都会被访问执行。一直到最后一个 CallServerInterceptor,在最后一个拦截器当中发起网络请求:

public final class CallServerInterceptor implements Interceptor {
    ...省略
    Response response = responseBuilder
        .request(request)
        .handshake(streamAllocation.connection().handshake())
        .sentRequestAtMillis(sentRequestMillis)
        .receivedResponseAtMillis(System.currentTimeMillis())
        .build();   

    ...省略
}

这样,一个同步请求就完成了。

当然,省略了很多代码,包括一些重试拦截器、缓存拦截器等等,会在后面讲到。

异步请求
异步请求和同步请求主要的区别是创建 RealCall 之后的执行,异步是调用了 enqueue 方法:

  @Override public void enqueue(Callback responseCallback) {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    eventListener.callStart(this);
    client.dispatcher().enqueue(new AsyncCall(responseCallback));
  }

同样是调用了 client 的 dispatcher 方法获取 Dispatcher 对象,只不过是调用了该对象的 enqueue 方法:

  synchronized void enqueue(AsyncCall call) {
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
      runningAsyncCalls.add(call);
      executorService().execute(call);
    } else {
      readyAsyncCalls.add(call);
    }
  }

runningAsyncCallsreadyAsyncCalls 同样也是双端队列:

  /** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
  private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
  /** Ready async calls in the order they'll be run. */
  private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();

系统判断将根据队列的长度判断传入的 ReadCall 是 running 还是 ready

  private int maxRequests = 64;
  private int maxRequestsPerHost = 5;

看一下 executorService 方法:

  public synchronized ExecutorService executorService() {
    if (executorService == null) {
      executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;
  }

在该方法中创建了一个线程池,然后执行了线程池的 execute 方法,我们知道线程池的 execute 方法参数是一个 Runnable 对象,那我们来看一下 enqueue 的方法参数:AsyncCall call

  final class AsyncCall extends NamedRunnable {
    ...省略
  }

  public abstract class NamedRunnable implements Runnable {
    ...省略
  }

可以看到 AsyncCall 同样也是一个 Runnable 对象,看一下 AsyncCall:

  final class AsyncCall extends NamedRunnable {
    private final Callback responseCallback;

    AsyncCall(Callback responseCallback) {
      super("OkHttp %s", redactedUrl());
      this.responseCallback = responseCallback;
    }

    ...省略

    @Override protected void execute() {
      boolean signalledCallback = false;
      try {
        Response response = getResponseWithInterceptorChain();
        if (retryAndFollowUpInterceptor.isCanceled()) {
          signalledCallback = true;
          responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
        } else {
          signalledCallback = true;
          responseCallback.onResponse(RealCall.this, response);
        }
      } catch (IOException e) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
          eventListener.callFailed(RealCall.this, e);
          responseCallback.onFailure(RealCall.this, e);
        }
      } finally {
        client.dispatcher().finished(this);
      }
    }
  }

可以看到又回到了 getResponseWithInterceptorChain 方法,这个方法上面同步请求的时候分析过了,接下来就是一些接口回调,最后是 finally 代码块中的内容,在同步方法当中我们也调用了 dispatcher 对象的 finished 方法,这个方法是干什么的呢?

  private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
    int runningCallsCount;
    Runnable idleCallback;
    synchronized (this) {
      if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
      if (promoteCalls) promoteCalls();
      runningCallsCount = runningCallsCount();
      idleCallback = this.idleCallback;
    }

    if (runningCallsCount == 0 && idleCallback != null) {
      idleCallback.run();
    }
  }

这个方法最主要的是 promoteCalls 方法的调用:

  private void promoteCalls() {
    if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
    if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.

    for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
      AsyncCall call = i.next();

      if (runningCallsForHost(call) < maxRequestsPerHost) {
        i.remove();
        runningAsyncCalls.add(call);
        executorService().execute(call);
      }

      if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
    }
  }

可以看到,会判断正在 运行的异步请求 runningAsyncCalls 是否大于系统的最大请求数,准备运行的异步请求 readyAsyncCalls 是否为空,然后会遍历 readyAsyncCalls,最后调用线程池的 execute 方法去执行这个异步请求。

同步请求和异步请求都会调用 dispatcher 的 finished 方法:

  /** Used by {@code AsyncCall#run} to signal completion. */
  void finished(AsyncCall call) {
    finished(runningAsyncCalls, call, true);
  }

  /** Used by {@code Call#execute} to signal completion. */
  void finished(RealCall call) {
    finished(runningSyncCalls, call, false);
  }

二者的不同支出就是是否会去调用 promoteCalls 方法。

拦截器(interceptor)

前面我们讲过了在 getResponseWithInterceptorChain 方法中系统会创建一个 ArrayList 来存储我们自己创建的拦截器和系统内置的拦截器:

  Response getResponseWithInterceptorChain() throws IOException {
    // Build a full stack of interceptors.
    List<Interceptor> interceptors = new ArrayList<>();
    interceptors.addAll(client.interceptors());
    interceptors.add(retryAndFollowUpInterceptor);
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
    interceptors.add(new CacheInterceptor(client.internalCache()));
    interceptors.add(new ConnectInterceptor(client));
    if (!forWebSocket) {
      interceptors.addAll(client.networkInterceptors());
    }
    interceptors.add(new CallServerInterceptor(forWebSocket));

    Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
        originalRequest, this, eventListener, client.connectTimeoutMillis(),
        client.readTimeoutMillis(), client.writeTimeoutMillis());

    return chain.proceed(originalRequest);
  }

通过 proceed 方法,依次调用每个拦截器链接起来:

public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
      RealConnection connection) throws IOException {
      //判断边界
    if (index >= interceptors.size()) throw new AssertionError();

    calls++;

   ...省略

    // 调用链条中的下一个拦截器,通过 index+1 的方式来获取下一个
    RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
        connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
        writeTimeout);
    Interceptor interceptor = interceptors.get(index);
    Response response = interceptor.intercept(next);

    ...省略
    return response;
  }

在上面的代码中,先是获取了新创建了一个 RealInterceptorChain 对象,它的一个重要参数就是 index+1,然后从拦截器列表当中获取到下一个拦截器,紧接着调用下一个拦截器的 intercept 方法,将 RealInterceptorChain 对象作为参数传入。

就以系统默认的拦截器为例,最一开始创建的 RealInterceptorChain 对象参数为 0,因为我们没有添加自定义的拦截器,自然就是系统内置的 retryAndFollowUpInterceptor 拦截器了:

  @Override public Response intercept(Chain chain) throws IOException {
    Request request = chain.request();
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Call call = realChain.call();
    EventListener eventListener = realChain.eventListener();

    ...省略

    int followUpCount = 0;
    Response priorResponse = null;
    while (true) {
      ...省略
      try {
        response = realChain.proceed(request, streamAllocation, null, null);
        releaseConnection = false;
      } catch (RouteException e) {
        ...省略
      } catch (IOException e) {
        ...省略
      } finally {
        ...省略
      }

      ...省略

    }
  }

可以看到又执行了 proceed 方法,又回到了之前,只不过这次的 index 是 +1+1,这样又拿到了第二个拦截器,又会执行第二个拦截器的 intercept 方法,以此类推,一直到最后一个。

我们也可以自己创建拦截器:

public class MyIntercept implements Interceptor {
    @Override
    public Response intercept(Chain chain) throws IOException {
        Request request = chain.request();
        //执行我们自定义的操作
        Log.d("TAG","打一个 Log");
        Response response = chain.proceed(request);
        return response;
    }
}

然后在创建 Client 当中添加我们的拦截器:

OkHttpClient client = new OkHttpClient().newBuilder().addInterceptor(new MyIntercept()).build();
Copyright© 2020-2022 li-xyz 冀ICP备2022001112号-1