Skip to content

Commit

Permalink
add uds support for java sdk (layotto#51)
Browse files Browse the repository at this point in the history
Co-authored-by: 梓淞 <yuziyuan.yzy@antgroup.com>
  • Loading branch information
ChloroplastYu and 梓淞 authored Sep 14, 2023
1 parent f7ef90c commit 0d0be8e
Show file tree
Hide file tree
Showing 10 changed files with 335 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
public class Configuration {

private static final Logger logger = LoggerFactory.getLogger(Configuration.class.getName());
private String dataClass;
private String dataClass;
static String storeName = "config_demo";
static String appId = "testApplication_yang";
static String group = "application";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@

public class Secret {

static String storeName = "secret_demo";
static String key = "db-user-pass:password";
static String storeName = "secret_demo";
static String key = "db-user-pass:password";

public static void main(String args[]){
RuntimeClient client = new RuntimeClientBuilder()
Expand Down
75 changes: 75 additions & 0 deletions examples-uds/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<artifactId>runtime-sdk-parent</artifactId>
<groupId>io.mosn.layotto</groupId>
<version>1.3.0-SNAPSHOT</version>
</parent>

<groupId>io.mosn.layotto</groupId>
<artifactId>examples-uds</artifactId>
<version>1.3.0-SNAPSHOT</version>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>

<dependencies>
<dependency>
<groupId>io.mosn.layotto</groupId>
<artifactId>runtime-sdk</artifactId>
<version>1.3.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.9.1</version>
</dependency>
</dependencies>

<build>
<sourceDirectory>src/main/java</sourceDirectory>
<finalName>examples-uds</finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.2.0</version>
<configuration>
<archive>
<manifest>
<mainClass>io.mosn.layotto.examples.uds.Demo</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>assemble-all</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>
35 changes: 35 additions & 0 deletions examples-uds/src/main/java/io/mosn/layotto/examples/uds/Demo.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 2021 Layotto Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.mosn.layotto.examples.uds;

import io.grpc.netty.shaded.io.netty.channel.unix.DomainSocketAddress;
import io.mosn.layotto.v1.RuntimeClientBuilder;
import spec.sdk.runtime.v1.client.RuntimeClient;

public class Demo {

public static void main(String[] args) {
DomainSocketAddress udsAddress = new DomainSocketAddress("/tmp/client-proxy.sock");
RuntimeClient client = new RuntimeClientBuilder()
.withUdsSocket(udsAddress)
.build();

String resp = client.sayHello("helloworld");
if (!"greeting, helloworld".equals(resp)) {
throw new RuntimeException("Unexpected result:" + resp);
}
System.out.println(resp);
}
}
18 changes: 18 additions & 0 deletions examples-uds/src/main/resources/log4j2.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="WARN" monitorInterval="30">
<Properties>
<Property name="LOG_PATTERN">%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1} - %m%n</Property>
</Properties>

<Appenders>
<Console name="console" target="SYSTEM_OUT" follow="true">
<PatternLayout pattern="${LOG_PATTERN}"/>
</Console>
</Appenders>

<Loggers>
<Root level="info">
<AppenderRef ref="console"/>
</Root>
</Loggers>
</Configuration>
5 changes: 4 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,12 @@
<module>examples-lock</module>
<module>examples-secret</module>
<module>examples-configuration</module>
<module>examples-uds</module>
</modules>

<properties>
<protobuf.version>3.11.2</protobuf.version>
<grpc.version>1.34.1</grpc.version>
<grpc.version>1.53.0</grpc.version>
<slf4j.version>1.7.30</slf4j.version>
<fastjson.version>1.2.69</fastjson.version>
<junit.version>4.13.1</junit.version>
Expand All @@ -53,6 +54,7 @@
<artifactId>runtime-spec-pb</artifactId>
<version>${project.version}</version>
</dependency>

<!-- json -->
<dependency>
<groupId>com.alibaba</groupId>
Expand Down Expand Up @@ -88,6 +90,7 @@
<artifactId>grpc-stub</artifactId>
<version>${grpc.version}</version>
</dependency>

<!-- unit test -->
<dependency>
<groupId>org.mockito</groupId>
Expand Down
5 changes: 5 additions & 0 deletions sdk-springboot/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@
<artifactId>spring-boot-autoconfigure</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<scope>compile</scope>
</dependency>

</dependencies>

Expand Down
109 changes: 94 additions & 15 deletions sdk/src/main/java/io/mosn/layotto/v1/RuntimeClientBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
import com.google.errorprone.annotations.DoNotCall;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
import io.grpc.netty.shaded.io.netty.channel.epoll.EpollDomainSocketChannel;
import io.grpc.netty.shaded.io.netty.channel.epoll.EpollEventLoopGroup;
import io.grpc.netty.shaded.io.netty.channel.unix.DomainSocketAddress;
import io.mosn.layotto.v1.config.RuntimeProperties;
import io.mosn.layotto.v1.domain.ApiProtocol;
import io.mosn.layotto.v1.grpc.GrpcRuntimeClient;
Expand All @@ -37,22 +41,24 @@
*/
public class RuntimeClientBuilder {

private static final Logger DEFAULT_LOGGER = LoggerFactory.getLogger(RuntimeClient.class.getName());
private static final Logger DEFAULT_LOGGER = LoggerFactory.getLogger(RuntimeClient.class.getName());

private int timeoutMs = RuntimeProperties.DEFAULT_TIMEOUT_MS;
private int timeoutMs = RuntimeProperties.DEFAULT_TIMEOUT_MS;

private String ip = RuntimeProperties.DEFAULT_IP;
private String ip = RuntimeProperties.DEFAULT_IP;

private int port = RuntimeProperties.DEFAULT_PORT;
private int port = RuntimeProperties.DEFAULT_PORT;

private ApiProtocol protocol = RuntimeProperties.DEFAULT_API_PROTOCOL;
private ApiProtocol protocol = RuntimeProperties.DEFAULT_API_PROTOCOL;

private Logger logger = DEFAULT_LOGGER;
private Logger logger = DEFAULT_LOGGER;

private ObjectSerializer stateSerializer = new JSONSerializer();
private ObjectSerializer stateSerializer = new JSONSerializer();

private int poolSize;

private DomainSocketAddress domainSocketAddress = null;

// TODO add rpc serializer

/**
Expand Down Expand Up @@ -125,6 +131,66 @@ public RuntimeClientBuilder withStateSerializer(ObjectSerializer stateSerializer
return this;
}

/**
* Sets the unix domain socket channel for objects to be persisted.
*
* @param udsAddress unix domain socket address
* @return builder
*/
public RuntimeClientBuilder withUdsSocket(DomainSocketAddress udsAddress) {
if (udsAddress == null) {
throw new IllegalArgumentException("Invalid unix domain socket address");
}

this.domainSocketAddress = udsAddress;
return this;
}

private ManagedChannel buildTcpChannel(String ip, int port) {
ManagedChannel tcpChannel = ManagedChannelBuilder
.forAddress(ip, port)
.usePlaintext()
.build();

return tcpChannel;
}

private ManagedChannel[] buildTcpChannels(String ip, int port, int poolSize) {
ManagedChannel[] channels = new ManagedChannel[poolSize];
for (int i = 0; i < poolSize; i++) {
channels[i] = buildTcpChannel(ip, port);
}

return channels;
}

private ManagedChannel buildUdsChannel(DomainSocketAddress udsAddress) {
ManagedChannel udsChannel;
try {
udsChannel = NettyChannelBuilder
.forAddress(udsAddress)
.eventLoopGroup(new EpollEventLoopGroup())
.channelType(EpollDomainSocketChannel.class)
.usePlaintext()
.build();
} catch (UnsatisfiedLinkError error) {
throw new IllegalArgumentException("Unix domain socket only supports the Linux platform");
} catch (Throwable e) {
throw new IllegalArgumentException("Invalid unix domain socket address");
}

return udsChannel;
}

private ManagedChannel[] buildUdsChannels(DomainSocketAddress udsAddress, int poolSize) {
ManagedChannel[] channels = new ManagedChannel[poolSize];
for (int i = 0; i < poolSize; i++) {
channels[i] = buildUdsChannel(udsAddress);
}

return channels;
}

/**
* Build an instance of the Client based on the provided setup.
*
Expand Down Expand Up @@ -153,22 +219,34 @@ public GrpcRuntimeClient buildGrpc() {
StubManager<RuntimeGrpc.RuntimeStub, RuntimeGrpc.RuntimeBlockingStub> runtimeStubManager;
StubManager<ObjectStorageServiceGrpc.ObjectStorageServiceStub, ObjectStorageServiceGrpc.ObjectStorageServiceBlockingStub> ossStubManager;
if (poolSize > 1) {
runtimeStubManager = new PooledStubManager<>(ip, port, poolSize, new RuntimeStubCreatorImpl());
ManagedChannel[] channels;
if (this.domainSocketAddress != null) {
channels = buildUdsChannels(this.domainSocketAddress, this.poolSize);
} else {
channels = buildTcpChannels(this.ip, this.port, this.poolSize);
}

runtimeStubManager = new PooledStubManager<>(channels, new RuntimeStubCreatorImpl());
ossStubManager = new PooledStubManager<>(runtimeStubManager.getChannels(), new OssStubCreatorImpl());
} else {
ManagedChannel channel = ManagedChannelBuilder.forAddress(ip, port)
.usePlaintext()
.build();
runtimeStubManager = new SingleStubManager(channel, new RuntimeStubCreatorImpl());
ossStubManager = new SingleStubManager(channel, new OssStubCreatorImpl());
ManagedChannel channel;
if (this.domainSocketAddress != null) {
channel = buildUdsChannel(this.domainSocketAddress);
} else {
channel = buildTcpChannel(this.ip, this.port);
}

runtimeStubManager = new SingleStubManager<>(channel, new RuntimeStubCreatorImpl());
ossStubManager = new SingleStubManager<>(channel, new OssStubCreatorImpl());
}
// 3. construct client
return new RuntimeClientGrpc(
logger,
timeoutMs,
stateSerializer,
runtimeStubManager,
ossStubManager);
ossStubManager
);
}

public GrpcRuntimeClient buildGrpcWithExistingChannel(ManagedChannel channel) {
Expand All @@ -186,7 +264,8 @@ public GrpcRuntimeClient buildGrpcWithExistingChannel(ManagedChannel channel) {
logger,
timeoutMs,
stateSerializer,
stubManager, ossStubManager);
stubManager,
ossStubManager);
}

public static class RuntimeStubCreatorImpl implements
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,25 +82,6 @@ private ConstructResult<A, B> constructPools(ManagedChannel[] channels, StubCrea
return new ConstructResult<>(asyncStubs, blockingStubs, asyncPool, blockingPool);
}

public PooledStubManager(String host, int port, int size,
StubCreator<A, B> sc) {
channels = new ManagedChannel[size];
List<A> asyncStubs = new ArrayList<>();
List<B> blockingStubs = new ArrayList<>();
// construct channels and stubs
for (int i = 0; i < size; i++) {
channels[i] = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build();
asyncStubs.add(sc.createAsyncStub(channels[i]));
blockingStubs.add(sc.createBlockingStub(channels[i]));
}
// construct pools
asyncRuntimePool = new RRPool<>(new CopyOnWriteArrayList<>(asyncStubs));
runtimePool = new RRPool<>(new CopyOnWriteArrayList<>(blockingStubs));

// init connections
init(asyncStubs, blockingStubs);
}

protected void init(List<A> asyncStubs, List<B> blockingStubs) {
// TODO establish connection
}
Expand Down
Loading

0 comments on commit 0d0be8e

Please sign in to comment.