Dubbo调用超时那些事儿

其实之前很早就看过dubbo源码中关于超时这部分的处理逻辑,但是没有记录下来,最近在某脉上看到有人问了这个问题,想着再回顾一下。

开始

从dubbo的请求开始,看看dubbo(2.6.6)在超时这块是怎么处理的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeChannel#request(java.lang.Object, int) 
@Override
public ResponseFuture request(Object request, int timeout) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
}
// create request.
Request req = new Request();
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
req.setData(request);
DefaultFuture future = new DefaultFuture(channel, req, timeout);
try {
channel.send(req);
} catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}

DefaultFuture

从返回值ResponseFuture类型可以看出,这是一个异步方法(不等同于Dubbo的异步调用)。那么调用超时的关键可以从ResponseFuture来看:

1
2
3
4
5
6
7
8
9
10
public interface ResponseFuture {

Object get() throws RemotingException;

Object get(int timeoutInMillis) throws RemotingException;

void setCallback(ResponseCallback callback);

boolean isDone();
}

可以看到这是一个接口,从request方法可以得知实现类是DefaultFuture,从构造函数入手:

1
2
3
4
5
6
7
8
9
 public DefaultFuture(Channel channel, Request request, int timeout) {
this.channel = channel;
this.request = request;
this.id = request.getId();
this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
// put into waiting map.
FUTURES.put(id, this);
CHANNELS.put(id, channel);
}

可以得知每一个DefaultFuture都有一个id,并且等于requestId,timeout是从url中获取的配置,没有时默认1000ms。

从代码的注释可以看到FUTURES这个map应该就是关键,是一个waiting map。

DefaultFuture中还有一个方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public static void received(Channel channel, Response response) {
try {
DefaultFuture future = FUTURES.remove(response.getId());
if (future != null) {
future.doReceived(response);
} else {
logger.warn("The timeout response finally returned at "
+ (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
+ ", response " + response
+ (channel == null ? "" : ", channel: " + channel.getLocalAddress()
+ " -> " + channel.getRemoteAddress()));
}
} finally {
CHANNELS.remove(response.getId());
}
}

可以看到调用的地方为:

com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeHandler#received

1
2
3
4
5
6
7
8
@Override
public void received(Channel channel, Object message) throws RemotingException {
//省略一些代码
} else if (message instanceof Response) {
handleResponse(channel, (Response) message);
//省略一些代码
}
}

com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeHandler#handleResponse

1
2
3
4
5
static void handleResponse(Channel channel, Response response) throws RemotingException {
if (response != null && !response.isHeartbeat()) {
DefaultFuture.received(channel, response);
}
}

回到DefaultFuture.received,可以看到通过Response id从FUTURES中拿了一个DefaultFuture出来,然后调用了doReceived方法,也就是说Response id和Request id 相同。结下来看看doReceived做了什么:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private void doReceived(Response res) {
lock.lock();
try {
response = res;
if (done != null) {
done.signal();
}
} finally {
lock.unlock();
}
if (callback != null) {
invokeCallback(callback);
}
}

首先是加锁,然后通过唤醒了阻塞在Condition上的线程。看看什么地方会阻塞在done这个条件上:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Override
public Object get(int timeout) throws RemotingException {
if (timeout <= 0) {
timeout = Constants.DEFAULT_TIMEOUT;
}
if (!isDone()) {
long start = System.currentTimeMillis();
lock.lock();
try {
while (!isDone()) {
done.await(timeout, TimeUnit.MILLISECONDS);
if (isDone() || System.currentTimeMillis() - start > timeout) {
break;
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
if (!isDone()) {
throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
}
}
return returnFromResponse();
}

是get方法,get方法确实在request请求后被调用:

1
(Result) currentClient.request(inv, timeout).get()

可以看到get方法的大致逻辑为,先获取锁,然后循环判断isDone,并阻塞等到条件,当条件超时,如果任务完成,或者超过timeout结束循环,接着判断isDone,如果超时抛出TimeoutException。并且通过sent(request请求时间)是否>0()来判断是clientSide还是serverSide超时。

isDone逻辑如下:

1
2
3
4
@Override
public boolean isDone() {
return response != null;
}

如果是正常Response,也有可能是超时的现象,可以看到get方法最后调用了一个函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
private Object returnFromResponse() throws RemotingException {
Response res = response;
if (res == null) {
throw new IllegalStateException("response cannot be null");
}
if (res.getStatus() == Response.OK) {
return res.getResult();
}
if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
throw new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage());
}
throw new RemotingException(channel, res.getErrorMessage());
}

TIMEOUT SIDE

SERVER_TIMEOUT(服务端超时): 这个就是正常的我们消费端请求一个RPC接口,服务端由于性能等一些原因处理时间超过了timeout配置时间。

CLIENT_TIMEOUT:我们可以看到是通过sent这个来判断是否clientTimeout,那么这个sent什么时候改变呢?就在发送请求的地方:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Override
public void sent(Channel channel, Object message) throws RemotingException {
Throwable exception = null;
try {
channel.setAttribute(KEY_WRITE_TIMESTAMP, System.currentTimeMillis());
ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
handler.sent(exchangeChannel, message);
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
} catch (Throwable t) {
exception = t;
}
if (message instanceof Request) {
Request request = (Request) message;
//这里会设置sent
DefaultFuture.sent(channel, request);
}
if (exception != null) {
if (exception instanceof RuntimeException) {
throw (RuntimeException) exception;
} else if (exception instanceof RemotingException) {
throw (RemotingException) exception;
} else {
throw new RemotingException(channel.getLocalAddress(), channel.getRemoteAddress(),
exception.getMessage(), exception);
}
}
}

也就是说handler.sent一旦调用成功返回,那么就不算clientSide Timeout了。那么CLIENT_TIMEOUT大概率就是由于client端网络,系统等原因超时。

作者

太阳当空赵先生

发布于

2022-02-25

更新于

2022-03-03

许可协议

评论