diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java index 1dab931a9c4..7f0e3a7227c 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java @@ -50,20 +50,19 @@ public class DefaultFuture extends CompletableFuture { private static final Map FUTURES = new ConcurrentHashMap<>(); - private static final Map PENDING_TASKS = new ConcurrentHashMap<>(); - public static final Timer TIME_OUT_TIMER = new HashedWheelTimer( new NamedThreadFactory("dubbo-future-timeout", true), 30, TimeUnit.MILLISECONDS); // invoke id. - private final long id; + private final Long id; private final Channel channel; private final Request request; private final int timeout; private final long start = System.currentTimeMillis(); private volatile long sent; + private Timeout timeoutCheckTask; private DefaultFuture(Channel channel, Request request, int timeout) { this.channel = channel; @@ -79,9 +78,8 @@ private DefaultFuture(Channel channel, Request request, int timeout) { * check time out of the future */ private static void timeoutCheck(DefaultFuture future) { - TimeoutCheckTask task = new TimeoutCheckTask(future); - Timeout t = TIME_OUT_TIMER.newTimeout(task, future.getTimeout(), TimeUnit.MILLISECONDS); - PENDING_TASKS.put(future.getId(), t); + TimeoutCheckTask task = new TimeoutCheckTask(future.getId()); + future.timeoutCheckTask = TIME_OUT_TIMER.newTimeout(task, future.getTimeout(), TimeUnit.MILLISECONDS); } /** @@ -140,15 +138,19 @@ public static void closeChannel(Channel channel) { } public static void received(Channel channel, Response response) { + received(channel, response, false); + } + + public static void received(Channel channel, Response response, boolean timeout) { try { DefaultFuture future = FUTURES.remove(response.getId()); if (future != null) { - future.doReceived(response); - Timeout t = PENDING_TASKS.remove(future.getId()); - if (t != null) { + Timeout t = future.timeoutCheckTask; + if (!timeout) { // decrease Time t.cancel(); } + future.doReceived(response); } else { logger.warn("The timeout response finally returned at " + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())) @@ -229,14 +231,15 @@ private String getTimeoutMessage(boolean scan) { private static class TimeoutCheckTask implements TimerTask { - private DefaultFuture future; + private final Long requestID; - TimeoutCheckTask(DefaultFuture future) { - this.future = future; + TimeoutCheckTask(Long requestID) { + this.requestID = requestID; } @Override public void run(Timeout timeout) { + DefaultFuture future = DefaultFuture.getFuture(requestID); if (future == null || future.isDone()) { return; } @@ -246,7 +249,7 @@ public void run(Timeout timeout) { timeoutResponse.setStatus(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT); timeoutResponse.setErrorMessage(future.getTimeoutMessage(true)); // handle response. - DefaultFuture.received(future.getChannel(), timeoutResponse); + DefaultFuture.received(future.getChannel(), timeoutResponse, true); } }