其实之前很早就看过dubbo源码中关于超时这部分的处理逻辑,但是没有记录下来,最近在某脉上看到有人问了这个问题,想着再回顾一下。
开始
从dubbo的请求开始,看看dubbo(2.6.6)在超时这块是怎么处理的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeChannel#request(java.lang.Object, int) @Override public ResponseFuture request(Object request, int timeout) throws RemotingException { if (closed) { throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!"); } Request req = new Request(); req.setVersion(Version.getProtocolVersion()); req.setTwoWay(true); req.setData(request); DefaultFuture future = new DefaultFuture(channel, req, timeout); try { channel.send(req); } catch (RemotingException e) { future.cancel(); throw e; } return future; }
|
DefaultFuture
从返回值ResponseFuture类型可以看出,这是一个异步方法(不等同于Dubbo的异步调用)。那么调用超时的关键可以从ResponseFuture来看:
1 2 3 4 5 6 7 8 9 10
| public interface ResponseFuture {
Object get() throws RemotingException;
Object get(int timeoutInMillis) throws RemotingException;
void setCallback(ResponseCallback callback);
boolean isDone(); }
|
可以看到这是一个接口,从request方法可以得知实现类是DefaultFuture,从构造函数入手:
1 2 3 4 5 6 7 8 9
| public DefaultFuture(Channel channel, Request request, int timeout) { this.channel = channel; this.request = request; this.id = request.getId(); this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); FUTURES.put(id, this); CHANNELS.put(id, channel); }
|
可以得知每一个DefaultFuture都有一个id,并且等于requestId,timeout是从url中获取的配置,没有时默认1000ms。
从代码的注释可以看到FUTURES这个map应该就是关键,是一个waiting map。
DefaultFuture中还有一个方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| public static void received(Channel channel, Response response) { try { DefaultFuture future = FUTURES.remove(response.getId()); if (future != null) { future.doReceived(response); } else { logger.warn("The timeout response finally returned at " + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())) + ", response " + response + (channel == null ? "" : ", channel: " + channel.getLocalAddress() + " -> " + channel.getRemoteAddress())); } } finally { CHANNELS.remove(response.getId()); } }
|
可以看到调用的地方为:
com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeHandler#received
1 2 3 4 5 6 7 8
| @Override public void received(Channel channel, Object message) throws RemotingException { } else if (message instanceof Response) { handleResponse(channel, (Response) message); } }
|
com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeHandler#handleResponse
1 2 3 4 5
| static void handleResponse(Channel channel, Response response) throws RemotingException { if (response != null && !response.isHeartbeat()) { DefaultFuture.received(channel, response); } }
|
回到DefaultFuture.received,可以看到通过Response id从FUTURES中拿了一个DefaultFuture出来,然后调用了doReceived方法,也就是说Response id和Request id 相同。结下来看看doReceived做了什么:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| private void doReceived(Response res) { lock.lock(); try { response = res; if (done != null) { done.signal(); } } finally { lock.unlock(); } if (callback != null) { invokeCallback(callback); } }
|
首先是加锁,然后通过唤醒了阻塞在Condition上的线程。看看什么地方会阻塞在done这个条件上:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| @Override public Object get(int timeout) throws RemotingException { if (timeout <= 0) { timeout = Constants.DEFAULT_TIMEOUT; } if (!isDone()) { long start = System.currentTimeMillis(); lock.lock(); try { while (!isDone()) { done.await(timeout, TimeUnit.MILLISECONDS); if (isDone() || System.currentTimeMillis() - start > timeout) { break; } } } catch (InterruptedException e) { throw new RuntimeException(e); } finally { lock.unlock(); } if (!isDone()) { throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false)); } } return returnFromResponse(); }
|
是get方法,get方法确实在request请求后被调用:
1
| (Result) currentClient.request(inv, timeout).get()
|
可以看到get方法的大致逻辑为,先获取锁,然后循环判断isDone,并阻塞等到条件,当条件超时,如果任务完成,或者超过timeout结束循环,接着判断isDone,如果超时抛出TimeoutException。并且通过sent(request请求时间)是否>0()来判断是clientSide还是serverSide超时。
isDone逻辑如下:
1 2 3 4
| @Override public boolean isDone() { return response != null; }
|
如果是正常Response,也有可能是超时的现象,可以看到get方法最后调用了一个函数:
1 2 3 4 5 6 7 8 9 10 11 12 13
| private Object returnFromResponse() throws RemotingException { Response res = response; if (res == null) { throw new IllegalStateException("response cannot be null"); } if (res.getStatus() == Response.OK) { return res.getResult(); } if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) { throw new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()); } throw new RemotingException(channel, res.getErrorMessage()); }
|
TIMEOUT SIDE
SERVER_TIMEOUT(服务端超时): 这个就是正常的我们消费端请求一个RPC接口,服务端由于性能等一些原因处理时间超过了timeout配置时间。
CLIENT_TIMEOUT:我们可以看到是通过sent这个来判断是否clientTimeout,那么这个sent什么时候改变呢?就在发送请求的地方:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| @Override public void sent(Channel channel, Object message) throws RemotingException { Throwable exception = null; try { channel.setAttribute(KEY_WRITE_TIMESTAMP, System.currentTimeMillis()); ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel); try { handler.sent(exchangeChannel, message); } finally { HeaderExchangeChannel.removeChannelIfDisconnected(channel); } } catch (Throwable t) { exception = t; } if (message instanceof Request) { Request request = (Request) message; DefaultFuture.sent(channel, request); } if (exception != null) { if (exception instanceof RuntimeException) { throw (RuntimeException) exception; } else if (exception instanceof RemotingException) { throw (RemotingException) exception; } else { throw new RemotingException(channel.getLocalAddress(), channel.getRemoteAddress(), exception.getMessage(), exception); } } }
|
也就是说handler.sent一旦调用成功返回,那么就不算clientSide Timeout了。那么CLIENT_TIMEOUT大概率就是由于client端网络,系统等原因超时。