From 347f154b7ffe83c3d652c4bf7af6df66cd935864 Mon Sep 17 00:00:00 2001 From: "ken.lj" Date: Fri, 11 Jan 2019 23:28:14 +0800 Subject: [PATCH] Async enhancement (#3184) * Provider async enhancement, user doesn't have to use 'async=true' on provider side. * only keep zero-args constructor * stop router tag from being transferred in the RPC chain * Code review: use 'this.future= new Future()''; remove setAsyncContext() from RpcContext * revert getInternalFuture() * Fix UT --- .../java/org/apache/dubbo/rpc/AsyncContext.java | 10 ---------- .../java/org/apache/dubbo/rpc/AsyncContextImpl.java | 11 ++++------- .../main/java/org/apache/dubbo/rpc/RpcContext.java | 13 ++++--------- .../org/apache/dubbo/rpc/filter/ContextFilter.java | 5 ++++- .../dubbo/rpc/proxy/AbstractProxyInvoker.java | 3 ++- .../java/org/apache/dubbo/rpc/RpcContextTest.java | 12 +++--------- .../dubbo/rpc/protocol/dubbo/DubboProtocol.java | 6 ------ 7 files changed, 17 insertions(+), 43 deletions(-) diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncContext.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncContext.java index cc9ff1a2d9a..1f51a54d4ff 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncContext.java +++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncContext.java @@ -16,8 +16,6 @@ */ package org.apache.dubbo.rpc; -import java.util.concurrent.CompletableFuture; - /** * AsyncContext works like {@see javax.servlet.AsyncContext} in the Servlet 3.0. * An AsyncContext is stated by a call to {@link RpcContext#startAsync()}. @@ -27,14 +25,6 @@ */ public interface AsyncContext { - /** - * get the internal future which is binding to this async context - * - * @return the internal future - */ - // FIXME - CompletableFuture getInternalFuture(); - /** * write value and complete the async context. * diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncContextImpl.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncContextImpl.java index 77cf0bd17b8..f8f67433658 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncContextImpl.java +++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncContextImpl.java @@ -34,10 +34,6 @@ public class AsyncContextImpl implements AsyncContext { private RpcContext storedServerContext; public AsyncContextImpl() { - } - - public AsyncContextImpl(CompletableFuture future) { - this.future = future; this.storedContext = RpcContext.getContext(); this.storedServerContext = RpcContext.getServerContext(); } @@ -68,7 +64,9 @@ public boolean stop() { @Override public void start() { - this.started.set(true); + if (this.started.compareAndSet(false, true)) { + this.future = new CompletableFuture<>(); + } } @Override @@ -78,8 +76,7 @@ public void signalContextSwitch() { // Restore any other contexts in here if necessary. } - @Override - public CompletableFuture getInternalFuture() { + public CompletableFuture getInternalFuture() { return future; } } diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContext.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContext.java index a9000ecb001..aaa92c875c7 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContext.java +++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContext.java @@ -731,12 +731,11 @@ public void asyncCall(Runnable runnable) { @SuppressWarnings("unchecked") public static AsyncContext startAsync() throws IllegalStateException { RpcContext currentContext = getContext(); - if (currentContext.asyncContext != null) { - currentContext.asyncContext.start(); - return currentContext.asyncContext; - } else { - throw new IllegalStateException("This service does not support asynchronous operations, you should open async explicitly before use."); + if (currentContext.asyncContext == null) { + currentContext.asyncContext = new AsyncContextImpl(); } + currentContext.asyncContext.start(); + return currentContext.asyncContext; } public boolean isAsyncStarted() { @@ -750,10 +749,6 @@ public boolean stopAsync() { return asyncContext.stop(); } - public void setAsyncContext(AsyncContext asyncContext) { - this.asyncContext = asyncContext; - } - public AsyncContext getAsyncContext() { return asyncContext; } diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ContextFilter.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ContextFilter.java index 7dd5522096a..db68ea1a7f3 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ContextFilter.java +++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ContextFilter.java @@ -49,7 +49,10 @@ public Result invoke(Invoker invoker, Invocation invocation) throws RpcExcept attachments.remove(Constants.DUBBO_VERSION_KEY); attachments.remove(Constants.TOKEN_KEY); attachments.remove(Constants.TIMEOUT_KEY); - attachments.remove(Constants.ASYNC_KEY);// Remove async property to avoid being passed to the following invoke chain. + // Remove async property to avoid being passed to the following invoke chain. + attachments.remove(Constants.ASYNC_KEY); + attachments.remove(Constants.TAG_KEY); + attachments.remove(Constants.FORCE_USE_TAG); } RpcContext.getContext() .setInvoker(invoker) diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/AbstractProxyInvoker.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/AbstractProxyInvoker.java index 0f5a5df6252..6a562d6f6fd 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/AbstractProxyInvoker.java +++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/AbstractProxyInvoker.java @@ -19,6 +19,7 @@ import org.apache.dubbo.common.URL; import org.apache.dubbo.common.logger.Logger; import org.apache.dubbo.common.logger.LoggerFactory; +import org.apache.dubbo.rpc.AsyncContextImpl; import org.apache.dubbo.rpc.AsyncRpcResult; import org.apache.dubbo.rpc.Invocation; import org.apache.dubbo.rpc.Invoker; @@ -86,7 +87,7 @@ public Result invoke(Invocation invocation) throws RpcException { if (RpcUtils.isReturnTypeFuture(invocation)) { return new AsyncRpcResult((CompletableFuture) obj); } else if (rpcContext.isAsyncStarted()) { // ignore obj in case of RpcContext.startAsync()? always rely on user to write back. - return new AsyncRpcResult(rpcContext.getAsyncContext().getInternalFuture()); + return new AsyncRpcResult(((AsyncContextImpl)(rpcContext.getAsyncContext())).getInternalFuture()); } else { return new RpcResult(obj); } diff --git a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/RpcContextTest.java b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/RpcContextTest.java index 281f96df37e..0d4d95f8158 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/RpcContextTest.java +++ b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/RpcContextTest.java @@ -17,12 +17,12 @@ package org.apache.dubbo.rpc; import org.apache.dubbo.common.URL; + import org.junit.Assert; import org.junit.Test; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.CompletableFuture; public class RpcContextTest { @@ -142,20 +142,14 @@ public void testObject() { @Test public void testAsync() { - CompletableFuture future = new CompletableFuture<>(); - AsyncContext asyncContext = new AsyncContextImpl(future); - RpcContext rpcContext = RpcContext.getContext(); Assert.assertFalse(rpcContext.isAsyncStarted()); - rpcContext.setAsyncContext(asyncContext); - Assert.assertFalse(rpcContext.isAsyncStarted()); - - RpcContext.startAsync(); + AsyncContext asyncContext = RpcContext.startAsync(); Assert.assertTrue(rpcContext.isAsyncStarted()); asyncContext.write(new Object()); - Assert.assertTrue(future.isDone()); + Assert.assertTrue(((AsyncContextImpl)asyncContext).getInternalFuture().isDone()); rpcContext.stopAsync(); Assert.assertTrue(rpcContext.isAsyncStarted()); diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java index d3ac2201012..86eb013f985 100644 --- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java +++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java @@ -34,7 +34,6 @@ import org.apache.dubbo.remoting.exchange.ExchangeServer; import org.apache.dubbo.remoting.exchange.Exchangers; import org.apache.dubbo.remoting.exchange.support.ExchangeHandlerAdapter; -import org.apache.dubbo.rpc.AsyncContextImpl; import org.apache.dubbo.rpc.AsyncRpcResult; import org.apache.dubbo.rpc.Exporter; import org.apache.dubbo.rpc.Invocation; @@ -105,11 +104,6 @@ public CompletableFuture reply(ExchangeChannel channel, Object message) } } RpcContext rpcContext = RpcContext.getContext(); - boolean supportServerAsync = invoker.getUrl().getMethodParameter(inv.getMethodName(), Constants.ASYNC_KEY, false); - if (supportServerAsync) { - CompletableFuture future = new CompletableFuture<>(); - rpcContext.setAsyncContext(new AsyncContextImpl(future)); - } rpcContext.setRemoteAddress(channel.getRemoteAddress()); Result result = invoker.invoke(inv);