OkHttp4.3源码解析之 - 发起请求

1.OkHttp4.3源码解析之 - 发起请求

什么是OkHttp?

OkHttp在Android开发领域里面应该是无人不知了吧。它是一个由Square公司开源的第三方库。主要用于处理网络请求。

OkHttp Github地址:https://github.com/square/okhttp/

关于Square公司,大家可以去看看他们的其他开源库,质量都挺高的,不仅包括Android相关的,还有Go,Ruby,JS,Kotlin等等

Square官方网站:https://square.github.io/

到目前为止,OkHttp的最新版本是4.3.1,里面一部分代码也已经用Kotlin来重写了,而大部分分析OkHttp源码的文章还停留在以前旧的版本,因此在写这系列源码分析的文章同时,也是学习Kotlin的一个好机会。

从一个简单的请求说起

如果有做过Android开发的话,相信以下代码大家都很熟悉了:

1
2
3
4
5
6
7
8
9
10
11
12
13
val client = OkHttpClient()
val request = Request.Builder().url("https://www.baidu.com").build()

val response = client.newCall(request).enqueue(object: Callback{
override fun onFailure(call: Call, e: IOException) {
println("bluelzy --- ${e.message}")
}

override fun onResponse(call: Call, response: Response) {
println("bluelzy --- ${response.body.toString()}")
}

})

这段代码做的事情很简单:

  1. 创建一个OkHttpClient对象

  2. 使用建造者模式创建一个Request对象

  3. 使用OkHttpClient.newCall(request).enqueue() 发起请求,并处理回调

但是这短短的几行代码里面,就已经有很多我们可以学习的东西了。

首先是建造者模式,也叫Builder模式,它的作用是让用户自由组合需要的参数,来实现不同的需求。具体实现方法可以通过接口,也可以像OkHttp一样,使用内部类。这里我们不详细展开阐述,有兴趣的读者可以去看看《大话设计模式》或者《Android源码设计模式分析与实战》。

然后,通过val这种写法来定义变量也不太优雅,因为这些变量其实声明一次就够了,我们可以改进一下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 定义Request
Request.Builder().url("https://www.baidu.com").build().let { request ->
// 定义OkHttpClient
OkHttpClient().newCall(request).enqueue(object : Callback {
override fun onFailure(call: Call, e: IOException) {
println("bluelzy --- ${e.message}")
}

override fun onResponse(call: Call, response: Response) {
println("bluelzy --- ${response.body.toString()}")
}

})
}

把声明的三个变量都干掉,这样代码看起来是不是简洁多了?

上面就是一个最简单的OkHttp发起GET请求的方式,下面我们一起来看看OkHttp是怎么做到的。

1.创建OkHttpClient

这个类的作用就是发起HTTP请求和接收响应

首先OkHttpClient有两个构造方法,一个是无参构造方法,另外一个是传入参数为Builder的构造方法

OkHttpClient的构造方法

无参构造方法

1
2
> constructor() : this(Builder())
>

有参构造方法

1
2
3
4
>  open class OkHttpClient internal constructor(
> builder: Builder
> )
>

OkHttpClient.Builder类

可以看到,无参构造方法其实也是调用了Builder为参数的构造方法,这里传入的Builder() 就是默认的实现,继续看看Builder里面是怎么实现的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
class Builder constructor() {
internal var dispatcher: Dispatcher = Dispatcher() // 调度器
internal var connectionPool: ConnectionPool = ConnectionPool() // 连接池
internal val interceptors: MutableList<Interceptor> = mutableListOf()//拦截器
// 网络拦截器
internal val networkInterceptors: MutableList<Interceptor> = mutableListOf()
// 事件监听
internal var eventListenerFactory: EventListener.Factory = EventListener.NONE.asFactory()
internal var retryOnConnectionFailure = true
internal var authenticator: Authenticator = Authenticator.NONE
internal var followRedirects = true
internal var followSslRedirects = true
internal var cookieJar: CookieJar = CookieJar.NO_COOKIES
// 缓存
internal var cache: Cache? = null
internal var dns: Dns = Dns.SYSTEM
internal var proxy: Proxy? = null
// 代理选择器
internal var proxySelector: ProxySelector? = null
// 代理身份验证
internal var proxyAuthenticator: Authenticator = Authenticator.NONE
// Socket工厂
internal var socketFactory: SocketFactory = SocketFactory.getDefault()
// SSL Socket工厂,用于HTTPS
internal var sslSocketFactoryOrNull: SSLSocketFactory? = null
internal var x509TrustManagerOrNull: X509TrustManager? = null
internal var connectionSpecs: List<ConnectionSpec> = DEFAULT_CONNECTION_SPECS
// 协议
internal var protocols: List<Protocol> = DEFAULT_PROTOCOLS
// 主机名字确认
internal var hostnameVerifier: HostnameVerifier = OkHostnameVerifier
// 证书链
internal var certificatePinner: CertificatePinner = CertificatePinner.DEFAULT
// 验证确认响应链,用于HTTPS
internal var certificateChainCleaner: CertificateChainCleaner? = null
internal var callTimeout = 0
internal var connectTimeout = 10_000
internal var readTimeout = 10_000
internal var writeTimeout = 10_000
internal var pingInterval = 0

internal constructor(okHttpClient: OkHttpClient) : this() {
this.dispatcher = okHttpClient.dispatcher
this.connectionPool = okHttpClient.connectionPool
this.interceptors += okHttpClient.interceptors
this.networkInterceptors += okHttpClient.networkInterceptors
this.eventListenerFactory = okHttpClient.eventListenerFactory
this.retryOnConnectionFailure = okHttpClient.retryOnConnectionFailure
this.authenticator = okHttpClient.authenticator
this.followRedirects = okHttpClient.followRedirects
this.followSslRedirects = okHttpClient.followSslRedirects
this.cookieJar = okHttpClient.cookieJar
this.cache = okHttpClient.cache
this.dns = okHttpClient.dns
this.proxy = okHttpClient.proxy
this.proxySelector = okHttpClient.proxySelector
this.proxyAuthenticator = okHttpClient.proxyAuthenticator
this.socketFactory = okHttpClient.socketFactory
this.sslSocketFactoryOrNull = okHttpClient.sslSocketFactoryOrNull
this.x509TrustManagerOrNull = okHttpClient.x509TrustManager
this.connectionSpecs = okHttpClient.connectionSpecs
this.protocols = okHttpClient.protocols
this.hostnameVerifier = okHttpClient.hostnameVerifier
this.certificatePinner = okHttpClient.certificatePinner
this.certificateChainCleaner = okHttpClient.certificateChainCleaner
this.callTimeout = okHttpClient.callTimeoutMillis
this.connectTimeout = okHttpClient.connectTimeoutMillis
this.readTimeout = okHttpClient.readTimeoutMillis
this.writeTimeout = okHttpClient.writeTimeoutMillis
this.pingInterval = okHttpClient.pingIntervalMillis
}

Builder是OkHttpClient的内部类,然后Builder声明了很多的参数,这些参数的作用我会在后面的文章中详细分析。

到这里为止,我们就完成了第一步,创建一个OkHttpClient对象,在构造方法里面创建了Builder()对象。然后看第二步:Request.Builder() 创建Request对象

2.创建Request对象

Request的构造方法

Request的构造方法需要4个参数

1
2
3
4
5
6
7
class Request internal constructor(
@get:JvmName("url") val url: HttpUrl, // 请求的url
@get:JvmName("method") val method: String, // 请求方法类型
@get:JvmName("headers") val headers: Headers, // 请求头
@get:JvmName("body") val body: RequestBody?, // 请求体
internal val tags: Map<Class<*>, Any>
)

Request.Builder类

Request同样使用了Builder模式,我们直接看Request.Builder类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
open class Builder {
internal var url: HttpUrl? = null
internal var method: String
internal var headers: Headers.Builder
internal var body: RequestBody? = null

/** A mutable map of tags, or an immutable empty map if we don't have any. */
internal var tags: MutableMap<Class<*>, Any> = mutableMapOf()

// 无参构造方法
constructor() {
// 默认为GET
this.method = "GET"
this.headers = Headers.Builder()
}

// 有参构造方法
internal constructor(request: Request) {
this.url = request.url
this.method = request.method
this.body = request.body
this.tags = if (request.tags.isEmpty()) {
mutableMapOf()
} else {
request.tags.toMutableMap()
}
this.headers = request.headers.newBuilder()
}

open fun url(url: HttpUrl): Builder = apply {
this.url = url
}
// 省略代码
}

可以看到,Builder默认的请求方法是GET,并且初始化了一个大小为20的ArrayList作为Headers的容器

除了method和haders之外,还有两个关键的参数,一个是url,另外一个就是RequestBody。这几个参数都提供了set方法,支持链式调用,例如url:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
open fun url(url: String): Builder {
// Silently replace web socket URLs with HTTP URLs.
val finalUrl: String = when {
url.startsWith("ws:", ignoreCase = true) -> {
"http:${url.substring(3)}"
}
url.startsWith("wss:", ignoreCase = true) -> {
"https:${url.substring(4)}"
}
else -> url
}

return url(finalUrl.toHttpUrl())
}

这里做了一点处理,如果是基于web socket协议的url,会被替换成http,而wws则是加密的web socket协议。

设置完了url / requestBody / mothod / headers 之后,我们都会调用build()方法:

1
2
3
4
5
6
7
8
9
open fun build(): Request {
return Request(
checkNotNull(url) { "url == null" },
method,
headers.build(),
body,
tags.toImmutableMap()
)
}

其实就是把刚刚设置的作为参数,调用Request的有参构造方法,创建一个Request对象。到这里为止,第二步也就完成了。

3.发起请求

做完前面两步之后,终于到了激动人心的时刻了,我们通过OkHttpClient.newCall(request).enqueue()来发起请求

到源码里面看看newCall和enqueue这两个方法分别做了什么

newCall()方法

1
2
3
override fun newCall(request: Request): Call {
return RealCall.newRealCall(this, request, forWebSocket = false)
}

调用RealCall.newRealCall方法,并且传入了OkHttpClient和Request作为参数

再看看newRealCall:

1
2
3
4
5
6
7
8
9
10
11
12
companion object {
fun newRealCall(
client: OkHttpClient,
originalRequest: Request,
forWebSocket: Boolean
): RealCall {
// Safely publish the Call instance to the EventListener.
return RealCall(client, originalRequest, forWebSocket).apply {
transmitter = Transmitter(client, this) // okhttp和网络层的中介
}
}
}

其实就是创建了一个RealCall对象,注意这里同时创建了一个Transmitter对象。它后面会再出现的,暂时先忽略,我们先回到发起请求这个过程中来。

enqueue()方法

上一步创建了RealCall对象是吧,传入了OkHttpClient和Request对象是吧,那么enqueue()方法又做了什么呢?

这个enqueue()方法其实是Call接口的其中一个方法,除了enqueue之外,还有request(), execute()等其他方法。我们先看这个enqueue()方法:

1
2
3
4
5
6
7
8
9
10
11
12
override fun enqueue(responseCallback: Callback) {
synchronized(this) {
check(!executed) { "Already Executed" }
executed = true
}
transmitter.callStart()
client.dispatcher.enqueue(AsyncCall(responseCallback))
}

override fun cancel() {
transmitter.cancel()
}

可以看到,首先这个方法有一个Callback作为参数,而这个Callback接口里面又有两个方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
interface Callback {
/**
* Called when the request could not be executed due to cancellation, a connectivity problem or
* timeout. Because networks can fail during an exchange, it is possible that the remote server
* accepted the request before the failure.
*/
fun onFailure(call: Call, e: IOException)

/**
* Called when the HTTP response was successfully returned by the remote server. The callback may
* proceed to read the response body with [Response.body]. The response is still live until its
* response body is [closed][ResponseBody]. The recipient of the callback may consume the response
* body on another thread.
*
* Note that transport-layer success (receiving a HTTP response code, headers and body) does not
* necessarily indicate application-layer success: `response` may still indicate an unhappy HTTP
* response code like 404 or 500.
*/
@Throws(IOException::class)
fun onResponse(call: Call, response: Response)
}

分别用于处理失败和成功两种情况的回调。

在enqueue的方法体内,首先通过一个synchronized关键字,确保这个方法只会执行一次,

然后通过dispatcher.enqueue来执行异步请求

1
2
3
4
5
6
7
8
9
10
11
12
13
internal fun enqueue(call: AsyncCall) {
synchronized(this) {
readyAsyncCalls.add(call)

// Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
// the same host.
if (!call.get().forWebSocket) {
val existingCall = findExistingCallWithHost(call.host())
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
}
}
promoteAndExecute()
}

继续看看promoteAndExecute()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private fun promoteAndExecute(): Boolean {
this.assertThreadDoesntHoldLock()

val executableCalls = mutableListOf<AsyncCall>()
val isRunning: Boolean
synchronized(this) {
val i = readyAsyncCalls.iterator()
while (i.hasNext()) {
val asyncCall = i.next()

if (runningAsyncCalls.size >= this.maxRequests) break // 最大请求数为64
if (asyncCall.callsPerHost().get() >= this.maxRequestsPerHost) continue // 相同的Host最大同时请求数为5

i.remove()
asyncCall.callsPerHost().incrementAndGet()
executableCalls.add(asyncCall) // 把请求加入到list中
runningAsyncCalls.add(asyncCall)
}
isRunning = runningCallsCount() > 0
}

for (i in 0 until executableCalls.size) {
val asyncCall = executableCalls[i]
asyncCall.executeOn(executorService) //
}

return isRunning
}

首先判断是不是超过了最大请求数或者是相同Host的最大请求数,如果是的话就直接return

否则就执行asyncCall.excuteOn方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
fun executeOn(executorService: ExecutorService) {
client.dispatcher.assertThreadDoesntHoldLock()

var success = false
try {
executorService.execute(this) // 执行请求
success = true
} catch (e: RejectedExecutionException) {
val ioException = InterruptedIOException("executor rejected")
ioException.initCause(e)
transmitter.noMoreExchanges(ioException)
responseCallback.onFailure(this@RealCall, ioException)
} finally {
if (!success) {
client.dispatcher.finished(this) // This call is no longer running!
}
}
}

AsyncCall是RealCall里面的一个内部类,因此在这里已经持有了OkHttpClient对象,也就持有了Dispatcher,而这个executorService又是什么呢?

其实这是一个线程池执行器,在Dispatcher中定义为一个变量

1
2
3
4
5
6
7
8
9
@get:Synchronized
@get:JvmName("executorService") val executorService: ExecutorService
get() {
if (executorServiceOrNull == null) {
executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
SynchronousQueue(), threadFactory("OkHttp Dispatcher", false))
}
return executorServiceOrNull!!
}

而 executorService.execute(this) 中的 this 指的就是AsyncCall对象,一个实现了Runnable接口的对象

因此,上面的excuteOn方法,其实就是执行AsyncCall的run()方法啊。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
override fun run() {
threadName("OkHttp ${redactedUrl()}") {
var signalledCallback = false
transmitter.timeoutEnter()
try {
val response = getResponseWithInterceptorChain() // 1
signalledCallback = true
responseCallback.onResponse(this@RealCall, response) //2
} catch (e: IOException) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log("Callback failure for ${toLoggableString()}", INFO, e)
} else {
responseCallback.onFailure(this@RealCall, e)
}
} catch (t: Throwable) {
cancel()
if (!signalledCallback) {
val canceledException = IOException("canceled due to $t")
canceledException.addSuppressed(t)
responseCallback.onFailure(this@RealCall, canceledException)
}
throw t
} finally {
client.dispatcher.finished(this)
}
}
}

这里又用到了Kotlin的内联函数,相当于在方法体外面多加了一层try…finally,关于内联函数,大家可以看看官方文档的说明:Kotlin中文网 - 内联函数

1
2
3
4
5
6
7
8
9
10
inline fun threadName(name: String, block: () -> Unit) {
val currentThread = Thread.currentThread()
val oldName = currentThread.name
currentThread.name = name
try {
block()
} finally {
currentThread.name = oldName
}
}

首先我们来看注释1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Throws(IOException::class)
fun getResponseWithInterceptorChain(): Response {
// Build a full stack of interceptors.
val interceptors = mutableListOf<Interceptor>()
interceptors += client.interceptors
interceptors += RetryAndFollowUpInterceptor(client)
interceptors += BridgeInterceptor(client.cookieJar)
interceptors += CacheInterceptor(client.cache)
interceptors += ConnectInterceptor
if (!forWebSocket) {
interceptors += client.networkInterceptors
}
interceptors += CallServerInterceptor(forWebSocket)

val chain = RealInterceptorChain(interceptors, transmitter, null, 0, originalRequest, this,
client.connectTimeoutMillis, client.readTimeoutMillis, client.writeTimeoutMillis)

var calledNoMoreExchanges = false
try {
val response = chain.proceed(originalRequest)
if (transmitter.isCanceled) {
response.closeQuietly()
throw IOException("Canceled")
}
return response
} catch (e: IOException) {
calledNoMoreExchanges = true
throw transmitter.noMoreExchanges(e) as Throwable
} finally {
if (!calledNoMoreExchanges) {
transmitter.noMoreExchanges(null)
}
}
}

这里用到了另外一个设计模式:责任链模式,用来处理整个网络请求中不同的部分,例如失败重试,缓存,连接等等。这些不同的部分都通过拦截器的方式来实现。

chain.proceed(originalRequest)方法中,其实就是调用了RealInterceptorChain.proceed()方法

这个方法的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Throws(IOException::class)
fun proceed(request: Request, transmitter: Transmitter, exchange: Exchange?): Response {
if (index >= interceptors.size) throw AssertionError()

calls++

// If we already have a stream, confirm that the incoming request will use it.
check(this.exchange == null || this.exchange.connection()!!.supportsUrl(request.url)) {
"network interceptor ${interceptors[index - 1]} must retain the same host and port"
}

// If we already have a stream, confirm that this is the only call to chain.proceed().
check(this.exchange == null || calls <= 1) {
"network interceptor ${interceptors[index - 1]} must call proceed() exactly once"
}

// Call the next interceptor in the chain.
val next = RealInterceptorChain(interceptors, transmitter, exchange,
index + 1, request, call, connectTimeout, readTimeout, writeTimeout)
val interceptor = interceptors[index]

@Suppress("USELESS_ELVIS")
val response = interceptor.intercept(next) ?: throw NullPointerException(
"interceptor $interceptor returned null")

// Confirm that the next interceptor made its required call to chain.proceed().
check(exchange == null || index + 1 >= interceptors.size || next.calls == 1) {
"network interceptor $interceptor must call proceed() exactly once"
}

check(response.body != null) { "interceptor $interceptor returned a response with no body" }

return response
}

可以看到,通过index + 1的方法,我们取出下一个拦截器,然后执行里面的intercept方法,这就是proceed方法主要进行的工作。最终把Response返回。得到的这个Response又通过Callback.onResponse回调方法,使得我们可以获取到这个Response对象。而如果中途抛出异常,那么则会回调onFailure方法。

至于我们在上面加入的那些拦截器,详细的说明会放到下一篇文章中讲解,我们也会加入自定义拦截器的例子。毕竟这种情况在实际开发中还是经常会遇到的,例如自定义ConverterFactory来解析后台返回的数据。

总结

最后,让我们再来回顾一下OkHttp是如何发起请求的:

  • 构造一个OkHttpClient对象,这里有许多的变量用于控制请求时候的参数
  • 构造一个Request对象,这里主要是4个要素:url, method, header, body
  • 调用OkHttpClient.newCall(request).enqueue()方法发起请求,在RealCall类中,实现了Call接口,并且持有OkHttpClient和Request对象,还有一个AsyncCall的内部类,这个内部类就是用来发起异步请求的,这个类同时也实现了Runnable接口,它的getResponseWithInterceptorChain()方法通过责任链的模式,把请求相关的拦截器一个个加入到List中,然后再通过RealInterceptorChain的proceed()方法来执行这些不同的拦截器所定义的方法。
  • 最后成功则回调Callback.onResponse, 失败回调Callback.onFailure

还有,到目前为止我们已经发现了OkHttp使用了两个设计模式,分别是:

  • Builder模式
  • 责任链模式

有兴趣的童鞋可以自行上网查找相关资料~

好了,一个简单的请求大致上就是这么个流程,下一篇文章我们继续深入了解OkHttp里面的拦截器~

本文结束啦感谢您的阅读