Skip to content

Commit

Permalink
Implements full spec of gRPC endpoint.
Browse files Browse the repository at this point in the history
Signed-off-by: Artur Souza <asouza.pro@gmail.com>
  • Loading branch information
artursouza committed May 9, 2024
1 parent ec29de0 commit c0af545
Show file tree
Hide file tree
Showing 4 changed files with 206 additions and 26 deletions.
2 changes: 1 addition & 1 deletion sdk-tests/src/test/java/io/dapr/it/DaprPorts.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public void use() {
if (this.grpcPort != null) {
System.getProperties().setProperty(Properties.GRPC_PORT.getName(), String.valueOf(this.grpcPort));
System.getProperties().setProperty(
Properties.GRPC_ENDPOINT.getName(), "http://127.0.0.1:" + this.grpcPort);
Properties.GRPC_ENDPOINT.getName(), "127.0.0.1:" + this.grpcPort);
}
}

Expand Down
143 changes: 123 additions & 20 deletions sdk/src/main/java/io/dapr/utils/NetworkUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.URI;
import java.net.UnknownHostException;
import java.util.regex.Pattern;

/**
* Utility methods for network, internal to Dapr SDK.
Expand All @@ -32,13 +31,56 @@ public final class NetworkUtils {

private static final long RETRY_WAIT_MILLISECONDS = 1000;

// Thanks to https://ihateregex.io/expr/ipv6/
private static final String IPV6_REGEX = "(([0-9a-fA-F]{1,4}:){7,7}[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,7}:|"
+ "([0-9a-fA-F]{1,4}:){1,6}:[0-9a-fA-F]{1,4}|"
+ "([0-9a-fA-F]{1,4}:){1,5}(:[0-9a-fA-F]{1,4}){1,2}|"
+ "([0-9a-fA-F]{1,4}:){1,4}(:[0-9a-fA-F]{1,4}){1,3}|([0-9a-fA-F]{1,4}:){1,3}(:[0-9a-fA-F]{1,4}){1,4}|"
+ "([0-9a-fA-F]{1,4}:){1,2}(:[0-9a-fA-F]{1,4}){1,5}|[0-9a-fA-F]{1,4}:((:[0-9a-fA-F]{1,4}){1,6})|"
+ ":((:[0-9a-fA-F]{1,4}){1,7}|:)|fe80:(:[0-9a-fA-F]{0,4}){0,4}%[0-9a-zA-Z]{1,}|"
+ "::(ffff(:0{1,4}){0,1}:){0,1}((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\\.){3,3}(25[0-5]|(2[0-4]|"
+ "1{0,1}[0-9]){0,1}[0-9])|([0-9a-fA-F]{1,4}:){1,4}:((25[0-5]|"
+ "(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\\.){3,3}(25[0-5]|"
+ "(2[0-4]|1{0,1}[0-9]){0,1}[0-9]))";

private static final Pattern IPV6_PATTERN = Pattern.compile(IPV6_REGEX, Pattern.CASE_INSENSITIVE);

// Don't accept "?" to avoid ambiguity with ?tls=true
private static final String GRPC_ENDPOINT_FILENAME_REGEX_PART = "[^\0\\?]+";

private static final String GRPC_ENDPOINT_HOSTNAME_REGEX_PART = "(([A-Za-z0-9_\\-\\.]+)|(\\[" + IPV6_REGEX + "\\]))";

private static final String GRPC_ENDPOINT_DNS_AUTHORITY_REGEX_PART =
"(?<dnsWithAuthority>dns://)(?<authorityEndpoint>" + GRPC_ENDPOINT_HOSTNAME_REGEX_PART + ":[0-9]+)/";

private static final String GRPC_ENDPOINT_PARAM_REGEX_PART = "(\\?(?<param>tls\\=((true)|(false))))?";

private static final String GRPC_ENDPOINT_SOCKET_REGEX_PART =
"(?<socket>((unix:)|(unix://)|(unix-abstract:))" + GRPC_ENDPOINT_FILENAME_REGEX_PART + ")";

private static final String GRPC_ENDPOINT_VSOCKET_REGEX_PART =
"(?<vsocket>vsock:" + GRPC_ENDPOINT_HOSTNAME_REGEX_PART + ":[0-9]+)";
private static final String GRPC_ENDPOINT_HOST_REGEX_PART =
"((?<http>http://)|(?<https>https://)|(?<dns>dns:)|(" + GRPC_ENDPOINT_DNS_AUTHORITY_REGEX_PART + "))?"
+ "(?<hostname>" + GRPC_ENDPOINT_HOSTNAME_REGEX_PART + ")?+"
+ "(:(?<port>[0-9]+))?";

private static final String GRPC_ENDPOINT_REGEX = "^("
+ "(" + GRPC_ENDPOINT_HOST_REGEX_PART + ")|"
+ "(" + GRPC_ENDPOINT_SOCKET_REGEX_PART + ")|"
+ "(" + GRPC_ENDPOINT_VSOCKET_REGEX_PART + ")"
+ ")" + GRPC_ENDPOINT_PARAM_REGEX_PART + "$";

private static final Pattern GRPC_ENDPOINT_PATTERN = Pattern.compile(GRPC_ENDPOINT_REGEX, Pattern.CASE_INSENSITIVE);

private NetworkUtils() {
}

/**
* Tries to connect to a socket, retrying every 1 second.
* @param host Host to connect to.
* @param port Port to connect to.
*
* @param host Host to connect to.
* @param port Port to connect to.
* @param timeoutInMilliseconds Timeout in milliseconds to give up trying.
* @throws InterruptedException If retry is interrupted.
*/
Expand All @@ -60,26 +102,15 @@ public static void waitForSocket(String host, int port, int timeoutInMillisecond

/**
* Creates a GRPC managed channel.
*
* @param interceptors Optional interceptors to add to the channel.
* @return GRPC managed channel to communicate with the sidecar.
*/
public static ManagedChannel buildGrpcManagedChannel(ClientInterceptor... interceptors) {
String address = Properties.SIDECAR_IP.get();
int port = Properties.GRPC_PORT.get();
boolean insecure = true;
String grpcEndpoint = Properties.GRPC_ENDPOINT.get();
if ((grpcEndpoint != null) && !grpcEndpoint.isEmpty()) {
URI uri = URI.create(grpcEndpoint);
insecure = uri.getScheme().equalsIgnoreCase("http");
port = uri.getPort() > 0 ? uri.getPort() : (insecure ? 80 : 443);
address = uri.getHost();
if ((uri.getPath() != null) && !uri.getPath().isEmpty()) {
address += uri.getPath();
}
}
ManagedChannelBuilder<?> builder = ManagedChannelBuilder.forAddress(address, port)
var settings = GrpcEndpointSettings.parse();
ManagedChannelBuilder<?> builder = ManagedChannelBuilder.forTarget(settings.endpoint)
.userAgent(Version.getSdkVersion());
if (insecure) {
if (!settings.secure) {
builder = builder.usePlaintext();
}
if (interceptors != null && interceptors.length > 0) {
Expand All @@ -88,6 +119,73 @@ public static ManagedChannel buildGrpcManagedChannel(ClientInterceptor... interc
return builder.build();
}

// Not private to allow unit testing
static final class GrpcEndpointSettings {
final String endpoint;
final boolean secure;

private GrpcEndpointSettings(String endpoint, boolean secure) {
this.endpoint = endpoint;
this.secure = secure;
}

static GrpcEndpointSettings parse() {
String address = Properties.SIDECAR_IP.get();
int port = Properties.GRPC_PORT.get();
boolean secure = false;
String grpcEndpoint = Properties.GRPC_ENDPOINT.get();
if ((grpcEndpoint != null) && !grpcEndpoint.isEmpty()) {
var matcher = GRPC_ENDPOINT_PATTERN.matcher(grpcEndpoint);
if (!matcher.matches()) {
throw new IllegalArgumentException("Illegal gRPC endpoint: " + grpcEndpoint);
}
var parsedHost = matcher.group("hostname");
if (parsedHost != null) {
address = parsedHost;
}

var https = matcher.group("https") != null;
var http = matcher.group("http") != null;
secure = https;

String parsedPort = matcher.group("port");
if (parsedPort != null) {
port = Integer.valueOf(parsedPort);
} else {
// This implements default port as 80 for http for backwards compatibility.
port = http ? 80 : 443;
}

String parsedParam = matcher.group("param");
if ((http || https) && (parsedParam != null)) {
throw new IllegalArgumentException("Query params is not supported in HTTP URI for gRPC endpoint.");
}

if (parsedParam != null) {
secure = parsedParam.equalsIgnoreCase("tls=true");
}

var authorityEndpoint = matcher.group("authorityEndpoint");
if (authorityEndpoint != null) {
return new GrpcEndpointSettings(String.format("dns://%s/%s:%d", authorityEndpoint, address, port), secure);
}

var socket = matcher.group("socket");
if (socket != null) {
return new GrpcEndpointSettings(socket, secure);
}

var vsocket = matcher.group("vsocket");
if (vsocket != null) {
return new GrpcEndpointSettings(vsocket, secure);
}
}

return new GrpcEndpointSettings(String.format("dns:///%s:%d", address, port), secure);
}

}

private static void callWithRetry(Runnable function, long retryTimeoutMilliseconds) throws InterruptedException {
long started = System.currentTimeMillis();
while (true) {
Expand All @@ -104,7 +202,7 @@ private static void callWithRetry(Runnable function, long retryTimeoutMillisecon
long elapsed = System.currentTimeMillis() - started;
if (elapsed >= retryTimeoutMilliseconds) {
if (exception instanceof RuntimeException) {
throw (RuntimeException)exception;
throw (RuntimeException) exception;
}

throw new RuntimeException(exception);
Expand All @@ -117,9 +215,14 @@ private static void callWithRetry(Runnable function, long retryTimeoutMillisecon

/**
* Retrieve loopback address for the host.
*
* @return The loopback address String
*/
public static String getHostLoopbackAddress() {
return InetAddress.getLoopbackAddress().getHostAddress();
}

static boolean isIPv6(String ip) {
return IPV6_PATTERN.matcher(ip).matches();
}
}
84 changes: 81 additions & 3 deletions sdk/src/test/java/io/dapr/utils/NetworkUtilsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@

import io.dapr.config.Properties;
import io.grpc.ManagedChannel;
import io.grpc.testing.GrpcCleanupRule;
import org.junit.Rule;
import org.junit.Assert;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import static org.mockito.Mockito.mock;

public class NetworkUtilsTest {
private final int defaultGrpcPort = 4000;
Expand Down Expand Up @@ -74,4 +72,84 @@ public void testBuildGrpcManagedChannel_httpsEndpointWithPort() {
String expectedAuthority = "example.com:3000";
Assertions.assertEquals(expectedAuthority, channel.authority());
}

@Test
public void testGrpcEndpointParsing() {
testGrpcEndpointParsingScenario(":5000", "dns:///127.0.0.1:5000", false);
testGrpcEndpointParsingScenario(":5000?tls=true", "dns:///127.0.0.1:5000", true);
testGrpcEndpointParsingScenario(":5000?tls=false", "dns:///127.0.0.1:5000", false);
testGrpcEndpointParsingScenario("myhost:5000", "dns:///myhost:5000", false);
testGrpcEndpointParsingScenario("myhost:5000?tls=true", "dns:///myhost:5000", true);
testGrpcEndpointParsingScenario("myhost:5000?tls=false", "dns:///myhost:5000", false);
testGrpcEndpointParsingScenario("myhost", "dns:///myhost:443", false);
testGrpcEndpointParsingScenario("myhost?tls=true", "dns:///myhost:443", true);
testGrpcEndpointParsingScenario("myhost?tls=false", "dns:///myhost:443", false);
testGrpcEndpointParsingScenario("dns:myhost", "dns:///myhost:443", false);
testGrpcEndpointParsingScenario("dns:myhost?tls=true", "dns:///myhost:443", true);
testGrpcEndpointParsingScenario("dns:myhost?tls=false", "dns:///myhost:443", false);
testGrpcEndpointParsingScenario("http://myhost", "dns:///myhost:80", false);
testGrpcEndpointParsingScenario("http://myhost:443", "dns:///myhost:443", false);
testGrpcEndpointParsingScenario("http://myhost:5000", "dns:///myhost:5000", false);
testGrpcEndpointParsingScenario("http://myhost:8080", "dns:///myhost:8080", false);
testGrpcEndpointParsingScenario("https://myhost", "dns:///myhost:443", true);
testGrpcEndpointParsingScenario("https://myhost:443", "dns:///myhost:443", true);
testGrpcEndpointParsingScenario("https://myhost:5000", "dns:///myhost:5000", true);
testGrpcEndpointParsingScenario("dns://myauthority:53/myhost", "dns://myauthority:53/myhost:443", false);
testGrpcEndpointParsingScenario("dns://myauthority:53/myhost?tls=false", "dns://myauthority:53/myhost:443", false);
testGrpcEndpointParsingScenario("dns://myauthority:53/myhost?tls=true", "dns://myauthority:53/myhost:443", true);
testGrpcEndpointParsingScenario("unix:my.sock", "unix:my.sock", false);
testGrpcEndpointParsingScenario("unix:my.sock?tls=true", "unix:my.sock", true);
testGrpcEndpointParsingScenario("unix://my.sock", "unix://my.sock", false);
testGrpcEndpointParsingScenario("unix://my.sock?tls=true", "unix://my.sock", true);
testGrpcEndpointParsingScenario("unix-abstract:my.sock", "unix-abstract:my.sock", false);
testGrpcEndpointParsingScenario("unix-abstract:my.sock?tls=true", "unix-abstract:my.sock", true);
testGrpcEndpointParsingScenario("vsock:mycid:5000", "vsock:mycid:5000", false);
testGrpcEndpointParsingScenario("vsock:mycid:5000?tls=true", "vsock:mycid:5000", true);
testGrpcEndpointParsingScenario("[2001:db8:1f70::999:de8:7648:6e8]", "dns:///[2001:db8:1f70::999:de8:7648:6e8]:443", false);
testGrpcEndpointParsingScenario("dns:[2001:db8:1f70::999:de8:7648:6e8]:5000", "dns:///[2001:db8:1f70::999:de8:7648:6e8]:5000", false);
testGrpcEndpointParsingScenario("dns://myauthority:53/[2001:db8:1f70::999:de8:7648:6e8]", "dns://myauthority:53/[2001:db8:1f70::999:de8:7648:6e8]:443", false);
testGrpcEndpointParsingScenario("https://[2001:db8:1f70::999:de8:7648:6e8]", "dns:///[2001:db8:1f70::999:de8:7648:6e8]:443", true);
testGrpcEndpointParsingScenario("https://[2001:db8:1f70::999:de8:7648:6e8]:5000", "dns:///[2001:db8:1f70::999:de8:7648:6e8]:5000", true);
}

@Test
public void testGrpcEndpointParsingError() {
testGrpcEndpointParsingErrorScenario("http://myhost?tls=true");
testGrpcEndpointParsingErrorScenario("http://myhost?tls=false");
testGrpcEndpointParsingErrorScenario("http://myhost:8080?tls=true");
testGrpcEndpointParsingErrorScenario("http://myhost:443?tls=false");
testGrpcEndpointParsingErrorScenario("https://myhost?tls=true");
testGrpcEndpointParsingErrorScenario("https://myhost?tls=false");
testGrpcEndpointParsingErrorScenario("https://myhost:8080?tls=true");
testGrpcEndpointParsingErrorScenario("https://myhost:443?tls=false");
testGrpcEndpointParsingErrorScenario("dns://myhost");
testGrpcEndpointParsingErrorScenario("dns:[2001:db8:1f70::999:de8:7648:6e8]:5000?abc=[]");
testGrpcEndpointParsingErrorScenario("dns:[2001:db8:1f70::999:de8:7648:6e8]:5000?abc=123");
testGrpcEndpointParsingErrorScenario("host:5000/v1/dapr");
testGrpcEndpointParsingErrorScenario("host:5000/?a=1");
testGrpcEndpointParsingErrorScenario("inv-scheme://myhost");
testGrpcEndpointParsingErrorScenario("inv-scheme:myhost:5000");
}

private static void testGrpcEndpointParsingScenario(
String grpcEndpointEnvValue,
String expectedEndpoint,
boolean expectSecure
) {
System.setProperty(Properties.GRPC_ENDPOINT.getName(), grpcEndpointEnvValue);
var settings = NetworkUtils.GrpcEndpointSettings.parse();

Assertions.assertEquals(expectedEndpoint, settings.endpoint);
Assertions.assertEquals(expectSecure, settings.secure);
}

private static void testGrpcEndpointParsingErrorScenario(String grpcEndpointEnvValue) {
try {
System.setProperty(Properties.GRPC_ENDPOINT.getName(), grpcEndpointEnvValue);
NetworkUtils.GrpcEndpointSettings.parse();
Assert.fail();
} catch (IllegalArgumentException e) {
// Expected
}
}
}
3 changes: 1 addition & 2 deletions sdk/src/test/java/io/dapr/utils/TestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import io.dapr.exceptions.DaprException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.function.Executable;
import org.apache.commons.validator.routines.InetAddressValidator;

import java.io.IOException;
import java.net.ServerSocket;
Expand Down Expand Up @@ -83,7 +82,7 @@ public static int findFreePort() throws IOException {

public static String formatIpAddress(final String ipAddress) {
String formattedIpAddress = ipAddress;
if(InetAddressValidator.getInstance().isValidInet6Address(ipAddress)) {
if(NetworkUtils.isIPv6(ipAddress)) {
formattedIpAddress = "[" + ipAddress + "]"; // per URL spec https://url.spec.whatwg.org/#host-writing
}
return formattedIpAddress;
Expand Down

0 comments on commit c0af545

Please sign in to comment.