Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix connect bug #812

Merged
merged 1 commit into from
Apr 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ protected boolean initRpcClient(final int rpcProcessorThreadPoolSize) {
final RaftRpcFactory factory = RpcFactoryHelper.rpcFactory();
this.rpcClient = factory.createRpcClient(factory.defaultJRaftClientConfigHelper(this.rpcOptions));
configRpcClient(this.rpcClient);
this.rpcClient.init(null);
this.rpcClient.init(this.rpcOptions);
this.rpcExecutor = ThreadPoolUtil.newBuilder() //
.poolName("JRaft-RPC-Processor") //
.enableMetric(true) //
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,8 @@
import org.slf4j.LoggerFactory;

import com.alipay.remoting.CustomSerializerManager;
import com.alipay.remoting.InvokeContext;
import com.alipay.remoting.rpc.RpcConfigManager;
import com.alipay.remoting.rpc.RpcConfigs;
import com.alipay.sofa.jraft.option.RpcOptions;
import com.alipay.sofa.jraft.rpc.ProtobufSerializer;
import com.alipay.sofa.jraft.rpc.RaftRpcFactory;
import com.alipay.sofa.jraft.rpc.RpcClient;
Expand Down Expand Up @@ -76,16 +74,6 @@ public RpcServer createRpcServer(final Endpoint endpoint, final ConfigHelper<Rpc
return rpcServer;
}

@Override
public ConfigHelper<RpcClient> defaultJRaftClientConfigHelper(final RpcOptions opts) {
return ins -> {
final BoltRpcClient client = (BoltRpcClient) ins;
final InvokeContext ctx = new InvokeContext();
ctx.put(InvokeContext.BOLT_CRC_SWITCH, opts.isEnableRpcChecksum());
client.setDefaultInvokeCtx(ctx);
};
}

@Override
public void ensurePipeline() {
// enable `bolt.rpc.dispatch-msg-list-in-default-executor` system property
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.alipay.remoting.ConnectionEventType;
import com.alipay.remoting.RejectedExecutionPolicy;
import com.alipay.remoting.config.BoltClientOption;
import com.alipay.remoting.config.switches.GlobalSwitch;
import com.alipay.sofa.jraft.ReplicatorGroup;
import com.alipay.sofa.jraft.error.InvokeTimeoutException;
import com.alipay.sofa.jraft.error.RemotingException;
Expand All @@ -45,15 +44,17 @@ public class BoltRpcClient implements RpcClient {
public static final String BOLT_REJECTED_EXECUTION_POLICY = "BOLT_REJECTED_EXECUTION_POLICY";

private final com.alipay.remoting.rpc.RpcClient rpcClient;
private com.alipay.remoting.InvokeContext defaultInvokeCtx;

private RpcOptions opts;

public BoltRpcClient(com.alipay.remoting.rpc.RpcClient rpcClient) {
this.rpcClient = Requires.requireNonNull(rpcClient, "rpcClient");
}

@Override
public boolean init(final RpcOptions opts) {
rpcClient.option(BoltClientOption.NETTY_FLUSH_CONSOLIDATION, true);
this.opts = opts;
this.rpcClient.option(BoltClientOption.NETTY_FLUSH_CONSOLIDATION, true);
this.rpcClient.initWriteBufferWaterMark(BoltRaftRpcFactory.CHANNEL_WRITE_BUF_LOW_WATER_MARK,
BoltRaftRpcFactory.CHANNEL_WRITE_BUF_HIGH_WATER_MARK);
this.rpcClient.enableReconnectSwitch();
Expand Down Expand Up @@ -122,25 +123,20 @@ public com.alipay.remoting.rpc.RpcClient getRpcClient() {
return rpcClient;
}

public com.alipay.remoting.InvokeContext getDefaultInvokeCtx() {
return defaultInvokeCtx;
}

public void setDefaultInvokeCtx(com.alipay.remoting.InvokeContext defaultInvokeCtx) {
this.defaultInvokeCtx = defaultInvokeCtx;
}

private RejectedExecutionPolicy getRejectedPolicy(final InvokeContext ctx) {
return ctx == null ? RejectedExecutionPolicy.CALLER_HANDLE_EXCEPTION : ctx.getOrDefault(
BOLT_REJECTED_EXECUTION_POLICY, RejectedExecutionPolicy.CALLER_HANDLE_EXCEPTION);
}

private com.alipay.remoting.InvokeContext getBoltInvokeCtx(final InvokeContext ctx) {
com.alipay.remoting.InvokeContext boltCtx;
if (ctx == null) {
return this.defaultInvokeCtx;
boltCtx = new com.alipay.remoting.InvokeContext();
boltCtx.put(com.alipay.remoting.InvokeContext.BOLT_CRC_SWITCH, this.opts.isEnableRpcChecksum());
return boltCtx;
}

com.alipay.remoting.InvokeContext boltCtx = ctx.get(BOLT_CTX);
boltCtx = ctx.get(BOLT_CTX);
if (boltCtx != null) {
return boltCtx;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,21 @@
*/
public final class ExtSerializerSupports {

private static final InvokeContext INVOKE_CONTEXT = new InvokeContext();

public static byte PROTO_STUFF = 2;
public static byte PROTO_STUFF = 2;

static {
SerializerManager.addSerializer(PROTO_STUFF, ProtostuffSerializer.INSTANCE);
INVOKE_CONTEXT.put(InvokeContext.BOLT_CUSTOM_SERIALIZER, PROTO_STUFF);
INVOKE_CONTEXT.put(InvokeContext.BOLT_CRC_SWITCH, false);
}

public static void init() {
// Will execute the code first of the static block
}

public static InvokeContext getInvokeContext() {
return INVOKE_CONTEXT;
final InvokeContext ctx = new InvokeContext();
ctx.put(InvokeContext.BOLT_CUSTOM_SERIALIZER, PROTO_STUFF);
ctx.put(InvokeContext.BOLT_CRC_SWITCH, false);
return ctx;
}

private ExtSerializerSupports() {
Expand Down