Skip to content

Commit

Permalink
Add configuration overrides for DaprWorkflowClient and WorkflowRuntim…
Browse files Browse the repository at this point in the history
…eBuilder (#1113)

Signed-off-by: Artur Ciocanu <ciocanu@adobe.com>
Co-authored-by: Artur Ciocanu <ciocanu@adobe.com>
  • Loading branch information
artur-ciocanu and Artur Ciocanu authored Sep 4, 2024
1 parent 935f3be commit 702aa05
Show file tree
Hide file tree
Showing 18 changed files with 246 additions and 131 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,6 @@ public class DaprKeyValueRepositoryIT {
static void daprProperties(DynamicPropertyRegistry registry) {
registry.add("dapr.http.endpoint", DAPR_CONTAINER::getHttpEndpoint);
registry.add("dapr.grpc.endpoint", DAPR_CONTAINER::getGrpcEndpoint);
registry.add("dapr.grpc.port", DAPR_CONTAINER::getGrpcPort);
registry.add("dapr.http.port", DAPR_CONTAINER::getHttpPort);
}

private static Map<String, String> createStateStoreProperties() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,6 @@ public class MySQLDaprKeyValueTemplateIT {
static void daprProperties(DynamicPropertyRegistry registry) {
registry.add("dapr.http.endpoint", DAPR_CONTAINER::getHttpEndpoint);
registry.add("dapr.grpc.endpoint", DAPR_CONTAINER::getGrpcEndpoint);
registry.add("dapr.grpc.port", DAPR_CONTAINER::getGrpcPort);
registry.add("dapr.http.port", DAPR_CONTAINER::getHttpPort);
}

private static Map<String, String> createStateStoreProperties() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,6 @@ public class PostgreSQLDaprKeyValueTemplateIT {
static void daprProperties(DynamicPropertyRegistry registry) {
registry.add("dapr.http.endpoint", DAPR_CONTAINER::getHttpEndpoint);
registry.add("dapr.grpc.endpoint", DAPR_CONTAINER::getGrpcEndpoint);
registry.add("dapr.grpc.port", DAPR_CONTAINER::getGrpcPort);
registry.add("dapr.http.port", DAPR_CONTAINER::getHttpPort);
}

private static Map<String, String> createStateStoreProperties() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,9 @@ public ObjectMapper mapper() {
@Bean
public DaprClientCustomizer daprClientCustomizer(
@Value("${dapr.http.endpoint}") String daprHttpEndpoint,
@Value("${dapr.grpc.endpoint}") String daprGrpcEndpoint,
@Value("${dapr.http.port}") String daprHttpPort,
@Value("${dapr.grpc.port}") String daprGrpcPort
@Value("${dapr.grpc.endpoint}") String daprGrpcEndpoint
){
return new TestcontainersDaprClientCustomizer(daprHttpEndpoint, daprGrpcEndpoint, daprHttpPort, daprGrpcPort);
return new TestcontainersDaprClientCustomizer(daprHttpEndpoint, daprGrpcEndpoint);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,9 @@ static class DaprSpringMessagingConfiguration {
@Bean
public DaprClientCustomizer daprClientCustomizer(
@Value("${dapr.http.endpoint}") String daprHttpEndpoint,
@Value("${dapr.grpc.endpoint}") String daprGrpcEndpoint,
@Value("${dapr.http.port}") String daprHttpPort,
@Value("${dapr.grpc.port}") String daprGrpcPort
@Value("${dapr.grpc.endpoint}") String daprGrpcEndpoint
){
return new TestcontainersDaprClientCustomizer(daprHttpEndpoint, daprGrpcEndpoint, daprHttpPort, daprGrpcPort);
return new TestcontainersDaprClientCustomizer(daprHttpEndpoint, daprGrpcEndpoint);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,14 @@
import static com.github.tomakehurst.wiremock.client.WireMock.urlMatching;
import static com.github.tomakehurst.wiremock.client.WireMock.verify;
import static java.util.Collections.singletonMap;
import static org.junit.jupiter.api.Assertions.*;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

@Testcontainers
@WireMockTest(httpPort = 8081)
@Tag("testcontainers")
public class DaprContainerTest {
public class DaprContainerIT {

// Time-to-live for messages published.
private static final String MESSAGE_TTL_IN_SECONDS = "1000";
Expand Down Expand Up @@ -123,14 +125,18 @@ public void testStateStore() throws Exception {

@Test
public void testPlacement() throws Exception {
OkHttpClient okHttpClient = new OkHttpClient.Builder().build();
String url = "http://" + DAPR_CONTAINER.getHost() + ":" + DAPR_CONTAINER.getHttpPort();
Request request = new Request.Builder().url(url + "/v1.0/metadata").build();
// Dapr and Placement need some time to connect
Thread.sleep(1000);

OkHttpClient okHttpClient = new OkHttpClient.Builder()
.build();
Request request = new Request.Builder()
.url(DAPR_CONTAINER.getHttpEndpoint() + "/v1.0/metadata")
.build();

try (Response response = okHttpClient.newCall(request).execute()) {
if (response.isSuccessful() && response.body() != null) {
assertTrue(response.body().string().contains("placement: connected"));

} else {
throw new IOException("Unexpected response: " + response.code());
}
Expand All @@ -151,8 +157,8 @@ public void testPubSub() throws Exception {
}

private DaprClientBuilder createDaprClientBuilder() {
return new DaprClientBuilder()
.withPropertyOverride(Properties.HTTP_PORT, String.valueOf(DAPR_CONTAINER.getHttpPort()))
.withPropertyOverride(Properties.GRPC_PORT, String.valueOf(DAPR_CONTAINER.getGrpcPort()));
return new DaprClientBuilder()
.withPropertyOverride(Properties.HTTP_ENDPOINT, DAPR_CONTAINER.getHttpEndpoint())
.withPropertyOverride(Properties.GRPC_ENDPOINT, DAPR_CONTAINER.getGrpcEndpoint());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

@Testcontainers
@Tag("testcontainers")
public class DaprPlacementContainerTest {
public class DaprPlacementContainerIT {

@Container
private static final DaprPlacementContainer PLACEMENT_CONTAINER = new DaprPlacementContainer("daprio/placement");
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@

package io.dapr.it.testcontainers;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.dapr.testcontainers.Component;
import io.dapr.testcontainers.DaprContainer;
import io.dapr.testcontainers.DaprLogLevel;
import io.dapr.workflows.client.DaprWorkflowClient;
import io.dapr.workflows.client.WorkflowInstanceStatus;
import io.dapr.workflows.runtime.WorkflowRuntime;
Expand All @@ -24,66 +28,96 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import org.testcontainers.containers.Network;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;

@SpringBootTest(
webEnvironment = WebEnvironment.RANDOM_PORT,
classes = TestWorkflowsApplication.class
classes = {
TestDaprWorkflowsConfiguration.class,
TestWorkflowsApplication.class
}
)
@Testcontainers
@Tag("testcontainers")
public class DaprWorkflowsTests {
public class DaprWorkflowsIT {

private static final Network DAPR_NETWORK = Network.newNetwork();

@Container
private static final DaprContainer DAPR_CONTAINER = new DaprContainer("daprio/daprd:1.13.2")
.withAppName("workflow-dapr-app")
.withNetwork(DAPR_NETWORK)
.withComponent(new Component("kvstore", "state.in-memory", "v1",
Collections.singletonMap("actorStateStore", "true")))
.withComponent(new Component("pubsub", "pubsub.in-memory", "v1", Collections.emptyMap()))
.withDaprLogLevel(DaprLogLevel.DEBUG)
.withLogConsumer(outputFrame -> System.out.println(outputFrame.getUtf8String()))
.withAppChannelAddress("host.testcontainers.internal");

/**
* Expose the Dapr ports to the host.
*
* @param registry the dynamic property registry
*/
@DynamicPropertySource
static void daprProperties(DynamicPropertyRegistry registry) {
registry.add("dapr.http.endpoint", DAPR_CONTAINER::getHttpEndpoint);
registry.add("dapr.grpc.endpoint", DAPR_CONTAINER::getGrpcEndpoint);
}

private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

@Autowired
private DaprWorkflowClient workflowClient;

@Autowired
private WorkflowRuntimeBuilder workflowRuntimeBuilder;

/**
* Initializes the test.
*/
@BeforeEach
public void init() {
WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder().registerWorkflow(TestWorkflow.class);
builder.registerActivity(FirstActivity.class);
builder.registerActivity(SecondActivity.class);

try (WorkflowRuntime runtime = builder.build()) {
try (WorkflowRuntime runtime = workflowRuntimeBuilder.build()) {
System.out.println("Start workflow runtime");
runtime.start(false);
}
}

@Test
public void myWorkflowTest() throws Exception {
workflowClient = new DaprWorkflowClient();

public void testWorkflows() throws Exception {
TestWorkflowPayload payload = new TestWorkflowPayload(new ArrayList<>());
String instanceId = workflowClient.scheduleNewWorkflow(TestWorkflow.class, payload);

workflowClient.waitForInstanceStart(instanceId, Duration.ofSeconds(10), false);

workflowClient.raiseEvent(instanceId, "MoveForward", payload);

WorkflowInstanceStatus workflowStatus = workflowClient.waitForInstanceCompletion(instanceId,
Duration.ofSeconds(10),
true);
Duration timeout = Duration.ofSeconds(10);
WorkflowInstanceStatus workflowStatus = workflowClient.waitForInstanceCompletion(instanceId, timeout, true);

// The workflow completed before 10 seconds
assertNotNull(workflowStatus);

String workflowPlayloadJson = workflowStatus.getSerializedOutput();

ObjectMapper mapper = new ObjectMapper();
TestWorkflowPayload workflowOutput = mapper.readValue(workflowPlayloadJson, TestWorkflowPayload.class);
TestWorkflowPayload workflowOutput = deserialize(workflowStatus.getSerializedOutput());

assertEquals(2, workflowOutput.getPayloads().size());
assertEquals("First Activity", workflowOutput.getPayloads().get(0));
assertEquals("Second Activity", workflowOutput.getPayloads().get(1));
assertEquals(instanceId, workflowOutput.getWorkflowId());
}

private TestWorkflowPayload deserialize(String value) throws JsonProcessingException {
return OBJECT_MAPPER.readValue(value, TestWorkflowPayload.class);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package io.dapr.it.testcontainers;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.dapr.config.Properties;
import io.dapr.workflows.client.DaprWorkflowClient;
import io.dapr.workflows.runtime.WorkflowRuntimeBuilder;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.Map;

@Configuration
public class TestDaprWorkflowsConfiguration {
@Bean
public ObjectMapper mapper() {
return new ObjectMapper();
}

@Bean
public DaprWorkflowClient daprWorkflowClient(
@Value("${dapr.http.endpoint}") String daprHttpEndpoint,
@Value("${dapr.grpc.endpoint}") String daprGrpcEndpoint
){
Map<String, String> overrides = Map.of(
"dapr.http.endpoint", daprHttpEndpoint,
"dapr.grpc.endpoint", daprGrpcEndpoint
);

return new DaprWorkflowClient(new Properties(overrides));
}

@Bean
public WorkflowRuntimeBuilder workflowRuntimeBuilder(
@Value("${dapr.http.endpoint}") String daprHttpEndpoint,
@Value("${dapr.grpc.endpoint}") String daprGrpcEndpoint
){
Map<String, String> overrides = Map.of(
"dapr.http.endpoint", daprHttpEndpoint,
"dapr.grpc.endpoint", daprGrpcEndpoint
);

WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder(new Properties(overrides));

builder.registerWorkflow(TestWorkflow.class);
builder.registerActivity(FirstActivity.class);
builder.registerActivity(SecondActivity.class);

return builder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.testcontainers.context.ImportTestcontainers;

@SpringBootApplication
public class TestWorkflowsApplication {
Expand All @@ -24,9 +23,4 @@ public static void main(String[] args) {
SpringApplication.run(TestWorkflowsApplication.class, args);
}

@ImportTestcontainers(DaprTestcontainersModule.class)
static class DaprTestConfiguration {

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,16 @@ public class DaprWorkflowClient implements AutoCloseable {
* Public constructor for DaprWorkflowClient. This layer constructs the GRPC Channel.
*/
public DaprWorkflowClient() {
this(NetworkUtils.buildGrpcManagedChannel(new Properties(), WORKFLOW_INTERCEPTOR));
this(new Properties());
}

/**
* Public constructor for DaprWorkflowClient. This layer constructs the GRPC Channel.
*
* @param properties Properties for the GRPC Channel.
*/
public DaprWorkflowClient(Properties properties) {
this(NetworkUtils.buildGrpcManagedChannel(properties, WORKFLOW_INTERCEPTOR));
}

/**
Expand Down
Loading

0 comments on commit 702aa05

Please sign in to comment.