diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/impl/AbstractClientService.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/impl/AbstractClientService.java index 51466e1db..302c8e3fb 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/impl/AbstractClientService.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/impl/AbstractClientService.java @@ -107,7 +107,7 @@ protected boolean initRpcClient(final int rpcProcessorThreadPoolSize) { final RaftRpcFactory factory = RpcFactoryHelper.rpcFactory(); this.rpcClient = factory.createRpcClient(factory.defaultJRaftClientConfigHelper(this.rpcOptions)); configRpcClient(this.rpcClient); - this.rpcClient.init(null); + this.rpcClient.init(this.rpcOptions); this.rpcExecutor = ThreadPoolUtil.newBuilder() // .poolName("JRaft-RPC-Processor") // .enableMetric(true) // diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/impl/BoltRaftRpcFactory.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/impl/BoltRaftRpcFactory.java index 9fda8b055..f6e9df5ed 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/impl/BoltRaftRpcFactory.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/impl/BoltRaftRpcFactory.java @@ -20,10 +20,8 @@ import org.slf4j.LoggerFactory; import com.alipay.remoting.CustomSerializerManager; -import com.alipay.remoting.InvokeContext; import com.alipay.remoting.rpc.RpcConfigManager; import com.alipay.remoting.rpc.RpcConfigs; -import com.alipay.sofa.jraft.option.RpcOptions; import com.alipay.sofa.jraft.rpc.ProtobufSerializer; import com.alipay.sofa.jraft.rpc.RaftRpcFactory; import com.alipay.sofa.jraft.rpc.RpcClient; @@ -76,16 +74,6 @@ public RpcServer createRpcServer(final Endpoint endpoint, final ConfigHelper defaultJRaftClientConfigHelper(final RpcOptions opts) { - return ins -> { - final BoltRpcClient client = (BoltRpcClient) ins; - final InvokeContext ctx = new InvokeContext(); - ctx.put(InvokeContext.BOLT_CRC_SWITCH, opts.isEnableRpcChecksum()); - client.setDefaultInvokeCtx(ctx); - }; - } - @Override public void ensurePipeline() { // enable `bolt.rpc.dispatch-msg-list-in-default-executor` system property diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/impl/BoltRpcClient.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/impl/BoltRpcClient.java index 9903ffe12..2ba5c6a7c 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/impl/BoltRpcClient.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/impl/BoltRpcClient.java @@ -22,7 +22,6 @@ import com.alipay.remoting.ConnectionEventType; import com.alipay.remoting.RejectedExecutionPolicy; import com.alipay.remoting.config.BoltClientOption; -import com.alipay.remoting.config.switches.GlobalSwitch; import com.alipay.sofa.jraft.ReplicatorGroup; import com.alipay.sofa.jraft.error.InvokeTimeoutException; import com.alipay.sofa.jraft.error.RemotingException; @@ -45,7 +44,8 @@ public class BoltRpcClient implements RpcClient { public static final String BOLT_REJECTED_EXECUTION_POLICY = "BOLT_REJECTED_EXECUTION_POLICY"; private final com.alipay.remoting.rpc.RpcClient rpcClient; - private com.alipay.remoting.InvokeContext defaultInvokeCtx; + + private RpcOptions opts; public BoltRpcClient(com.alipay.remoting.rpc.RpcClient rpcClient) { this.rpcClient = Requires.requireNonNull(rpcClient, "rpcClient"); @@ -53,7 +53,8 @@ public BoltRpcClient(com.alipay.remoting.rpc.RpcClient rpcClient) { @Override public boolean init(final RpcOptions opts) { - rpcClient.option(BoltClientOption.NETTY_FLUSH_CONSOLIDATION, true); + this.opts = opts; + this.rpcClient.option(BoltClientOption.NETTY_FLUSH_CONSOLIDATION, true); this.rpcClient.initWriteBufferWaterMark(BoltRaftRpcFactory.CHANNEL_WRITE_BUF_LOW_WATER_MARK, BoltRaftRpcFactory.CHANNEL_WRITE_BUF_HIGH_WATER_MARK); this.rpcClient.enableReconnectSwitch(); @@ -122,25 +123,20 @@ public com.alipay.remoting.rpc.RpcClient getRpcClient() { return rpcClient; } - public com.alipay.remoting.InvokeContext getDefaultInvokeCtx() { - return defaultInvokeCtx; - } - - public void setDefaultInvokeCtx(com.alipay.remoting.InvokeContext defaultInvokeCtx) { - this.defaultInvokeCtx = defaultInvokeCtx; - } - private RejectedExecutionPolicy getRejectedPolicy(final InvokeContext ctx) { return ctx == null ? RejectedExecutionPolicy.CALLER_HANDLE_EXCEPTION : ctx.getOrDefault( BOLT_REJECTED_EXECUTION_POLICY, RejectedExecutionPolicy.CALLER_HANDLE_EXCEPTION); } private com.alipay.remoting.InvokeContext getBoltInvokeCtx(final InvokeContext ctx) { + com.alipay.remoting.InvokeContext boltCtx; if (ctx == null) { - return this.defaultInvokeCtx; + boltCtx = new com.alipay.remoting.InvokeContext(); + boltCtx.put(com.alipay.remoting.InvokeContext.BOLT_CRC_SWITCH, this.opts.isEnableRpcChecksum()); + return boltCtx; } - com.alipay.remoting.InvokeContext boltCtx = ctx.get(BOLT_CTX); + boltCtx = ctx.get(BOLT_CTX); if (boltCtx != null) { return boltCtx; } diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/rpc/ExtSerializerSupports.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/rpc/ExtSerializerSupports.java index 08999b904..4b33edfde 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/rpc/ExtSerializerSupports.java +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/rpc/ExtSerializerSupports.java @@ -24,14 +24,10 @@ */ public final class ExtSerializerSupports { - private static final InvokeContext INVOKE_CONTEXT = new InvokeContext(); - - public static byte PROTO_STUFF = 2; + public static byte PROTO_STUFF = 2; static { SerializerManager.addSerializer(PROTO_STUFF, ProtostuffSerializer.INSTANCE); - INVOKE_CONTEXT.put(InvokeContext.BOLT_CUSTOM_SERIALIZER, PROTO_STUFF); - INVOKE_CONTEXT.put(InvokeContext.BOLT_CRC_SWITCH, false); } public static void init() { @@ -39,7 +35,10 @@ public static void init() { } public static InvokeContext getInvokeContext() { - return INVOKE_CONTEXT; + final InvokeContext ctx = new InvokeContext(); + ctx.put(InvokeContext.BOLT_CUSTOM_SERIALIZER, PROTO_STUFF); + ctx.put(InvokeContext.BOLT_CRC_SWITCH, false); + return ctx; } private ExtSerializerSupports() {