Skip to content

Commit

Permalink
Async enhancement (#3184)
Browse files Browse the repository at this point in the history
* Provider async enhancement, user doesn't have to use 'async=true' on provider side.

* only keep zero-args constructor

* stop router tag from being transferred in the RPC chain

* Code review: use 'this.future= new Future()''; remove setAsyncContext() from RpcContext

* revert getInternalFuture()

* Fix UT
  • Loading branch information
chickenlj authored and beiwei30 committed Jan 11, 2019
1 parent a8af5ce commit 347f154
Show file tree
Hide file tree
Showing 7 changed files with 17 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
*/
package org.apache.dubbo.rpc;

import java.util.concurrent.CompletableFuture;

/**
* AsyncContext works like {@see javax.servlet.AsyncContext} in the Servlet 3.0.
* An AsyncContext is stated by a call to {@link RpcContext#startAsync()}.
Expand All @@ -27,14 +25,6 @@
*/
public interface AsyncContext {

/**
* get the internal future which is binding to this async context
*
* @return the internal future
*/
// FIXME
CompletableFuture getInternalFuture();

/**
* write value and complete the async context.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,6 @@ public class AsyncContextImpl implements AsyncContext {
private RpcContext storedServerContext;

public AsyncContextImpl() {
}

public AsyncContextImpl(CompletableFuture<Object> future) {
this.future = future;
this.storedContext = RpcContext.getContext();
this.storedServerContext = RpcContext.getServerContext();
}
Expand Down Expand Up @@ -68,7 +64,9 @@ public boolean stop() {

@Override
public void start() {
this.started.set(true);
if (this.started.compareAndSet(false, true)) {
this.future = new CompletableFuture<>();
}
}

@Override
Expand All @@ -78,8 +76,7 @@ public void signalContextSwitch() {
// Restore any other contexts in here if necessary.
}

@Override
public CompletableFuture getInternalFuture() {
public CompletableFuture<Object> getInternalFuture() {
return future;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -731,12 +731,11 @@ public void asyncCall(Runnable runnable) {
@SuppressWarnings("unchecked")
public static AsyncContext startAsync() throws IllegalStateException {
RpcContext currentContext = getContext();
if (currentContext.asyncContext != null) {
currentContext.asyncContext.start();
return currentContext.asyncContext;
} else {
throw new IllegalStateException("This service does not support asynchronous operations, you should open async explicitly before use.");
if (currentContext.asyncContext == null) {
currentContext.asyncContext = new AsyncContextImpl();
}
currentContext.asyncContext.start();
return currentContext.asyncContext;
}

public boolean isAsyncStarted() {
Expand All @@ -750,10 +749,6 @@ public boolean stopAsync() {
return asyncContext.stop();
}

public void setAsyncContext(AsyncContext asyncContext) {
this.asyncContext = asyncContext;
}

public AsyncContext getAsyncContext() {
return asyncContext;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,10 @@ public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcExcept
attachments.remove(Constants.DUBBO_VERSION_KEY);
attachments.remove(Constants.TOKEN_KEY);
attachments.remove(Constants.TIMEOUT_KEY);
attachments.remove(Constants.ASYNC_KEY);// Remove async property to avoid being passed to the following invoke chain.
// Remove async property to avoid being passed to the following invoke chain.
attachments.remove(Constants.ASYNC_KEY);
attachments.remove(Constants.TAG_KEY);
attachments.remove(Constants.FORCE_USE_TAG);
}
RpcContext.getContext()
.setInvoker(invoker)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.rpc.AsyncContextImpl;
import org.apache.dubbo.rpc.AsyncRpcResult;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
Expand Down Expand Up @@ -86,7 +87,7 @@ public Result invoke(Invocation invocation) throws RpcException {
if (RpcUtils.isReturnTypeFuture(invocation)) {
return new AsyncRpcResult((CompletableFuture<Object>) obj);
} else if (rpcContext.isAsyncStarted()) { // ignore obj in case of RpcContext.startAsync()? always rely on user to write back.
return new AsyncRpcResult(rpcContext.getAsyncContext().getInternalFuture());
return new AsyncRpcResult(((AsyncContextImpl)(rpcContext.getAsyncContext())).getInternalFuture());
} else {
return new RpcResult(obj);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
package org.apache.dubbo.rpc;

import org.apache.dubbo.common.URL;

import org.junit.Assert;
import org.junit.Test;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

public class RpcContextTest {

Expand Down Expand Up @@ -142,20 +142,14 @@ public void testObject() {
@Test
public void testAsync() {

CompletableFuture<Object> future = new CompletableFuture<>();
AsyncContext asyncContext = new AsyncContextImpl(future);

RpcContext rpcContext = RpcContext.getContext();
Assert.assertFalse(rpcContext.isAsyncStarted());

rpcContext.setAsyncContext(asyncContext);
Assert.assertFalse(rpcContext.isAsyncStarted());

RpcContext.startAsync();
AsyncContext asyncContext = RpcContext.startAsync();
Assert.assertTrue(rpcContext.isAsyncStarted());

asyncContext.write(new Object());
Assert.assertTrue(future.isDone());
Assert.assertTrue(((AsyncContextImpl)asyncContext).getInternalFuture().isDone());

rpcContext.stopAsync();
Assert.assertTrue(rpcContext.isAsyncStarted());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.dubbo.remoting.exchange.ExchangeServer;
import org.apache.dubbo.remoting.exchange.Exchangers;
import org.apache.dubbo.remoting.exchange.support.ExchangeHandlerAdapter;
import org.apache.dubbo.rpc.AsyncContextImpl;
import org.apache.dubbo.rpc.AsyncRpcResult;
import org.apache.dubbo.rpc.Exporter;
import org.apache.dubbo.rpc.Invocation;
Expand Down Expand Up @@ -105,11 +104,6 @@ public CompletableFuture<Object> reply(ExchangeChannel channel, Object message)
}
}
RpcContext rpcContext = RpcContext.getContext();
boolean supportServerAsync = invoker.getUrl().getMethodParameter(inv.getMethodName(), Constants.ASYNC_KEY, false);
if (supportServerAsync) {
CompletableFuture<Object> future = new CompletableFuture<>();
rpcContext.setAsyncContext(new AsyncContextImpl(future));
}
rpcContext.setRemoteAddress(channel.getRemoteAddress());
Result result = invoker.invoke(inv);

Expand Down

0 comments on commit 347f154

Please sign in to comment.