OkHttp线程池和连接池

Quibbler 2021-3-20 2457

OkHttp线程池和连接池


        了解了OkHttp的网络请求流程以及拦截器实现原理,再关注OkHttp中两个重要的:OkHttp的线程池和连接池。



1、OkHttp线程池

        在OkHttp网络请求流程一文中,我们分析了OkHttp异步和同步请求流程。请求最后都在Dispatcher中分发调度处理,最后被ExecutorService执行。


1.1、Dispatcher

        Dispatcher中执行任务的执行器是executorService

  private var executorServiceOrNull: ExecutorService? = null
    
  val executorService: ExecutorService
    get() {
      if (executorServiceOrNull == null) {
        executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
            SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
      }
      return executorServiceOrNull!!
    }

        Dispatcher有两个构造器:一个默认无参,一个带ExecutorService参数。

  constructor()
  
  constructor(executorService: ExecutorService) : this() {
    this.executorServiceOrNull = executorService
  }


1.2、默认线程池

        如果没有设置OkHttp线程执行器,其内部有一个默认的executorServiceOrNull,会充当Dispatcher的任务执行器,默认实现如下:

        executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
            SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))

        无核心线程,最大线程数量无上限,线程空闲存活时间60s,关于线程池详见线程池 Executors

        核心线程数量0:保持在线程池中的线程数量,为0代表线程空闲后不会保留。

        Integer.MAX_VALUE:表示线程池可以容纳最大线程数量。

        TimeUnit.SECOND:当线程池中的线程数量大于核心线程时,空闲的线程就会等待60s才会被终止,如果小于,则会立刻停止。

        SynchronousQueue:同步队列,按序排队,先来先服务。

        threadFactory("$okHttpName Dispatcher", false):线程工厂。


1.3、队列限制

        既然默认的executorServiceOrNull线程池无上限,不断的接受网络请求任务会不会导致线程数量太多而引起OOM呢?

        不会的,OkHttp会控制进入线程池执行的任务数量,还记得在OkHttp网络请求流程一文中异步请求涉及到的两个异步队列:

  /** Ready async calls in the order they'll be run. */
  private val readyAsyncCalls = ArrayDeque<AsyncCall>()
  
  /** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
  private val runningAsyncCalls = ArrayDeque<AsyncCall>()

        在promoteAndExecute()方法中,从准备队列readyAsyncCalls取出任务放到执行队列runningAsyncCalls中执行的时候会判断是否达到最大任务数量maxRequests

  private fun promoteAndExecute(): Boolean {
    ...
    val executableCalls = mutableListOf<AsyncCall>()
    val isRunning: Boolean
    synchronized(this) {
      val i = readyAsyncCalls.iterator()
      while (i.hasNext()) {
        val asyncCall = i.next()
        
        // Max capacity.
        if (runningAsyncCalls.size >= this.maxRequests) break 
        
        // Host max capacity.
        if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue 
        ...
        executableCalls.add(asyncCall)
        runningAsyncCalls.add(asyncCall)
      }
    }
    for (i in 0 until executableCalls.size) {
      val asyncCall = executableCalls[i]
      asyncCall.executeOn(executorService)
    }
    return isRunning
  }

        Dispatcher中定了最大任务数量maxRequests,默认64个并发异步任务。同时还限制了对每个主机同时执行的最大请求数maxRequestsPerHost为5,避免服务端承受太大“压力”。 

  /**
   * The maximum number of requests to execute concurrently. Above this requests queue in memory,
   * waiting for the running calls to complete.
   *
   * If more than [maxRequests] requests are in flight when this is invoked, those requests will
   * remain in flight.
   */
  @get:Synchronized var maxRequests = 64
    set(maxRequests) {
      require(maxRequests >= 1) { "max < 1: $maxRequests" }
      synchronized(this) {
        field = maxRequests
      }
      promoteAndExecute()
    }
    
  /**
   * The maximum number of requests for each host to execute concurrently. This limits requests by
   * the URL's host name. Note that concurrent requests to a single IP address may still exceed this
   * limit: multiple hostnames may share an IP address or be routed through the same HTTP proxy.
   *
   * If more than [maxRequestsPerHost] requests are in flight when this is invoked, those requests
   * will remain in flight.
   *
   * WebSocket connections to hosts **do not** count against this limit.
   */
  @get:Synchronized var maxRequestsPerHost = 5
    set(maxRequestsPerHost) {
      require(maxRequestsPerHost >= 1) { "max < 1: $maxRequestsPerHost" }
      synchronized(this) {
        field = maxRequestsPerHost
      }
      promoteAndExecute()
    }

        为什么没有限制同步任务?因为这得开发者自行使用线程或线程池执行execute(),开发者要合理的使用ThreadPoolExecutor创建应用的线程池。推荐阅读为什么阿里不允许用Executors创建线程池?


1.4、自定义线程池

        开发者可以自行在OkHttpClient.Builder配置dispatcher,自定义OkHttp线程池。

    val executorService = ThreadPoolExecutor(5, 32, 30, TimeUnit.SECONDS, SynchronousQueue())
    val dispatcher: Dispatcher = Dispatcher(executorService)
    dispatcher.maxRequests = 64
    val okHttpClient = OkHttpClient.Builder()
            .dispatcher(dispatcher)
            .build()



2、OkHttp连接池

        再看OkHttp框架中另一个重要的概念:连接池,ConnectionPool


2.1、连接池的初始化

        OkHttp的连接池在OkHttpClient对象创建的时候初始化:

    val connectionPool: ConnectionPool = builder.connectionPool

        通过OkHttpClient.Builder构造,而OkHttpClient.Builder中使用默认连接池。当然开发者可以自行定义连接池的大小等配置。

  class Builder constructor() {
    internal var dispatcher: Dispatcher = Dispatcher()
    internal var connectionPool: ConnectionPool = ConnectionPool()
    ...
  }

        调用ConnectionPool默认构造函数:创建一个默认5个连接大小的连接池,每个连接保活5分钟。

  class ConnectionPool internal constructor(
    internal val delegate: RealConnectionPool) {
    
    constructor(
      maxIdleConnections: Int,
      keepAliveDuration: Long,
      timeUnit: TimeUnit) : this(RealConnectionPool(
        taskRunner = TaskRunner.INSTANCE,
        maxIdleConnections = maxIdleConnections,
        keepAliveDuration = keepAliveDuration,
        timeUnit = timeUnit))
        
    constructor() : this(5, 5, TimeUnit.MINUTES)
    ...
  }

        RealConnectionPoolConnectionPool的代理类,真正实现一个连接池的功能。使用ConcurrentLinkedQueue作为池保存连接。

  /**
   * Holding the lock of the connection being added or removed when mutating this, and check its
   * [RealConnection.noNewExchanges] property. This defends against races where a connection is
   * simultaneously adopted and removed.
   */
  private val connections = ConcurrentLinkedQueue<RealConnection>()


2.2、复用连接池

        在OkHttp拦截器Interceptor一文中,梳理了5个框架默认拦截器。OkHttp处理网络流程在哪一步用到了连接池呢?在那篇博客的第2.4节 ConnectInterceptor中。

  @Throws(IOException::class)
  override fun intercept(chain: Interceptor.Chain): Response {
    val realChain = chain as RealInterceptorChain
    val exchange = realChain.call.initExchange(chain)
    val connectedChain = realChain.copy(exchange = exchange)
    return connectedChain.proceed(realChain.request)
  }

        别看ConnectInterceptor代码就这么点,它可完成了连接池复用、和服务器建立连接等。看一下其中RealCallinitExchange()方法:

  /** 查找新的或合并的连接以承载即将到来的请求和响应. */
  internal fun initExchange(chain: RealInterceptorChain): Exchange {
    ...
    val exchangeFinder = this.exchangeFinder!!
    val codec = exchangeFinder.find(client, chain)
    val result = Exchange(this, eventListener, exchangeFinder, codec)
    ...
    return result
  }


        内部调用RealCallExchangeFinder成员的find()方法,等等!exchangeFinder什么时候初始化的?让我们时光倒流,回到RetryAndFollowUpInterceptor拦截器中:

  class RetryAndFollowUpInterceptor(private val client: OkHttpClient) : Interceptor {
    override fun intercept(chain: Interceptor.Chain): Response {
      val realChain = chain as RealInterceptorChain
      var request = chain.request
      val call = realChain.call
      ...
      var newExchangeFinder = true
      ...
      while (true) {
        call.enterNetworkInterceptorExchange(request, newExchangeFinder)
        ...
      }
  }

        在该拦截器中调用RealCallenterNetworkInterceptorExchange(request: Request, newExchangeFinder: Boolean)方法:

  fun enterNetworkInterceptorExchange(request: Request, newExchangeFinder: Boolean) {
    ...
    if (newExchangeFinder) {
      this.exchangeFinder = ExchangeFinder(
          connectionPool,
          createAddress(request.url),
          this,
          eventListener
      )
    }
  }

        传入的connectionPoll变量,其实就是从OkHttpClientConnectionPoll成员中的代理类RealConnectionPool

  private val connectionPool: RealConnectionPool = client.connectionPool.delegate

        完成了对RealCallexchangeFinder成员的初始化,后面的拦截器中可以使用该成员变量。


        回到RealCallinitExchange()方法中继续,接着调用开启ExchangeFinder的一系列调用链:find -> findHealthyConnection -> findConnection

  fun find(client: OkHttpClient,chain: RealInterceptorChain): ExchangeCodec {
    try {
      val resultConnection = findHealthyConnection(
          connectTimeout = chain.connectTimeoutMillis,
          readTimeout = chain.readTimeoutMillis,
          writeTimeout = chain.writeTimeoutMillis,
          pingIntervalMillis = client.pingIntervalMillis,
          connectionRetryEnabled = client.retryOnConnectionFailure,
          doExtensiveHealthChecks = chain.request.method != "GET"
      )
      return resultConnection.newCodec(client, chain)
    }...
  }

        从连接池中复用连接,如果没有就新建连接放入池中。这其中的逻辑比较复杂,不需要了解每个实现细节,非硬着头皮钻要花很多时间。


2.3、连接池的维护

        最后还有一点,在连接池实现类RealConnectionPool中用Task任务在TaskRunner中执行完成连接池的维护。

  private val cleanupTask = object : Task("$okHttpName ConnectionPool") {
    override fun runOnce() = cleanup(System.nanoTime())
  }

        还记得2.1节连接池初始化时传入的其中一个参数就是TaskRunner实例:

    constructor(
      maxIdleConnections: Int,
      keepAliveDuration: Long,
      timeUnit: TimeUnit) : this(RealConnectionPool(
        taskRunner = TaskRunner.INSTANCE,
        maxIdleConnections = maxIdleConnections,
        keepAliveDuration = keepAliveDuration,
        timeUnit = timeUnit))

        TaskRunnerINSTANCE单例使用Kotlin伴生对象实现,这比Java方便多了。

  companion object {
    val INSTANCE = TaskRunner(RealBackend(threadFactory("$okHttpName TaskRunner", daemon = true)))
  }

       TaskRunner内部持有的Backend接口实现类RealBackend对象,内建了一个执行器和1.2节中说到的OkHttp默认线程池一样。

  class RealBackend(threadFactory: ThreadFactory) : Backend {
    private val executor = ThreadPoolExecutor(
        0, // corePoolSize.
        Int.MAX_VALUE, // maximumPoolSize.
        60L, TimeUnit.SECONDS, // keepAliveTime.
        SynchronousQueue(),
        threadFactory
    )
    ...
  }

       回到RealConnectionPool中,每当对连接池进行操作,比如添加新的连接放入连接池中,都会执行一遍Task:如果闲置连接超过了保持活动限制或闲置连接限制,则移除最长的连接。

  fun put(connection: RealConnection) {
    connection.assertThreadHoldsLock()
    connections.add(connection)
    cleanupQueue.schedule(cleanupTask)
  }

        具体执行TaskRunner执行Task的流程就不展开了,也比较有意思。因为这一个TaskRunner,不仅要执行维护连接池的Task、还负责Cache缓存任务的处理,总之在OkHttp中用的地方也不少。



不忘初心的阿甘
最新回复 (0)
    • 安卓笔记本
      2
        登录 注册 QQ
返回
仅供学习交流,切勿用于商业用途。如有错误欢迎指出:fluent0418@gmail.com