Skip to content

Commit

Permalink
rpc功能基本完成,补充netty部分,可以进行调用
Browse files Browse the repository at this point in the history
  • Loading branch information
lyzzzz committed Mar 19, 2022
1 parent cc290cf commit 38209bc
Show file tree
Hide file tree
Showing 26 changed files with 485 additions and 37 deletions.
11 changes: 11 additions & 0 deletions demo-consumer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,23 @@
<artifactId>spring-boot-starter</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

<dependency>
<groupId>com.lyz</groupId>
<artifactId>rpc-framework</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>

<dependency>
<groupId>com.lyz</groupId>
<artifactId>demo-facade</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.lyz.demo.consumer;

import com.lyz.demo.rpc.service.HelloService;
import com.lyz.rpc.consumer.RpcReference;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;

@RestController
public class HelloController {
@RpcReference
private HelloService helloService;

@GetMapping({ "/", "" })
public Mono<?> demo(@RequestParam String hello) {
return Mono.fromCallable(() -> helloService.demo(hello));
}
}
2 changes: 2 additions & 0 deletions demo-consumer/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,5 @@ rpc:
port: 8070
consumer:
enabled: true
server:
port: 8072
22 changes: 21 additions & 1 deletion demo-facade/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,27 @@
<name>rpc-demo-facade</name>

<properties>
<java.version>11</java.version>
</properties>

<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.22</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>11</source>
<target>11</target>
</configuration>
</plugin>
</plugins>
</build>

</project>

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.lyz.demo.rpc.service;

/**
* @author liyizhen
* @date 2022/3/17
*/
public interface HelloService {
String demo(String hello);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.lyz.demo.provider;

import com.lyz.demo.rpc.service.HelloService;
import com.lyz.rpc.provider.RpcService;

import java.util.Random;

/**
* @author liyizhen
* @date 2022/3/17
*/
@RpcService(interfaceClass = HelloService.class, version = "1.0.0")
public class HelloServiceImpl implements HelloService {
@Override
public String demo(String hello) {
return new Random().nextInt() + hello;
}
}

This file was deleted.

13 changes: 13 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,17 @@
<dependencies>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>11</source>
<target>11</target>
</configuration>
</plugin>
</plugins>
</build>

</project>
1 change: 0 additions & 1 deletion rpc-framework/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
<name>rpc-framework</name>

<properties>
<java.version>11</java.version>
</properties>

<dependencies>
Expand Down
43 changes: 38 additions & 5 deletions rpc-framework/src/main/java/com/lyz/rpc/consumer/Consumer.java
Original file line number Diff line number Diff line change
@@ -1,30 +1,63 @@
package com.lyz.rpc.consumer;

import com.lyz.rpc.core.InstanceInfo;
import com.lyz.rpc.protocol.Protocol;
import com.lyz.rpc.protocol.ProtocolDecoder;
import com.lyz.rpc.protocol.ProtocolEncoder;
import com.lyz.rpc.protocol.ProtocolResponseHandler;
import com.lyz.rpc.registry.RegistryService;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;

/**
* 消费器,利用 netty 对目标进行连接
* @author liyizhen
* @date 2022/3/18
*/
@Slf4j
public class Consumer {
public Consumer() {
Bootstrap bootstrap = new Bootstrap();
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
bootstrap.group(eventLoopGroup)
private final RegistryService registryService;
private final Bootstrap bootstrap;
private final NioEventLoopGroup nioEventLoopGroup;

public Consumer(RegistryService registryService) {
this.registryService = registryService;

bootstrap = new Bootstrap();
nioEventLoopGroup = new NioEventLoopGroup();
bootstrap.group(nioEventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ProtocolDecoder());
ch.pipeline().addLast(new ProtocolEncoder());
ch.pipeline().addLast(new ProtocolResponseHandler());
}
});
}

public void request() {
public void request(Protocol<Protocol.Request> protocol) throws InterruptedException {
// 发现服务
InstanceInfo instanceInfo = registryService.discovery();

// 发起调用
ChannelFuture channelFuture = bootstrap.connect(instanceInfo.getHost(), instanceInfo.getPort()).sync();
channelFuture.addListener(arg0 -> {
if (channelFuture.isSuccess()) {
log.info("成功调用目标 host:{},port:{}", instanceInfo.getHost(), instanceInfo.getPort());
} else {
log.info("调用目标失败 host:{},port:{}", instanceInfo.getHost(), instanceInfo.getPort());
channelFuture.cause().printStackTrace();
nioEventLoopGroup.shutdownGracefully();
}
});
channelFuture.channel().writeAndFlush(protocol);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.lyz.rpc.consumer;

import com.lyz.rpc.protocol.Protocol;
import io.netty.util.concurrent.Promise;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;

public class RequestHolder {
private static final AtomicLong ID_GENERATOR = new AtomicLong();
private static final ConcurrentHashMap<Long, Promise<Protocol.Response>> PROMISES = new ConcurrentHashMap<>();

public static long generateId() {
return ID_GENERATOR.incrementAndGet();
}

public static void putRequest(long requestId, Promise<Protocol.Response> promise) {
PROMISES.put(requestId, promise);
}

public static void successRequest(long requestId, Protocol.Response result) {
Promise<Protocol.Response> promise = PROMISES.get(requestId);
if (promise == null) {
throw new IllegalArgumentException("promise 为空 requestId:" + requestId);
}
promise.setSuccess(result);
PROMISES.remove(requestId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* @date 2022/3/18
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Target(ElementType.FIELD)
@Autowired
public @interface RpcReference {
String version() default "1.0.0";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanClassLoaderAware;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.support.AbstractBeanDefinition;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.util.ClassUtils;
import org.springframework.util.ReflectionUtils;

Expand All @@ -25,9 +26,10 @@
* @date 2022/3/18
*/
@Slf4j
public class RpcReferenceBeanFactoryPostProcessor implements BeanFactoryPostProcessor, BeanClassLoaderAware {
public class RpcReferenceBeanFactoryPostProcessor implements BeanFactoryPostProcessor, BeanClassLoaderAware, ApplicationContextAware {
private final RegistryService registryService;
private ClassLoader classLoader;
private ApplicationContext applicationContext;
private Map<String, BeanDefinition> rpcReferenceBeanDefinitions = new HashMap<>();

public RpcReferenceBeanFactoryPostProcessor(RegistryService registryService) {
Expand All @@ -48,6 +50,10 @@ public void postProcessBeanFactory(ConfigurableListableBeanFactory configurableL

BeanDefinitionRegistry beanDefinitionRegistry = (BeanDefinitionRegistry) configurableListableBeanFactory;
rpcReferenceBeanDefinitions.forEach((k, v) -> {
if (applicationContext.containsBean(k)) {
throw new RuntimeException("已存在 RpcReference Bean:" + k);
}

beanDefinitionRegistry.registerBeanDefinition(k, v);
log.info("成功注册 RpcReference 类 {}", k);
});
Expand All @@ -61,7 +67,6 @@ private void parseField(Field field) {
beanDefinitionBuilder.setInitMethodName("init");
beanDefinitionBuilder.addPropertyValue("interfaceClass", field.getType());
beanDefinitionBuilder.addPropertyValue("version", annotation.version());
beanDefinitionBuilder.addPropertyValue("registryService", registryService);

AbstractBeanDefinition beanDefinition = beanDefinitionBuilder.getBeanDefinition();
rpcReferenceBeanDefinitions.put(field.getName(), beanDefinition);
Expand All @@ -72,4 +77,9 @@ private void parseField(Field field) {
public void setBeanClassLoader(ClassLoader classLoader) {
this.classLoader = classLoader;
}

@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
package com.lyz.rpc.consumer;

import com.lyz.rpc.protocol.Protocol;
import com.lyz.rpc.registry.RegistryService;
import io.netty.channel.DefaultEventLoop;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.Promise;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.util.concurrent.TimeUnit;

/**
* rpc 引用代理
Expand All @@ -24,7 +29,29 @@ public RpcReferenceProxy(RegistryService registryService, Class<?> interfaceClas
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// TODO 构造协议 发送请求 连接 consumer
Protocol<Protocol.Request> protocol = new Protocol<>();
Protocol.Header header = new Protocol.Header();
Protocol.Request request = new Protocol.Request();
protocol.setHeader(header);
protocol.setBody(request);

return null;
header.setMagic((short) 0x10);
header.setVersion((byte) 0x1);
header.setType((byte) Protocol.Type.REQUEST.getType());
header.setStatus((byte)Protocol.Status.SUCCESS.getCode());
header.setRequestId(RequestHolder.generateId());

request.setVersion(version);
request.setClassName(method.getDeclaringClass().getName());
request.setMethodName(method.getName());
request.setParams(args);
request.setParamTypes(method.getParameterTypes());

Consumer consumer = new Consumer(registryService);
consumer.request(protocol);
Promise<Protocol.Response> promise = new DefaultPromise<>(new DefaultEventLoop());
RequestHolder.putRequest(header.getRequestId(), promise);

return promise.get(1000, TimeUnit.MILLISECONDS).getData();
}
}
Loading

0 comments on commit 38209bc

Please sign in to comment.