OkHttp线程池和连接池
了解了OkHttp的网络请求流程以及拦截器实现原理,再关注OkHttp中两个重要的:OkHttp的线程池和连接池。
1、OkHttp线程池
在OkHttp网络请求流程一文中,我们分析了OkHttp异步和同步请求流程。请求最后都在Dispatcher中分发调度处理,最后被ExecutorService执行。
1.1、Dispatcher
Dispatcher中执行任务的执行器是executorService。
private var executorServiceOrNull: ExecutorService? = null
val executorService: ExecutorService
get() {
if (executorServiceOrNull == null) {
executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
}
return executorServiceOrNull!!
}
Dispatcher有两个构造器:一个默认无参,一个带ExecutorService参数。
constructor()
constructor(executorService: ExecutorService) : this() {
this.executorServiceOrNull = executorService
}
1.2、默认线程池
如果没有设置OkHttp线程执行器,其内部有一个默认的executorServiceOrNull,会充当Dispatcher的任务执行器,默认实现如下:
executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
无核心线程,最大线程数量无上限,线程空闲存活时间60s,关于线程池详见线程池 Executors。
①核心线程数量0:保持在线程池中的线程数量,为0代表线程空闲后不会保留。
②Integer.MAX_VALUE:表示线程池可以容纳最大线程数量。
③TimeUnit.SECOND:当线程池中的线程数量大于核心线程时,空闲的线程就会等待60s才会被终止,如果小于,则会立刻停止。
④SynchronousQueue:同步队列,按序排队,先来先服务。
⑤threadFactory("$okHttpName Dispatcher", false):线程工厂。
1.3、队列限制
既然默认的executorServiceOrNull线程池无上限,不断的接受网络请求任务会不会导致线程数量太多而引起OOM呢?
不会的,OkHttp会控制进入线程池执行的任务数量,还记得在OkHttp网络请求流程一文中异步请求涉及到的两个异步队列:
/** Ready async calls in the order they'll be run. */
private val readyAsyncCalls = ArrayDeque<AsyncCall>()
/** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
private val runningAsyncCalls = ArrayDeque<AsyncCall>()
在promoteAndExecute()方法中,从准备队列readyAsyncCalls取出任务放到执行队列runningAsyncCalls中执行的时候会判断是否达到最大任务数量maxRequests:
private fun promoteAndExecute(): Boolean {
...
val executableCalls = mutableListOf<AsyncCall>()
val isRunning: Boolean
synchronized(this) {
val i = readyAsyncCalls.iterator()
while (i.hasNext()) {
val asyncCall = i.next()
// Max capacity.
if (runningAsyncCalls.size >= this.maxRequests) break
// Host max capacity.
if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue
...
executableCalls.add(asyncCall)
runningAsyncCalls.add(asyncCall)
}
}
for (i in 0 until executableCalls.size) {
val asyncCall = executableCalls[i]
asyncCall.executeOn(executorService)
}
return isRunning
}
Dispatcher中定了最大任务数量maxRequests,默认64个并发异步任务。同时还限制了对每个主机同时执行的最大请求数maxRequestsPerHost为5,避免服务端承受太大“压力”。
/**
* The maximum number of requests to execute concurrently. Above this requests queue in memory,
* waiting for the running calls to complete.
*
* If more than [maxRequests] requests are in flight when this is invoked, those requests will
* remain in flight.
*/
@get:Synchronized var maxRequests = 64
set(maxRequests) {
require(maxRequests >= 1) { "max < 1: $maxRequests" }
synchronized(this) {
field = maxRequests
}
promoteAndExecute()
}
/**
* The maximum number of requests for each host to execute concurrently. This limits requests by
* the URL's host name. Note that concurrent requests to a single IP address may still exceed this
* limit: multiple hostnames may share an IP address or be routed through the same HTTP proxy.
*
* If more than [maxRequestsPerHost] requests are in flight when this is invoked, those requests
* will remain in flight.
*
* WebSocket connections to hosts **do not** count against this limit.
*/
@get:Synchronized var maxRequestsPerHost = 5
set(maxRequestsPerHost) {
require(maxRequestsPerHost >= 1) { "max < 1: $maxRequestsPerHost" }
synchronized(this) {
field = maxRequestsPerHost
}
promoteAndExecute()
}
为什么没有限制同步任务?因为这得开发者自行使用线程或线程池执行execute(),开发者要合理的使用ThreadPoolExecutor创建应用的线程池。推荐阅读为什么阿里不允许用Executors创建线程池?
1.4、自定义线程池
开发者可以自行在OkHttpClient.Builder配置dispatcher,自定义OkHttp线程池。
val executorService = ThreadPoolExecutor(5, 32, 30, TimeUnit.SECONDS, SynchronousQueue())
val dispatcher: Dispatcher = Dispatcher(executorService)
dispatcher.maxRequests = 64
val okHttpClient = OkHttpClient.Builder()
.dispatcher(dispatcher)
.build()
2、OkHttp连接池
再看OkHttp框架中另一个重要的概念:连接池,ConnectionPool。
2.1、连接池的初始化
OkHttp的连接池在OkHttpClient对象创建的时候初始化:
val connectionPool: ConnectionPool = builder.connectionPool
通过OkHttpClient.Builder构造,而OkHttpClient.Builder中使用默认连接池。当然开发者可以自行定义连接池的大小等配置。
class Builder constructor() {
internal var dispatcher: Dispatcher = Dispatcher()
internal var connectionPool: ConnectionPool = ConnectionPool()
...
}
调用ConnectionPool默认构造函数:创建一个默认5个连接大小的连接池,每个连接保活5分钟。
class ConnectionPool internal constructor(
internal val delegate: RealConnectionPool) {
constructor(
maxIdleConnections: Int,
keepAliveDuration: Long,
timeUnit: TimeUnit) : this(RealConnectionPool(
taskRunner = TaskRunner.INSTANCE,
maxIdleConnections = maxIdleConnections,
keepAliveDuration = keepAliveDuration,
timeUnit = timeUnit))
constructor() : this(5, 5, TimeUnit.MINUTES)
...
}
RealConnectionPool为ConnectionPool的代理类,真正实现一个连接池的功能。使用ConcurrentLinkedQueue作为池保存连接。
/**
* Holding the lock of the connection being added or removed when mutating this, and check its
* [RealConnection.noNewExchanges] property. This defends against races where a connection is
* simultaneously adopted and removed.
*/
private val connections = ConcurrentLinkedQueue<RealConnection>()
2.2、复用连接池
在OkHttp拦截器Interceptor一文中,梳理了5个框架默认拦截器。OkHttp处理网络流程在哪一步用到了连接池呢?在那篇博客的第2.4节 ConnectInterceptor中。
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
val exchange = realChain.call.initExchange(chain)
val connectedChain = realChain.copy(exchange = exchange)
return connectedChain.proceed(realChain.request)
}
别看ConnectInterceptor代码就这么点,它可完成了连接池复用、和服务器建立连接等。看一下其中RealCall的initExchange()方法:
/** 查找新的或合并的连接以承载即将到来的请求和响应. */
internal fun initExchange(chain: RealInterceptorChain): Exchange {
...
val exchangeFinder = this.exchangeFinder!!
val codec = exchangeFinder.find(client, chain)
val result = Exchange(this, eventListener, exchangeFinder, codec)
...
return result
}
内部调用RealCall的ExchangeFinder成员的find()方法,等等!exchangeFinder什么时候初始化的?让我们时光倒流,回到RetryAndFollowUpInterceptor拦截器中:
class RetryAndFollowUpInterceptor(private val client: OkHttpClient) : Interceptor {
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
var request = chain.request
val call = realChain.call
...
var newExchangeFinder = true
...
while (true) {
call.enterNetworkInterceptorExchange(request, newExchangeFinder)
...
}
}
在该拦截器中调用RealCall的enterNetworkInterceptorExchange(request: Request, newExchangeFinder: Boolean)方法:
fun enterNetworkInterceptorExchange(request: Request, newExchangeFinder: Boolean) {
...
if (newExchangeFinder) {
this.exchangeFinder = ExchangeFinder(
connectionPool,
createAddress(request.url),
this,
eventListener
)
}
}
传入的connectionPoll变量,其实就是从OkHttpClient的ConnectionPoll成员中的代理类RealConnectionPool。
private val connectionPool: RealConnectionPool = client.connectionPool.delegate
完成了对RealCall中exchangeFinder成员的初始化,后面的拦截器中可以使用该成员变量。
回到RealCall的initExchange()方法中继续,接着调用开启ExchangeFinder的一系列调用链:find -> findHealthyConnection -> findConnection 。
fun find(client: OkHttpClient,chain: RealInterceptorChain): ExchangeCodec {
try {
val resultConnection = findHealthyConnection(
connectTimeout = chain.connectTimeoutMillis,
readTimeout = chain.readTimeoutMillis,
writeTimeout = chain.writeTimeoutMillis,
pingIntervalMillis = client.pingIntervalMillis,
connectionRetryEnabled = client.retryOnConnectionFailure,
doExtensiveHealthChecks = chain.request.method != "GET"
)
return resultConnection.newCodec(client, chain)
}...
}
从连接池中复用连接,如果没有就新建连接放入池中。这其中的逻辑比较复杂,不需要了解每个实现细节,非硬着头皮钻要花很多时间。
2.3、连接池的维护
最后还有一点,在连接池实现类RealConnectionPool中用Task任务在TaskRunner中执行完成连接池的维护。
private val cleanupTask = object : Task("$okHttpName ConnectionPool") {
override fun runOnce() = cleanup(System.nanoTime())
}
还记得2.1节连接池初始化时传入的其中一个参数就是TaskRunner实例:
constructor(
maxIdleConnections: Int,
keepAliveDuration: Long,
timeUnit: TimeUnit) : this(RealConnectionPool(
taskRunner = TaskRunner.INSTANCE,
maxIdleConnections = maxIdleConnections,
keepAliveDuration = keepAliveDuration,
timeUnit = timeUnit))
TaskRunner的INSTANCE单例使用Kotlin伴生对象实现,这比Java方便多了。
companion object {
val INSTANCE = TaskRunner(RealBackend(threadFactory("$okHttpName TaskRunner", daemon = true)))
}
TaskRunner内部持有的Backend接口实现类RealBackend对象,内建了一个执行器和1.2节中说到的OkHttp默认线程池一样。
class RealBackend(threadFactory: ThreadFactory) : Backend {
private val executor = ThreadPoolExecutor(
0, // corePoolSize.
Int.MAX_VALUE, // maximumPoolSize.
60L, TimeUnit.SECONDS, // keepAliveTime.
SynchronousQueue(),
threadFactory
)
...
}
回到RealConnectionPool中,每当对连接池进行操作,比如添加新的连接放入连接池中,都会执行一遍Task:如果闲置连接超过了保持活动限制或闲置连接限制,则移除最长的连接。
fun put(connection: RealConnection) {
connection.assertThreadHoldsLock()
connections.add(connection)
cleanupQueue.schedule(cleanupTask)
}
具体执行TaskRunner执行Task的流程就不展开了,也比较有意思。因为这一个TaskRunner,不仅要执行维护连接池的Task、还负责Cache缓存任务的处理,总之在OkHttp中用的地方也不少。