Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions sdk-actors/src/test/java/io/dapr/client/DaprHttpProxy.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

public class DaprHttpProxy extends io.dapr.client.DaprHttp {

public DaprHttpProxy(String hostname, int port, OkHttpClient httpClient) {
super(hostname, port, httpClient);
public DaprHttpProxy(String hostname, int port, String daprApiToken, OkHttpClient httpClient) {
super(hostname, port, daprApiToken, httpClient);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ public void testInvokeTimeout() throws Exception {
String message = assertThrows(StatusRuntimeException.class, () -> stub.sleep(req)).getMessage();
long delay = System.currentTimeMillis() - started;
assertTrue(delay >= TIMEOUT_MS, "Delay: " + delay + " is not greater than timeout: " + TIMEOUT_MS);
assertTrue(message.contains("DEADLINE_EXCEEDED"));
assertTrue(message.contains("CallOptions deadline exceeded after"));
assertTrue(message.contains("DEADLINE_EXCEEDED"), "The message contains DEADLINE_EXCEEDED: " + message);
assertTrue(message.contains("CallOptions deadline exceeded after"), "The message contains DEADLINE_EXCEEDED: " + message);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public class DaprWorkflowClient implements AutoCloseable {
* Public constructor for DaprWorkflowClient. This layer constructs the GRPC Channel.
*/
public DaprWorkflowClient() {
this(NetworkUtils.buildGrpcManagedChannel(WORKFLOW_INTERCEPTOR));
this(NetworkUtils.buildGrpcManagedChannel(new Properties(), WORKFLOW_INTERCEPTOR));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.dapr.workflows.runtime;

import com.microsoft.durabletask.DurableTaskGrpcWorkerBuilder;
import io.dapr.config.Properties;
import io.dapr.utils.NetworkUtils;
import io.dapr.workflows.Workflow;
import io.dapr.workflows.internal.ApiTokenClientInterceptor;
Expand All @@ -38,15 +39,20 @@ public class WorkflowRuntimeBuilder {
* Constructs the WorkflowRuntimeBuilder.
*/
public WorkflowRuntimeBuilder() {
this(LoggerFactory.getLogger(WorkflowRuntimeBuilder.class));
this(new Properties(), LoggerFactory.getLogger(WorkflowRuntimeBuilder.class));
}

WorkflowRuntimeBuilder(Logger logger) {
public WorkflowRuntimeBuilder(Logger logger) {
this(new Properties(), logger);
}

WorkflowRuntimeBuilder(Properties properties, Logger logger) {
this.builder = new DurableTaskGrpcWorkerBuilder().grpcChannel(
NetworkUtils.buildGrpcManagedChannel(WORKFLOW_INTERCEPTOR));
NetworkUtils.buildGrpcManagedChannel(properties, WORKFLOW_INTERCEPTOR));
this.logger = logger;
}


/**
* Returns a WorkflowRuntime object.
*
Expand Down
23 changes: 21 additions & 2 deletions sdk/src/main/java/io/dapr/client/DaprClientBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,25 @@
package io.dapr.client;

import io.dapr.client.resiliency.ResiliencyOptions;
import io.dapr.config.Properties;
import io.dapr.config.Property;
import io.dapr.serializer.DaprObjectSerializer;
import io.dapr.serializer.DefaultObjectSerializer;
import io.dapr.utils.NetworkUtils;
import io.dapr.v1.DaprGrpc;
import io.grpc.ManagedChannel;

import java.util.HashMap;
import java.util.Map;

/**
* A builder for the DaprClient,
* Currently only gRPC and HTTP Client will be supported.
*/
public class DaprClientBuilder {

private final Map<String, String> propertyOverrides = new HashMap<>();

/**
* Builder for Dapr's HTTP Client.
*/
Expand Down Expand Up @@ -105,6 +112,17 @@ public DaprClientBuilder withResiliencyOptions(ResiliencyOptions options) {
return this;
}

/**
* Allow to set up properties override for static properties.
* @param property that we want to override
* @param value the value of such property
* @return an instance of the setup Client
*/
public DaprClientBuilder withPropertyOverride(Property<?> property, String value) {
this.propertyOverrides.put(property.getName(), value);
return this;
}

/**
* Build an instance of the Client based on the provided setup.
*
Expand Down Expand Up @@ -132,8 +150,9 @@ public DaprPreviewClient buildPreviewClient() {
* @throws java.lang.IllegalStateException if either host is missing or if port is missing or a negative number.
*/
private DaprClientImpl buildDaprClient() {
final ManagedChannel channel = NetworkUtils.buildGrpcManagedChannel();
final DaprHttp daprHttp = this.daprHttpBuilder.build();
final Properties properties = new Properties(this.propertyOverrides);
final ManagedChannel channel = NetworkUtils.buildGrpcManagedChannel(properties);
final DaprHttp daprHttp = this.daprHttpBuilder.build(properties);
final GrpcChannelFacade channelFacade = new GrpcChannelFacade(channel);
DaprGrpc.DaprStub asyncStub = DaprGrpc.newStub(channel);
return new DaprClientImpl(
Expand Down
12 changes: 9 additions & 3 deletions sdk/src/main/java/io/dapr/client/DaprHttp.java
Original file line number Diff line number Diff line change
Expand Up @@ -151,16 +151,22 @@ public int getStatusCode() {
*/
private final OkHttpClient httpClient;

/**
* Dapr API Token required to interact with DAPR APIs.
*/
private final String daprApiToken;

/**
* Creates a new instance of {@link DaprHttp}.
*
* @param hostname Hostname for calling Dapr. (e.g. "127.0.0.1")
* @param port Port for calling Dapr. (e.g. 3500)
* @param httpClient RestClient used for all API calls in this new instance.
*/
DaprHttp(String hostname, int port, OkHttpClient httpClient) {
DaprHttp(String hostname, int port, String daprApiToken, OkHttpClient httpClient) {
this.uri = URI.create(DEFAULT_HTTP_SCHEME + "://" + hostname + ":" + port);
this.httpClient = httpClient;
this.daprApiToken = daprApiToken;
}

/**
Expand All @@ -169,9 +175,10 @@ public int getStatusCode() {
* @param uri Endpoint for calling Dapr. (e.g. "https://my-dapr-api.company.com")
* @param httpClient RestClient used for all API calls in this new instance.
*/
DaprHttp(String uri, OkHttpClient httpClient) {
DaprHttp(String uri, String daprApiToken, OkHttpClient httpClient) {
this.uri = URI.create(uri);
this.httpClient = httpClient;
this.daprApiToken = daprApiToken;
}

/**
Expand Down Expand Up @@ -314,7 +321,6 @@ private CompletableFuture<Response> doInvokeApi(String method,
requestBuilder.method(method, body);
}

String daprApiToken = Properties.API_TOKEN.get();
if (daprApiToken != null) {
requestBuilder.addHeader(Headers.DAPR_API_TOKEN, daprApiToken);
}
Expand Down
36 changes: 24 additions & 12 deletions sdk/src/main/java/io/dapr/client/DaprHttpBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@
import java.time.Duration;
import java.util.concurrent.TimeUnit;

import static io.dapr.config.Properties.API_TOKEN;
import static io.dapr.config.Properties.HTTP_CLIENT_MAX_IDLE_CONNECTIONS;
import static io.dapr.config.Properties.HTTP_CLIENT_MAX_REQUESTS;
import static io.dapr.config.Properties.HTTP_CLIENT_READ_TIMEOUT_SECONDS;
import static io.dapr.config.Properties.HTTP_ENDPOINT;
import static io.dapr.config.Properties.HTTP_PORT;
import static io.dapr.config.Properties.SIDECAR_IP;

/**
* A builder for the DaprHttp.
*/
Expand All @@ -43,38 +51,39 @@ public class DaprHttpBuilder {
*/
private static final int KEEP_ALIVE_DURATION = 30;


/**
* Build an instance of the Http client based on the provided setup.
*
* @param properties to configure the DaprHttp client
* @return an instance of {@link DaprHttp}
* @throws IllegalStateException if any required field is missing
*/
public DaprHttp build() {
return buildDaprHttp();
public DaprHttp build(Properties properties) {
return buildDaprHttp(properties);
}

/**
* Creates an instance of the HTTP Client.
*
* @param properties to configure the DaprHttp client
* @return Instance of {@link DaprHttp}
*/
private DaprHttp buildDaprHttp() {
private DaprHttp buildDaprHttp(Properties properties) {
if (OK_HTTP_CLIENT == null) {
synchronized (LOCK) {
if (OK_HTTP_CLIENT == null) {
OkHttpClient.Builder builder = new OkHttpClient.Builder();
Duration readTimeout = Duration.ofSeconds(Properties.HTTP_CLIENT_READ_TIMEOUT_SECONDS.get());
Duration readTimeout = Duration.ofSeconds(properties.getValue(HTTP_CLIENT_READ_TIMEOUT_SECONDS));
builder.readTimeout(readTimeout);

Dispatcher dispatcher = new Dispatcher();
dispatcher.setMaxRequests(Properties.HTTP_CLIENT_MAX_REQUESTS.get());
dispatcher.setMaxRequests(properties.getValue(HTTP_CLIENT_MAX_REQUESTS));
// The maximum number of requests for each host to execute concurrently.
// Default value is 5 in okhttp which is totally UNACCEPTABLE!
// For sidecar case, set it the same as maxRequests.
dispatcher.setMaxRequestsPerHost(Properties.HTTP_CLIENT_MAX_REQUESTS.get());
dispatcher.setMaxRequestsPerHost(HTTP_CLIENT_MAX_REQUESTS.get());
builder.dispatcher(dispatcher);

ConnectionPool pool = new ConnectionPool(Properties.HTTP_CLIENT_MAX_IDLE_CONNECTIONS.get(),
ConnectionPool pool = new ConnectionPool(properties.getValue(HTTP_CLIENT_MAX_IDLE_CONNECTIONS),
KEEP_ALIVE_DURATION, TimeUnit.SECONDS);
builder.connectionPool(pool);

Expand All @@ -83,11 +92,14 @@ private DaprHttp buildDaprHttp() {
}
}

String endpoint = Properties.HTTP_ENDPOINT.get();
String endpoint = properties.getValue(HTTP_ENDPOINT);
if ((endpoint != null) && !endpoint.isEmpty()) {
return new DaprHttp(endpoint, OK_HTTP_CLIENT);
return new DaprHttp(endpoint, properties.getValue(API_TOKEN), OK_HTTP_CLIENT);
}

return new DaprHttp(Properties.SIDECAR_IP.get(), Properties.HTTP_PORT.get(), OK_HTTP_CLIENT);
return new DaprHttp(properties.getValue(SIDECAR_IP), properties.getValue(HTTP_PORT), properties.getValue(API_TOKEN),
OK_HTTP_CLIENT);


}
}
37 changes: 37 additions & 0 deletions sdk/src/main/java/io/dapr/config/Properties.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;

/**
* Global properties for Dapr's SDK, using Supplier so they are dynamically resolved.
Expand Down Expand Up @@ -172,4 +174,39 @@ public class Properties {
"dapr.http.client.maxIdleConnections",
"DAPR_HTTP_CLIENT_MAX_IDLE_CONNECTIONS",
DEFAULT_HTTP_CLIENT_MAX_IDLE_CONNECTIONS);

/**
* Mechanism to override properties set in a static context.
*/
private final Map<String, String> overrides;

/**
* Creates a new instance to handle Properties per instance.
*/
public Properties() {
this.overrides = null;
}

/**
* Creates a new instance to handle Properties per instance.
* @param overrides to override static properties
*/
public Properties(Map<String, String> overrides) {
this.overrides = Collections.unmodifiableMap(overrides);
}

/**
* Gets a property value taking in consideration the override values.
* @param <T> type of the property that we want to get the value from
* @param property to override static property value from overrides
* @return the property's value
*/
public <T> T getValue(Property<T> property) {
if (overrides != null) {
String override = overrides.get(property.getName());
return property.get(override);
} else {
return property.get();
}
}
}
18 changes: 18 additions & 0 deletions sdk/src/main/java/io/dapr/config/Property.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,24 @@ public String getEnvName() {
* @return Value from system property (1st) or env variable (2nd) or default (last).
*/
public T get() {
return this.get(null);
}

/**
* Gets the value defined by system property first, then env variable or sticks to default.
* @param override overrides the property value
* @return Value from system property (1st) or env variable (2nd) or default (last).
*/
public T get(String override) {
if ((override != null) && !override.isEmpty()) {
try {
return this.parse(override);
} catch (IllegalArgumentException e) {
LOGGER.warning(String.format("Invalid override value in property: %s", this.name));
// OK, we tried. Falling back to system environment variable.
}
}

String propValue = System.getProperty(this.name);
if (propValue != null && !propValue.trim().isEmpty()) {
try {
Expand Down
19 changes: 12 additions & 7 deletions sdk/src/main/java/io/dapr/utils/NetworkUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@
import java.net.Socket;
import java.util.regex.Pattern;

import static io.dapr.config.Properties.GRPC_ENDPOINT;
import static io.dapr.config.Properties.GRPC_PORT;
import static io.dapr.config.Properties.SIDECAR_IP;


/**
* Utility methods for network, internal to Dapr SDK.
*/
Expand Down Expand Up @@ -102,12 +107,12 @@ public static void waitForSocket(String host, int port, int timeoutInMillisecond

/**
* Creates a GRPC managed channel.
*
* @param properties instance to set up the GrpcEndpoint
* @param interceptors Optional interceptors to add to the channel.
* @return GRPC managed channel to communicate with the sidecar.
*/
public static ManagedChannel buildGrpcManagedChannel(ClientInterceptor... interceptors) {
var settings = GrpcEndpointSettings.parse();
public static ManagedChannel buildGrpcManagedChannel(Properties properties, ClientInterceptor... interceptors) {
var settings = GrpcEndpointSettings.parse(properties);
ManagedChannelBuilder<?> builder = ManagedChannelBuilder.forTarget(settings.endpoint)
.userAgent(Version.getSdkVersion());
if (!settings.secure) {
Expand All @@ -129,11 +134,11 @@ private GrpcEndpointSettings(String endpoint, boolean secure) {
this.secure = secure;
}

static GrpcEndpointSettings parse() {
String address = Properties.SIDECAR_IP.get();
int port = Properties.GRPC_PORT.get();
static GrpcEndpointSettings parse(Properties properties) {
String address = properties.getValue(SIDECAR_IP);
int port = properties.getValue(GRPC_PORT);
boolean secure = false;
String grpcEndpoint = Properties.GRPC_ENDPOINT.get();
String grpcEndpoint = properties.getValue(GRPC_ENDPOINT);
if ((grpcEndpoint != null) && !grpcEndpoint.isEmpty()) {
var matcher = GRPC_ENDPOINT_PATTERN.matcher(grpcEndpoint);
if (!matcher.matches()) {
Expand Down
16 changes: 16 additions & 0 deletions sdk/src/test/java/io/dapr/client/DaprClientBuilderTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,15 @@

package io.dapr.client;

import io.dapr.config.Properties;
import io.dapr.exceptions.DaprErrorDetails;
import io.dapr.exceptions.DaprException;
import io.dapr.serializer.DaprObjectSerializer;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand All @@ -35,6 +39,18 @@ public void build() {
assertNotNull(daprClient);
}

@Test
public void buildWithOverrideSidecarIP() {

DaprClientBuilder daprClientBuilder = new DaprClientBuilder();
daprClientBuilder.withPropertyOverride(Properties.SIDECAR_IP, "unknown-host");
DaprClient daprClient = daprClientBuilder.build();
assertNotNull(daprClient);
DaprException thrown = assertThrows(DaprException.class, () -> { daprClient.getMetadata().block(); });
assertTrue(thrown.toString().contains("UNAVAILABLE"));

}

@Test
public void noObjectSerializer() {
assertThrows(IllegalArgumentException.class, () -> { new DaprClientBuilder().withObjectSerializer(null);});
Expand Down
Loading