预备知识
通常我们进行HTTP连接网络的时候我们会进行TCP的三次握手,然后传输数据,然后再释放连接。 大量的连接每次连接关闭都要三次握手四次分手的很显然会造成性能低下, 因此http有一种叫做keep-alive connections的机制(HTTP1.1以后默认开启),它可以在传输数据后仍然保持连接, 当客户端需要再次获取数据时,直接使用刚刚空闲下来的连接而不需要再次握手。
OkHttp的复用连接池就是为了复用这些没有断开连接的TCP连接的。
连接池概述
okhttp连接的建立主要是围绕ConnectInterceptor来的,
1、流程为首先通过RealCall的initExchange(chain)创建一个Exchange对象,其中会打开与目标服务器的链接, 并调用 Chain.proceed()方法进入下一个拦截器。
2、initExchange()方法中会先通过 ExchangeFinder 尝试去 RealConnectionPool 中寻找已存在的连接,未找到则会重新创建一个RealConnection 并开始连接,
3、然后将其存入RealConnectionPool,现在已经准备好了RealConnection 对象,
4、然后通过请求协议创建不同的ExchangeCodec 并返回,返回的ExchangeCodec正是创建Exchange对象的一个参数。
ConnectInterceptor 主要目的
ConnectInterceptor 的主要职责是建立与目标服务器的连接,并为后续的拦截器(CallServerInterceptor)提供一个有效的网络通道。这样可以实现发送请求并接收服务器响应的功能。
ConnectInterceptor的代码很简单,主要的功能就是初始化RealCall的Exchange。这个Exchange的功能就是基于RealConnection+ExchangeCodec进行数据交换。
核心类介绍
RealConnection
RealConnection 实现了 Connection接口,其中使用 Socket建立HTTP/HTTPS连接,并且获取 I/O 流,内部持有输入和输出流。
如果拥有了一个RealConnection就代表了我们已经跟服务器有了一条通信链路,而且通过RealConnection代表是连接socket链路,RealConnection对象意味着我们已经跟服务端有了一条通信链路了,同一个 Connection 可能会承载多个 HTTP 的请求与响应。
类构造与关键属性:
class RealConnection(val connectionPool: RealConnectionPool,private val route: Route
) : Http2Connection.Listener(), Connection {//和服务器直接通信的socket实例和用于数据读写的输入输出流private var rawSocket: Socket? = nullprivate var source: BufferedSource? = nullprivate var sink: BufferedSink? = null...//引用计数法记录本连接被多少个请求持有,用于回收管理中判断该连接是否空闲val calls = mutableListOf<Reference<RealCall>>()...
}
RealConnectionPool
这是用来存储 RealConnection 的池子,内部使用一个双端队列来进行存储。
在 OkHttp 中,一个连接(RealConnection)用完后不会立马被关闭并释放掉,而且是会存储到连接池(RealConnectionPool)中。 除了缓存连接外,缓存池还负责定期清理过期的连接,在 RealConnection 中会维护一个用来描述该连接空闲时间的字段,每添加一个新的连接到连接池中时都会进行一次检测,遍历所有的连接,找出当前未被使用且空闲时间最长的那个连接,如果该连接空闲时长超出阈值,或者连接池已满,将会关闭该连接。
类构造与关键属性:
class RealConnectionPool(taskRunner: TaskRunner,//每个空闲Socket的最大连接数,默认为5private val maxIdleConnections: Int,keepAliveDuration: Long,timeUnit: TimeUnit
) {//连接保活时间,默认5分钟private val keepAliveDurationNs: Long = timeUnit.toNanos(keepAliveDuration)private val cleanupQueue: TaskQueue = taskRunner.newQueue()private val cleanupTask = object : Task("$okHttpName ConnectionPool") {override fun runOnce() = cleanup(System.nanoTime())}//RealConnection是Socket对象的包装类,connections也就是对这些连接的缓存private val connections = ArrayDeque<RealConnection>()...
}
ExchangeCodec
ExchangeCodec 的功能就是对http报文的编解码,负责对Request 编码及解码 Response,也就是写入请求及读取响应,我们的请求及响应数据都通过它来读写。
其实现类有两个:Http1ExchangeCodec 及 Http2ExchangeCodec,分别对应两种协议版本。
Exchange
功能类似 ExchangeCodec,但它是对应的是单个请求,其在 ExchangeCodec 基础上担负了一些连接管理及事件分发的作用。
具体而言,Exchange 与 Request 以及ExchangeCodec 一一对应,新建一个请求时就会创建一个 Exchange,该 Exchange 负责将这个请求发送出去并读取到响应数据,而发送与接收数据使用的是 ExchangeCodec。
关键流程解析
调用链
--ConnectInterceptor.intercept--RealCall.initExchange--ExchangeFinder.find--ExchangeFinder.findHealthyConnection--ExchangeFinder.findConnection--RealConnectionPool.callAcquirePooledConnection...--RealConnection.newCodec
通过ExchangeFinder找到或者新建连接
class ExchangeFinder(...
) {...fun find(client: OkHttpClient, chain: RealInterceptorChain ): ExchangeCodec {...val resultConnection = findHealthyConnection(...)return resultConnection.newCodec(client, chain)...}private fun findHealthyConnection(...): RealConnection {...val candidate = findConnection(...)...}private fun findConnection(...): RealConnection {...// 首先查找当前call对应的connection是否可以使用 val callConnection = call.connection if (callConnection != null) {if (callConnection.noNewExchanges || !sameHostAndPort(callConnection.route().address.url)) {}}// 从连接池里面查找可用连接if (connectionPool.callAcquirePooledConnection(address, call, null, false)) {val result = call.connection!!eventListener.connectionAcquired(call, result)return result}...// 找不到的话创建新的连接val newConnection = RealConnection(connectionPool, route)...// 连接socketnewConnection.connect(...)...// 将新连接丢到连接池connectionPool.put(newConnection)// 绑定RealCall和连接call.acquireConnectionNoEvents(newConnection)...return newConnection}...
}class RealCall(...) : Call {fun acquireConnectionNoEvents(connection: RealConnection) {...this.connection = connectionconnection.calls.add(CallReference(this, callStackTrace))}
}
我们可以看到这里主要干了:
1.首先从判断当前RealCall是否有对应的Connection,并校验是否可以使用
2.call.connection不存在或者不符合条件,则接着从连接池里面查找可用连接
2.找不到的话创建新的连接RealConnection,连接socket
3.将新连接丢到连接池
4.绑定RealCall和连接RealConnection,也就是将RealConnection内部的引用计数+1
可以用图来描述就是:
从连接池里面查找可用连接
我们详细看一下是如何从连接池获取符合条件的connection并弄清楚连接复用的条件。
connectionPool.callAcquirePooledConnection()
从连接池中获取连接并将call添加到connection 的calls集合中
-> RealConnectionPoolfun callAcquirePooledConnection(address: Address,call: RealCall,routes: List<Route>?,requireMultiplexed: Boolean): Boolean {for (connection in connections) {synchronized(connection) {//当需要进行多路复用且当前的连接不是 HTTP/2 连接时,则放弃当前连接if (requireMultiplexed && !connection.isMultiplexed) return@synchronized// 查找到符合条件的address// 分析1.1.1.1.1.1if (!connection.isEligible(address, routes)) return@synchronized// 找到复用的 connection 并添加callcall.acquireConnectionNoEvents(connection)// realcall.connection = connection// 当前连接承载的所有call// connection.calls.add(CallReference(realCall, callStackTrace))return true}}return false}
分析1.1.1.1.1.1、判断连接是否有效
// 对比连接池中获取的每一个address,找到符合条件的address.
internal fun isEligible(address: Address, routes: List<Route>?): Boolean {assertThreadHoldsLock()...// 连接次数是否已满,在HTTP 1.X的情况下allocationLimit总是为1,即线头阻塞,// 前一个请求完全结束后后一个请求才能复用,否则只能新开Connectionif (calls.size >= allocationLimit || noNewExchanges) return false// 找到符合条件的address ,// 对比:非Host的地址部分是否相等,内部会比较dns、protocols、proxy、sslSocketFactory、port等if (!this.route.address.equalsNonHost(address)) return false// 如果主机名相同则返回 trueif (address.url.host == this.route().address.url.host) {return true // This connection is a perfect match.}// http2 相关 ----------------// 到达这一步,我们没有主机名匹配。但如果满足连接合并的要求,仍然可以承载请求。// 1. 1. 此连接必须是 HTTP/2。if (http2Connection == null) return false// 2. 路由必须共享一个 IP 地址。if (routes == null || !routeMatchesAny(routes)) return false// 3. 此连接的服务器证书必须包含新主机。if (address.hostnameVerifier !== OkHostnameVerifier) return falseif (!supportsUrl(address.url)) return false// 4. 证书锁定必须与主机匹配。try {address.certificatePinner!!.check(address.url.host, handshake()!!.peerCertificates)} catch (_: SSLPeerUnverifiedException) {return false}// http2 调用方的地址可以由此连接承载。return true }
我们可以得到连接池复用的条件为:
1.当前连接可用请求次数已满不可复用,在HTTP 1.X的情况下allocationLimit总是为1,即线头阻塞,前一个请求完全结束后后一个请求才能复用此连接,否则只能新开Connection
2.非Host的地址部分不相等不可复用,内部会比较dns、protocols、proxy、sslSocketFactory、port等
3.前面满足的前提下,host如果相同可复用
4.如果host不相等,但当前http协议是http2那就需要继续判断其他条件决定是否可复用
这里要特别注意
allocationLimit 在HTTP 1.X的情况下allocationLimit总是为1,保证了HTTP 1.X的情况下每次只能跑一个请求, 也就是说必须一个将上次Request的Response完全读取之后才能发送下一次Request。
但http2在多路复用+二进制帧的加持下是允许一个连接同时被多个请求使用的,允许连续发送多个Request 。
Connection引用计数
创建或者复用Connection的时候都会调用到RealCall.acquireConnectionNoEvents,将RealCall的弱引用丢到connection.calls里面,于是就完成了请求对Connection引用计数+1;
class RealCall(...) : Call {fun acquireConnectionNoEvents(connection: RealConnection) {...this.connection = connectionconnection.calls.add(CallReference(this, callStackTrace))}
}
有add就有remove,请求完成后,会调用Exchange.complete方法,最终调到RealCall.releaseConnectionNoEvents将引用从connection.calls里面删掉,于是就完成了请求对Connection引用计数-1;
internal fun releaseConnectionNoEvents(): Socket? {...val calls = connection.callsval index = calls.indexOfFirst { it.get() == this@RealCall }//找到当前请求RealCall对应的Connecton.calls列表中的RealCallcheck(index != -1)calls.removeAt(index)this.connection = null...return null}
Sockt连接的建立
连接的真正建立其实也是通过Socket来完成的,我们可以简单看一下:
--> RealConnectionfun connect(connectTimeout: Int,readTimeout: Int,writeTimeout: Int,pingIntervalMillis: Int,connectionRetryEnabled: Boolean,call: Call,eventListener: EventListener) {...while (true) {try {// 检测当前的路由是否通过HTTP代理来实现HTTPS的隧道传输if (route.requiresTunnel()) {connectTunnel(connectTimeout, readTimeout, writeTimeout, call, eventListener)if (rawSocket == null) {// 无法建立隧道连接,但正确关闭了资源。break}} else {// 连接socket ,connectSocket(connectTimeout, readTimeout, call, eventListener)}...break} catch (e: IOException) {// 异常情况重置状态... }}}
--> RealConnectionprivate fun connectSocket(connectTimeout: Int,readTimeout: Int,call: Call,eventListener: EventListener) {val proxy = route.proxyval address = route.address// 创建Socket val rawSocket = when (proxy.type()) {Proxy.Type.DIRECT, Proxy.Type.HTTP -> address.socketFactory.createSocket()!!else -> Socket(proxy)}this.rawSocket = rawSocketrawSocket.soTimeout = readTimeouttry {// 连接socketPlatform.get().connectSocket(rawSocket, route.socketAddress, connectTimeout)} catch (e: ConnectException) {...}// 获取Socket的读写流,分别封装 读source 写sinktry {source = rawSocket.source().buffer()sink = rawSocket.sink().buffer()} catch (npe: NullPointerException) {...}}
Sockt连接的断开
由于我们需要复用连接,因此Socket连接并不是在请求完成后就断开的,如果空闲连接数在允许范围内(默认5个),他会保持空闲存活keep-alive的时间后(默认5分钟)由okhttp的自动清理机制进行清理并关闭。具体分析见下面的空闲连接清理部分。
如何清理空闲连接?
从上面的复用机制我们看到,socket连接在上一次请求完成之后是不会断开的,等待下次请求复用。 如果一直不去断开的话,就会有一个资源占用的问题。
那么OkHttp是在什么时候断开连接的呢?
其实RealConnectionPool内部会有个cleanupTask专门用于连接的清理,它会在RealConnectionPool的put(加入新连接)、connectionBecameIdle(有连接空闲)里面被调用。
private val cleanupQueue: TaskQueue = taskRunner.newQueue()private val cleanupTask = object : Task("$okHttpName ConnectionPool") {override fun runOnce() = cleanup(System.nanoTime())}//加入新连接fun put(connection: RealConnection) {...cleanupQueue.schedule(cleanupTask)}//有连接空闲,对应引用计数的-1,也就是RealCall.releaseConnectionNoEventsfun connectionBecameIdle(connection: RealConnection): Boolean {connection.assertThreadHoldsLock()return if (connection.noNewExchanges || maxIdleConnections == 0) {connection.noNewExchanges = trueconnections.remove(connection)if (connections.isEmpty()) cleanupQueue.cancelAll()true} else {cleanupQueue.schedule(cleanupTask)false}}
cleanupQueue会根据 Task.runOnce的返回值等待一段时间再次调用runOnce, 这样设计是为了在本次执行清理后,拿到最近一个需要清理的连接到期剩余的时间,方便第一时间将过期的连接清理掉。 这里的runOnce实际就是cleanup方法,这里面会查找空闲过久的连接,然后关闭它的socket:
fun cleanup(now: Long): Long {var inUseConnectionCount = 0var idleConnectionCount = 0var longestIdleConnection: RealConnection? = nullvar longestIdleDurationNs = Long.MIN_VALUE// 找到下一次空闲连接超时的时间for (connection in connections) {synchronized(connection) {// 如果这个connection还在使用(Response还没有读完),就计数然后继续搜索if (pruneAndGetAllocationCount(connection, now) > 0) {inUseConnectionCount++} else {idleConnectionCount++// 这个连接已经空闲,计算它空闲了多久,并且保存空闲了最久的连接val idleDurationNs = now - connection.idleAtNsif (idleDurationNs > longestIdleDurationNs) {longestIdleDurationNs = idleDurationNslongestIdleConnection = connection} else {Unit}}}}when {longestIdleDurationNs >= this.keepAliveDurationNs|| idleConnectionCount > this.maxIdleConnections -> {// 如果空闲最久的连接比keepAliveDurationNs这个值要大就回收val connection = longestIdleConnection!!...// 关闭socketconnection.socket().closeQuietly()if (connections.isEmpty()) cleanupQueue.cancelAll()// 我们只回收了空闲超时最久的连接,可能还会有其他连接也超时了,返回0让它立马进行下一次清理return 0L}idleConnectionCount > 0 -> {// 如果有空闲连接,就计算最近的一次空闲超时的时间,去等待return keepAliveDurationNs - longestIdleDurationNs}inUseConnectionCount > 0 -> {// 如果所有连接都在使用,就等待这个超时时间去重新检查清理return keepAliveDurationNs}else -> {// 如果没有连接,就不需要再检查了return -1}}
}
这里面主要是干了这几个事情:
1.遍历缓存池中所有的Connection,根据pruneAndGetAllocationCount计算每个Connection被多少个请求引用决定该Connection是否要进入回收判断逻辑, 如果需要被回收,得到空闲时间最长的Connection和时间
2.对比刚才收集到的最长时间的Connection和keepAlive的时间
3.对比当前空闲的数量和连接池允许的最大的空闲数量
4.对满足23条件的Connection进行Socket断开操作,并且返回0马上进行下一次cleanup回收,因为我们只回收了空闲时间最久的连接
5.如果有空闲连接,但是还没到最大空闲时间,那就返回时间差值,等待这个时间后再次执行cleanup回收
6.如果没有空闲连接,就等待keepAlive时间后再次进行检查
7.如果没有连接,就返回-1不检查了
pruneAndGetAllocationCount返回的是正在占用的请求数,用于检测连接是否空闲,prune有修剪的意思, 除了计算被引用的次数外,内部遍历RealConnection的allocations弱引用列表,修剪并移除掉RealConnection.calls引用计数列表中已经内存回收的RealCall对应的弱引用本身 Refrence,这里巧妙利用了弱引用的原理,类似WeakedHashMap:
RealConnectionPoolprivate fun pruneAndGetAllocationCount(connection: RealConnection, now: Long): Int {connection.assertThreadHoldsLock()val references = connection.callsvar i = 0while (i < references.size) {val reference = references[i]if (reference.get() != null) {//如果得到的为null,说明弱引用指向的对象本身已经发生了内存泄漏i++continue}// We've discovered a leaked call. This is an application bug.val callReference = reference as CallReferenceval message = "A connection to ${connection.route().address.url} was leaked. " +"Did you forget to close a response body?"Platform.get().logCloseableLeak(message, callReference.callStackTrace)references.removeAt(i)connection.noNewExchanges = true// If this was the last allocation, the connection is eligible for immediate eviction.if (references.isEmpty()) {connection.idleAtNs = now - keepAliveDurationNsreturn 0}}return references.size}
什么情况下会发生reference.get()==null的情况呢?
既然前面提到RealCall.releaseConnectionNoEvents中会主动对引用计数进行remove,那什么时候才会发生泄漏的情况呢?比如得到一个Response之后一直不去读取的话实际上它会一直占中这个RealConnection,具体可能是下面的样子:
client.newCall(getRequest()).enqueue(new Callback() {@Overridepublic void onFailure(@NotNull Call call, @NotNull IOException e) {}@Overridepublic void onResponse(@NotNull Call call, @NotNull Response response) throws IOException {// 啥都不干}
});
onResponse传入的response没有人去读取数据,就会一直占用连接,但是由于它在后面又没有人引用就会被GC回收导致这条连接再也不能断开。 pruneAndGetAllocationCount里面就通过弱引用get返回null的方式去检查到这样的异常,进行清理动作。
我们也可以联想到OkHttp常常需要注意的两个问题的原因:
1.Response.string只能调用一次
由于Response.string读取完成之后这次请求其实就已经结束了,而且OkHttp并没有对这个结果做缓存, 所以下次再读取就会出现java.lang.IllegalStateException: closed异常
2.Response必须被及时读取
如果我们得到一个Response之后一直不去读取的话实际上它会一直占中这这个Connect,下次HTTP 1.X的请求就不能复用这套链接,要新建一条Connection
小结: 以上就是对ConnectInterceptor的主要流程分析,主要源于OkHttp源码分析之连接池复用 这篇文章。内容层次清晰,容易懂。
下面是我自己对源码的分析理解:
ConnectInterceptor完整源码分析:
一、ConnectInterceptor拦截器源码分析
ConnectInterceptor 的源码比较简单
object ConnectInterceptor : Interceptor {@Throws(IOException::class)override fun intercept(chain: Interceptor.Chain): Response {val realChain = chain as RealInterceptorChain// 分析1:为即将进行的请求和响应找到一个新的或者从连接池中获取的连接。val exchange = realChain.call.initExchange(chain)val connectedChain = realChain.copy(exchange = exchange)return connectedChain.proceed(realChain.request)}}
分析1、RealCall.initExchange()
---> RealCallinternal fun initExchange(chain: RealInterceptorChain): Exchange {// RetryAndFollowUpInterceptor 的 call.enterNetworkInterceptorExchange 方法中初始化 // RealCall 中的 exchangeFinder // exchangeFinder 是一个用于查找连接和编解码器的对象,它包含了连接池和路由选择等相关逻辑val exchangeFinder = this.exchangeFinder!!// 找到链接并通过连接创建 传输解码器// 方法作用 1、建立有效链接 2、通过建立的链接创建传输编解码器// 通过find获取一个连接的编解码器(codec)。// 分析1.1val codec = exchangeFinder.find(client, chain)// 创建Exchange// Exchange: 传输单个HTTP请求和响应对。这将连接管理和事件分层到ExchangeCodec上,后者处理实际的I/O。// 将 codec 的操作 包装在 Exchange 实体中。// 根据获取的连接和编解码器,创建一个 Exchange 对象,该对象用于管理请求和响应的交换过程。// 分析1.2val result = Exchange(this, eventListener, exchangeFinder, codec)this.interceptorScopedExchange = resultthis.exchange = resultsynchronized(this) {this.requestBodyOpen = truethis.responseBodyOpen = true}if (canceled) throw IOException("Canceled")return result}
find() 方法完成了ConnectInterceptor 的两个功能:
- 找到(创建)有效的连接,并建立连接
- 通过建立的连接创建 Exchange 管理请求和响应的交换过程。
分析1.1、ExchangeFinder.find
--> ExchangeFinderfun find(client: OkHttpClient,chain: RealInterceptorChain): ExchangeCodec {try {// 1、找到连接// 分析1.1.1val resultConnection = findHealthyConnection(connectTimeout = chain.connectTimeoutMillis,readTimeout = chain.readTimeoutMillis,writeTimeout = chain.writeTimeoutMillis,pingIntervalMillis = client.pingIntervalMillis,connectionRetryEnabled = client.retryOnConnectionFailure,doExtensiveHealthChecks = chain.request.method != "GET")// 2、通过链接创建 Exchange Codec 交换传输编码器// 分析1.1.2return resultConnection.newCodec(client, chain)}
分析1.1.1、寻找连接,建立连接
--> ExchangeFinder@Throws(IOException::class)private fun findHealthyConnection(connectTimeout: Int,readTimeout: Int,writeTimeout: Int,pingIntervalMillis: Int,connectionRetryEnabled: Boolean,doExtensiveHealthChecks: Boolean): RealConnection {while (true) {// 找到候选的连接// 分析1.1.1.1 val candidate = findConnection(connectTimeout = connectTimeout,readTimeout = readTimeout,writeTimeout = writeTimeout,pingIntervalMillis = pingIntervalMillis,connectionRetryEnabled = connectionRetryEnabled)// 确认连接是正常的if (candidate.isHealthy(doExtensiveHealthChecks)) {return candidate}// If it isn't, take it out of the pool.candidate.noNewExchanges()...}}
分析1.1.1.1 、寻找和建立连接逻辑
如何找到有效的连接呢?官方给出了寻找连接的逻辑:
作用是为请求获取一个连接。
1、它首先尝试复用已有的连接,
2、然后尝试从连接池获取连接,
3、最后建立新的连接。
在获取连接的过程中,会进行路由选择、连接合并等操作,以提高连接的效率和复用性。
@Throws(IOException::class)private fun findConnection(connectTimeout: Int,readTimeout: Int,writeTimeout: Int,pingIntervalMillis: Int,connectionRetryEnabled: Boolean): RealConnection {// 检查请求是否已取消if (call.isCanceled()) throw IOException("Canceled")// 1、尝试复用RealCall已有的连接// 此处的连接可能会在 releaseConnectionNoEvents() 中被修改!val callConnection = call.connection if (callConnection != null) {var toClose: Socket? = nullsynchronized(callConnection) {// 如果连接没有新的交换或者与当前请求的主机和端口不匹配,则释放连接if (callConnection.noNewExchanges || !sameHostAndPort(callConnection.route().address.url)) {toClose = call.releaseConnectionNoEvents()}}// 如果连接没有被释放,则复用连接if (call.connection != null) {check(toClose == null)return callConnection}// 连接已被释放toClose?.closeQuietly()eventListener.connectionReleased(call, callConnection)}// 需要一个新的连接,重置状态数据信息refusedStreamCount = 0connectionShutdownCount = 0otherFailureCount = 0// 2、尝试从连接池(ConnectionPool)中获取// 分析1.1.1.1.1// private val connections = ConcurrentLinkedQueue<RealConnection>()if (connectionPool.callAcquirePooledConnection(address, call, null, false)) {val result = call.connection!!eventListener.connectionAcquired(call, result)return result}// 3、连接池中没有可用连接,确定下一个要尝试的路由val routes: List<Route>?val route: Routeif (nextRouteToTry != null) {// 使用之前合并连接的路由routes = nullroute = nextRouteToTry!!nextRouteToTry = null} else if (routeSelection != null && routeSelection!!.hasNext()) {// 使用现有的路由选择中的路由routes = nullroute = routeSelection!!.next()} else {// 计算新的路由选择,这是一个阻塞操作!var localRouteSelector = routeSelectorif (localRouteSelector == null) {localRouteSelector = RouteSelector(address, call.client.routeDatabase, call, eventListener)this.routeSelector = localRouteSelector}val localRouteSelection = localRouteSelector.next()routeSelection = localRouteSelectionroutes = localRouteSelection.routes// 检查请求是否已取消if (call.isCanceled()) throw IOException("Canceled")// 由于连接合并,现在有了一组 IP地址,再次尝试从连接池获取连接if (connectionPool.callAcquirePooledConnection(address, call, routes, false)) {val result = call.connection!!eventListener.connectionAcquired(call, result)return result}route = localRouteSelection.next()}// 告知请求正在连接的连接,以便异步取消操作生效val newConnection = RealConnection(connectionPool, route)call.connectionToCancel = newConnectiontry {// 分析1.1.1.1.2newConnection.connect(connectTimeout,readTimeout,writeTimeout,pingIntervalMillis,connectionRetryEnabled,call,eventListener)} finally {call.connectionToCancel = null}call.client.routeDatabase.connected(newConnection.route())// 如果与该主机的其他请求同时进行连接,则合并连接if (connectionPool.callAcquirePooledConnection(address, call, routes, true)) {val result = call.connection!!nextRouteToTry = routenewConnection.socket().closeQuietly()eventListener.connectionAcquired(call, result)return result}// 添加连接到连接池中synchronized(newConnection) {// 添加连接到连接池中connectionPool.put(newConnection)// RealCall 和 newConnection关联call.acquireConnectionNoEvents(newConnection)}eventListener.connectionAcquired(call, newConnection)return newConnection}
这段代码是一个用于查找连接的函数,其主要逻辑如下:
- 首先检查请求是否已取消,如果已取消,则抛出IOException。
- 尝试重用来自请求的连接,如果存在可重用的连接,则直接返回。
- 如果连接池中存在可用连接,从连接池中获取连接并返回。
- 如果连接池中不存在可用连接,则确定下一个要尝试的路由。
- 如果存在已合并连接的路由,则使用该路由。
- 如果存在已存在的路由选择,并且还有未尝试的路由,则使用该路由。
- 否则,计算新的路由选择,并获取下一个要尝试的路由。
- 进行连接操作,创建新的连接,并进行连接设置。
- 如果与该主机的其他请求同时进行连接,则合并连接。
- 将新连接放入连接池中,并为请求获取该连接。
- 返回新连接。
分析1.1.1.1.1、从连接池中获取连接 connectionPool.callAcquirePooledConnection()
从连接池中获取连接并将call添加到connection 的calls集合中
-> RealConnectionPoolfun callAcquirePooledConnection(address: Address,call: RealCall,routes: List<Route>?,requireMultiplexed: Boolean): Boolean {for (connection in connections) {synchronized(connection) {// http2,如果需要要求使用多路复用的连接(requireMultiplexed 为 true),但该连接不支持多路复用.继续遍历下一个if (requireMultiplexed && !connection.isMultiplexed) return@synchronized// 查找到符合条件的address// 分析1.1.1.1.1.1if (!connection.isEligible(address, routes)) return@synchronized// 找到复用的 connection 并添加callcall.acquireConnectionNoEvents(connection)// realcall.connection = connection// 当前连接承载的所有call// connection.calls.add(CallReference(realCall, callStackTrace))return true}}return false}
分析1.1.1.1.1.1、判断连接是否有效
// 对比连接池中获取的每一个address,找到符合条件的address.
internal fun isEligible(address: Address, routes: List<Route>?): Boolean {assertThreadHoldsLock()...// 找到符合条件的address ,// 对比:this.dns == that.dns &&this.proxyAuthenticator == that.proxyAuthenticator &&this.protocols == that.protocols...if (!this.route.address.equalsNonHost(address)) return false// 如果主机名相同则返回 trueif (address.url.host == this.route().address.url.host) {return true // This connection is a perfect match.}// http2 相关 ----------------// 到达这一步,我们没有主机名匹配。但如果满足连接合并的要求,仍然可以承载请求。// 1. 1. 此连接必须是 HTTP/2。if (http2Connection == null) return false// 2. 路由必须共享一个 IP 地址。if (routes == null || !routeMatchesAny(routes)) return false// 3. 此连接的服务器证书必须包含新主机。if (address.hostnameVerifier !== OkHostnameVerifier) return falseif (!supportsUrl(address.url)) return false// 4. 证书锁定必须与主机匹配。try {address.certificatePinner!!.check(address.url.host, handshake()!!.peerCertificates)} catch (_: SSLPeerUnverifiedException) {return false}// http2 调用方的地址可以由此连接承载。return true }
连接绑定, call 和 connection 绑定赋值,
call.acquireConnectionNoEvents(connection)
--> RealCall
fun acquireConnectionNoEvents(connection: RealConnection) {...// RealCall 和 connection关联this.connection = connection// calls 是 当前connection 承载的所有call 的集合connection.calls.add(CallReference(this, callStackTrace))}
分析1.1.1.1.2、建立连接 findConnection 的connect
--> RealConnectionfun connect(connectTimeout: Int,readTimeout: Int,writeTimeout: Int,pingIntervalMillis: Int,connectionRetryEnabled: Boolean,call: Call,eventListener: EventListener) {...while (true) {try {// 检测当前的路由是否通过HTTP代理来实现HTTPS的隧道传输if (route.requiresTunnel()) {connectTunnel(connectTimeout, readTimeout, writeTimeout, call, eventListener)if (rawSocket == null) {// 无法建立隧道连接,但正确关闭了资源。break}} else {// 连接socket ,connectSocket(connectTimeout, readTimeout, call, eventListener)}establishProtocol(connectionSpecSelector, pingIntervalMillis, call, eventListener)eventListener.connectEnd(call, route.socketAddress, route.proxy, protocol)break} catch (e: IOException) {// 异常情况重置状态socket?.closeQuietly()rawSocket?.closeQuietly()socket = nullrawSocket = nullsource = nullsink = nullhandshake = nullprotocol = nullhttp2Connection = nullallocationLimit = 1... }}}
连接Socket
--> RealConnectionprivate fun connectSocket(connectTimeout: Int,readTimeout: Int,call: Call,eventListener: EventListener) {val proxy = route.proxyval address = route.address// 创建Socket val rawSocket = when (proxy.type()) {Proxy.Type.DIRECT, Proxy.Type.HTTP -> address.socketFactory.createSocket()!!else -> Socket(proxy)}this.rawSocket = rawSocketeventListener.connectStart(call, route.socketAddress, proxy)rawSocket.soTimeout = readTimeouttry {// 连接socketPlatform.get().connectSocket(rawSocket, route.socketAddress, connectTimeout)} catch (e: ConnectException) {throw ConnectException("Failed to connect to ${route.socketAddress}").apply {initCause(e)}}// 获取Socket的读写流,分别封装 读source 写sinktry {source = rawSocket.source().buffer()sink = rawSocket.sink().buffer()} catch (npe: NullPointerException) {if (npe.message == NPE_THROW_WITH_NULL) {throw IOException(npe)}}}
分析1.1.2、创建传输编解码管理器
--> RealConnectioninternal fun newCodec(client: OkHttpClient, chain: RealInterceptorChain): ExchangeCodec {val socket = this.socket!!val source = this.source!!val sink = this.sink!!val http2Connection = this.http2Connectionreturn if (http2Connection != null) {// 如果是Http2Http2ExchangeCodec(client, this, chain, http2Connection)} else {// Http1 socket.soTimeout = chain.readTimeoutMillis()source.timeout().timeout(chain.readTimeoutMillis.toLong(), MILLISECONDS)sink.timeout().timeout(chain.writeTimeoutMillis.toLong(), MILLISECONDS)Http1ExchangeCodec(client, this, source, sink)}}