Skip to content

Commit

Permalink
feat/never discard the callback task (#413)
Browse files Browse the repository at this point in the history
* insure never discard the callback task

* configurable for rejected_execution_policy, and caller_handle_exception as default
  • Loading branch information
fengjiachun committed Apr 1, 2020
1 parent e565e16 commit 0c592b5
Showing 1 changed file with 24 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.concurrent.Executor;

import com.alipay.remoting.ConnectionEventType;
import com.alipay.remoting.RejectedExecutionPolicy;
import com.alipay.remoting.Url;
import com.alipay.remoting.config.switches.GlobalSwitch;
import com.alipay.remoting.rpc.RpcAddressParser;
Expand All @@ -41,13 +42,14 @@
*/
public class BoltRpcClient implements RpcClient {

public static final String BOLT_ADDRESS_PARSER = "BOLT_ADDRESS_PARSER";
public static final String BOLT_CTX = "BOLT_CTX";
public static final String BOLT_ADDRESS_PARSER = "BOLT_ADDRESS_PARSER";
public static final String BOLT_CTX = "BOLT_CTX";
public static final String BOLT_REJECTED_EXECUTION_POLICY = "BOLT_REJECTED_EXECUTION_POLICY";

private final com.alipay.remoting.rpc.RpcClient rpcClient;

private com.alipay.remoting.InvokeContext defaultInvokeCtx;
private RpcAddressParser defaultAddressParser = new RpcAddressParser();
private RpcAddressParser defaultAddressParser = new RpcAddressParser();

public BoltRpcClient(com.alipay.remoting.rpc.RpcClient rpcClient) {
this.rpcClient = Requires.requireNonNull(rpcClient, "rpcClient");
Expand Down Expand Up @@ -106,7 +108,7 @@ public void invokeAsync(final Endpoint endpoint, final Object request, final Inv
final RpcAddressParser addressParser = getAddressParser(ctx);
try {
final Url url = addressParser.parse(endpoint.toString());
this.rpcClient.invokeWithCallback(url, request, getBoltInvokeCtx(ctx), getBoltCallback(callback),
this.rpcClient.invokeWithCallback(url, request, getBoltInvokeCtx(ctx), getBoltCallback(callback, ctx),
(int) timeoutMs);
} catch (final com.alipay.remoting.rpc.exception.InvokeTimeoutException e) {
throw new InvokeTimeoutException(e);
Expand Down Expand Up @@ -140,6 +142,11 @@ private RpcAddressParser getAddressParser(final InvokeContext ctx) {
this.defaultAddressParser);
}

private RejectedExecutionPolicy getRejectedPolicy(final InvokeContext ctx) {
return ctx == null ? RejectedExecutionPolicy.CALLER_HANDLE_EXCEPTION : ctx.getOrDefault(
BOLT_REJECTED_EXECUTION_POLICY, RejectedExecutionPolicy.CALLER_HANDLE_EXCEPTION);
}

private com.alipay.remoting.InvokeContext getBoltInvokeCtx(final InvokeContext ctx) {
if (ctx == null) {
return this.defaultInvokeCtx;
Expand All @@ -161,16 +168,19 @@ private com.alipay.remoting.InvokeContext getBoltInvokeCtx(final InvokeContext c
return boltCtx;
}

private BoltCallback getBoltCallback(final InvokeCallback callback) {
return new BoltCallback(callback);
private BoltCallback getBoltCallback(final InvokeCallback callback, final InvokeContext ctx) {
Requires.requireNonNull(callback, "callback");
return new BoltCallback(callback, getRejectedPolicy(ctx));
}

private static class BoltCallback implements com.alipay.remoting.InvokeCallback {
private static class BoltCallback implements com.alipay.remoting.RejectionProcessableInvokeCallback {

private final InvokeCallback callback;
private final InvokeCallback callback;
private final RejectedExecutionPolicy rejectedPolicy;

private BoltCallback(final InvokeCallback callback) {
private BoltCallback(final InvokeCallback callback, final RejectedExecutionPolicy rejectedPolicy) {
this.callback = callback;
this.rejectedPolicy = rejectedPolicy;
}

@Override
Expand All @@ -187,5 +197,10 @@ public void onException(final Throwable err) {
public Executor getExecutor() {
return this.callback.executor();
}

@Override
public RejectedExecutionPolicy rejectedExecutionPolicy() {
return this.rejectedPolicy;
}
}
}

0 comments on commit 0c592b5

Please sign in to comment.