Skip to content

Commit

Permalink
some improve
Browse files Browse the repository at this point in the history
  • Loading branch information
fengjiachun committed May 6, 2020
1 parent 3a8bc15 commit ec8fe5b
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,10 @@ public ExecutorSelector getExecutorSelector() {
});
}

public com.alipay.remoting.rpc.RpcServer getServer() {
return rpcServer;
}

private static class BoltConnection implements Connection {

private final com.alipay.remoting.Connection conn;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,12 @@ public class GrpcClient implements RpcClient {

private final Map<Endpoint, ManagedChannel> managedChannels = new ConcurrentHashMap<>();
private final Map<String, Message> parserClasses;
private final MarshallerRegistry marshallerRegistry;
private volatile ReplicatorGroup replicatorGroup;

public GrpcClient(Map<String, Message> parserClasses) {
public GrpcClient(Map<String, Message> parserClasses, MarshallerRegistry marshallerRegistry) {
this.parserClasses = parserClasses;
this.marshallerRegistry = marshallerRegistry;
}

@Override
Expand Down Expand Up @@ -154,7 +156,8 @@ private MethodDescriptor<Message, Message> getCallMethod(final Object request) {
//
.setFullMethodName(MethodDescriptor.generateFullMethodName(interest, GrpcRaftRpcFactory.FIXED_METHOD_NAME)) //
.setRequestMarshaller(ProtoUtils.marshaller(reqIns)) //
.setResponseMarshaller(ProtoUtils.marshaller(MarshallerHelper.findRespInstance(interest))) //
.setResponseMarshaller(
ProtoUtils.marshaller(this.marshallerRegistry.findResponseInstanceByRequest(interest))) //
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,14 @@
*/
package com.alipay.sofa.jraft.rpc.impl;

import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import io.grpc.Server;
import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder;
import io.grpc.util.MutableHandlerRegistry;

import com.alipay.sofa.jraft.rpc.RaftRpcFactory;
import com.alipay.sofa.jraft.rpc.RpcClient;
import com.alipay.sofa.jraft.rpc.RpcServer;
Expand All @@ -29,13 +34,29 @@

/**
* @author nicholas.jxf
* @author jiachun.fjc
*/
@SPI(priority = 1)
public class GrpcRaftRpcFactory implements RaftRpcFactory {

public static String FIXED_METHOD_NAME = "_call";
public static String FIXED_METHOD_NAME = "_call";

private final Map<String, Message> parserClasses = new ConcurrentHashMap<>();
private final MarshallerRegistry defaultMarshallerRegistry = new MarshallerRegistry() {

@Override
public Message findResponseInstanceByRequest(final String reqCls) {
return MarshallerHelper
.findRespInstance(reqCls);
}

private final Map<String, Message> parserClasses = new ConcurrentHashMap<>();
@Override
public void registerResponseInstance(final String reqCls,
final Message respIns) {
MarshallerHelper.registerRespInstance(reqCls,
respIns);
}
};

@Override
public void registerProtobufSerializer(final String className, final Object... args) {
Expand All @@ -44,7 +65,7 @@ public void registerProtobufSerializer(final String className, final Object... a

@Override
public RpcClient createRpcClient(final ConfigHelper<RpcClient> helper) {
final RpcClient rpcClient = new GrpcClient(this.parserClasses);
final RpcClient rpcClient = new GrpcClient(this.parserClasses, getMarshallerRegistry());
if (helper != null) {
helper.config(rpcClient);
}
Expand All @@ -55,7 +76,12 @@ public RpcClient createRpcClient(final ConfigHelper<RpcClient> helper) {
public RpcServer createRpcServer(final Endpoint endpoint, final ConfigHelper<RpcServer> helper) {
final int port = Requires.requireNonNull(endpoint, "endpoint").getPort();
Requires.requireTrue(port > 0 && port < 0xFFFF, "port out of range:" + port);
final RpcServer rpcServer = new GrpcServer(endpoint, this.parserClasses);
final MutableHandlerRegistry handlerRegistry = new MutableHandlerRegistry();
final Server server = NettyServerBuilder //
.forAddress(new InetSocketAddress(endpoint.getIp(), endpoint.getPort())) //
.fallbackHandlerRegistry(handlerRegistry) //
.build();
final RpcServer rpcServer = new GrpcServer(server, handlerRegistry, this.parserClasses, getMarshallerRegistry());
if (helper != null) {
helper.config(rpcServer);
}
Expand All @@ -66,4 +92,8 @@ public RpcServer createRpcServer(final Endpoint endpoint, final ConfigHelper<Rpc
public boolean isReplicatorPipelineEnabled() {
return false;
}

public MarshallerRegistry getMarshallerRegistry() {
return defaultMarshallerRegistry;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,13 @@
package com.alipay.sofa.jraft.rpc.impl;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

import io.grpc.MethodDescriptor;
import io.grpc.Server;
import io.grpc.ServerCallHandler;
import io.grpc.ServerServiceDefinition;
import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder;
import io.grpc.protobuf.ProtoUtils;
import io.grpc.stub.ServerCalls;
import io.grpc.util.MutableHandlerRegistry;
Expand All @@ -34,7 +32,6 @@
import com.alipay.sofa.jraft.rpc.RpcContext;
import com.alipay.sofa.jraft.rpc.RpcProcessor;
import com.alipay.sofa.jraft.rpc.RpcServer;
import com.alipay.sofa.jraft.util.Endpoint;
import com.alipay.sofa.jraft.util.Requires;
import com.alipay.sofa.jraft.util.internal.ThrowUtil;
import com.google.protobuf.Message;
Expand All @@ -47,16 +44,19 @@
*/
public class GrpcServer implements RpcServer {

private final Endpoint endpoint;
private final MutableHandlerRegistry handlerRegistry = new MutableHandlerRegistry();
private final AtomicBoolean started = new AtomicBoolean(false);
private final Server server;
private final MutableHandlerRegistry handlerRegistry;
private final Map<String, Message> parserClasses;
private final MarshallerRegistry marshallerRegistry;

private Server server;
private final AtomicBoolean started = new AtomicBoolean(false);

public GrpcServer(Endpoint endpoint, Map<String, Message> parserClasses) {
this.endpoint = endpoint;
public GrpcServer(Server server, MutableHandlerRegistry handlerRegistry, Map<String, Message> parserClasses,
MarshallerRegistry marshallerRegistry) {
this.server = server;
this.handlerRegistry = handlerRegistry;
this.parserClasses = parserClasses;
this.marshallerRegistry = marshallerRegistry;
}

@Override
Expand All @@ -65,10 +65,6 @@ public boolean init(final Void opts) {
throw new IllegalStateException("grpc server has started");
}
try {
this.server = NettyServerBuilder //
.forAddress(new InetSocketAddress(this.endpoint.getIp(), this.endpoint.getPort())) //
.fallbackHandlerRegistry(this.handlerRegistry) //
.build();
this.server.start();
} catch (final IOException e) {
ThrowUtil.throwException(e);
Expand Down Expand Up @@ -100,7 +96,7 @@ public void registerProcessor(final RpcProcessor processor) {
.setFullMethodName(
MethodDescriptor.generateFullMethodName(processor.interest(), GrpcRaftRpcFactory.FIXED_METHOD_NAME)) //
.setRequestMarshaller(ProtoUtils.marshaller(reqIns)) //
.setResponseMarshaller(ProtoUtils.marshaller(MarshallerHelper.findRespInstance(interest))) //
.setResponseMarshaller(ProtoUtils.marshaller(this.marshallerRegistry.findResponseInstanceByRequest(interest))) //
.build();

final ServerCallHandler<Message, Message> handler = ServerCalls.asyncUnaryCall(
Expand Down Expand Up @@ -139,4 +135,12 @@ public String getRemoteAddress() {
public int boundPort() {
return this.server.getPort();
}

public Server getServer() {
return server;
}

public MutableHandlerRegistry getHandlerRegistry() {
return handlerRegistry;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@
import com.alipay.sofa.jraft.rpc.RpcRequests;
import com.google.protobuf.Message;

/**
* Not thread safe.
*
* @author jiachun.fjc
*/
public class MarshallerHelper {

private static Map<String, Message> messages = new HashMap<>();
Expand Down Expand Up @@ -60,4 +65,8 @@ public class MarshallerHelper {
public static Message findRespInstance(final String name) {
return messages.get(name);
}

public static void registerRespInstance(final String name, final Message instance) {
messages.put(name, instance);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.sofa.jraft.rpc.impl;

import com.google.protobuf.Message;

/**
* @author jiachun.fjc
*/
public interface MarshallerRegistry {

/**
* Find response default instance by request's class name.
*
* @param reqCls request class name
* @return response default instance
*/
Message findResponseInstanceByRequest(final String reqCls);

/**
* Register response default instance.
*
* @param reqCls request class name
* @param respIns response default instance
*/
void registerResponseInstance(final String reqCls, final Message respIns);
}

0 comments on commit ec8fe5b

Please sign in to comment.