OKHttp源码解析(三)--中阶之线程池和消息队列
上篇文章已经说明了OKHttp有两种调用方式,一种是阻塞的同步请求,一种是异步的非阻塞的请求。今天主要是讲解的是异步的请求。其中涉及了Dispatcher这个类,其他的类基本已经在上篇文章介绍过了。所以本片文章的大体思路如下:
在讲解线程池和消息队列的时候有必要讲下线程池的基本概念
android的异步任务一般都是用Thread+Handler或者AsyncTask来实现,其中笔者当初经历过各种各样坑,特别是内存泄漏,当初笔者可是相当的欲死欲仙啊!所以现在很少有开发者还在用这一套来做异步任务,现在一般都是Rxjava为主,当然还有自己自定义的异步任务框架(比如笔者),像RxJava都帮我们写好了对应场景的线程池,这是为什么?
我对线程池的理解是有两个层次,一种是狭隘的,一种是广义的,那么咱们各自都说下
线程池是一种多线程处理形式,处理过程中将任务添加到队列中,后面再创建线程去处理这些任务,线程池里面的线程都是后台线程,每个线程都是默认的优先级下运。如果某个线程处于空闲中,将添加一个任务进来,让空闲线程去处理任务。如果所有线程都很繁忙,消息队列会挂起,等待某个线程池空闲后再处理任务。这样可以保证线程数量不能超多最大数量。
多线程技术主要是解决处理器单元内多个线程执行的问题,它可以显著减少处理的单元闲置时间,增加处理器单元的吞吐能力。如果对多线程应用不当,会增加对单个任务的的处理时间。
举例说明: 假如一个服务器完成一项任务的时间为T:
T1 创建线程的时间 T2 在线程中执行任务的时间,包括线程同步所需要的时间 T3 线程销毁的时间
显然 T= T1+T2+T3. 注意:这是一个理想化的情况
可以看出,T1,T3是多线程自身带来的开销(在Java中,通过映射pThread,并进一步通过SystemCall实现native线程),我们渴望减少T1和T3的时间,从而减少T的时间。但是一些线程的使用者并没有注意到这一点,所以在线程中频繁的创建或者销毁线程,这导致T1和T3在T中占有相当比例。这显然突出的线程池的弱点(T1,T3),而不是有点(并发性)。 取自IBM知识库
所以线程池的技术正是如何关注缩短或调整T1,T3时间的技术,从而提高服务器程序的性能。
在平时我们可以通过线程工厂来创建线程池来尽量避免上述的问题。
Dispatcher负责请求的分发
/** Executes calls. Created lazily. */ private ExecutorService executorService; public synchronized ExecutorService executorService() { if (executorService == null) { executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false)); } return executorService; }
由上面代码可以得出Dispatcher内部实现了懒加载的无边界限制的线程池。参数解析
(1)SynchronousQueue每个插入操作必须等待另一个线程的移除操作,同样任何一个移除操作都等待另一个线程的插入操作。因此队列内部其实没有任何一个元素,或者说容量为0,严格说并不是一种容器,由于队列没有容量,因此不能调用peek等操作,因此只有移除元素才有元素,显然这是一种快速传递元素的方式,也就是说在这种情况下元素总是以最快的方式从插入者(生产者)传递给移除者(消费者),这在多任务队列中最快的处理任务方式。对于高频请求场景,无疑是最合适的。
(2)在OKHttp中,创建了一个阀值是Integer.MAX_VALUE的线程池,它不保留任何最小线程,随时创建更多的线程数,而且如果线程空闲后,只能多活60秒。所以也就说如果收到20个并发请求,线程池会创建20个线程,当完成后的60秒后会自动关闭所有20个线程。他这样设计成不设上限的线程,以保证I/O任务中高阻塞低占用的过程,不会长时间卡在阻塞上。
整个框架主要通过Call来封装每一次的请求。同时Call持有OkHttpClient和一份Request。而每一次的同步或者异步请求都会有Dispatcher的参与。
Dispatcher在执行同步的Call:直接加入到runningSyncCall队列中,实际上并没有执行该Call,而是交给外部执行
/** Used by {@code Call#execute} to signal it is in-flight. */ synchronized void executed(RealCall call) { runningSyncCalls.add(call); }
将Call加入队列:如果当前正在执行的call的数量大于maxRequest(64),或者该call的Host上的call超过maxRequestsPerHos(5),则加入readyAsyncCall排队等待,否则加入runningAsyncCalls并执行
synchronized void enqueue(AsyncCall call) { if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) { runningAsyncCalls.add(call); executorService().execute(call); } else { readyAsyncCalls.add(call); } }
从ready到running,在每个call结束的时候都会调用finished
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) { int runningCallsCount; Runnable idleCallback; synchronized (this) { if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!"); //每次remove完后,执行promoteCalls来轮转。 if (promoteCalls) promoteCalls(); runningCallsCount = runningCallsCount(); idleCallback = this.idleCallback; } //线程池为空时,执行回调 if (runningCallsCount == 0 && idleCallback != null) { idleCallback.run(); } } private void promoteCalls() { //如果当前执行的线程大于maxRequests(64),则不操作 if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity. if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote. for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) { AsyncCall call = i.next(); if (runningCallsForHost(call) < maxRequestsPerHost) { i.remove(); runningAsyncCalls.add(call); executorService().execute(call); } if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity. } }
通过上面代码,大家可以知道finished先执行calls.remove(call)删除call,然后执行promoteCalls(),在promoteCalls()方法里面:如果当前线程大于maxRequest则不操作,如果小于maxRequest则遍历readyAsyncCalls,取出一个call,并把这个call放入runningAsyncCalls,然后执行execute。在遍历过程中如果runningAsyncCalls超过maxRequest则不再添加,否则一直添加。所以可以这样说:
具体的执行请求则在RealCall里面实现的,同步的在RealCall的execute里面实现的,而异步的则在AsyncCall的execute里面实现的。里面都是调用RealCall的getResponseWithInterceptorChain的方法来实现责任链的调用。
在OKHttp中,它使用Dispatcher作为任务的调度器。
如下图所示
Dispatcher.png
在整个调度流程中涉及的成员如下: 其中
// Dispatcher.java maxRequests = 64 // 最大并发请求数为64 maxRequestsPerHost = 5 //每个主机最大请求数为5 ExecutorService executorService //消费者池(也就是线程池) Deque<AsyncCall> readyAsyncCalls: // 异步的缓存,正在准备被消费的(用数组实现,可自动扩容,无大小限制) Deque<AsyncCall> runningAsyncCalls //正在运行的 异步的任务集合,仅仅是用来引用正在运行的任务以判断并发量,注意它并不是消费者缓存 Deque<RealCall> runningSyncCalls //正在运行的,同步的任务集合。仅仅是用来引用正在运行的同步任务以判断并发量
通过将请求任务分发给多个线程,可以显著的减少I/O等待时间
@Override public Response execute() throws IOException { synchronized (this) { if (executed) throw new IllegalStateException("Already Executed"); executed = true; } captureCallStackTrace(); try { client.dispatcher().executed(this); Response result = getResponseWithInterceptorChain(); if (result == null) throw new IOException("Canceled"); return result; } finally { client.dispatcher().finished(this); } }
/** Used by {@code Call#execute} to signal it is in-flight. */ synchronized void executed(RealCall call) { runningSyncCalls.add(call); }
void finished(RealCall call) { finished(runningSyncCalls, call, false); }
这里其实做的是出队操作。至此同步的调度就已经结束了
在讲解异步调度之前不得不提到AsyncCall这个类,AsyncCall,他其实是RealCall的内部类
//RealCall.java final class AsyncCall extends NamedRunnable { private final Callback responseCallback; AsyncCall(Callback responseCallback) { super("OkHttp %s", redactedUrl()); this.responseCallback = responseCallback; } String host() { return originalRequest.url().host(); } Request request() { return originalRequest; } RealCall get() { return RealCall.this; } @Override protected void execute() { boolean signalledCallback = false; try { //执行耗时任务 Response response = getResponseWithInterceptorChain(); if (retryAndFollowUpInterceptor.isCanceled()) { //retryAndFollowUpInterceptor取消了 执行失败 signalledCallback = true; //回调,注意这里回调是在线程池中,不是主线程 responseCallback.onFailure(RealCall.this, new IOException("Canceled")); } else { //一切正常走入正常流程 signalledCallback = true; responseCallback.onResponse(RealCall.this, response); } } catch (IOException e) { if (signalledCallback) { // Do not signal the callback twice! Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e); } else { responseCallback.onFailure(RealCall.this, e); } } finally { //最后执行出队 client.dispatcher().finished(this); } } }
@Override public void enqueue(Callback responseCallback) { synchronized (this) { if (executed) throw new IllegalStateException("Already Executed"); executed = true; } captureCallStackTrace(); client.dispatcher().enqueue(new AsyncCall(responseCallback)); }
在enqueue里面调用了client.dispatcher().enqueue(new AsyncCall(responseCallback));方法
synchronized void enqueue(AsyncCall call) { //判断是否满足入队的条件(立即执行) if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) { //正在运行的异步集合添加call runningAsyncCalls.add(call); //执行这个call executorService().execute(call); } else { //不满足入队(立即执行)条件,则添加到等待集合中 readyAsyncCalls.add(call); } }
上述代码发现想要入队需要满足下面的条件
(runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost)
如果满足条件,那么就直接把AsyncCall直接加到runningCalls的队列中,并在线程池中执行(线程池会根据当前负载自动创建,销毁,缓存相应的线程)。反之就放入readyAsyncCalls进行缓存等待。 runningAsyncCalls.size() < maxRequests 表示当前正在运行的AsyncCall是否小于maxRequests = 64 runningCallsForHost(call) < maxRequestsPerHos 表示同一个地址访问的AsyncCall是否小于maxRequestsPerHost = 5; 即 当前正在并发的请求不能超过64且同一个地址的访问不能超过5个
第三步:这里分两种情况
runningAsyncCalls.add(call);
executorService().execute(call);
由于AsyncCall继承于NamedRunnable类,而NamedRunnable类又是Runnable类的实现类,所以走到了AsyncCall的execute()方法里面
@Override protected void execute() { boolean signalledCallback = false; try { Response response = getResponseWithInterceptorChain(); if (retryAndFollowUpInterceptor.isCanceled()) { signalledCallback = true; responseCallback.onFailure(RealCall.this, new IOException("Canceled")); } else { signalledCallback = true; responseCallback.onResponse(RealCall.this, response); } } catch (IOException e) { if (signalledCallback) { // Do not signal the callback twice! Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e); } else { responseCallback.onFailure(RealCall.this, e); } } finally { client.dispatcher().finished(this); } }
void finished(AsyncCall call) { finished(runningAsyncCalls, call, true); }
PS:注意这里面第三个参数 同步是false,异步是true,如果是异步则需要进行是否添加继续入队的情景
readyAsyncCalls.add(call);
能进入等待则说明当前要么有64条正在进行的并发,要么同一个地址有5个请求,所以要等待。
当有如下条件被满足或者触发的时候则执行promoteCalls操作
public synchronized void setMaxRequestsPerHost(int maxRequestsPerHost) { //设置的maxRequestsPerHost不能小于1 if (maxRequestsPerHost < 1) { throw new IllegalArgumentException("max < 1: " + maxRequestsPerHost); } this.maxRequestsPerHost = maxRequestsPerHost; promoteCalls(); }
public synchronized void setMaxRequests(int maxRequests) { //设置的maxRequests不能小于1 if (maxRequests < 1) { throw new IllegalArgumentException("max < 1: " + maxRequests); } this.maxRequests = maxRequests; promoteCalls(); }
if (promoteCalls) promoteCalls();
private void promoteCalls() { if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity. if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote. for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) { AsyncCall call = i.next(); if (runningCallsForHost(call) < maxRequestsPerHost) { i.remove(); runningAsyncCalls.add(call); executorService().execute(call); } if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity. } }
if (runningAsyncCalls.size() >= maxRequests) return; if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
如果此时 并发的数量还是大于maxRequests=64则return并继续等待 如果此时,没有等待的任务,则直接return并继续等待
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) { AsyncCall call = i.next(); if (runningCallsForHost(call) < maxRequestsPerHost) { i.remove(); runningAsyncCalls.add(call); executorService().execute(call); } if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity. }
进行同一个host是否已经有5请求在了,如果在了,则return返回并继续等待
i.remove(); runningAsyncCalls.add(call);
executorService().execute(call);
@Override protected void execute() { boolean signalledCallback = false; try { Response response = getResponseWithInterceptorChain(); if (retryAndFollowUpInterceptor.isCanceled()) { signalledCallback = true; responseCallback.onFailure(RealCall.this, new IOException("Canceled")); } else { signalledCallback = true; responseCallback.onResponse(RealCall.this, response); } } catch (IOException e) { if (signalledCallback) { // Do not signal the callback twice! Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e); } else { responseCallback.onFailure(RealCall.this, e); } } finally { client.dispatcher().finished(this); } }
void finished(AsyncCall call) { finished(runningAsyncCalls, call, true); }
至此 异步任务调度已经结束了
1、采用Dispacher作为调度,与线程池配合实现了高并发,低阻塞的的运行 2、采用Deque作为集合,按照入队的顺序先进先出 3、最精彩的就是在try/catch/finally中调用finished函数,可以主动控制队列的移动。避免了使用锁而wait/notify操作。