diff --git a/examples/components/workflows/redis.yaml b/examples/components/workflows/redis.yaml new file mode 100644 index 000000000..2f676bff8 --- /dev/null +++ b/examples/components/workflows/redis.yaml @@ -0,0 +1,14 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: statestore +spec: + type: state.redis + version: v1 + metadata: + - name: redisHost + value: localhost:6379 + - name: redisPassword + value: "" + - name: actorStateStore + value: "true" diff --git a/examples/pom.xml b/examples/pom.xml index 2d9e9c2c8..216b9845f 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -118,6 +118,11 @@ dapr-sdk-actors ${project.version} + + io.dapr + dapr-sdk-workflows + ${dapr.sdk-workflows.version} + io.dapr dapr-sdk diff --git a/examples/src/main/java/io/dapr/examples/unittesting/DaprWorkflowExampleTest.java b/examples/src/main/java/io/dapr/examples/unittesting/DaprWorkflowExampleTest.java new file mode 100644 index 000000000..f85f9e013 --- /dev/null +++ b/examples/src/main/java/io/dapr/examples/unittesting/DaprWorkflowExampleTest.java @@ -0,0 +1,109 @@ +/* + * Copyright 2023 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.examples.unittesting; + +import com.microsoft.durabletask.Task; +import com.microsoft.durabletask.TaskCanceledException; +import io.dapr.workflows.Workflow; +import io.dapr.workflows.WorkflowContext; +import io.dapr.workflows.WorkflowStub; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; +import org.slf4j.Logger; + +import java.time.Duration; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.startsWith; +import static org.mockito.Mockito.mock; + +/** + * 1. Build and install jars: + * mvn clean install + * 2. cd [repo root]/examples + * 3. Run the test code: + * java -jar target/dapr-java-sdk-examples-exec.jar \ + * org.junit.platform.console.ConsoleLauncher --select-class=io.dapr.examples.unittesting.DaprWorkflowExampleTest + */ +public class DaprWorkflowExampleTest { + private static final String timeoutWorkflow = "DemoWorkflowTimeout"; + private static final String noTimeoutWorkflow = "DemoWorkflowNoTimeout"; + private static final String workflowDefaultId = "demo-workflow-123"; + + private class DemoWorkflow extends Workflow { + + @Override + public WorkflowStub create() { + return ctx -> { + String name = ctx.getName(); + String id = ctx.getInstanceId(); + try { + ctx.waitForExternalEvent(name, Duration.ofMillis(100)).await(); + } catch (TaskCanceledException e) { + ctx.getLogger().warn("Timed out"); + } + String output = name + ":" + id; + ctx.complete(output); + }; + } + } + + @Test + public void testWorkflow() { + String name = noTimeoutWorkflow; + String id = workflowDefaultId; + WorkflowContext mockContext = createMockContext(name, id); + + new DemoWorkflow().run(mockContext); + + String expectedOutput = name + ":" + id; + Mockito.verify(mockContext, Mockito.times(1)).complete(expectedOutput); + } + + @Test + public void testWorkflowWaitForEventTimeout() { + WorkflowContext mockContext = createMockContext(timeoutWorkflow, workflowDefaultId); + Logger mockLogger = mock(Logger.class); + Mockito.doReturn(mockLogger).when(mockContext).getLogger(); + + new DemoWorkflow().run(mockContext); + + Mockito.verify(mockLogger, Mockito.times(1)).warn("Timed out"); + } + + @Test + public void testWorkflowWaitForEventNoTimeout() { + WorkflowContext mockContext = createMockContext(noTimeoutWorkflow, workflowDefaultId); + Logger mockLogger = mock(Logger.class); + Mockito.doReturn(mockLogger).when(mockContext).getLogger(); + + new DemoWorkflow().run(mockContext); + + Mockito.verify(mockLogger, Mockito.times(0)).warn(anyString()); + } + + private WorkflowContext createMockContext(String name, String id) { + WorkflowContext mockContext = mock(WorkflowContext.class); + + Mockito.doReturn(name).when(mockContext).getName(); + Mockito.doReturn(id).when(mockContext).getInstanceId(); + Mockito.doReturn(mock(Task.class)) + .when(mockContext).waitForExternalEvent(startsWith(noTimeoutWorkflow), any(Duration.class)); + Mockito.doThrow(TaskCanceledException.class) + .when(mockContext).waitForExternalEvent(startsWith(timeoutWorkflow), any(Duration.class)); + + return mockContext; + } +} diff --git a/examples/src/main/java/io/dapr/examples/unittesting/README.md b/examples/src/main/java/io/dapr/examples/unittesting/README.md index cc1073a38..7fd3190e8 100644 --- a/examples/src/main/java/io/dapr/examples/unittesting/README.md +++ b/examples/src/main/java/io/dapr/examples/unittesting/README.md @@ -33,7 +33,9 @@ cd examples ``` ### Understanding the code -This example will simulate an application code via the App class: + +#### Example App Test +This example, found in `DaprExampleTest.java`, will simulate an application code via the App class: ```java private static final class MyApp { @@ -115,7 +117,7 @@ The second test uses a mock implementation of the factory method and checks the ``` -### Running the example +##### Running the example + +Run this example with the following command: +```bash +java -jar target/dapr-java-sdk-examples-exec.jar org.junit.platform.console.ConsoleLauncher --select-class=io.dapr.examples.unittesting.DaprWorkflowExampleTest +``` + + + +After running, Junit should print the output as follows: + +```txt +╷ +├─ JUnit Jupiter ✔ +│ └─ DaprWorkflowExampleTest ✔ +│ ├─ testWorkflowWaitForEventTimeout() ✔ +│ ├─ testWorkflowWaitForEventNoTimeout() ✔ +│ └─ testWorkflow() ✔ +└─ JUnit Vintage ✔ + +Test run finished after 815 ms +[ 3 containers found ] +[ 0 containers skipped ] +[ 3 containers started ] +[ 0 containers aborted ] +[ 3 containers successful ] +[ 0 containers failed ] +[ 3 tests found ] +[ 0 tests skipped ] +[ 3 tests started ] +[ 0 tests aborted ] +[ 3 tests successful ] +[ 0 tests failed ] + ``` \ No newline at end of file diff --git a/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflow.java b/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflow.java new file mode 100644 index 000000000..f09954c47 --- /dev/null +++ b/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflow.java @@ -0,0 +1,43 @@ +/* + * Copyright 2023 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.examples.workflows; + +import com.microsoft.durabletask.TaskCanceledException; +import io.dapr.workflows.Workflow; +import io.dapr.workflows.WorkflowStub; + +import java.time.Duration; + +/** + * Implementation of the DemoWorkflow for the server side. + */ +public class DemoWorkflow extends Workflow { + + @Override + public WorkflowStub create() { + return ctx -> { + ctx.getLogger().info("Starting Workflow: " + ctx.getName()); + ctx.getLogger().info("Instance ID: " + ctx.getInstanceId()); + ctx.getLogger().info("Waiting for event: 'myEvent'..."); + try { + ctx.waitForExternalEvent("myEvent", Duration.ofSeconds(10)).await(); + ctx.getLogger().info("Received!"); + } catch (TaskCanceledException e) { + ctx.getLogger().warn("Timed out"); + ctx.getLogger().warn(e.getMessage()); + } + ctx.complete("finished"); + }; + } +} diff --git a/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflowClient.java b/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflowClient.java new file mode 100644 index 000000000..cf439c969 --- /dev/null +++ b/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflowClient.java @@ -0,0 +1,55 @@ +/* + * Copyright 2023 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.examples.workflows; + +import io.dapr.workflows.client.DaprWorkflowClient; + +import java.util.concurrent.TimeUnit; + +/** + * For setup instructions, see the README. + */ +public class DemoWorkflowClient { + + /** + * The main method. + * @param args Input arguments (unused). + * @throws InterruptedException If program has been interrupted. + */ + public static void main(String[] args) throws InterruptedException { + DaprWorkflowClient client = new DaprWorkflowClient(); + + try (client) { + System.out.println("*****"); + String instanceId = client.scheduleNewWorkflow(DemoWorkflow.class); + System.out.printf("Started new workflow instance with random ID: %s%n", instanceId); + + System.out.println("Sleep and allow this workflow instance to timeout..."); + TimeUnit.SECONDS.sleep(10); + + System.out.println("*****"); + String instanceToTerminateId = "terminateMe"; + client.scheduleNewWorkflow(DemoWorkflow.class, null, instanceToTerminateId); + System.out.printf("Started new workflow instance with specified ID: %s%n", instanceToTerminateId); + + TimeUnit.SECONDS.sleep(5); + System.out.println("Terminate this workflow instance manually before the timeout is reached"); + client.terminateWorkflow(instanceToTerminateId, null); + System.out.println("*****"); + } + + System.out.println("Exiting DemoWorkflowClient."); + System.exit(0); + } +} diff --git a/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflowWorker.java b/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflowWorker.java new file mode 100644 index 000000000..f41fc4fb2 --- /dev/null +++ b/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflowWorker.java @@ -0,0 +1,42 @@ +/* + * Copyright 2023 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.examples.workflows; + +import io.dapr.workflows.runtime.WorkflowRuntime; +import io.dapr.workflows.runtime.WorkflowRuntimeBuilder; + +/** + * For setup instructions, see the README. + */ +public class DemoWorkflowWorker { + + /** + * The main method of this app. + * + * @param args The port the app will listen on. + * @throws Exception An Exception. + */ + public static void main(String[] args) throws Exception { + // Register the Workflow with the builder. + WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder().registerWorkflow(DemoWorkflow.class); + + // Build and then start the workflow runtime pulling and executing tasks + try (WorkflowRuntime runtime = builder.build()) { + System.out.println("Start workflow runtime"); + runtime.start(); + } + + System.exit(0); + } +} diff --git a/examples/src/main/java/io/dapr/examples/workflows/README.md b/examples/src/main/java/io/dapr/examples/workflows/README.md new file mode 100644 index 000000000..c8b07593b --- /dev/null +++ b/examples/src/main/java/io/dapr/examples/workflows/README.md @@ -0,0 +1,78 @@ +# Dapr Workflow Sample + +In this example, we'll use Dapr to test workflow features. + +Visit [the Workflow documentation landing page](https://docs.dapr.io/developing-applications/building-blocks/workflow) for more information. + +This example contains the follow classes: + +* DemoWorkflow: An example of a Dapr Workflow. +* DemoWorkflowClient: This application will start workflows using Dapr. +* DemoWorkflowWorker: An application that registers a workflow to the Dapr workflow runtime engine. It also executes the workflow instance. + +## Pre-requisites + +* [Dapr and Dapr Cli](https://docs.dapr.io/getting-started/install-dapr/). + * Run `dapr init`. +* Java JDK 11 (or greater): + * [Microsoft JDK 11](https://docs.microsoft.com/en-us/java/openjdk/download#openjdk-11) + * [Oracle JDK 11](https://www.oracle.com/technetwork/java/javase/downloads/index.html#JDK11) + * [OpenJDK 11](https://jdk.java.net/11/) +* [Apache Maven](https://maven.apache.org/install.html) version 3.x. + +### Checking out the code + +Clone this repository: + +```sh +git clone https://github.com/dapr/java-sdk.git +cd java-sdk +``` + +Then build the Maven project: + +```sh +# make sure you are in the `java-sdk` directory. +mvn install +``` + +Get into the `examples` directory. +```sh +cd examples +``` + +### Running the demo Workflow worker + +The first Java class to consider is `DemoWorkflowWorker`. Its job is to register an implementation of `DemoWorkflow` in the Dapr's workflow runtime engine. In `DemoWorkflowWorker.java` file, you will find the `DemoWorkflowWorker` class and the `main` method. See the code snippet below: + +```java +public class DemoWorkflowWorker { + + public static void main(String[] args) throws Exception { + // Register the Workflow with the runtime. + WorkflowRuntime.getInstance().registerWorkflow(DemoWorkflow.class); + System.out.println("Start workflow runtime"); + WorkflowRuntime.getInstance().startAndBlock(); + System.exit(0); + } +} +``` + +This application uses `WorkflowRuntime.getInstance().registerWorkflow()` in order to register `DemoWorkflow` as a Workflow in the Dapr Workflow runtime. + +`WorkflowRuntime.getInstance().start()` method will build and start the engine within the Dapr workflow runtime. + +Now, execute the following script in order to run DemoWorkflowWorker: +```sh +dapr run --app-id demoworkflowworker --resources-path ./components/workflows --dapr-grpc-port 50001 -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.DemoWorkflowWorker +``` + +### Running the Workflow client + +The `DemoWorkflowClient` starts instances of workflows that have been registered with Dapr. + +With the DemoWorkflowWorker running, use the follow command to start the workflow with the DemoWorkflowClient: + +```sh +java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.DemoWorkflowClient +``` diff --git a/pom.xml b/pom.xml index d93474450..947f63a5e 100644 --- a/pom.xml +++ b/pom.xml @@ -17,6 +17,7 @@ 1.42.1 3.17.3 https://raw.githubusercontent.com/dapr/dapr/v1.11.0-rc.5/dapr/proto + 0.10.0-SNAPSHOT 1.6.2 3.1.1 1.8 @@ -301,6 +302,7 @@ sdk-autogen sdk sdk-actors + sdk-workflows sdk-springboot examples diff --git a/scripts/update_sdk_version.sh b/scripts/update_sdk_version.sh index ac8663e97..fce2a5486 100755 --- a/scripts/update_sdk_version.sh +++ b/scripts/update_sdk_version.sh @@ -3,13 +3,11 @@ set -uex DAPR_JAVA_SDK_VERSION=$1 +# replace the major version of DAPR_JAVA_SDK_VERSION with 0 while in alpha +DAPR_WORKFLOW_SDK_VERSION=$(echo $DAPR_JAVA_SDK_VERSION | sed -E "s/[0-9]+\.(.*?)/0.\1/") mvn versions:set -DnewVersion=$DAPR_JAVA_SDK_VERSION -mvn versions:commit - -if [[ "$OSTYPE" == "darwin"* ]]; then - sed -i bak "s/.*<\/dapr.sdk.version>/${DAPR_JAVA_SDK_VERSION}<\/dapr.sdk.version>/g" sdk-tests/pom.xml - rm sdk-tests/pom.xmlbak -else - sed -i "s/.*<\/dapr.sdk.version>/${DAPR_JAVA_SDK_VERSION}<\/dapr.sdk.version>/g" sdk-tests/pom.xml -fi +mvn versions:set-property -Dproperty=dapr.sdk.version -DnewVersion=$DAPR_JAVA_SDK_VERSION -f sdk-tests/pom.xml + +mvn versions:set -DnewVersion=$DAPR_WORKFLOW_SDK_VERSION -f sdk-workflows/pom.xml +mvn versions:set-property -Dproperty=dapr.sdk-workflows.version -DnewVersion=$DAPR_WORKFLOW_SDK_VERSION diff --git a/sdk-workflows/pom.xml b/sdk-workflows/pom.xml new file mode 100644 index 000000000..51051e35d --- /dev/null +++ b/sdk-workflows/pom.xml @@ -0,0 +1,153 @@ + + 4.0.0 + + + io.dapr + dapr-sdk-parent + 1.10.0-SNAPSHOT + + + dapr-sdk-workflows + jar + 0.10.0-SNAPSHOT + dapr-sdk-workflows + SDK for Workflows on Dapr + + + false + 1.42.1 + 2.12.3 + + + + + io.dapr + dapr-sdk + ${project.parent.version} + + + junit + junit + test + + + org.mockito + mockito-core + test + + + org.junit.jupiter + junit-jupiter + test + + + com.microsoft + durabletask-client + 1.1.1 + + + + com.fasterxml.jackson.core + jackson-core + ${jackson.version} + + + com.fasterxml.jackson.core + jackson-databind + ${jackson.version} + + + com.fasterxml.jackson.core + jackson-annotations + ${jackson.version} + + + com.fasterxml.jackson.datatype + jackson-datatype-jsr310 + ${jackson.version} + + + + + + + org.apache.maven.plugins + maven-source-plugin + 3.2.1 + + + attach-sources + + jar-no-fork + + + + + + + org.apache.maven.plugins + maven-javadoc-plugin + 3.2.0 + + + attach-javadocs + + jar + + + + + + org.jacoco + jacoco-maven-plugin + 0.8.8 + + + default-prepare-agent + + prepare-agent + + + + report + test + + report + + + target/jacoco-report/ + + + + check + + check + + + + + BUNDLE + + + LINE + COVEREDRATIO + 80% + + + + + + + + + + + + diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/DaprWorkflowContextImpl.java b/sdk-workflows/src/main/java/io/dapr/workflows/DaprWorkflowContextImpl.java new file mode 100644 index 000000000..121026563 --- /dev/null +++ b/sdk-workflows/src/main/java/io/dapr/workflows/DaprWorkflowContextImpl.java @@ -0,0 +1,94 @@ +/* + * Copyright 2023 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.workflows; + +import com.microsoft.durabletask.Task; +import com.microsoft.durabletask.TaskOrchestrationContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.helpers.NOPLogger; + +import java.time.Duration; + +public class DaprWorkflowContextImpl implements WorkflowContext { + private final TaskOrchestrationContext innerContext; + private final Logger logger; + + /** + * Constructor for DaprWorkflowContextImpl. + * + * @param context TaskOrchestrationContext + * @throws IllegalArgumentException if context is null + */ + public DaprWorkflowContextImpl(TaskOrchestrationContext context) throws IllegalArgumentException { + this(context, LoggerFactory.getLogger(WorkflowContext.class)); + } + + /** + * Constructor for DaprWorkflowContextImpl. + * + * @param context TaskOrchestrationContext + * @param logger Logger + * @throws IllegalArgumentException if context or logger is null + */ + public DaprWorkflowContextImpl(TaskOrchestrationContext context, Logger logger) throws IllegalArgumentException { + if (context == null) { + throw new IllegalArgumentException("Context cannot be null"); + } + if (logger == null) { + throw new IllegalArgumentException("Logger cannot be null"); + } + + this.innerContext = context; + this.logger = logger; + } + + /** + * {@inheritDoc} + */ + public Logger getLogger() { + if (this.innerContext.getIsReplaying()) { + return NOPLogger.NOP_LOGGER; + } + return this.logger; + } + + /** + * {@inheritDoc} + */ + public String getName() { + return this.innerContext.getName(); + } + + /** + * {@inheritDoc} + */ + public String getInstanceId() { + return this.innerContext.getInstanceId(); + } + + /** + * {@inheritDoc} + */ + public void complete(Object output) { + this.innerContext.complete(output); + } + + /** + * {@inheritDoc} + */ + public Task waitForExternalEvent(String eventName, Duration timeout) { + return this.innerContext.waitForExternalEvent(eventName, timeout); + } +} diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/Workflow.java b/sdk-workflows/src/main/java/io/dapr/workflows/Workflow.java new file mode 100644 index 000000000..66b5c02d7 --- /dev/null +++ b/sdk-workflows/src/main/java/io/dapr/workflows/Workflow.java @@ -0,0 +1,39 @@ +/* + * Copyright 2023 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.workflows; + +/** + * Common interface for workflow implementations. + */ +public abstract class Workflow { + public Workflow(){ + } + + /** + * Executes the workflow logic. + * + * @return A WorkflowStub. + */ + public abstract WorkflowStub create(); + + /** + * Executes the workflow logic. + * + * @param ctx provides access to methods for scheduling durable tasks and getting information about the current + * workflow instance. + */ + public void run(WorkflowContext ctx) { + this.create().run(ctx); + } +} diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowContext.java b/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowContext.java new file mode 100644 index 000000000..61b983de2 --- /dev/null +++ b/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowContext.java @@ -0,0 +1,68 @@ +/* + * Copyright 2023 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.workflows; + +import com.microsoft.durabletask.Task; +import org.slf4j.Logger; + +import java.time.Duration; + +/** + * Context object used by workflow implementations to perform actions such as scheduling activities, + * durable timers, waiting for external events, and for getting basic information about the current + * workflow instance. + */ +public interface WorkflowContext { + + /** + * Get a logger only when {@code isReplaying} is false. + * Otherwise, return a NOP (no operation) logger. + * + * @return Logger + */ + Logger getLogger(); + + + /** + * Gets the name of the current workflow. + * + * @return the name of the current workflow + */ + String getName(); + + /** + * Gets the instance ID of the current workflow. + * + * @return the instance ID of the current workflow + */ + String getInstanceId(); + + /** + * Completes the current workflow. + * + * @param output the serializable output of the completed Workflow. + */ + void complete(Object output); + + /** + * Waits for an event to be raised with name and returns the event data. + * + * @param eventName The name of the event to wait for. Event names are case-insensitive. + * External event names can be reused any number of times; they are not + * required to be unique. + * @param timeout The amount of time to wait before cancelling the external event task. + * @return An asynchronous durabletask.Task to await. + */ + Task waitForExternalEvent(String eventName, Duration timeout); +} diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowStub.java b/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowStub.java new file mode 100644 index 000000000..561a6e1a7 --- /dev/null +++ b/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowStub.java @@ -0,0 +1,21 @@ +/* + * Copyright 2023 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.workflows; + +import io.dapr.workflows.WorkflowContext; + +@FunctionalInterface +public interface WorkflowStub { + void run(WorkflowContext ctx); +} diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/client/DaprWorkflowClient.java b/sdk-workflows/src/main/java/io/dapr/workflows/client/DaprWorkflowClient.java new file mode 100644 index 000000000..9f4ab03ad --- /dev/null +++ b/sdk-workflows/src/main/java/io/dapr/workflows/client/DaprWorkflowClient.java @@ -0,0 +1,133 @@ +/* + * Copyright 2023 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.workflows.client; + +import com.microsoft.durabletask.DurableTaskClient; +import com.microsoft.durabletask.DurableTaskGrpcClientBuilder; +import io.dapr.utils.NetworkUtils; +import io.dapr.workflows.Workflow; +import io.grpc.ManagedChannel; + +import javax.annotation.Nullable; +import java.util.concurrent.TimeUnit; + +public class DaprWorkflowClient implements AutoCloseable { + + private DurableTaskClient innerClient; + private ManagedChannel grpcChannel; + + /** + * Public constructor for DaprWorkflowClient. This layer constructs the GRPC Channel. + */ + public DaprWorkflowClient() { + this(NetworkUtils.buildGrpcManagedChannel()); + } + + /** + * Private Constructor that passes a created DurableTaskClient and the new GRPC channel. + * + * @param grpcChannel ManagedChannel for GRPC channel. + */ + private DaprWorkflowClient(ManagedChannel grpcChannel) { + this(createDurableTaskClient(grpcChannel), grpcChannel); + } + + /** + * Private Constructor for DaprWorkflowClient. + * + * @param innerClient DurableTaskGrpcClient with GRPC Channel set up. + * @param grpcChannel ManagedChannel for instance variable setting. + * + */ + private DaprWorkflowClient(DurableTaskClient innerClient, ManagedChannel grpcChannel) { + this.innerClient = innerClient; + this.grpcChannel = grpcChannel; + } + + /** + * Static method to create the DurableTaskClient. + * + * @param grpcChannel ManagedChannel for GRPC. + * @return a new instance of a DurableTaskClient with a GRPC channel. + */ + private static DurableTaskClient createDurableTaskClient(ManagedChannel grpcChannel) { + return new DurableTaskGrpcClientBuilder() + .grpcChannel(grpcChannel) + .build(); + } + + /** + * Schedules a new workflow using DurableTask client. + * + * @param any Workflow type + * @param clazz Class extending Workflow to start an instance of. + * @return A String with the randomly-generated instance ID for new Workflow instance. + */ + public String scheduleNewWorkflow(Class clazz) { + return this.innerClient.scheduleNewOrchestrationInstance(clazz.getCanonicalName()); + } + + /** + * Schedules a new workflow using DurableTask client. + * + * @param any Workflow type + * @param clazz Class extending Workflow to start an instance of. + * @param input the input to pass to the scheduled orchestration instance. Must be serializable. + * @return A String with the randomly-generated instance ID for new Workflow instance. + */ + public String scheduleNewWorkflow(Class clazz, Object input) { + return this.innerClient.scheduleNewOrchestrationInstance(clazz.getCanonicalName(), input); + } + + /** + * Schedules a new workflow using DurableTask client. + * + * @param any Workflow type + * @param clazz Class extending Workflow to start an instance of. + * @param input the input to pass to the scheduled orchestration instance. Must be serializable. + * @param instanceId the unique ID of the orchestration instance to schedule + * @return A String with the instanceId parameter value. + */ + public String scheduleNewWorkflow(Class clazz, Object input, String instanceId) { + return this.innerClient.scheduleNewOrchestrationInstance(clazz.getCanonicalName(), input, instanceId); + } + + /** + * Terminates the workflow associated with the provided instance id. + * + * @param workflowInstanceId Workflow instance id to terminate. + * @param output the optional output to set for the terminated orchestration instance. + */ + public void terminateWorkflow(String workflowInstanceId, @Nullable Object output) { + this.innerClient.terminate(workflowInstanceId, output); + } + + /** + * Closes the inner DurableTask client and shutdown the GRPC channel. + * + */ + public void close() throws InterruptedException { + try { + if (this.innerClient != null) { + this.innerClient.close(); + this.innerClient = null; + } + } finally { + if (this.grpcChannel != null && !this.grpcChannel.isShutdown()) { + this.grpcChannel.shutdown().awaitTermination(5, TimeUnit.SECONDS); + this.grpcChannel = null; + } + } + } +} diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/OrchestratorWrapper.java b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/OrchestratorWrapper.java new file mode 100644 index 000000000..f28eed0de --- /dev/null +++ b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/OrchestratorWrapper.java @@ -0,0 +1,62 @@ +/* + * Copyright 2023 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.workflows.runtime; + +import com.microsoft.durabletask.TaskOrchestration; +import com.microsoft.durabletask.TaskOrchestrationFactory; +import io.dapr.workflows.DaprWorkflowContextImpl; +import io.dapr.workflows.Workflow; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; + +/** + * Wrapper for Durable Task Framework orchestration factory. + */ +class OrchestratorWrapper implements TaskOrchestrationFactory { + private final Constructor workflowConstructor; + private final String name; + + public OrchestratorWrapper(Class clazz) { + this.name = clazz.getCanonicalName(); + try { + this.workflowConstructor = clazz.getDeclaredConstructor(); + } catch (NoSuchMethodException e) { + throw new RuntimeException( + String.format("No constructor found for workflow class '%s'.", this.name), e + ); + } + } + + @Override + public String getName() { + return name; + } + + @Override + public TaskOrchestration create() { + return ctx -> { + T workflow; + try { + workflow = this.workflowConstructor.newInstance(); + } catch (InstantiationException | IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException( + String.format("Unable to instantiate instance of workflow class '%s'", this.name), e + ); + } + workflow.run(new DaprWorkflowContextImpl(ctx)); + }; + + } +} diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowRuntime.java b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowRuntime.java new file mode 100644 index 000000000..6754f675b --- /dev/null +++ b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowRuntime.java @@ -0,0 +1,60 @@ +/* + * Copyright 2023 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.workflows.runtime; + +import com.microsoft.durabletask.DurableTaskGrpcWorker; + +/** + * Contains methods to register workflows and activities. + */ +public class WorkflowRuntime implements AutoCloseable { + + private DurableTaskGrpcWorker worker; + + public WorkflowRuntime(DurableTaskGrpcWorker worker) { + this.worker = worker; + } + + /** + * Start the Workflow runtime processing items and block. + * + */ + public void start() { + this.start(true); + } + + /** + * Start the Workflow runtime processing items. + * + * @param block block the thread if true + */ + public void start(boolean block) { + if (block) { + this.worker.startAndBlock(); + } else { + this.worker.start(); + } + } + + /** + * {@inheritDoc} + */ + @Override + public void close() { + if (this.worker != null) { + this.worker.close(); + this.worker = null; + } + } +} \ No newline at end of file diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowRuntimeBuilder.java b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowRuntimeBuilder.java new file mode 100644 index 000000000..11fe624ba --- /dev/null +++ b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowRuntimeBuilder.java @@ -0,0 +1,58 @@ +/* + * Copyright 2023 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.workflows.runtime; + +import com.microsoft.durabletask.DurableTaskGrpcWorkerBuilder; +import io.dapr.utils.NetworkUtils; +import io.dapr.workflows.Workflow; + +public class WorkflowRuntimeBuilder { + private static volatile WorkflowRuntime instance; + private DurableTaskGrpcWorkerBuilder builder; + + public WorkflowRuntimeBuilder() { + this.builder = new DurableTaskGrpcWorkerBuilder().grpcChannel(NetworkUtils.buildGrpcManagedChannel()); + } + + /** + * Returns a WorkflowRuntime object. + * + * @return A WorkflowRuntime object. + */ + public WorkflowRuntime build() { + if (instance == null) { + synchronized (WorkflowRuntime.class) { + if (instance == null) { + instance = new WorkflowRuntime(this.builder.build()); + } + } + } + return instance; + } + + /** + * Registers a Workflow object. + * + * @param any Workflow type + * @param clazz the class being registered + * @return the WorkflowRuntimeBuilder + */ + public WorkflowRuntimeBuilder registerWorkflow(Class clazz) { + this.builder = this.builder.addOrchestration( + new OrchestratorWrapper<>(clazz) + ); + + return this; + } +} diff --git a/sdk-workflows/src/test/java/io/dapr/workflows/DaprWorkflowContextImplTest.java b/sdk-workflows/src/test/java/io/dapr/workflows/DaprWorkflowContextImplTest.java new file mode 100644 index 000000000..2cdd1bada --- /dev/null +++ b/sdk-workflows/src/test/java/io/dapr/workflows/DaprWorkflowContextImplTest.java @@ -0,0 +1,103 @@ +/* + * Copyright 2023 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.workflows; + +import com.microsoft.durabletask.Task; +import com.microsoft.durabletask.TaskOrchestrationContext; +import io.dapr.workflows.DaprWorkflowContextImpl; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; + +import java.time.Duration; + +import static org.junit.Assert.assertThrows; +import static org.mockito.Mockito.*; + +public class DaprWorkflowContextImplTest { + private DaprWorkflowContextImpl context; + private TaskOrchestrationContext mockInnerContext; + + @Before + public void setUp() { + mockInnerContext = mock(TaskOrchestrationContext.class); + context = new DaprWorkflowContextImpl(mockInnerContext); + } + + @Test + public void nullConstructorTest() { + assertThrows(IllegalArgumentException.class, () -> { new DaprWorkflowContextImpl(mockInnerContext, null); }); + assertThrows(IllegalArgumentException.class, () -> { new DaprWorkflowContextImpl(null, mock(Logger.class)); }); + assertThrows(IllegalArgumentException.class, () -> { new DaprWorkflowContextImpl(null, null); }); + } + + @Test + public void getNameTest() { + context.getName(); + verify(mockInnerContext, times(1)).getName(); + } + + @Test + public void getInstanceIdTest() { + context.getInstanceId(); + verify(mockInnerContext, times(1)).getInstanceId(); + } + + @Test + public void waitForExternalEventTest() { + doReturn(mock(Task.class)) + .when(mockInnerContext).waitForExternalEvent(any(String.class), any(Duration.class)); + DaprWorkflowContextImpl testContext = new DaprWorkflowContextImpl(mockInnerContext); + String expectedEvent = "TestEvent"; + Duration expectedDuration = Duration.ofSeconds(1); + + testContext.waitForExternalEvent(expectedEvent, expectedDuration).await(); + verify(mockInnerContext, times(1)).waitForExternalEvent(expectedEvent, expectedDuration); + } + + @Test(expected = IllegalArgumentException.class) + public void DaprWorkflowContextWithEmptyInnerContext() { + context = new DaprWorkflowContextImpl(null, null); + } + + @Test + public void completeTest() { + context.complete(null); + verify(mockInnerContext, times(1)).complete(null); + } + + @Test + public void getLoggerReplayingTest() { + Logger mockLogger = mock(Logger.class); + when(mockInnerContext.getIsReplaying()).thenReturn(true); + DaprWorkflowContextImpl testContext = new DaprWorkflowContextImpl(mockInnerContext, mockLogger); + + String expectedArg = "test print"; + testContext.getLogger().info(expectedArg); + + verify(mockLogger, times(0)).info(any(String.class)); + } + + @Test + public void getLoggerFirstTimeTest() { + Logger mockLogger = mock(Logger.class); + when(mockInnerContext.getIsReplaying()).thenReturn(false); + DaprWorkflowContextImpl testContext = new DaprWorkflowContextImpl(mockInnerContext, mockLogger); + + String expectedArg = "test print"; + testContext.getLogger().info(expectedArg); + + verify(mockLogger, times(1)).info(expectedArg); + } +} diff --git a/sdk-workflows/src/test/java/io/dapr/workflows/client/DaprWorkflowClientTest.java b/sdk-workflows/src/test/java/io/dapr/workflows/client/DaprWorkflowClientTest.java new file mode 100644 index 000000000..a3f826123 --- /dev/null +++ b/sdk-workflows/src/test/java/io/dapr/workflows/client/DaprWorkflowClientTest.java @@ -0,0 +1,120 @@ +/* + * Copyright 2023 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.workflows.client; + +import com.microsoft.durabletask.DurableTaskClient; +import io.dapr.workflows.Workflow; +import io.dapr.workflows.WorkflowStub; +import io.grpc.ManagedChannel; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.lang.reflect.Constructor; +import java.util.Arrays; + +import static org.junit.Assert.assertThrows; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.mockito.Mockito.*; + +public class DaprWorkflowClientTest { + private static Constructor constructor; + private DaprWorkflowClient client; + private DurableTaskClient mockInnerClient; + private ManagedChannel mockGrpcChannel; + + public class TestWorkflow extends Workflow { + @Override + public WorkflowStub create() { + return ctx -> { }; + } + } + + @BeforeClass + public static void beforeAll() { + constructor = + Constructor.class.cast(Arrays.stream(DaprWorkflowClient.class.getDeclaredConstructors()) + .filter(c -> c.getParameters().length == 2).peek(c -> c.setAccessible(true)).findFirst().get()); + } + + @Before + public void setUp() throws Exception { + mockInnerClient = mock(DurableTaskClient.class); + mockGrpcChannel = mock(ManagedChannel.class); + when(mockGrpcChannel.shutdown()).thenReturn(mockGrpcChannel); + + client = constructor.newInstance(mockInnerClient, mockGrpcChannel); + } + + @Test + public void EmptyConstructor() { + assertDoesNotThrow(DaprWorkflowClient::new); + } + + @Test + public void scheduleNewWorkflowWithArgName() { + String expectedName = TestWorkflow.class.getCanonicalName(); + + client.scheduleNewWorkflow(TestWorkflow.class); + + verify(mockInnerClient, times(1)).scheduleNewOrchestrationInstance(expectedName); + } + + @Test + public void scheduleNewWorkflowWithArgsNameInput() { + String expectedName = TestWorkflow.class.getCanonicalName(); + Object expectedInput = new Object(); + + client.scheduleNewWorkflow(TestWorkflow.class, expectedInput); + + verify(mockInnerClient, times(1)) + .scheduleNewOrchestrationInstance(expectedName, expectedInput); + } + + @Test + public void scheduleNewWorkflowWithArgsNameInputInstance() { + String expectedName = TestWorkflow.class.getCanonicalName(); + Object expectedInput = new Object(); + String expectedInstanceId = "myTestInstance123"; + + client.scheduleNewWorkflow(TestWorkflow.class, expectedInput, expectedInstanceId); + + verify(mockInnerClient, times(1)) + .scheduleNewOrchestrationInstance(expectedName, expectedInput, expectedInstanceId); + } + + @Test + public void terminateWorkflow() { + String expectedArgument = "TestWorkflowInstanceId"; + + client.terminateWorkflow(expectedArgument, null); + verify(mockInnerClient, times(1)).terminate(expectedArgument, null); + } + + @Test + public void close() throws InterruptedException { + client.close(); + verify(mockInnerClient, times(1)).close(); + verify(mockGrpcChannel, times(1)).shutdown(); + } + + @Test + public void closeWithInnerClientRuntimeException() throws InterruptedException { + doThrow(RuntimeException.class).when(mockInnerClient).close(); + + assertThrows(RuntimeException.class, () -> { client.close(); }); + verify(mockInnerClient, times(1)).close(); + verify(mockGrpcChannel, times(1)).shutdown(); + } +} diff --git a/sdk-workflows/src/test/java/io/dapr/workflows/runtime/OrchestratorWrapperTest.java b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/OrchestratorWrapperTest.java new file mode 100644 index 000000000..eff6ccd98 --- /dev/null +++ b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/OrchestratorWrapperTest.java @@ -0,0 +1,52 @@ +/* + * Copyright 2023 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.workflows.runtime; + + +import com.microsoft.durabletask.TaskOrchestrationContext; +import io.dapr.workflows.Workflow; +import io.dapr.workflows.WorkflowContext; +import io.dapr.workflows.WorkflowStub; +import org.junit.Assert; +import org.junit.Test; + +import static org.mockito.Mockito.*; + +public class OrchestratorWrapperTest { + public static class TestWorkflow extends Workflow { + @Override + public WorkflowStub create() { + return WorkflowContext::getInstanceId; + } + } + + @Test + public void getName() { + OrchestratorWrapper wrapper = new OrchestratorWrapper<>(TestWorkflow.class); + Assert.assertEquals( + "io.dapr.workflows.runtime.OrchestratorWrapperTest.TestWorkflow", + wrapper.getName() + ); + } + + @Test + public void createWithClass() { + TaskOrchestrationContext mockContext = mock(TaskOrchestrationContext.class); + OrchestratorWrapper wrapper = new OrchestratorWrapper<>(TestWorkflow.class); + when( mockContext.getInstanceId() ).thenReturn("uuid"); + wrapper.create().run(mockContext); + verify(mockContext, times(1)).getInstanceId(); + } + +} diff --git a/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowRuntimeBuilderTest.java b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowRuntimeBuilderTest.java new file mode 100644 index 000000000..905b89e9d --- /dev/null +++ b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowRuntimeBuilderTest.java @@ -0,0 +1,24 @@ +package io.dapr.workflows.runtime; + + +import io.dapr.workflows.Workflow; +import io.dapr.workflows.WorkflowStub; +import org.junit.Test; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; + +public class WorkflowRuntimeBuilderTest { + public static class TestWorkflow extends Workflow { + @Override + public WorkflowStub create() { + return ctx -> { }; + } + } + + @Test + public void registerValidWorkflowClass() { + assertDoesNotThrow(() -> new WorkflowRuntimeBuilder().registerWorkflow(TestWorkflow.class)); + } + + +} diff --git a/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowRuntimeTest.java b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowRuntimeTest.java new file mode 100644 index 000000000..63fdf42e4 --- /dev/null +++ b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowRuntimeTest.java @@ -0,0 +1,50 @@ +/* + * Copyright 2023 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.workflows.runtime; + + +import com.microsoft.durabletask.DurableTaskGrpcWorker; +import com.microsoft.durabletask.DurableTaskGrpcWorkerBuilder; +import io.dapr.workflows.Workflow; +import io.dapr.workflows.WorkflowStub; +import org.junit.Test; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; + +public class WorkflowRuntimeTest { + public static class TestWorkflow extends Workflow { + @Override + public WorkflowStub create() { + return ctx -> { }; + } + } + + @Test + public void startTest() { + DurableTaskGrpcWorker worker = new DurableTaskGrpcWorkerBuilder().build(); + try (WorkflowRuntime runtime = new WorkflowRuntime(worker)) { + assertDoesNotThrow(() -> { + runtime.start(false); + }); + } + } + + @Test + public void closeWithoutStarting() { + DurableTaskGrpcWorker worker = new DurableTaskGrpcWorkerBuilder().build(); + try (WorkflowRuntime runtime = new WorkflowRuntime(worker)) { + assertDoesNotThrow(runtime::close); + } + } +} diff --git a/sdk/src/main/java/io/dapr/client/DaprClientBuilder.java b/sdk/src/main/java/io/dapr/client/DaprClientBuilder.java index 202a6c67e..9b73e5b90 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClientBuilder.java +++ b/sdk/src/main/java/io/dapr/client/DaprClientBuilder.java @@ -16,16 +16,12 @@ import io.dapr.config.Properties; import io.dapr.serializer.DaprObjectSerializer; import io.dapr.serializer.DefaultObjectSerializer; -import io.dapr.utils.Version; +import io.dapr.utils.NetworkUtils; import io.dapr.v1.DaprGrpc; import io.grpc.ManagedChannel; -import io.grpc.ManagedChannelBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.Closeable; -import java.net.URI; - /** * A builder for the DaprClient, * Currently only gRPC and HTTP Client will be supported. @@ -163,34 +159,12 @@ private DaprClient buildDaprClient(DaprApiProtocol protocol) { * @throws java.lang.IllegalStateException if either host is missing or if port is missing or a negative number. */ private DaprClient buildDaprClientGrpc() { - final ManagedChannel channel = buildGrpcManagedChanel(); + final ManagedChannel channel = NetworkUtils.buildGrpcManagedChannel(); final GrpcChannelFacade channelFacade = new GrpcChannelFacade(channel); DaprGrpc.DaprStub asyncStub = DaprGrpc.newStub(channel); return new DaprClientGrpc(channelFacade, asyncStub, this.objectSerializer, this.stateSerializer); } - private ManagedChannel buildGrpcManagedChanel() { - String address = Properties.SIDECAR_IP.get(); - int port = Properties.GRPC_PORT.get(); - boolean insecure = true; - String grpcEndpoint = Properties.GRPC_ENDPOINT.get(); - if ((grpcEndpoint != null) && !grpcEndpoint.isEmpty()) { - URI uri = URI.create(grpcEndpoint); - insecure = uri.getScheme().equalsIgnoreCase("http"); - port = uri.getPort() > 0 ? uri.getPort() : (insecure ? 80 : 443); - address = uri.getHost(); - if ((uri.getPath() != null) && !uri.getPath().isEmpty()) { - address += uri.getPath(); - } - } - ManagedChannelBuilder builder = ManagedChannelBuilder.forAddress(address, port) - .userAgent(Version.getSdkVersion()); - if (insecure) { - builder = builder.usePlaintext(); - } - return builder.build(); - } - /** * Creates and instance of DaprClient over HTTP. * diff --git a/sdk/src/main/java/io/dapr/utils/NetworkUtils.java b/sdk/src/main/java/io/dapr/utils/NetworkUtils.java index 840c67cef..8f755f313 100644 --- a/sdk/src/main/java/io/dapr/utils/NetworkUtils.java +++ b/sdk/src/main/java/io/dapr/utils/NetworkUtils.java @@ -13,9 +13,14 @@ package io.dapr.utils; +import io.dapr.config.Properties; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; + import java.io.IOException; import java.net.InetSocketAddress; import java.net.Socket; +import java.net.URI; /** * Utility methods for network, internal to Dapr SDK. @@ -47,4 +52,30 @@ public static void waitForSocket(String host, int port, int timeoutInMillisecond } }, timeoutInMilliseconds); } + + /** + * Creates a GRPC managed channel. + * @return GRPC managed channel to communicate with the sidecar. + */ + public static ManagedChannel buildGrpcManagedChannel() { + String address = Properties.SIDECAR_IP.get(); + int port = Properties.GRPC_PORT.get(); + boolean insecure = true; + String grpcEndpoint = Properties.GRPC_ENDPOINT.get(); + if ((grpcEndpoint != null) && !grpcEndpoint.isEmpty()) { + URI uri = URI.create(grpcEndpoint); + insecure = uri.getScheme().equalsIgnoreCase("http"); + port = uri.getPort() > 0 ? uri.getPort() : (insecure ? 80 : 443); + address = uri.getHost(); + if ((uri.getPath() != null) && !uri.getPath().isEmpty()) { + address += uri.getPath(); + } + } + ManagedChannelBuilder builder = ManagedChannelBuilder.forAddress(address, port) + .userAgent(Version.getSdkVersion()); + if (insecure) { + builder = builder.usePlaintext(); + } + return builder.build(); + } } diff --git a/sdk/src/test/java/io/dapr/utils/NetworkUtilsTest.java b/sdk/src/test/java/io/dapr/utils/NetworkUtilsTest.java new file mode 100644 index 000000000..0c1417cea --- /dev/null +++ b/sdk/src/test/java/io/dapr/utils/NetworkUtilsTest.java @@ -0,0 +1,62 @@ +package io.dapr.utils; + +import io.dapr.config.Properties; +import io.grpc.ManagedChannel; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class NetworkUtilsTest { + private final int defaultGrpcPort = 4000; + private final String defaultSidecarIP = "127.0.0.1"; + @Before + public void setUp() { + System.setProperty(Properties.GRPC_PORT.getName(), Integer.toString(defaultGrpcPort)); + System.setProperty(Properties.SIDECAR_IP.getName(), defaultSidecarIP); + System.setProperty(Properties.GRPC_ENDPOINT.getName(), ""); + } + + @Test + public void testBuildGrpcManagedChannel() { + ManagedChannel channel = NetworkUtils.buildGrpcManagedChannel(); + + String expectedAuthority = String.format("%s:%s", defaultSidecarIP, defaultGrpcPort); + Assert.assertEquals(expectedAuthority, channel.authority()); + } + + @Test + public void testBuildGrpcManagedChannel_httpEndpointNoPort() { + System.setProperty(Properties.GRPC_ENDPOINT.getName(), "http://example.com"); + ManagedChannel channel = NetworkUtils.buildGrpcManagedChannel(); + + String expectedAuthority = "example.com:80"; + Assert.assertEquals(expectedAuthority, channel.authority()); + } + + @Test + public void testBuildGrpcManagedChannel_httpEndpointWithPort() { + System.setProperty(Properties.GRPC_ENDPOINT.getName(), "http://example.com:3000"); + ManagedChannel channel = NetworkUtils.buildGrpcManagedChannel(); + + String expectedAuthority = "example.com:3000"; + Assert.assertEquals(expectedAuthority, channel.authority()); + } + + @Test + public void testBuildGrpcManagedChannel_httpsEndpointNoPort() { + System.setProperty(Properties.GRPC_ENDPOINT.getName(), "https://example.com"); + ManagedChannel channel = NetworkUtils.buildGrpcManagedChannel(); + + String expectedAuthority = "example.com:443"; + Assert.assertEquals(expectedAuthority, channel.authority()); + } + + @Test + public void testBuildGrpcManagedChannel_httpsEndpointWithPort() { + System.setProperty(Properties.GRPC_ENDPOINT.getName(), "https://example.com:3000"); + ManagedChannel channel = NetworkUtils.buildGrpcManagedChannel(); + + String expectedAuthority = "example.com:3000"; + Assert.assertEquals(expectedAuthority, channel.authority()); + } +}