Dispatcher.png
在整个调度流程中涉及的成员如下:
其中
- Dispatcher 对象是分发者,也是生产者(默认在主线程中)
- AsyncCall 对象其实是一个任务即Runnable(内部做了包装异步接口)
// Dispatcher.java
maxRequests = 64 // 最大并发请求数为64
maxRequestsPerHost = 5 //每个主机最大请求数为5
ExecutorService executorService //消费者池(也就是线程池)
Deque<AsyncCall> readyAsyncCalls: // 异步的缓存,正在准备被消费的(用数组实现,可自动扩容,无大小限制)
Deque<AsyncCall> runningAsyncCalls //正在运行的 异步的任务集合,仅仅是用来引用正在运行的任务以判断并发量,注意它并不是消费者缓存
Deque<RealCall> runningSyncCalls //正在运行的,同步的任务集合。仅仅是用来引用正在运行的同步任务以判断并发量
通过将请求任务分发给多个线程,可以显著的减少I/O等待时间
2、OKHttp调度的具体流程分析
(1)同步调度分析
第一步是:是调用了RealCall的execute()方法里面调用executed(this);
@Override public Response execute() throws IOException {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
try {
client.dispatcher().executed(this);
Response result = getResponseWithInterceptorChain();
if (result == null) throw new IOException("Canceled");
return result;
} finally {
client.dispatcher().finished(this);
}
}
第二步:在Dispatcher里面的executed执行入队操作
/** Used by {@code Call#execute} to signal it is in-flight. */
synchronized void executed(RealCall call) {
runningSyncCalls.add(call);
}
第三步:执行getResponseWithInterceptorChain();进入拦截器链流程,然后进行请求,获取Response,并返回Response result 。
第四步:执行client.dispatcher().finished(this)操作
void finished(RealCall call) {
finished(runningSyncCalls, call, false);
}
这里其实做的是出队操作。至此同步的调度就已经结束了
(2)异步调度分析
AsyncCall类简介
在讲解异步调度之前不得不提到AsyncCall这个类,AsyncCall,他其实是RealCall的内部类
//RealCall.java
final class AsyncCall extends NamedRunnable {
private final Callback responseCallback;
AsyncCall(Callback responseCallback) {
super("OkHttp %s", redactedUrl());
this.responseCallback = responseCallback;
}
String host() {
return originalRequest.url().host();
}
Request request() {
return originalRequest;
}
RealCall get() {
return RealCall.this;
}
@Override protected void execute() {
boolean signalledCallback = false;
try {
//执行耗时任务
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
//retryAndFollowUpInterceptor取消了 执行失败
signalledCallback = true;
//回调,注意这里回调是在线程池中,不是主线程
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
//一切正常走入正常流程
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
responseCallback.onFailure(RealCall.this, e);
}
} finally {
//最后执行出队
client.dispatcher().finished(this);
}
}
}
第一步 是调用了RealCall的enqueue()方法
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
在enqueue里面调用了client.dispatcher().enqueue(new AsyncCall(responseCallback));方法
第二步:在Dispatcher里面的enqueue执行入队操作
synchronized void enqueue(AsyncCall call) {
//判断是否满足入队的条件(立即执行)
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
//正在运行的异步集合添加call
runningAsyncCalls.add(call);
//执行这个call
executorService().execute(call);
} else {
//不满足入队(立即执行)条件,则添加到等待集合中
readyAsyncCalls.add(call);
}
}
上述代码发现想要入队需要满足下面的条件
(runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost)
如果满足条件,那么就直接把AsyncCall直接加到runningCalls的队列中,并在线程池中执行(线程池会根据当前负载自动创建,销毁,缓存相应的线程)。反之就放入readyAsyncCalls进行缓存等待。
runningAsyncCalls.size() < maxRequests 表示当前正在运行的AsyncCall是否小于maxRequests = 64
runningCallsForHost(call) < maxRequestsPerHos 表示同一个地址访问的AsyncCall是否小于maxRequestsPerHost = 5;
即 当前正在并发的请求不能超过64且同一个地址的访问不能超过5个
第三步:这里分两种情况
情况1 第三步 可以直接入队
runningAsyncCalls.add(call);
第四步:线程池executorService执行execute()方法
executorService().execute(call);
由于AsyncCall继承于NamedRunnable类,而NamedRunnable类又是Runnable类的实现类,所以走到了AsyncCall的execute()方法里面
第五步:执行AsyncCall的execute()方法
@Override protected void execute() {
boolean signalledCallback = false;
try {
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
responseCallback.onFailure(RealCall.this, e);
}
} finally {
client.dispatcher().finished(this);
}
}
第六步:执行getResponseWithInterceptorChain();进入拦截器链流程,然后进行请求,获取Response。
第七步:如果是正常的获取到Response,则执行responseCallback.onResponse()
第八步:执行client.dispatcher().finished(this)操作 进行出队操作
void finished(AsyncCall call) {
finished(runningAsyncCalls, call, true);
}
PS:注意这里面第三个参数 同步是false,异步是true,如果是异步则需要进行是否添加继续入队的情景
情况2 第三步 不能直接入队,需要等待
readyAsyncCalls.add(call);
第四步 触发条件
能进入等待则说明当前要么有64条正在进行的并发,要么同一个地址有5个请求,所以要等待。
当有如下条件被满足或者触发的时候则执行promoteCalls操作
- 1 Dispatcher的setMaxRequestsPerHost()方法被调用时
public synchronized void setMaxRequestsPerHost(int maxRequestsPerHost) {
//设置的maxRequestsPerHost不能小于1
if (maxRequestsPerHost < 1) {
throw new IllegalArgumentException("max < 1: " + maxRequestsPerHost);
}
this.maxRequestsPerHost = maxRequestsPerHost;
promoteCalls();
}
- 2 Dispatcher的setMaxRequests()被调用时
public synchronized void setMaxRequests(int maxRequests) {
//设置的maxRequests不能小于1
if (maxRequests < 1) {
throw new IllegalArgumentException("max < 1: " + maxRequests);
}
this.maxRequests = maxRequests;
promoteCalls();
}
- 3当有一条请求结束了,执行了finish()的出队操作,这时候会触发promoteCalls()进行调整
if (promoteCalls)
promoteCalls();
第五步 执行Dispatcher的promoteCalls()方法
private void promoteCalls() {
if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();
if (runningCallsForHost(call) < maxRequestsPerHost) {
i.remove();
runningAsyncCalls.add(call);
executorService().execute(call);
}
if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
}
}
第六步 先判断是否满足 初步入队条件
if (runningAsyncCalls.size() >= maxRequests)
return;
if (readyAsyncCalls.isEmpty())
return; // No ready calls to promote.
如果此时 并发的数量还是大于maxRequests=64则return并继续等待
如果此时,没有等待的任务,则直接return并继续等待
第七步 满足初步的入队条件,进行遍历,然后进行第二轮入队判断
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();
if (runningCallsForHost(call) < maxRequestsPerHost) {
i.remove();
runningAsyncCalls.add(call);
executorService().execute(call);
}
if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
}
进行同一个host是否已经有5请求在了,如果在了,则return返回并继续等待
第八步 此时已经全部满足条件,则从等待队列面移除这个call,然后添加到正在运行的队列中
i.remove();
runningAsyncCalls.add(call);
第九步 线程池executorService执行execute()方法
executorService().execute(call);
第十步:执行AsyncCall的execute()方法
@Override protected void execute() {
boolean signalledCallback = false;
try {
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
responseCallback.onFailure(RealCall.this, e);
}
} finally {
client.dispatcher().finished(this);
}
}
第十一步:执行getResponseWithInterceptorChain();进入拦截器链流程,然后进行请求,获取Response。
第十二步:如果是正常的获取到Response,则执行responseCallback.onResponse()
第十三步:执行client.dispatcher().finished(this)操作 进行出队操作
void finished(AsyncCall call) {
finished(runningAsyncCalls, call, true);
}
至此 异步任务调度已经结束了
总结:
- 1、异步流程总结
所以简单的描述下异步调度为:如果当前还能可以执行异步任务,则入队,并立即执行,否则加入readyAsyncCalls队列,当一个请求执行完毕后,会调用promoteCalls(),来把readyAsyncCalls队列中的Async移出来并加入到runningAsyncCalls,并开始执行。然后在当前线程中去执行Call的getResponseWithInterceptorChain()方法,直接获取当前的返回数据Response
- 2、对比同步和异步任务,我们会发现:同步请求和异步请求原理都是一样的,都是在getResponseWithInterceptorChain()函数通过Interceptor链条来实现网络请求逻辑,而异步任务则通过ExecutorService来实现的。PS:在Dispatcher中添加一个封装了Callback的Call的匿名内部类AsyncCall来执行当前 的Call。这个AsyncCall是Call的匿名内部类。AsyncCall的execute方法仍然会回调到Call的 getResponseWithInterceptorChain方法来完成请求,同时将返回数据或者状态通过Callback来完成。
四、OKHttp调度的"优雅'之处:
1、采用Dispacher作为调度,与线程池配合实现了高并发,低阻塞的的运行
2、采用Deque作为集合,按照入队的顺序先进先出
3、最精彩的就是在try/catch/finally中调用finished函数,可以主动控制队列的移动。避免了使用锁而wait/notify操作。