Skip to content

Commit

Permalink
[v1.0] 支持多个服务的注册
Browse files Browse the repository at this point in the history
  • Loading branch information
CN-GuoZiyang committed Jun 17, 2020
1 parent 0a6d7ec commit 8467b19
Show file tree
Hide file tree
Showing 11 changed files with 157 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.io.Serializable;

/**
* 测试用api的实体
* @author ziyang
*/
@Data
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package top.guoziyang.rpc.api;

/**
* 测试用api的接口
* @author ziyang
*/
public interface HelloService {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@
import lombok.Getter;

/**
* RPC调用过程中的错误
* @author ziyang
*/
@AllArgsConstructor
@Getter
public enum RpcError {

SERVICE_INVOCATION_FAILURE("服务调用出现失败"),
SERVICE_CAN_NOT_BE_NULL("注册的服务不得为空");
SERVICE_NOT_FOUND("找不到对应的服务"),
SERVICE_NOT_IMPLEMENT_ANY_INTERFACE("注册的服务未实现接口");

private final String message;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import top.guoziyang.rpc.enumeration.RpcError;

/**
* RPC调用异常
* @author ziyang
*/
public class RpcException extends RuntimeException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import top.guoziyang.rpc.entity.RpcRequest;
import top.guoziyang.rpc.entity.RpcResponse;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package top.guoziyang.rpc.registry;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import top.guoziyang.rpc.enumeration.RpcError;
import top.guoziyang.rpc.exception.RpcException;

import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

/**
* 默认的服务注册表
* @author ziyang
*/
public class DefaultServiceRegistry implements ServiceRegistry {

private static final Logger logger = LoggerFactory.getLogger(DefaultServiceRegistry.class);

private final Map<String, Object> serviceMap = new ConcurrentHashMap<>();
private final Set<String> registeredService = ConcurrentHashMap.newKeySet();

@Override
public synchronized <T> void register(T service) {
String serviceName = service.getClass().getCanonicalName();
if(registeredService.contains(serviceName)) return;
registeredService.add(serviceName);
Class<?>[] interfaces = service.getClass().getInterfaces();
if(interfaces.length == 0) {
throw new RpcException(RpcError.SERVICE_NOT_IMPLEMENT_ANY_INTERFACE);
}
for(Class<?> i : interfaces) {
serviceMap.put(i.getCanonicalName(), service);
}
logger.info("向接口: {} 注册服务: {}", interfaces, serviceName);
}

@Override
public synchronized Object getService(String serviceName) {
Object service = serviceMap.get(serviceName);
if(service == null) {
throw new RpcException(RpcError.SERVICE_NOT_FOUND);
}
return service;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package top.guoziyang.rpc.registry;

/**
* 服务注册表通用接口
* @author ziyang
*/
public interface ServiceRegistry {

/**
* 将一个服务注册进注册表
* @param service 待注册的服务实体
* @param <T> 服务实体类
*/
<T> void register(T service);

/**
* 根据服务名称获取服务实体
* @param serviceName 服务名称
* @return 服务实体
*/
Object getService(String serviceName);

}
39 changes: 10 additions & 29 deletions rpc-core/src/main/java/top/guoziyang/rpc/server/RequestHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,47 +6,28 @@
import top.guoziyang.rpc.entity.RpcResponse;
import top.guoziyang.rpc.enumeration.ResponseCode;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.Socket;

/**
* 实际进行过程调用的工作线程
* 进行过程调用的处理器
* @author ziyang
*/
public class RequestHandler implements Runnable {
public class RequestHandler {

private static final Logger logger = LoggerFactory.getLogger(RequestHandler.class);

private Socket socket;
private Object service;

public RequestHandler(Socket socket, Object service) {
this.socket = socket;
this.service = service;
}

@Override
public void run() {
try (ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream())) {
RpcRequest rpcRequest = (RpcRequest) objectInputStream.readObject();
Object returnObject = invokeMethod(rpcRequest);
objectOutputStream.writeObject(RpcResponse.success(returnObject));
objectOutputStream.flush();
} catch (IOException | ClassNotFoundException | IllegalAccessException | InvocationTargetException e) {
public Object handle(RpcRequest rpcRequest, Object service) {
Object result = null;
try {
result = invokeTargetMethod(rpcRequest, service);
logger.info("服务:{} 成功调用方法:{}", rpcRequest.getInterfaceName(), rpcRequest.getMethodName());
} catch (IllegalAccessException | InvocationTargetException e) {
logger.error("调用或发送时有错误发生:", e);
}
} return result;
}

private Object invokeMethod(RpcRequest rpcRequest) throws IllegalAccessException, InvocationTargetException, ClassNotFoundException {
Class<?> clazz = Class.forName(rpcRequest.getInterfaceName());
if(!clazz.isAssignableFrom(service.getClass())) {
return RpcResponse.fail(ResponseCode.CLASS_NOT_FOUND);
}
private Object invokeTargetMethod(RpcRequest rpcRequest, Object service) throws IllegalAccessException, InvocationTargetException {
Method method;
try {
method = service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParamTypes());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package top.guoziyang.rpc.server;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import top.guoziyang.rpc.entity.RpcRequest;
import top.guoziyang.rpc.entity.RpcResponse;
import top.guoziyang.rpc.registry.ServiceRegistry;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.Socket;

/**
* 处理RpcRequest的工作线程
* @author ziyang
*/
public class RequestHandlerThread implements Runnable {

private static final Logger logger = LoggerFactory.getLogger(RequestHandlerThread.class);

private Socket socket;
private RequestHandler requestHandler;
private ServiceRegistry serviceRegistry;

public RequestHandlerThread(Socket socket, RequestHandler requestHandler, ServiceRegistry serviceRegistry) {
this.socket = socket;
this.requestHandler = requestHandler;
this.serviceRegistry = serviceRegistry;
}

@Override
public void run() {
try (ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream())) {
RpcRequest rpcRequest = (RpcRequest) objectInputStream.readObject();
String interfaceName = rpcRequest.getInterfaceName();
Object service = serviceRegistry.getService(interfaceName);
Object result = requestHandler.handle(rpcRequest, service);
objectOutputStream.writeObject(RpcResponse.success(result));
objectOutputStream.flush();
} catch (IOException | ClassNotFoundException e) {
logger.error("调用或发送时有错误发生:", e);
}
}

}
31 changes: 19 additions & 12 deletions rpc-core/src/main/java/top/guoziyang/rpc/server/RpcServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import top.guoziyang.rpc.registry.ServiceRegistry;

import java.io.IOException;
import java.net.ServerSocket;
Expand All @@ -14,28 +15,34 @@
*/
public class RpcServer {

private final ExecutorService threadPool;
private static final Logger logger = LoggerFactory.getLogger(RpcServer.class);

public RpcServer() {
int corePoolSize = 5;
int maximumPoolSize = 50;
long keepAliveTime = 60;
BlockingQueue<Runnable> workingQueue = new ArrayBlockingQueue<>(100);
private static final int CORE_POOL_SIZE = 5;
private static final int MAXIMUM_POOL_SIZE = 50;
private static final int KEEP_ALIVE_TIME = 60;
private static final int BLOCKING_QUEUE_CAPACITY = 100;
private final ExecutorService threadPool;
private RequestHandler requestHandler = new RequestHandler();
private final ServiceRegistry serviceRegistry;

public RpcServer(ServiceRegistry serviceRegistry) {
this.serviceRegistry = serviceRegistry;
BlockingQueue<Runnable> workingQueue = new ArrayBlockingQueue<>(BLOCKING_QUEUE_CAPACITY);
ThreadFactory threadFactory = Executors.defaultThreadFactory();
threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS, workingQueue, threadFactory);
threadPool = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS, workingQueue, threadFactory);
}

public void register(Object service, int port) {
public void start(int port) {
try (ServerSocket serverSocket = new ServerSocket(port)) {
logger.info("服务器正在启动...");
logger.info("服务器启动……");
Socket socket;
while((socket = serverSocket.accept()) != null) {
logger.info("客户端连接!Ip为:" + socket.getInetAddress() + ":" + socket.getPort());
threadPool.execute(new RequestHandler(socket, service));
logger.info("消费者连接: {}:{}", socket.getInetAddress(), socket.getPort());
threadPool.execute(new RequestHandlerThread(socket, requestHandler, serviceRegistry));
}
threadPool.shutdown();
} catch (IOException e) {
logger.error("连接时有错误发生:", e);
logger.error("服务器启动时有错误发生:", e);
}
}

Expand Down
8 changes: 6 additions & 2 deletions test-server/src/main/java/top/guoziyang/test/TestServer.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package top.guoziyang.test;

import top.guoziyang.rpc.api.HelloService;
import top.guoziyang.rpc.registry.DefaultServiceRegistry;
import top.guoziyang.rpc.registry.ServiceRegistry;
import top.guoziyang.rpc.server.RpcServer;

/**
Expand All @@ -11,8 +13,10 @@ public class TestServer {

public static void main(String[] args) {
HelloService helloService = new HelloServiceImpl();
RpcServer rpcServer = new RpcServer();
rpcServer.register(helloService, 9000);
ServiceRegistry serviceRegistry = new DefaultServiceRegistry();
serviceRegistry.register(helloService);
RpcServer rpcServer = new RpcServer(serviceRegistry);
rpcServer.start(9000);
}

}

0 comments on commit 8467b19

Please sign in to comment.