Skip to content

Commit

Permalink
fix connect bug (#812)
Browse files Browse the repository at this point in the history
  • Loading branch information
Block authored Apr 16, 2022
1 parent ed92628 commit 0929136
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ protected boolean initRpcClient(final int rpcProcessorThreadPoolSize) {
final RaftRpcFactory factory = RpcFactoryHelper.rpcFactory();
this.rpcClient = factory.createRpcClient(factory.defaultJRaftClientConfigHelper(this.rpcOptions));
configRpcClient(this.rpcClient);
this.rpcClient.init(null);
this.rpcClient.init(this.rpcOptions);
this.rpcExecutor = ThreadPoolUtil.newBuilder() //
.poolName("JRaft-RPC-Processor") //
.enableMetric(true) //
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,8 @@
import org.slf4j.LoggerFactory;

import com.alipay.remoting.CustomSerializerManager;
import com.alipay.remoting.InvokeContext;
import com.alipay.remoting.rpc.RpcConfigManager;
import com.alipay.remoting.rpc.RpcConfigs;
import com.alipay.sofa.jraft.option.RpcOptions;
import com.alipay.sofa.jraft.rpc.ProtobufSerializer;
import com.alipay.sofa.jraft.rpc.RaftRpcFactory;
import com.alipay.sofa.jraft.rpc.RpcClient;
Expand Down Expand Up @@ -76,16 +74,6 @@ public RpcServer createRpcServer(final Endpoint endpoint, final ConfigHelper<Rpc
return rpcServer;
}

@Override
public ConfigHelper<RpcClient> defaultJRaftClientConfigHelper(final RpcOptions opts) {
return ins -> {
final BoltRpcClient client = (BoltRpcClient) ins;
final InvokeContext ctx = new InvokeContext();
ctx.put(InvokeContext.BOLT_CRC_SWITCH, opts.isEnableRpcChecksum());
client.setDefaultInvokeCtx(ctx);
};
}

@Override
public void ensurePipeline() {
// enable `bolt.rpc.dispatch-msg-list-in-default-executor` system property
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.alipay.remoting.ConnectionEventType;
import com.alipay.remoting.RejectedExecutionPolicy;
import com.alipay.remoting.config.BoltClientOption;
import com.alipay.remoting.config.switches.GlobalSwitch;
import com.alipay.sofa.jraft.ReplicatorGroup;
import com.alipay.sofa.jraft.error.InvokeTimeoutException;
import com.alipay.sofa.jraft.error.RemotingException;
Expand All @@ -45,15 +44,17 @@ public class BoltRpcClient implements RpcClient {
public static final String BOLT_REJECTED_EXECUTION_POLICY = "BOLT_REJECTED_EXECUTION_POLICY";

private final com.alipay.remoting.rpc.RpcClient rpcClient;
private com.alipay.remoting.InvokeContext defaultInvokeCtx;

private RpcOptions opts;

public BoltRpcClient(com.alipay.remoting.rpc.RpcClient rpcClient) {
this.rpcClient = Requires.requireNonNull(rpcClient, "rpcClient");
}

@Override
public boolean init(final RpcOptions opts) {
rpcClient.option(BoltClientOption.NETTY_FLUSH_CONSOLIDATION, true);
this.opts = opts;
this.rpcClient.option(BoltClientOption.NETTY_FLUSH_CONSOLIDATION, true);
this.rpcClient.initWriteBufferWaterMark(BoltRaftRpcFactory.CHANNEL_WRITE_BUF_LOW_WATER_MARK,
BoltRaftRpcFactory.CHANNEL_WRITE_BUF_HIGH_WATER_MARK);
this.rpcClient.enableReconnectSwitch();
Expand Down Expand Up @@ -122,25 +123,20 @@ public com.alipay.remoting.rpc.RpcClient getRpcClient() {
return rpcClient;
}

public com.alipay.remoting.InvokeContext getDefaultInvokeCtx() {
return defaultInvokeCtx;
}

public void setDefaultInvokeCtx(com.alipay.remoting.InvokeContext defaultInvokeCtx) {
this.defaultInvokeCtx = defaultInvokeCtx;
}

private RejectedExecutionPolicy getRejectedPolicy(final InvokeContext ctx) {
return ctx == null ? RejectedExecutionPolicy.CALLER_HANDLE_EXCEPTION : ctx.getOrDefault(
BOLT_REJECTED_EXECUTION_POLICY, RejectedExecutionPolicy.CALLER_HANDLE_EXCEPTION);
}

private com.alipay.remoting.InvokeContext getBoltInvokeCtx(final InvokeContext ctx) {
com.alipay.remoting.InvokeContext boltCtx;
if (ctx == null) {
return this.defaultInvokeCtx;
boltCtx = new com.alipay.remoting.InvokeContext();
boltCtx.put(com.alipay.remoting.InvokeContext.BOLT_CRC_SWITCH, this.opts.isEnableRpcChecksum());
return boltCtx;
}

com.alipay.remoting.InvokeContext boltCtx = ctx.get(BOLT_CTX);
boltCtx = ctx.get(BOLT_CTX);
if (boltCtx != null) {
return boltCtx;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,21 @@
*/
public final class ExtSerializerSupports {

private static final InvokeContext INVOKE_CONTEXT = new InvokeContext();

public static byte PROTO_STUFF = 2;
public static byte PROTO_STUFF = 2;

static {
SerializerManager.addSerializer(PROTO_STUFF, ProtostuffSerializer.INSTANCE);
INVOKE_CONTEXT.put(InvokeContext.BOLT_CUSTOM_SERIALIZER, PROTO_STUFF);
INVOKE_CONTEXT.put(InvokeContext.BOLT_CRC_SWITCH, false);
}

public static void init() {
// Will execute the code first of the static block
}

public static InvokeContext getInvokeContext() {
return INVOKE_CONTEXT;
final InvokeContext ctx = new InvokeContext();
ctx.put(InvokeContext.BOLT_CUSTOM_SERIALIZER, PROTO_STUFF);
ctx.put(InvokeContext.BOLT_CRC_SWITCH, false);
return ctx;
}

private ExtSerializerSupports() {
Expand Down

0 comments on commit 0929136

Please sign in to comment.