拦截器的简单使用 自定义一个拦截器需要实现Interceptor接口,接口定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public interface Interceptor { Response intercept (Chain chain) throws IOException ; interface Chain { Request request () ; Response proceed (Request request) throws IOException ; @Nullable Connection connection () ; Call call () ; int connectTimeoutMillis () ; Chain withConnectTimeout (int timeout, TimeUnit unit) ; int readTimeoutMillis () ; Chain withReadTimeout (int timeout, TimeUnit unit) ; int writeTimeoutMillis () ; Chain withWriteTimeout (int timeout, TimeUnit unit) } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public class MyInterceptor implements Interceptor { @Override public Response intercept (Chain chain) throws IOException { Request request = chain.request(); Response response = chain.proceed(request); return response; } }
1 2 3 4 5 6 7 8 9 OkHttpClient client = new OkHttpClient.Builder() .addInterceptor(new MyInterceptor()) .build(); 或 OkHttpClient client = new OkHttpClient.Builder() .addNetworkInterceptor(new MyInterceptor()) .build();
这样okhttp在链式调用拦截器处理请求时就会调用到我们自定义的拦截器,那么addInterceptor(Interceptor)和addNetworkInterceptor(Interceptor)有什么不一样呢?它们一个是添加应用拦截器,一个是添加网络拦截器,主要是调用的时机不一样,更多区别可以参考官方WIKI文档Okhttp-wiki 之 Interceptors 拦截器 ,当我们平时做应用开发使用addInterceptor(Interceptor)就行了。
RealCall :: getResponseWithInterceptorChain() 在上一篇文章知道RealCall的getResponseWithInterceptorChain()是处理、发送请求并且返回响应的地方,我们再看一遍getResponseWithInterceptorChain()方法的源码,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 Response getResponseWithInterceptorChain () throws IOException { List<Interceptor> interceptors = new ArrayList<>(); interceptors.addAll(client.interceptors()); interceptors.add(new RetryAndFollowUpInterceptor(client)); interceptors.add(new BridgeInterceptor(client.cookieJar())); interceptors.add(new CacheInterceptor(client.internalCache())); interceptors.add(new ConnectInterceptor(client)); if (!forWebSocket) { interceptors.addAll(client.networkInterceptors()); } interceptors.add(new CallServerInterceptor(forWebSocket)); Interceptor.Chain chain = new RealInterceptorChain(interceptors, transmitter, null , 0 , originalRequest, this , client.connectTimeoutMillis(), client.readTimeoutMillis(), client.writeTimeoutMillis()); boolean calledNoMoreExchanges = false ; try { Response response = chain.proceed(originalRequest); return response; } }
1、添加拦截器到interceptors列表中 除了添加我们自定义的拦截器外,还添加了默认的拦截器,如下:
2、构造第一个Chain Chain是Interceptor的一个内部接口,它的实现类是RealInterceptorChain,我们要对它的传进来的前6个构造参数有个印象,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public final class RealInterceptorChain implements Interceptor .Chain { public RealInterceptorChain (List<Interceptor> interceptors, Transmitter transmitter, @Nullable Exchange exchange, int index, Request request, Call call, int connectTimeout, int readTimeout, int writeTimeout) { this .interceptors = interceptors; this .transmitter = transmitter; this .exchange = exchange; this .index = index; this .request = request; this .call = call; } }
3、调用Chain的proceed(Request)方法处理请求 实际是RealInterceptorChain的proceed(Request)方法,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public final class RealInterceptorChain implements Interceptor .Chain { @Override public Response proceed (Request request) throws IOException { return proceed(request, transmitter, exchange); } public Response proceed (Request request, Transmitter transmitter, @Nullable Exchange exchange) throws IOException { if (index >= interceptors.size()) throw new AssertionError(); RealInterceptorChain next = new RealInterceptorChain(interceptors, transmitter, exchange, index + 1 , request, call, connectTimeout, readTimeout, writeTimeout); Interceptor interceptor = interceptors.get(index); Response response = interceptor.intercept(next); return response; } }
proceed方法里面首先会再新建一个Chain并且index + 1 作为构造参数传了进去,然后通过index从interceptors列表中获取了一个拦截器,接着就会调用拦截器的intercept方法,并把刚刚新建的Chain作为参数传给拦截器,我们再回顾一下上面所讲的拦截器intercept方法的模板,intercept方法处理完Request逻辑后,会再次调用传入的Chain的proceed(Request)方法,这样又会重复Chain的proceed方法中的逻辑,由于index已经加1了,所以这次Chain就会通过index获取下一个拦截器,并调用下一个拦截器的intercept(Chain)方法,然后如此循环重复下去,这样就把每个拦截器通过一个个Chain连接起来,形成一条链,把Request沿着链传递下去,直到请求被处理,然后返回Response,响应同样的沿着链传递上去,如下:
RetryAndFollowUpInterceptor 在自定义拦截器的时候就讲过,Interceptor的intercept(Chain)方法就是拦截器的拦截实现,RetryAndFollowUpInterceptor的intercept(Chain)方法如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 @Override public Response intercept (Chain chain) throws IOException { Request request = chain.request(); RealInterceptorChain realChain = (RealInterceptorChain) chain; Transmitter transmitter = realChain.transmitter(); int followUpCount = 0 ; Response priorResponse = null ; while (true ) { transmitter.prepareToConnect(request); if (transmitter.isCanceled()) { throw new IOException("Canceled" ); } Response response; boolean success = false ; try { response = realChain.proceed(request, transmitter, null ); success = true ; }catch (RouteException e) { if (!recover(e.getLastConnectException(), transmitter, false , request)) { throw e.getFirstConnectException(); } continue ; } catch (IOException e) { boolean requestSendStarted = !(e instanceof ConnectionShutdownException); if (!recover(e, transmitter, requestSendStarted, request)) throw e; continue ; } finally { if (!success) { transmitter.exchangeDoneDueToException(); } } Request followUp = followUpRequest(response, route); if (followUp == null ) { return response; } if (++followUpCount > MAX_FOLLOW_UPS) { throw new ProtocolException("Too many follow-up requests: " + followUpCount); } request = followUp; priorResponse = response; } }
1、Transmitter 在整个方法的流程中出现了一个Transmitter,这里介绍一下,它是okhttp中应用层和网络层的桥梁,管理同一个Cal的所有连接、请求、响应和IO流之间的关系,它在RealCall创建后就被创建了,如下:
1 2 3 4 5 6 7 8 static RealCall newRealCall (OkHttpClient client, Request originalRequest, boolean forWebSocket) { RealCall call = new RealCall(client, originalRequest, forWebSocket); call.transmitter = new Transmitter(client, call); return call; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public final class Transmitter { private final OkHttpClient client; private final RealConnectionPool connectionPool; public RealConnection connection; private ExchangeFinder exchangeFinder; private @Nullable Exchange exchange; private final Call call; public Transmitter (OkHttpClient client, Call call) { this .client = client; this .connectionPool = Internal.instance.realConnectionPool(client.connectionPool()); this .call = call; this .eventListener = client.eventListenerFactory().create(call); this .timeout.timeout(client.callTimeoutMillis(), MILLISECONDS); } public void prepareToConnect (Request request) { if (this .request != null ) { if (sameConnection(this .request.url(), request.url()) && exchangeFinder.hasRouteToTry()) { return ; } } this .request = request; this .exchangeFinder = new ExchangeFinder(this , connectionPool, createAddress(request.url()), call, eventListener); } }
BridgeInterceptor BridgeInterceptor的intercept(Chain)方法如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 @Override public Response intercept (Chain chain) throws IOException { Request userRequest = chain.request(); Request.Builder requestBuilder = userRequest.newBuilder(); RequestBody body = userRequest.body(); if (body != null ) { MediaType contentType = body.contentType(); if (contentType != null ) { requestBuilder.header("Content-Type" , contentType.toString()); } long contentLength = body.contentLength(); if (contentLength != -1 ) { requestBuilder.header("Content-Length" , Long.toString(contentLength)); requestBuilder.removeHeader("Transfer-Encoding" ); } else { requestBuilder.header("Transfer-Encoding" , "chunked" ); requestBuilder.removeHeader("Content-Length" ); } } if (userRequest.header("Host" ) == null ) { requestBuilder.header("Host" , hostHeader(userRequest.url(), false )); } if (userRequest.header("Connection" ) == null ) { requestBuilder.header("Connection" , "Keep-Alive" ); } boolean transparentGzip = false ; if (userRequest.header("Accept-Encoding" ) == null && userRequest.header("Range" ) == null ) { transparentGzip = true ; requestBuilder.header("Accept-Encoding" , "gzip" ); } List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url()); if (!cookies.isEmpty()) { requestBuilder.header("Cookie" , cookieHeader(cookies)); } if (userRequest.header("User-Agent" ) == null ) { requestBuilder.header("User-Agent" , Version.userAgent()); } Response networkResponse = chain.proceed(requestBuilder.build()); HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers()); Response.Builder responseBuilder = networkResponse.newBuilder() .request(userRequest); if (transparentGzip && "gzip" .equalsIgnoreCase(networkResponse.header("Content-Encoding" )) && HttpHeaders.hasBody(networkResponse)) { GzipSource responseBody = new GzipSource(networkResponse.body().source()); Headers strippedHeaders = networkResponse.headers().newBuilder() .removeAll("Content-Encoding" ) .removeAll("Content-Length" ) .build(); responseBuilder.headers(strippedHeaders); String contentType = networkResponse.header("Content-Type" ); responseBuilder.body(new RealResponseBody(contentType, -1L , Okio.buffer(responseBody))); } return responseBuilder.build(); }
BridgeInterceptor中的逻辑是在所有默认拦截器中是最简单,它主要就是对Request或Response的header做了一些处理,把用户构造的Request转换为发送给服务器的Request,还有把服务器返回的Response转换为对用户友好的Response。例如,对于Request,当开发者没有添加Accept-Encoding时,它会自动添加Accept-Encoding : gzip,表示客户端支持使用gzip;对于Response,当Content-Encoding是gzip方式并且客户端是自动添加gzip支持时,它会移除Content-Encoding、Content-Length,然后重新解压缩响应的内容。
CacheInterceptor CacheInterceptor的intercept(Chain)方法如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 @Override public Response intercept (Chain chain) throws IOException { Response cacheCandidate = cache != null ? cache.get(chain.request()) : null ; long now = System.currentTimeMillis(); CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get(); Request networkRequest = strategy.networkRequest; Response cacheResponse = strategy.cacheResponse; if (cacheCandidate != null && cacheResponse == null ) { closeQuietly(cacheCandidate.body()); } if (networkRequest == null && cacheResponse == null ) { return new Response.Builder() .request(chain.request()) .protocol(Protocol.HTTP_1_1) .code(504 ) .message("Unsatisfiable Request (only-if-cached)" ) .body(Util.EMPTY_RESPONSE) .sentRequestAtMillis(-1L ) .receivedResponseAtMillis(System.currentTimeMillis()) .build(); } if (networkRequest == null ) { return cacheResponse.newBuilder() .cacheResponse(stripBody(cacheResponse)) .build(); } Response networkResponse = null ; try { networkResponse = chain.proceed(networkRequest); } finally { } if (cacheResponse != null ) { if (networkResponse.code() == HTTP_NOT_MODIFIED) { Response response = cacheResponse.newBuilder() .headers(combine(cacheResponse.headers(), networkResponse.headers())) .sentRequestAtMillis(networkResponse.sentRequestAtMillis()) .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis()) .cacheResponse(stripBody(cacheResponse)) .networkResponse(stripBody(networkResponse)) .build(); networkResponse.body().close(); cache.trackConditionalCacheHit(); cache.update(cacheResponse, response); return response; } else { closeQuietly(cacheResponse.body()); } } Response response = networkResponse.newBuilder() .cacheResponse(stripBody(cacheResponse)) .networkResponse(stripBody(networkResponse)) .build(); if (cache != null ) { if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) { CacheRequest cacheRequest = cache.put(response); return cacheWritingResponse(cacheRequest, response); } if (HttpMethod.invalidatesCache(networkRequest.method())) { try { cache.remove(networkRequest); } catch (IOException ignored) { } } } return response; }
1、Cache - 缓存实现 Cache是okhttp中缓存的实现,内部使用了DiskLruCache,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 public final class Cache implements Closeable , Flushable { final DiskLruCache cache; final InternalCache internalCache = new InternalCache() { @Override public @Nullable Response get (Request request) throws IOException { return Cache.this .get(request); } @Override public @Nullable CacheRequest put (Response response) throws IOException { return Cache.this .put(response); } @Override public void remove (Request request) throws IOException { Cache.this .remove(request); } @Override public void update (Response cached, Response network) { Cache.this .update(cached, network); } @Override public void trackConditionalCacheHit () { Cache.this .trackConditionalCacheHit(); } @Override public void trackResponse (CacheStrategy cacheStrategy) { Cache.this .trackResponse(cacheStrategy); } } public Cache (File directory, long maxSize) { this (directory, maxSize, FileSystem.SYSTEM); } Cache(File directory, long maxSize, FileSystem fileSystem) { this .cache = DiskLruCache.create(fileSystem, directory, VERSION, ENTRY_COUNT, maxSize); } @Nullable Response get (Request request) { } @Nullable CacheRequest put (Response response) { } void remove (Request request) throws IOException { } void update (Response cached, Response network) { } synchronized void trackConditionalCacheHit () { } synchronized void trackResponse (CacheStrategy cacheStrategy) { } @Override public void flush () throws IOException { } @Override public void close () throws IOException { } }
Cache中有一个内部实现类InternalCache,见名知意,它是okhttp内部使用的,它实现了InternalCache接口,接口中的方法都和Cache中的方法同名,而且这个实现类的所有方法都是调用了Cache中相应的方法,也就是说InternalCache的方法实现和Cache相应的方法一样,但Cache和InternalCache不一样的是,Cache比InternalCache多了一些方法供外部调用如flush()、 close()等,提供了更多对缓存的控制,而InternalCache中的方法都只是缓存的基本操作,如get、put、remove、update等方法,这些方法的逻辑都是基于Cache中的DiskLruCache实现,详情可以看DiskLruCache 的原理实现。
1 2 3 4 5 6 7 8 9 File cacheDir = new File(Constant.PATH_NET_CACHE); Cache cache = new Cache(cacheDir, 1024 * 1024 * 10 ); OkHttpClient client = new OkHttpClient.Builder() .cache(cache) .build();
1 2 3 4 5 6 7 8 public final class CacheInterceptor implements Interceptor { final @Nullable InternalCache cache; public CacheInterceptor (@Nullable InternalCache cache) { this .cache = cache; } }
可用看到它是InternalCache实例,在 getResponseWithInterceptorChain()中添加拦截器时就通过client为这个InternalCache赋值了,如下:
1 2 3 4 5 6 7 Response getResponseWithInterceptorChain () throws IOException { interceptors.add(new CacheInterceptor(client.internalCache())); }
注意到new CacheInterceptor(client.internalCache()),所以我们看client的internalCache方法,如下:
1 2 3 4 @Nullable InternalCache internalCache () { return cache != null ? cache.internalCache : internalCache; }
2、CacheStrategy - 缓存策略 CacheStrategy是okhttp缓存策略的实现,okhttp缓存策略遵循了HTTP缓存策略,因此了解okhttp缓存策略前需要有HTTP缓存相关基础:HTTP 协议缓存机制详解 ,了解了HTTP缓存策略后,我们再来看CacheStrategy,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 public final class CacheStrategy { public final @Nullable Request networkRequest; public final @Nullable Response cacheResponse; CacheStrategy(Request networkRequest, Response cacheResponse) { this .networkRequest = networkRequest; this .cacheResponse = cacheResponse; } public static class Factory { final long nowMillis; final Request request; final Response cacheResponse; public Factory (long nowMillis, Request request, Response cacheResponse) { this .nowMillis = nowMillis; this .request = request; this .cacheResponse = cacheResponse; } public CacheStrategy get () { CacheStrategy candidate = getCandidate(); return candidate; } } }
CacheStrategy是通过工厂模式 创建的,它有两个主要的成员变量:networkRequest、cacheResponse,CacheInterceptor的intercept方法通过CacheStrategy的networkRequest和cacheResponse的组合来判断执行什么策略,networkRequest是否为空决定是否请求网络,cacheResponse是否为空决定是否使用缓存,networkRequest和cacheResponse的4种组合和对应的缓存策略如下:
networkRequest和cacheResponse在创建CacheStrategy时通过构造参数赋值,那么CacheStrategy在那里被创建呢?当调用CacheStrategy.Factory(long, Request, Response).get()时就会返回一个CacheStrategy实例,所以CacheStrategy在Factory的get方法中被创建,我们来看Factory的get方法,如下:
1 2 3 4 5 6 public CacheStrategy get () { CacheStrategy candidate = getCandidate(); return candidate; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 private CacheStrategy getCandidate () { if (cacheResponse == null ) { return new CacheStrategy(request, null ); } if (request.isHttps() && cacheResponse.handshake() == null ) { return new CacheStrategy(request, null ); } if (!isCacheable(cacheResponse, request)) { return new CacheStrategy(request, null ); } CacheControl requestCaching = request.cacheControl(); if (requestCaching.noCache() || hasConditions(request)) { return new CacheStrategy(request, null ); } CacheControl responseCaching = cacheResponse.cacheControl(); long ageMillis = cacheResponseAge(); long freshMillis = computeFreshnessLifetime(); if (requestCaching.maxAgeSeconds() != -1 ) freshMillis = Math.min(freshMillis, SECONDS.toMillis(requestCaching.maxAgeSeconds())); } long minFreshMillis = 0 ; if (requestCaching.minFreshSeconds() != -1 ) { minFreshMillis = SECONDS.toMillis(requestCaching.minFreshSeconds()); } long maxStaleMillis = 0 ; if (!responseCaching.mustRevalidate() && requestCaching.maxStaleSeconds() != -1 ) { maxStaleMillis = SECONDS.toMillis(requestCaching.maxStaleSeconds()); } if (!responseCaching.noCache() && ageMillis + minFreshMillis < freshMillis + maxStaleMillis) { Response.Builder builder = cacheResponse.newBuilder(); if (ageMillis + minFreshMillis >= freshMillis) { builder.addHeader("Warning" , "110 HttpURLConnection \"Response is stale\"" ); } long oneDayMillis = 24 * 60 * 60 * 1000L ; if (ageMillis > oneDayMillis && isFreshnessLifetimeHeuristic()) { builder.addHeader("Warning" , "113 HttpURLConnection \"Heuristic expiration\"" ); } return new CacheStrategy(null , builder.build()); } String conditionName; String conditionValue; if (etag != null ) { conditionName = "If-None-Match" ; conditionValue = etag; } else if (lastModified != null ) { conditionName = "If-Modified-Since" ; conditionValue = lastModifiedString; } else if (servedDate != null ) { conditionName = "If-Modified-Since" ; conditionValue = servedDateString; } else { return new CacheStrategy(request, null ); } Headers.Builder conditionalRequestHeaders = request.headers().newBuilder(); Internal.instance.addLenient(conditionalRequestHeaders, conditionName, conditionValue); Request conditionalRequest = request.newBuilder() .headers(conditionalRequestHeaders.build()) .build(); return new CacheStrategy(conditionalRequest, cacheResponse); }
getCandidate()方法中根据HTTP的缓存策略 决定networkRequest和cacheResponse的组合,从getCandidate()方法中我们可以看到HTTP的缓存策略分为两种:
1、强制缓存:客户端参与决策决定是否继续使用缓存 ,客户端第一次请求数据时,服务端返回了缓存的过期时间:Expires或Cache-Control,当客户端再次请求时,就判断缓存的过期时间,没有过期就可以继续使用缓存,否则就不使用,重新请求服务端。
2、对比缓存:服务端参与决策决定是否继续使用缓存 ,客户端第一次请求数据时,服务端会将缓存标识:Last-Modified/If-Modified-Since、Etag/If-None-Match和数据一起返回给客户端 ,当客户端再次请求时,客户端将缓存标识发送给服务端,服务端根据缓存标识进行判断,如果缓存还没有更新,可以使用,则返回304,表示客户端可以继续使用缓存,否则客户端不能继续使用缓存,只能使用服务器返回的新的响应。
而且强制缓存优先于对比缓存,我们再贴出来自HTTP 协议缓存机制详解 的一张图,它很好的解释了getCandidate()方法中1~5步骤流程,如下:
3、缓存机制 我们再回到CacheInterceptor的intercept方法,它的1~4步骤就是CacheStrategy的networkRequest和cacheResponse的4种组合情况,都有详细的注释,每一种组合对应一种缓存策略,而缓存策略又是基于getCandidate()方法中写死的HTTP缓存策略,再结合okhttp本地缓存的实现Cache,我们得出结论:okhttp的缓存机制 = Cache缓存实现 + 基于HTTP的缓存策略 ,整个流程图如下:
ConnectInterceptor ConnectInterceptor的intercept(Chain)方法如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Override public Response intercept (Chain chain) throws IOException { RealInterceptorChain realChain = (RealInterceptorChain) chain; Request request = realChain.request(); Transmitter transmitter = realChain.transmitter(); boolean doExtensiveHealthChecks = !request.method().equals("GET" ); Exchange exchange = transmitter.newExchange(chain, doExtensiveHealthChecks); return realChain.proceed(request, transmitter, exchange); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 Exchange newExchange (Interceptor.Chain chain, boolean doExtensiveHealthChecks) { ExchangeCodec codec = exchangeFinder.find(client, chain, doExtensiveHealthChecks); Exchange result = new Exchange(this , call, eventListener, exchangeFinder, codec); return result; }
1、RealConnection - 连接实现 连接的真正实现,实现了Connection接口,内部利用Socket建立连接,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 public interface Connection { Route route () ; Socket socket () ; @Nullable Handshake handshake () ; Protocol protocol () ; } public final class RealConnection extends Http2Connection .Listener implements Connection { public final RealConnectionPool connectionPool; private final Route route; private Socket rawSocket; private Socket socket; private Handshake handshake; private Protocol protocol; private Http2Connection http2Connection; private BufferedSource source; private BufferedSink sink; public RealConnection (RealConnectionPool connectionPool, Route route) { this .connectionPool = connectionPool; this .route = route; } public void connect (int connectTimeout, int readTimeout, int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled, Call call, EventListener eventListener) { } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 public void connect (int connectTimeout, int readTimeout, int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled, Call call, EventListener eventListener) { if (protocol != null ) throw new IllegalStateException("already connected" ); RouteException routeException = null ; List<ConnectionSpec> connectionSpecs = route.address().connectionSpecs(); ConnectionSpecSelector connectionSpecSelector = new ConnectionSpecSelector(connectionSpecs); if (route.address().sslSocketFactory() == null ) { if (!connectionSpecs.contains(ConnectionSpec.CLEARTEXT)) { throw new RouteException(new UnknownServiceException( "CLEARTEXT communication not enabled for client" )); } String host = route.address().url().host(); if (!Platform.get().isCleartextTrafficPermitted(host)) { throw new RouteException(new UnknownServiceException( "CLEARTEXT communication to " + host + " not permitted by network security policy" )); } } else { if (route.address().protocols().contains(Protocol.H2_PRIOR_KNOWLEDGE)) { throw new RouteException(new UnknownServiceException( "H2_PRIOR_KNOWLEDGE cannot be used with HTTPS" )); } } while (true ) { try { if (route.requiresTunnel()) { connectTunnel(connectTimeout, readTimeout, writeTimeout, call, eventListener); if (rawSocket == null ) { break ; } } else { connectSocket(connectTimeout, readTimeout, call, eventListener); } establishProtocol(connectionSpecSelector, pingIntervalMillis, call, eventListener); break ; } if (http2Connection != null ) { synchronized (connectionPool) { allocationLimit = http2Connection.maxConcurrentStreams(); } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 private void connectSocket (int connectTimeout, int readTimeout, Call call, EventListener eventListener) throws IOException { Proxy proxy = route.proxy(); Address address = route.address(); rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP ? address.socketFactory().createSocket() : new Socket(proxy); eventListener.connectStart(call, route.socketAddress(), proxy); rawSocket.setSoTimeout(readTimeout); try { Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout); } try { source = Okio.buffer(Okio.source(rawSocket)); sink = Okio.buffer(Okio.sink(rawSocket)); } }
我们关注注释1,Platform是okhttp中根据不同Android版本平台的差异实现的一个兼容类,这里就不细究,Platform的connectSocket方法最终会调用rawSocket的connect()方法建立其Socket连接,建立Socket连接后,就可以通过Socket连接获得输入输出流source和sink,okhttp就可以从source读取或往sink写入数据,source和sink是BufferedSource和BufferedSink类型,它们是来自于okio库 ,它是一个封装了java.io和java.nio的库,okhttp底层依赖这个库读写数据,Okio好在哪里?详情可以看这篇文章Okio好在哪 。
2、RealConnectionPool - 连接池 连接池,用来管理连接对象RealConnection,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 public final class RealConnectionPool { private static final Executor executor = new ThreadPoolExecutor( 0 , Integer.MAX_VALUE , 60L , TimeUnit.SECONDS, new SynchronousQueue<>(), Util.threadFactory("OkHttp ConnectionPool" , true )); boolean cleanupRunning; private final Runnable cleanupRunnable = () -> { while (true ) { long waitNanos = cleanup(System.nanoTime()); if (waitNanos == -1 ) return ; if (waitNanos > 0 ) { long waitMillis = waitNanos / 1000000L ; waitNanos -= (waitMillis * 1000000L ); synchronized (RealConnectionPool.this ) { try { RealConnectionPool.this .wait(waitMillis, (int ) waitNanos); } catch (InterruptedException ignored) { } } } } }; private final Deque<RealConnection> connections = new ArrayDeque<>(); void put (RealConnection connection) { if (!cleanupRunning) { cleanupRunning = true ; executor.execute(cleanupRunnable); } connections.add(connection); } long cleanup (long now) { } }
RealConnectionPool 在内部维护了一个线程池,用来执行清理连接任务cleanupRunnable,还维护了一个双端队列connections,用来缓存已经创建的连接。要知道创建一次连接要经历TCP握手,如果是HTTPS还要经历TLS握手,握手的过程都是耗时的,所以为了提高效率,就需要connections来对连接进行缓存,从而可以复用;还有如果连接使用完毕,长时间不释放,也会造成资源的浪费,所以就需要cleanupRunnable定时清理无用的连接,okhttp支持5个并发连接,默认每个连接keepAlive为5分钟,keepAlive就是连接空闲后,保持存活的时间。
当我们第一次调用RealConnectionPool 的put方法缓存新建连接时,如果cleanupRunnable还没执行,它首先会使用线程池执行cleanupRunnable,然后把新建连接放入双端队列,cleanupRunnable中会调用cleanup方法进行连接的清理,该方法返回现在到下次清理的时间间隔,然后调用wiat方法进入等待状态,等时间到了后,再次调用cleanup方法进行清理,就这样往复循环。我们来看一下cleanup方法的清理逻辑:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 long cleanup (long now) { int inUseConnectionCount = 0 ; int idleConnectionCount = 0 ; RealConnection longestIdleConnection = null ; long longestIdleDurationNs = Long.MIN_VALUE; synchronized (this ) { for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) { RealConnection connection = i.next(); if (pruneAndGetAllocationCount(connection, now) > 0 ) { inUseConnectionCount++; continue ; } idleConnectionCount++; long idleDurationNs = now - connection.idleAtNanos; if (idleDurationNs > longestIdleDurationNs) { longestIdleDurationNs = idleDurationNs; longestIdleConnection = connection; } } if (longestIdleDurationNs >= this .keepAliveDurationNs || idleConnectionCount > this .maxIdleConnections) { connections.remove(longestIdleConnection); } else if (idleConnectionCount > 0 ) { return keepAliveDurationNs - longestIdleDurationNs; } else if (inUseConnectionCount > 0 ) { return keepAliveDurationNs; } else { cleanupRunning = false ; return -1 ; } } closeQuietly(longestIdleConnection.socket()); return 0 ; }
2.1、如果longestIdleConnection的空闲时间大于最大空闲时长 或 空闲连接数大于最大空闲连接数,那么该连接就会被从队列中移除,然后关闭该连接的socket,返回0,立即再次进行清理;
2.2、如果空闲连接数小于5个 并且 longestIdleConnection的空闲时间小于最大空闲时长即还没到期清理,那么返回该连接的到期时间,下次再清理;
2.3、如果没有空闲连接 且 所有连接都还在使用,那么返回默认的keepAlive时间,5分钟后再清理;
3、连接创建(连接机制) ExchangeFinder的fing方法如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public ExchangeCodec find ( OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) { try { RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks); return resultConnection.newCodec(client, chain); } } private RealConnection findHealthyConnection (int connectTimeout, int readTimeout, int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled, boolean doExtensiveHealthChecks) throws IOException { while (true ) { RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis, connectionRetryEnabled); if (!candidate.isHealthy(doExtensiveHealthChecks)) { candidate.noNewExchanges(); continue ; } return candidate; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 private RealConnection findConnection (int connectTimeout, int readTimeout, int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException { boolean foundPooledConnection = false ; RealConnection result = null ; Route selectedRoute = null ; RealConnection releasedConnection; Socket toClose; synchronized (connectionPool) { if (transmitter.isCanceled()) throw new IOException("Canceled" ); hasStreamFailure = false ; . releasedConnection = transmitter.connection; toClose = transmitter.connection != null && transmitter.connection.noNewExchanges ? transmitter.releaseConnectionNoEvents() : null ; if (transmitter.connection != null ) { result = transmitter.connection; releasedConnection = null ; } if (result == null ) { if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, null , false )) { foundPooledConnection = true ; result = transmitter.connection; } else if (nextRouteToTry != null ) { selectedRoute = nextRouteToTry; nextRouteToTry = null ; } else if (retryCurrentRoute()) { selectedRoute = transmitter.connection.route(); } } } closeQuietly(toClose); if (result != null ) { return result; } boolean newRouteSelection = false ; if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) { newRouteSelection = true ; routeSelection = routeSelector.next(); } List<Route> routes = null ; synchronized (connectionPool) { if (transmitter.isCanceled()) throw new IOException("Canceled" ); if (newRouteSelection) { routes = routeSelection.getAll(); if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, routes, false )) { foundPooledConnection = true ; result = transmitter.connection; } } if (!foundPooledConnection) { if (selectedRoute == null ) { selectedRoute = routeSelection.next(); } result = new RealConnection(connectionPool, selectedRoute); connectingConnection = result; } } if (foundPooledConnection) { eventListener.connectionAcquired(call, result); return result; } result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis, connectionRetryEnabled, call, eventListener); connectionPool.routeDatabase.connected(result.route()); Socket socket = null ; synchronized (connectionPool) { connectingConnection = null ; if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, routes, true )) { result.noNewExchanges = true ; socket = result.socket(); result = transmitter.connection; } else { connectionPool.put(result); transmitter.acquireConnectionNoEvents(result); } } closeQuietly(socket); eventListener.connectionAcquired(call, result); return result; }
CallServerInterceptor CallServerInterceptor的intercept(Chain)方法如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 @Override public Response intercept (Chain chain) throws IOException { RealInterceptorChain realChain = (RealInterceptorChain) chain; Exchange exchange = realChain.exchange(); Request request = realChain.request(); exchange.writeRequestHeaders(request); boolean responseHeadersStarted = false ; Response.Builder responseBuilder = null ; if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null ) { if (responseBuilder == null ) { if (request.body().isDuplex()) { exchange.flushRequest(); BufferedSink bufferedRequestBody = Okio.buffer( exchange.createRequestBody(request, true )); request.body().writeTo(bufferedRequestBody); } else { BufferedSink bufferedRequestBody = Okio.buffer( exchange.createRequestBody(request, false )); request.body().writeTo(bufferedRequestBody); bufferedRequestBody.close(); } } else { } } else { exchange.noRequestBody(); } if (responseBuilder == null ) { responseBuilder = exchange.readResponseHeaders(false ); } Response response = responseBuilder .request(request) .handshake(exchange.connection().handshake()) .sentRequestAtMillis(sentRequestMillis) .receivedResponseAtMillis(System.currentTimeMillis()) .build(); if (forWebSocket && code == 101 ) { response = response.newBuilder() .body(Util.EMPTY_RESPONSE) .build(); } else { response = response.newBuilder() .body(exchange.openResponseBody(response)) .build(); } return response; }
结语 结合上一篇文章,我们对okhttp已经有了一个深入的了解,首先,我们会在请求的时候初始化一个Call的实例,然后执行它的execute()方法或enqueue()方法,内部最后都会执行到getResponseWithInterceptorChain()方法,这个方法里面通过拦截器组成的责任链,依次经过用户自定义普通拦截器、重试拦截器、桥接拦截器、缓存拦截器、连接拦截器和用户自定义网络拦截器和访问服务器拦截器等拦截处理过程,来获取到一个响应并交给用户。okhttp的请求流程、缓存机制和连接机制是当中的重点,在阅读源码的过程中也学习到很多东西,下一次就来分析它的搭档Retrofit。