diff --git a/examples/src/main/java/io/dapr/examples/workflows/DemoActivityInput.java b/examples/src/main/java/io/dapr/examples/workflows/DemoActivityInput.java new file mode 100644 index 000000000..5b0675b9f --- /dev/null +++ b/examples/src/main/java/io/dapr/examples/workflows/DemoActivityInput.java @@ -0,0 +1,34 @@ +/* + * Copyright 2023 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.examples.workflows; + +public class DemoActivityInput { + + private String message; + + public DemoActivityInput() { + } + + public DemoActivityInput(String message) { + this.message = message; + } + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } +} diff --git a/examples/src/main/java/io/dapr/examples/workflows/DemoActivityOutput.java b/examples/src/main/java/io/dapr/examples/workflows/DemoActivityOutput.java new file mode 100644 index 000000000..8d060419c --- /dev/null +++ b/examples/src/main/java/io/dapr/examples/workflows/DemoActivityOutput.java @@ -0,0 +1,44 @@ +/* + * Copyright 2023 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.examples.workflows; + +public class DemoActivityOutput { + + private String originalMessage; + private String newMessage; + + public DemoActivityOutput() { + } + + public DemoActivityOutput(String originalMessage, String newMessage) { + this.originalMessage = originalMessage; + this.newMessage = newMessage; + } + + public String getOriginalMessage() { + return originalMessage; + } + + public void setOriginalMessage(String originalMessage) { + this.originalMessage = originalMessage; + } + + public String getNewMessage() { + return newMessage; + } + + public void setNewMessage(String newMessage) { + this.newMessage = newMessage; + } +} diff --git a/examples/src/main/java/io/dapr/examples/workflows/DemoSubWorkflow.java b/examples/src/main/java/io/dapr/examples/workflows/DemoSubWorkflow.java new file mode 100644 index 000000000..096744382 --- /dev/null +++ b/examples/src/main/java/io/dapr/examples/workflows/DemoSubWorkflow.java @@ -0,0 +1,39 @@ +/* + * Copyright 2023 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.examples.workflows; + +import io.dapr.workflows.Workflow; +import io.dapr.workflows.WorkflowStub; + +/** + * Implementation of the DemoWorkflow for the server side. + */ +public class DemoSubWorkflow extends Workflow { + @Override + public WorkflowStub create() { + return ctx -> { + + var logger = ctx.getLogger(); + logger.info("Child-Workflow> Started: " + ctx.getName()); + logger.info("Child-Workflow> Instance ID: " + ctx.getInstanceId()); + logger.info("Child-Workflow> Current Time: " + ctx.getCurrentInstant()); + + var input = ctx.getInput(String.class); + logger.info("Child-Workflow> Input: " + input); + + logger.info("Child-Workflow> Completed"); + ctx.complete("result: " + input); + }; + } +} diff --git a/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflow.java b/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflow.java index f09954c47..3552feb5f 100644 --- a/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflow.java +++ b/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflow.java @@ -13,31 +13,114 @@ package io.dapr.examples.workflows; +import com.microsoft.durabletask.CompositeTaskFailedException; +import com.microsoft.durabletask.Task; import com.microsoft.durabletask.TaskCanceledException; import io.dapr.workflows.Workflow; import io.dapr.workflows.WorkflowStub; import java.time.Duration; +import java.util.Arrays; +import java.util.List; /** * Implementation of the DemoWorkflow for the server side. */ public class DemoWorkflow extends Workflow { - @Override public WorkflowStub create() { return ctx -> { ctx.getLogger().info("Starting Workflow: " + ctx.getName()); ctx.getLogger().info("Instance ID: " + ctx.getInstanceId()); - ctx.getLogger().info("Waiting for event: 'myEvent'..."); + ctx.getLogger().info("Current Orchestration Time: " + ctx.getCurrentInstant()); + ctx.getLogger().info("Waiting for event: 'TimedOutEvent'..."); + try { + ctx.waitForExternalEvent("TimedOutEvent", Duration.ofSeconds(10)).await(); + } catch (TaskCanceledException e) { + ctx.getLogger().warn("Timed out"); + ctx.getLogger().warn(e.getMessage()); + } + + ctx.getLogger().info("Waiting for event: 'TestEvent'..."); + try { + ctx.waitForExternalEvent("TestEvent", Duration.ofSeconds(10)).await(); + ctx.getLogger().info("Received TestEvent"); + } catch (TaskCanceledException e) { + ctx.getLogger().warn("Timed out"); + ctx.getLogger().warn(e.getMessage()); + } + + ctx.getLogger().info("Parallel Execution - Waiting for all tasks to finish..."); + try { + Task t1 = ctx.waitForExternalEvent("event1", Duration.ofSeconds(5), String.class); + Task t2 = ctx.waitForExternalEvent("event2", Duration.ofSeconds(5), String.class); + Task t3 = ctx.waitForExternalEvent("event3", Duration.ofSeconds(5), String.class); + + List results = ctx.allOf(Arrays.asList(t1, t2, t3)).await(); + results.forEach(t -> ctx.getLogger().info("finished task: " + t)); + ctx.getLogger().info("All tasks finished!"); + + } catch (CompositeTaskFailedException e) { + ctx.getLogger().warn(e.getMessage()); + List exceptions = e.getExceptions(); + exceptions.forEach(ex -> ctx.getLogger().warn(ex.getMessage())); + } + + ctx.getLogger().info("Parallel Execution - Waiting for any task to finish..."); try { - ctx.waitForExternalEvent("myEvent", Duration.ofSeconds(10)).await(); - ctx.getLogger().info("Received!"); + Task e1 = ctx.waitForExternalEvent("e1", Duration.ofSeconds(5), String.class); + Task e2 = ctx.waitForExternalEvent("e2", Duration.ofSeconds(5), String.class); + Task e3 = ctx.waitForExternalEvent("e3", Duration.ofSeconds(5), String.class); + Task timeoutTask = ctx.createTimer(Duration.ofSeconds(1)); + + Task winner = ctx.anyOf(Arrays.asList(e1, e2, e3, timeoutTask)).await(); + if (winner == timeoutTask) { + ctx.getLogger().info("All tasks timed out!"); + } else { + ctx.getLogger().info("One of the tasks finished!"); + } } catch (TaskCanceledException e) { ctx.getLogger().warn("Timed out"); ctx.getLogger().warn(e.getMessage()); } - ctx.complete("finished"); + + ctx.getLogger().info("Calling Activity..."); + var input = new DemoActivityInput("Hello Activity!"); + var output = ctx.callActivity(DemoWorkflowActivity.class.getName(), input, DemoActivityOutput.class).await(); + + ctx.getLogger().info("Activity returned: " + output); + ctx.getLogger().info("Activity returned: " + output.getNewMessage()); + ctx.getLogger().info("Activity returned: " + output.getOriginalMessage()); + + + boolean shouldComplete = true; + ctx.getLogger().info("Waiting for event: 'RestartEvent'..."); + try { + ctx.waitForExternalEvent("RestartEvent", Duration.ofSeconds(10)).await(); + ctx.getLogger().info("Received RestartEvent"); + ctx.getLogger().info("Restarting Workflow by calling continueAsNew..."); + ctx.continueAsNew("TestInputRestart", false); + shouldComplete = false; + } catch (TaskCanceledException e) { + ctx.getLogger().warn("Restart Timed out"); + ctx.getLogger().warn(e.getMessage()); + } + + if (shouldComplete) { + ctx.getLogger().info("Child-Workflow> Calling ChildWorkflow..."); + var childWorkflowInput = "Hello ChildWorkflow!"; + var childWorkflowOutput = + ctx.callSubWorkflow(DemoSubWorkflow.class.getName(), childWorkflowInput, String.class).await(); + + ctx.getLogger().info("Child-Workflow> returned: " + childWorkflowOutput); + + ctx.getLogger().info("Workflow finished"); + ctx.complete("finished"); + + return; + } + + ctx.getLogger().info("Workflow restarted"); }; } } diff --git a/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflowActivity.java b/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflowActivity.java new file mode 100644 index 000000000..9e82f2a4d --- /dev/null +++ b/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflowActivity.java @@ -0,0 +1,53 @@ +/* + * Copyright 2023 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.examples.workflows; + +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import io.dapr.workflows.runtime.WorkflowActivity; +import io.dapr.workflows.runtime.WorkflowActivityContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeUnit; + +@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.ANY) +public class DemoWorkflowActivity implements WorkflowActivity { + + @Override + public DemoActivityOutput run(WorkflowActivityContext ctx) { + Logger logger = LoggerFactory.getLogger(DemoWorkflowActivity.class); + logger.info("Starting Activity: " + ctx.getName()); + + var message = ctx.getInput(DemoActivityInput.class).getMessage(); + var newMessage = message + " World!, from Activity"; + logger.info("Message Received from input: " + message); + logger.info("Sending message to output: " + newMessage); + + logger.info("Sleeping for 5 seconds to simulate long running operation..."); + + try { + TimeUnit.SECONDS.sleep(5); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + + logger.info("Activity finished"); + + var output = new DemoActivityOutput(message, newMessage); + logger.info("Activity returned: " + output); + + return output; + } +} diff --git a/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflowClient.java b/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflowClient.java index cf439c969..3aae02669 100644 --- a/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflowClient.java +++ b/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflowClient.java @@ -14,8 +14,11 @@ package io.dapr.examples.workflows; import io.dapr.workflows.client.DaprWorkflowClient; +import io.dapr.workflows.client.WorkflowInstanceStatus; +import java.time.Duration; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** * For setup instructions, see the README. @@ -24,6 +27,7 @@ public class DemoWorkflowClient { /** * The main method. + * * @param args Input arguments (unused). * @throws InterruptedException If program has been interrupted. */ @@ -31,14 +35,67 @@ public static void main(String[] args) throws InterruptedException { DaprWorkflowClient client = new DaprWorkflowClient(); try (client) { - System.out.println("*****"); - String instanceId = client.scheduleNewWorkflow(DemoWorkflow.class); + String separatorStr = "*******"; + System.out.println(separatorStr); + String instanceId = client.scheduleNewWorkflow(DemoWorkflow.class, "input data"); System.out.printf("Started new workflow instance with random ID: %s%n", instanceId); - System.out.println("Sleep and allow this workflow instance to timeout..."); - TimeUnit.SECONDS.sleep(10); + System.out.println(separatorStr); + System.out.println("**GetInstanceMetadata:Running Workflow**"); + WorkflowInstanceStatus workflowMetadata = client.getInstanceState(instanceId, true); + System.out.printf("Result: %s%n", workflowMetadata); - System.out.println("*****"); + System.out.println(separatorStr); + System.out.println("**WaitForInstanceStart**"); + try { + WorkflowInstanceStatus waitForInstanceStartResult = + client.waitForInstanceStart(instanceId, Duration.ofSeconds(60), true); + System.out.printf("Result: %s%n", waitForInstanceStartResult); + } catch (TimeoutException ex) { + System.out.printf("waitForInstanceStart has an exception:%s%n", ex); + } + + System.out.println(separatorStr); + System.out.println("**SendExternalMessage**"); + client.raiseEvent(instanceId, "TestEvent", "TestEventPayload"); + + System.out.println(separatorStr); + System.out.println("** Registering parallel Events to be captured by allOf(t1,t2,t3) **"); + client.raiseEvent(instanceId, "event1", "TestEvent 1 Payload"); + client.raiseEvent(instanceId, "event2", "TestEvent 2 Payload"); + client.raiseEvent(instanceId, "event3", "TestEvent 3 Payload"); + System.out.printf("Events raised for workflow with instanceId: %s\n", instanceId); + + System.out.println(separatorStr); + System.out.println("** Registering Event to be captured by anyOf(t1,t2,t3) **"); + client.raiseEvent(instanceId, "e2", "event 2 Payload"); + System.out.printf("Event raised for workflow with instanceId: %s\n", instanceId); + + + System.out.println(separatorStr); + System.out.println("**WaitForInstanceCompletion**"); + try { + WorkflowInstanceStatus waitForInstanceCompletionResult = + client.waitForInstanceCompletion(instanceId, Duration.ofSeconds(60), true); + System.out.printf("Result: %s%n", waitForInstanceCompletionResult); + } catch (TimeoutException ex) { + System.out.printf("waitForInstanceCompletion has an exception:%s%n", ex); + } + + System.out.println(separatorStr); + System.out.println("**purgeInstance**"); + boolean purgeResult = client.purgeInstance(instanceId); + System.out.printf("purgeResult: %s%n", purgeResult); + + System.out.println(separatorStr); + System.out.println("**raiseEvent**"); + + String eventInstanceId = client.scheduleNewWorkflow(DemoWorkflow.class); + System.out.printf("Started new workflow instance with random ID: %s%n", eventInstanceId); + client.raiseEvent(eventInstanceId, "TestException", null); + System.out.printf("Event raised for workflow with instanceId: %s\n", eventInstanceId); + + System.out.println(separatorStr); String instanceToTerminateId = "terminateMe"; client.scheduleNewWorkflow(DemoWorkflow.class, null, instanceToTerminateId); System.out.printf("Started new workflow instance with specified ID: %s%n", instanceToTerminateId); @@ -46,7 +103,20 @@ public static void main(String[] args) throws InterruptedException { TimeUnit.SECONDS.sleep(5); System.out.println("Terminate this workflow instance manually before the timeout is reached"); client.terminateWorkflow(instanceToTerminateId, null); - System.out.println("*****"); + System.out.println(separatorStr); + + String restartingInstanceId = "restarting"; + client.scheduleNewWorkflow(DemoWorkflow.class, null, restartingInstanceId); + System.out.printf("Started new workflow instance with ID: %s%n", restartingInstanceId); + System.out.println("Sleeping 30 seconds to restart the workflow"); + TimeUnit.SECONDS.sleep(30); + + System.out.println("**SendExternalMessage: RestartEvent**"); + client.raiseEvent(restartingInstanceId, "RestartEvent", "RestartEventPayload"); + + System.out.println("Sleeping 30 seconds to terminate the eternal workflow"); + TimeUnit.SECONDS.sleep(30); + client.terminateWorkflow(restartingInstanceId, null); } System.out.println("Exiting DemoWorkflowClient."); diff --git a/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflowWorker.java b/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflowWorker.java index f41fc4fb2..21bd01052 100644 --- a/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflowWorker.java +++ b/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflowWorker.java @@ -30,6 +30,7 @@ public class DemoWorkflowWorker { public static void main(String[] args) throws Exception { // Register the Workflow with the builder. WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder().registerWorkflow(DemoWorkflow.class); + builder.registerActivity(DemoWorkflowActivity.class); // Build and then start the workflow runtime pulling and executing tasks try (WorkflowRuntime runtime = builder.build()) { diff --git a/pom.xml b/pom.xml index 947f63a5e..9aea3460b 100644 --- a/pom.xml +++ b/pom.xml @@ -25,7 +25,12 @@ 8 8 true - + + 2.12.3 true 4.0.0-RC1 true @@ -270,7 +275,8 @@ false true site - io.dapr.examples:io.dapr.springboot:io.dapr.examples.*:io.dapr.springboot.* + io.dapr.examples:io.dapr.springboot:io.dapr.examples.*:io.dapr.springboot.* + @@ -293,9 +299,9 @@ - https://github.com/dapr/java-sdk - scm:git:https://github.com/dapr/java-sdk.git - HEAD + https://github.com/dapr/java-sdk + scm:git:https://github.com/dapr/java-sdk.git + HEAD diff --git a/sdk-actors/pom.xml b/sdk-actors/pom.xml index 2d747fa55..143312c92 100644 --- a/sdk-actors/pom.xml +++ b/sdk-actors/pom.xml @@ -1,7 +1,7 @@ + xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> 4.0.0 @@ -37,12 +37,12 @@ mockito-core test - - com.github.gmazzo - okhttp-mock - 1.4.1 - test - + + com.github.gmazzo + okhttp-mock + 1.4.1 + test + org.junit.jupiter junit-jupiter-api @@ -129,7 +129,7 @@ BUNDLE - + LINE COVEREDRATIO 80% diff --git a/sdk-workflows/pom.xml b/sdk-workflows/pom.xml index 51051e35d..d27439f36 100644 --- a/sdk-workflows/pom.xml +++ b/sdk-workflows/pom.xml @@ -1,7 +1,7 @@ + xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> 4.0.0 @@ -18,8 +18,6 @@ false - 1.42.1 - 2.12.3 @@ -38,11 +36,23 @@ mockito-core test + + org.mockito + mockito-inline + 4.2.0 + test + org.junit.jupiter junit-jupiter test + + org.junit.vintage + junit-vintage-engine + 5.7.0 + test + com.microsoft durabletask-client @@ -135,7 +145,7 @@ BUNDLE - + LINE COVEREDRATIO 80% diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/DaprWorkflowContextImpl.java b/sdk-workflows/src/main/java/io/dapr/workflows/DaprWorkflowContextImpl.java index 121026563..38594126c 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/DaprWorkflowContextImpl.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/DaprWorkflowContextImpl.java @@ -13,13 +13,20 @@ package io.dapr.workflows; +import com.microsoft.durabletask.CompositeTaskFailedException; import com.microsoft.durabletask.Task; +import com.microsoft.durabletask.TaskCanceledException; +import com.microsoft.durabletask.TaskOptions; import com.microsoft.durabletask.TaskOrchestrationContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.helpers.NOPLogger; +import javax.annotation.Nullable; + import java.time.Duration; +import java.time.Instant; +import java.util.List; public class DaprWorkflowContextImpl implements WorkflowContext { private final TaskOrchestrationContext innerContext; @@ -39,7 +46,7 @@ public DaprWorkflowContextImpl(TaskOrchestrationContext context) throws IllegalA * Constructor for DaprWorkflowContextImpl. * * @param context TaskOrchestrationContext - * @param logger Logger + * @param logger Logger * @throws IllegalArgumentException if context or logger is null */ public DaprWorkflowContextImpl(TaskOrchestrationContext context, Logger logger) throws IllegalArgumentException { @@ -78,6 +85,13 @@ public String getInstanceId() { return this.innerContext.getInstanceId(); } + /** + * {@inheritDoc} + */ + public Instant getCurrentInstant() { + return this.innerContext.getCurrentInstant(); + } + /** * {@inheritDoc} */ @@ -88,7 +102,106 @@ public void complete(Object output) { /** * {@inheritDoc} */ - public Task waitForExternalEvent(String eventName, Duration timeout) { - return this.innerContext.waitForExternalEvent(eventName, timeout); + @Override + public Task waitForExternalEvent(String name, Duration timeout, Class dataType) + throws TaskCanceledException { + return this.innerContext.waitForExternalEvent(name, timeout, dataType); + } + + /** + * Waits for an event to be raised named {@code name} and returns a {@link Task} that completes when the event is + * received or is canceled when {@code timeout} expires. + * + *

See {@link #waitForExternalEvent(String, Duration, Class)} for a full description. + * + * @param name the case-insensitive name of the event to wait for + * @param timeout the amount of time to wait before canceling the returned {@code Task} + * @return a new {@link Task} that completes when the external event is received or when {@code timeout} expires + * @throws TaskCanceledException if the specified {@code timeout} value expires before the event is received + */ + @Override + public Task waitForExternalEvent(String name, Duration timeout) throws TaskCanceledException { + return this.innerContext.waitForExternalEvent(name, timeout, Void.class); + } + + /** + * Waits for an event to be raised named {@code name} and returns a {@link Task} that completes when the event is + * received. + * + *

See {@link #waitForExternalEvent(String, Duration, Class)} for a full description. + * + * @param name the case-insensitive name of the event to wait for + * @return a new {@link Task} that completes when the external event is received + */ + @Override + public Task waitForExternalEvent(String name) throws TaskCanceledException { + return this.innerContext.waitForExternalEvent(name, null, Void.class); + } + + @Override + public boolean isReplaying() { + return this.innerContext.getIsReplaying(); + } + + /** + * {@inheritDoc} + */ + public Task callActivity(String name, Object input, TaskOptions options, Class returnType) { + return this.innerContext.callActivity(name, input, options, returnType); + } + + /** + * {@inheritDoc} + */ + public Task> allOf(List> tasks) throws CompositeTaskFailedException { + return this.innerContext.allOf(tasks); + } + + /** + * {@inheritDoc} + */ + public Task> anyOf(List> tasks) { + return this.innerContext.anyOf(tasks); + } + + /** + * {@inheritDoc} + */ + public Task createTimer(Duration duration) { + return this.innerContext.createTimer(duration); + } + + + /** + * {@inheritDoc} + */ + public T getInput(Class targetType) { + return this.innerContext.getInput(targetType); + } + + /** + * {@inheritDoc} + */ + @Override + public Task callSubWorkflow(String name, @Nullable Object input, @Nullable String instanceID, + @Nullable TaskOptions options, Class returnType) { + + return this.innerContext.callSubOrchestrator(name, input, instanceID, options, returnType); + } + + /** + * {@inheritDoc} + */ + @Override + public void continueAsNew(Object input) { + this.innerContext.continueAsNew(input); + } + + /** + * {@inheritDoc} + */ + @Override + public void continueAsNew(Object input, boolean preserveUnprocessedEvents) { + this.innerContext.continueAsNew(input, preserveUnprocessedEvents); } } diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowContext.java b/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowContext.java index 61b983de2..5d33ed45a 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowContext.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowContext.java @@ -13,10 +13,20 @@ package io.dapr.workflows; +import com.microsoft.durabletask.CompositeTaskFailedException; import com.microsoft.durabletask.Task; +import com.microsoft.durabletask.TaskCanceledException; +import com.microsoft.durabletask.TaskFailedException; +import com.microsoft.durabletask.TaskOptions; import org.slf4j.Logger; +import javax.annotation.Nullable; + import java.time.Duration; +import java.time.Instant; +import java.time.ZonedDateTime; +import java.util.Arrays; +import java.util.List; /** * Context object used by workflow implementations to perform actions such as scheduling activities, @@ -48,6 +58,13 @@ public interface WorkflowContext { */ String getInstanceId(); + /** + * Gets the current orchestration time in UTC. + * + * @return the current orchestration time in UTC + */ + Instant getCurrentInstant(); + /** * Completes the current workflow. * @@ -56,13 +73,445 @@ public interface WorkflowContext { void complete(Object output); /** - * Waits for an event to be raised with name and returns the event data. + * Waits for an event to be raised named {@code name} and returns a {@link Task} that completes when the event is + * received or is canceled when {@code timeout} expires. + * + *

If the current orchestration is not yet waiting for an event named {@code name}, then the event will be saved in + * the orchestration instance state and dispatched immediately when this method is called. This event saving occurs + * even if the current orchestrator cancels the wait operation before the event is received. + * + *

Orchestrators can wait for the same event name multiple times, so waiting for multiple events with the same name + * is allowed. Each external event received by an orchestrator will complete just one task returned by this method. + * + * @param name the case-insensitive name of the event to wait for + * @param timeout the amount of time to wait before canceling the returned {@code Task} + * @param dataType the expected class type of the event data payload + * @param the expected type of the event data payload + * @return a new {@link Task} that completes when the external event is received or when {@code timeout} expires + * @throws TaskCanceledException if the specified {@code timeout} value expires before the event is received + */ + Task waitForExternalEvent(String name, Duration timeout, Class dataType) throws TaskCanceledException; + + /** + * Waits for an event to be raised named {@code name} and returns a {@link Task} that completes when the event is + * received or is canceled when {@code timeout} expires. + * + *

See {@link #waitForExternalEvent(String, Duration, Class)} for a full description. + * + * @param name the case-insensitive name of the event to wait for + * @param timeout the amount of time to wait before canceling the returned {@code Task} + * @param the expected type of the event data payload + * @return a new {@link Task} that completes when the external event is received or when {@code timeout} expires + * @throws TaskCanceledException if the specified {@code timeout} value expires before the event is received + */ + Task waitForExternalEvent(String name, Duration timeout) throws TaskCanceledException; + + /** + * Waits for an event to be raised named {@code name} and returns a {@link Task} that completes when the event is + * received. + * + *

See {@link #waitForExternalEvent(String, Duration, Class)} for a full description. + * + * @param name the case-insensitive name of the event to wait for + * @param the expected type of the event data payload + * @return a new {@link Task} that completes when the external event is received + */ + Task waitForExternalEvent(String name) throws TaskCanceledException; + + /** + * Waits for an event to be raised named {@code name} and returns a {@link Task} that completes when the event is + * received. + * + *

See {@link #waitForExternalEvent(String, Duration, Class)} for a full description. + * + * @param name the case-insensitive name of the event to wait for + * @param dataType the expected class type of the event data payload + * @param the expected type of the event data payload + * @return a new {@link Task} that completes when the external event is received + */ + default Task waitForExternalEvent(String name, Class dataType) { + try { + return this.waitForExternalEvent(name, null, dataType); + } catch (TaskCanceledException e) { + // This should never happen because of the max duration + throw new RuntimeException("An unexpected exception was throw while waiting for an external event.", e); + } + } + + /** + * Asynchronously invokes an activity by name and with the specified input value and returns a new {@link Task} + * that completes when the activity completes. If the activity completes successfully, the returned {@code Task}'s + * value will be the activity's output. If the activity fails, the returned {@code Task} will complete exceptionally + * with a {@link TaskFailedException}. + * + * @param name the name of the activity to call + * @param input the serializable input to pass to the activity + * @param options additional options that control the execution and processing of the activity + * @param returnType the expected class type of the activity output + * @param the expected type of the activity output + * @return a new {@link Task} that completes when the activity completes or fails + */ + Task callActivity(String name, Object input, TaskOptions options, Class returnType); + + /** + * Asynchronously invokes an activity by name and returns a new {@link Task} that completes when the activity + * completes. See {@link #callActivity(String, Object, TaskOptions, Class)} for a complete description. + * + * @param name the name of the activity to call + * @return a new {@link Task} that completes when the activity completes or fails + * @see #callActivity(String, Object, TaskOptions, Class) + */ + default Task callActivity(String name) { + return this.callActivity(name, null, null, Void.class); + } + + /** + * Asynchronously invokes an activity by name and with the specified input value and returns a new {@link Task} + * that completes when the activity completes. See {@link #callActivity(String, Object, TaskOptions, Class)} for a + * complete description. + * + * @param name the name of the activity to call + * @param input the serializable input to pass to the activity + * @return a new {@link Task} that completes when the activity completes or fails + */ + default Task callActivity(String name, Object input) { + return this.callActivity(name, input, null, Void.class); + } + + /** + * Asynchronously invokes an activity by name and returns a new {@link Task} that completes when the activity + * completes. If the activity completes successfully, the returned {@code Task}'s value will be the activity's + * output. See {@link #callActivity(String, Object, TaskOptions, Class)} for a complete description. + * + * @param name the name of the activity to call + * @param returnType the expected class type of the activity output + * @param the expected type of the activity output + * @return a new {@link Task} that completes when the activity completes or fails + */ + default Task callActivity(String name, Class returnType) { + return this.callActivity(name, null, null, returnType); + } + + /** + * Asynchronously invokes an activity by name and with the specified input value and returns a new {@link Task} + * that completes when the activity completes.If the activity completes successfully, the returned {@code Task}'s + * value will be the activity's output. See {@link #callActivity(String, Object, TaskOptions, Class)} for a + * complete description. + * + * @param name the name of the activity to call + * @param input the serializable input to pass to the activity + * @param returnType the expected class type of the activity output + * @param the expected type of the activity output + * @return a new {@link Task} that completes when the activity completes or fails + */ + default Task callActivity(String name, Object input, Class returnType) { + return this.callActivity(name, input, null, returnType); + } + + /** + * Asynchronously invokes an activity by name and with the specified input value and returns a new {@link Task} + * that completes when the activity completes. See {@link #callActivity(String, Object, TaskOptions, Class)} for a + * complete description. + * + * @param name the name of the activity to call + * @param input the serializable input to pass to the activity + * @param options additional options that control the execution and processing of the activity + * @return a new {@link Task} that completes when the activity completes or fails + */ + default Task callActivity(String name, Object input, TaskOptions options) { + return this.callActivity(name, input, options, Void.class); + } + + /** + * Gets a value indicating whether the workflow is currently replaying a previous execution. + * + *

Workflow functions are "replayed" after being unloaded from memory to reconstruct local variable state. + * During a replay, previously executed tasks will be completed automatically with previously seen values + * that are stored in the workflow history. Once the workflow reaches the point where it's no longer + * replaying existing history, this method will return {@code false}. + * + *

You can use this method if you have logic that needs to run only when not replaying. For example, + * certain types of application logging may become too noisy when duplicated as part of replay. The + * application code could check to see whether the function is being replayed and then issue the log statements + * when this value is {@code false}. + * + * @return {@code true} if the workflow is replaying, otherwise {@code false} + */ + boolean isReplaying(); + + /** + * Returns a new {@code Task} that is completed when all the given {@code Task}s complete. If any of the given + * {@code Task}s complete with an exception, the returned {@code Task} will also complete with an + * {@link CompositeTaskFailedException} containing details of the first encountered failure. + * The value of the returned {@code Task} is an ordered list of the return values of the given tasks. + * If no tasks are provided, returns a {@code Task} completed with value + * {@code null}. + * + *

This method is useful for awaiting the completion of a set of independent tasks before continuing to the next + * step in the orchestration, as in the following example: + *

{@code
+   * Task t1 = ctx.callActivity("MyActivity", String.class);
+   * Task t2 = ctx.callActivity("MyActivity", String.class);
+   * Task t3 = ctx.callActivity("MyActivity", String.class);
+   *
+   * List orderedResults = ctx.allOf(List.of(t1, t2, t3)).await();
+   * }
+ * + *

Exceptions in any of the given tasks results in an unchecked {@link CompositeTaskFailedException}. + * This exception can be inspected to obtain failure details of individual {@link Task}s. + *

{@code
+   * try {
+   *     List orderedResults = ctx.allOf(List.of(t1, t2, t3)).await();
+   * } catch (CompositeTaskFailedException e) {
+   *     List exceptions = e.getExceptions()
+   * }
+   * }
+ * + * @param tasks the list of {@code Task} objects + * @param the return type of the {@code Task} objects + * @return the values of the completed {@code Task} objects in the same order as the source list + * @throws CompositeTaskFailedException if the specified {@code timeout} value expires before the event is received + */ + Task> allOf(List> tasks) throws CompositeTaskFailedException; + + /** + * Returns a new {@code Task} that is completed when any of the tasks in {@code tasks} completes. + * See {@link #anyOf(Task[])} for more detailed information. + * + * @param tasks the list of {@code Task} objects + * @return a new {@code Task} that is completed when any of the given {@code Task}s complete + * @see #anyOf(Task[]) + */ + Task> anyOf(List> tasks); + + /** + * Returns a new {@code Task} that is completed when any of the given {@code Task}s complete. The value of the + * new {@code Task} is a reference to the completed {@code Task} object. If no tasks are provided, returns a + * {@code Task} that never completes. + * + *

This method is useful for waiting on multiple concurrent tasks and performing a task-specific operation when the + * first task completes, as in the following example: + *

{@code
+   * Task event1 = ctx.waitForExternalEvent("Event1");
+   * Task event2 = ctx.waitForExternalEvent("Event2");
+   * Task event3 = ctx.waitForExternalEvent("Event3");
+   *
+   * Task winner = ctx.anyOf(event1, event2, event3).await();
+   * if (winner == event1) {
+   *     // ...
+   * } else if (winner == event2) {
+   *     // ...
+   * } else if (winner == event3) {
+   *     // ...
+   * }
+   * }
+ * The {@code anyOf} method can also be used for implementing long-running timeouts, as in the following example: + *
{@code
+   * Task activityTask = ctx.callActivity("SlowActivity");
+   * Task timeoutTask = ctx.createTimer(Duration.ofMinutes(30));
+   *
+   * Task winner = ctx.anyOf(activityTask, timeoutTask).await();
+   * if (winner == activityTask) {
+   *     // completion case
+   * } else {
+   *     // timeout case
+   * }
+   * }
+ * + * @param tasks the list of {@code Task} objects + * @return a new {@code Task} that is completed when any of the given {@code Task}s complete + */ + default Task> anyOf(Task... tasks) { + return this.anyOf(Arrays.asList(tasks)); + } + + /** + * Creates a durable timer that expires after the specified delay. + * + *

Specifying a long delay (for example, a delay of a few days or more) may result in the creation of multiple, + * internally-managed durable timers. The orchestration code doesn't need to be aware of this behavior. However, + * it may be visible in framework logs and the stored history state. + * + * @param duration the amount of time before the timer should expire + * @return a new {@code Task} that completes after the specified delay + */ + Task createTimer(Duration duration); + + /** + * Creates a durable timer that expires after the specified timestamp with specific zone. + * + *

Specifying a long delay (for example, a delay of a few days or more) may result in the creation of multiple, + * internally-managed timers. The workflow code doesn't need to be aware of this behavior. However, + * it may be visible in framework logs and the stored history state. + * + * @param zonedDateTime timestamp with specific zone when the timer should expire + * @return a new {@code Task} that completes after the specified delay + */ + default Task createTimer(ZonedDateTime zonedDateTime) { + throw new UnsupportedOperationException("This method is not implemented."); + } + + + /** + * Gets the deserialized input of the current task orchestration. + * + * @param targetType the {@link Class} object associated with {@code V} + * @param the expected type of the workflow input + * @return the deserialized input as an object of type {@code V} or {@code null} if no input was provided. + */ + V getInput(Class targetType); + + /** + * Asynchronously invokes another workflow as a sub-workflow and returns a {@link Task} that completes + * when the sub-workflow completes. + * + *

See {@link #callSubWorkflow(String, Object, String, TaskOptions, Class)} for a full description. + * + * @param name the name of the workflow to invoke + * @return a new {@link Task} that completes when the sub-workflow completes or fails + * @see #callSubWorkflow(String, Object, String, TaskOptions, Class) + */ + default Task callSubWorkflow(String name) { + return this.callSubWorkflow(name, null); + } + + /** + * Asynchronously invokes another workflow as a sub-workflow and returns a {@link Task} that completes + * when the sub-workflow completes. + * + *

See {@link #callSubWorkflow(String, Object, String, TaskOptions, Class)} for a full description. + * + * @param name the name of the workflow to invoke + * @param input the serializable input to send to the sub-workflow + * @return a new {@link Task} that completes when the sub-workflow completes or fails + */ + default Task callSubWorkflow(String name, Object input) { + return this.callSubWorkflow(name, input, null); + } + + /** + * Asynchronously invokes another workflow as a sub-workflow and returns a {@link Task} that completes + * when the sub-workflow completes. + * + *

See {@link #callSubWorkflow(String, Object, String, TaskOptions, Class)} for a full description. + * + * @param name the name of the workflow to invoke + * @param input the serializable input to send to the sub-workflow + * @param returnType the expected class type of the sub-workflow output + * @param the expected type of the sub-workflow output + * @return a new {@link Task} that completes when the sub-workflow completes or fails + */ + default Task callSubWorkflow(String name, Object input, Class returnType) { + return this.callSubWorkflow(name, input, null, returnType); + } + + /** + * Asynchronously invokes another workflow as a sub-workflow and returns a {@link Task} that completes + * when the sub-workflow completes. + * + *

See {@link #callSubWorkflow(String, Object, String, TaskOptions, Class)} for a full description. + * + * @param name the name of the workflow to invoke + * @param input the serializable input to send to the sub-workflow + * @param instanceID the unique ID of the sub-workflow + * @param returnType the expected class type of the sub-workflow output + * @param the expected type of the sub-workflow output + * @return a new {@link Task} that completes when the sub-workflow completes or fails + */ + default Task callSubWorkflow(String name, Object input, String instanceID, Class returnType) { + return this.callSubWorkflow(name, input, instanceID, null, returnType); + } + + /** + * Asynchronously invokes another workflow as a sub-workflow and returns a {@link Task} that completes + * when the sub-workflow completes. + * + *

See {@link #callSubWorkflow(String, Object, String, TaskOptions, Class)} for a full description. + * + * @param name the name of the workflow to invoke + * @param input the serializable input to send to the sub-workflow + * @param instanceID the unique ID of the sub-workflow + * @param options additional options that control the execution and processing of the activity + * @return a new {@link Task} that completes when the sub-workflow completes or fails + */ + default Task callSubWorkflow(String name, Object input, String instanceID, TaskOptions options) { + return this.callSubWorkflow(name, input, instanceID, options, Void.class); + } + + /** + * Asynchronously invokes another workflow as a sub-workflow and returns a {@link Task} that completes + * when the sub-workflow completes. If the sub-workflow completes successfully, the returned + * {@code Task}'s value will be the activity's output. If the sub-workflow fails, the returned {@code Task} + * will complete exceptionally with a {@link TaskFailedException}. + * + *

A sub-workflow has its own instance ID, history, and status that is independent of the parent workflow + * that started it. There are many advantages to breaking down large orchestrations into sub-workflows: + *

    + *
  • + * Splitting large orchestrations into a series of smaller sub-workflows can make code more maintainable. + *
  • + *
  • + * Distributing orchestration logic across multiple compute nodes concurrently is useful if + * orchestration logic otherwise needs to coordinate a lot of tasks. + *
  • + *
  • + * Memory usage and CPU overhead can be reduced by keeping the history of parent orchestrations smaller. + *
  • + *
+ * The disadvantage is that there is overhead associated with starting a sub-workflow and processing its + * output. This is typically only an issue for very small orchestrations. + * + *

Because sub-workflows are independent of their parents, terminating a parent orchestration does not affect + * any sub-workflows. sub-workflows must be terminated independently using their unique instance ID, + * which is specified using the {@code instanceID} parameter + * + * @param name the name of the workflow to invoke + * @param input the serializable input to send to the sub-workflow + * @param instanceID the unique ID of the sub-workflow + * @param options additional options that control the execution and processing of the activity + * @param returnType the expected class type of the sub-workflow output + * @param the expected type of the sub-workflow output + * @return a new {@link Task} that completes when the sub-workflow completes or fails + */ + Task callSubWorkflow(String name, + @Nullable Object input, + @Nullable String instanceID, + @Nullable TaskOptions options, + Class returnType); + + /** + * Restarts the orchestration with a new input and clears its history. See {@link #continueAsNew(Object, boolean)} + * for a full description. + * + * @param input the serializable input data to re-initialize the instance with + */ + default void continueAsNew(Object input) { + this.continueAsNew(input, true); + } + + /** + * Restarts the orchestration with a new input and clears its history. + * + *

This method is primarily designed for eternal orchestrations, which are orchestrations that + * may not ever complete. It works by restarting the orchestration, providing it with a new input, + * and truncating the existing orchestration history. It allows an orchestration to continue + * running indefinitely without having its history grow unbounded. The benefits of periodically + * truncating history include decreased memory usage, decreased storage volumes, and shorter orchestrator + * replays when rebuilding state. + * + *

The results of any incomplete tasks will be discarded when an orchestrator calls {@code continueAsNew}. + * For example, if a timer is scheduled and then {@code continueAsNew} is called before the timer fires, the timer + * event will be discarded. The only exception to this is external events. By default, if an external event is + * received by an orchestration but not yet processed, the event is saved in the orchestration state unit it is + * received by a call to {@link #waitForExternalEvent}. These events will remain in memory + * even after an orchestrator restarts using {@code continueAsNew}. This behavior can be disabled by specifying + * {@code false} for the {@code preserveUnprocessedEvents} parameter value. + * + *

Orchestrator implementations should complete immediately after calling the{@code continueAsNew} method. * - * @param eventName The name of the event to wait for. Event names are case-insensitive. - * External event names can be reused any number of times; they are not - * required to be unique. - * @param timeout The amount of time to wait before cancelling the external event task. - * @return An asynchronous durabletask.Task to await. + * @param input the serializable input data to re-initialize the instance with + * @param preserveUnprocessedEvents {@code true} to push unprocessed external events into the new orchestration + * history, otherwise {@code false} */ - Task waitForExternalEvent(String eventName, Duration timeout); + void continueAsNew(Object input, boolean preserveUnprocessedEvents); } diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/client/DaprWorkflowClient.java b/sdk-workflows/src/main/java/io/dapr/workflows/client/DaprWorkflowClient.java index 9f4ab03ad..6e05a7f06 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/client/DaprWorkflowClient.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/client/DaprWorkflowClient.java @@ -15,13 +15,21 @@ import com.microsoft.durabletask.DurableTaskClient; import com.microsoft.durabletask.DurableTaskGrpcClientBuilder; +import com.microsoft.durabletask.OrchestrationMetadata; +import com.microsoft.durabletask.PurgeResult; import io.dapr.utils.NetworkUtils; import io.dapr.workflows.Workflow; import io.grpc.ManagedChannel; import javax.annotation.Nullable; + +import java.time.Duration; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +/** + * Defines client operations for managing Dapr Workflow instances. + */ public class DaprWorkflowClient implements AutoCloseable { private DurableTaskClient innerClient; @@ -48,7 +56,6 @@ private DaprWorkflowClient(ManagedChannel grpcChannel) { * * @param innerClient DurableTaskGrpcClient with GRPC Channel set up. * @param grpcChannel ManagedChannel for instance variable setting. - * */ private DaprWorkflowClient(DurableTaskClient innerClient, ManagedChannel grpcChannel) { this.innerClient = innerClient; @@ -70,9 +77,9 @@ private static DurableTaskClient createDurableTaskClient(ManagedChannel grpcChan /** * Schedules a new workflow using DurableTask client. * - * @param any Workflow type + * @param any Workflow type * @param clazz Class extending Workflow to start an instance of. - * @return A String with the randomly-generated instance ID for new Workflow instance. + * @return the randomly-generated instance ID for new Workflow instance. */ public String scheduleNewWorkflow(Class clazz) { return this.innerClient.scheduleNewOrchestrationInstance(clazz.getCanonicalName()); @@ -81,10 +88,10 @@ public String scheduleNewWorkflow(Class clazz) { /** * Schedules a new workflow using DurableTask client. * - * @param any Workflow type + * @param any Workflow type * @param clazz Class extending Workflow to start an instance of. * @param input the input to pass to the scheduled orchestration instance. Must be serializable. - * @return A String with the randomly-generated instance ID for new Workflow instance. + * @return the randomly-generated instance ID for new Workflow instance. */ public String scheduleNewWorkflow(Class clazz, Object input) { return this.innerClient.scheduleNewOrchestrationInstance(clazz.getCanonicalName(), input); @@ -93,11 +100,11 @@ public String scheduleNewWorkflow(Class clazz, Object in /** * Schedules a new workflow using DurableTask client. * - * @param any Workflow type - * @param clazz Class extending Workflow to start an instance of. - * @param input the input to pass to the scheduled orchestration instance. Must be serializable. + * @param any Workflow type + * @param clazz Class extending Workflow to start an instance of. + * @param input the input to pass to the scheduled orchestration instance. Must be serializable. * @param instanceId the unique ID of the orchestration instance to schedule - * @return A String with the instanceId parameter value. + * @return the instanceId parameter value. */ public String scheduleNewWorkflow(Class clazz, Object input, String instanceId) { return this.innerClient.scheduleNewOrchestrationInstance(clazz.getCanonicalName(), input, instanceId); @@ -107,15 +114,117 @@ public String scheduleNewWorkflow(Class clazz, Object in * Terminates the workflow associated with the provided instance id. * * @param workflowInstanceId Workflow instance id to terminate. - * @param output the optional output to set for the terminated orchestration instance. + * @param output the optional output to set for the terminated orchestration instance. */ public void terminateWorkflow(String workflowInstanceId, @Nullable Object output) { this.innerClient.terminate(workflowInstanceId, output); } /** - * Closes the inner DurableTask client and shutdown the GRPC channel. + * Fetches workflow instance metadata from the configured durable store. + * + * @param instanceId the unique ID of the workflow instance to fetch + * @param getInputsAndOutputs true to fetch the workflow instance's + * inputs, outputs, and custom status, or false to omit them + * @return a metadata record that describes the workflow instance and it execution status, or a default instance + */ + @Nullable + public WorkflowInstanceStatus getInstanceState(String instanceId, boolean getInputsAndOutputs) { + OrchestrationMetadata metadata = this.innerClient.getInstanceMetadata(instanceId, getInputsAndOutputs); + if (metadata == null) { + return null; + } + return new WorkflowInstanceStatus(metadata); + } + + /** + * Waits for an workflow to start running and returns an + * {@link WorkflowInstanceStatus} object that contains metadata about the started + * instance and optionally its input, output, and custom status payloads. + * + *

A "started" workflow instance is any instance not in the Pending state. + * + *

If an workflow instance is already running when this method is called, + * the method will return immediately. + * + * @param instanceId the unique ID of the workflow instance to wait for + * @param timeout the amount of time to wait for the workflow instance to start + * @param getInputsAndOutputs true to fetch the workflow instance's + * inputs, outputs, and custom status, or false to omit them + * @return the workflow instance metadata or null if no such instance is found + * @throws TimeoutException when the workflow instance is not started within the specified amount of time + */ + @Nullable + public WorkflowInstanceStatus waitForInstanceStart(String instanceId, Duration timeout, boolean getInputsAndOutputs) + throws TimeoutException { + + OrchestrationMetadata metadata = this.innerClient.waitForInstanceStart(instanceId, timeout, getInputsAndOutputs); + return metadata == null ? null : new WorkflowInstanceStatus(metadata); + } + + /** + * Waits for an workflow to complete and returns an {@link WorkflowInstanceStatus} object that contains + * metadata about the completed instance. + * + *

A "completed" workflow instance is any instance in one of the terminal states. For example, the + * Completed, Failed, or Terminated states. + * + *

Workflows are long-running and could take hours, days, or months before completing. + * Workflows can also be eternal, in which case they'll never complete unless terminated. + * In such cases, this call may block indefinitely, so care must be taken to ensure appropriate timeouts are used. + * If an workflow instance is already complete when this method is called, the method will return immediately. * + * @param instanceId the unique ID of the workflow instance to wait for + * @param timeout the amount of time to wait for the workflow instance to complete + * @param getInputsAndOutputs true to fetch the workflow instance's inputs, outputs, and custom + * status, or false to omit them + * @return the workflow instance metadata or null if no such instance is found + * @throws TimeoutException when the workflow instance is not completed within the specified amount of time + */ + @Nullable + public WorkflowInstanceStatus waitForInstanceCompletion(String instanceId, Duration timeout, + boolean getInputsAndOutputs) throws TimeoutException { + + OrchestrationMetadata metadata = + this.innerClient.waitForInstanceCompletion(instanceId, timeout, getInputsAndOutputs); + return metadata == null ? null : new WorkflowInstanceStatus(metadata); + } + + /** + * Sends an event notification message to awaiting workflow instance. + * + * @param workflowInstanceId The ID of the workflow instance that will handle the event. + * @param eventName The name of the event. Event names are case-insensitive. + * @param eventPayload The serializable data payload to include with the event. + */ + public void raiseEvent(String workflowInstanceId, String eventName, Object eventPayload) { + this.innerClient.raiseEvent(workflowInstanceId, eventName, eventPayload); + } + + /** + * Purges workflow instance state from the workflow state store. + * + * @param workflowInstanceId The unique ID of the workflow instance to purge. + * @return Return true if the workflow state was found and purged successfully otherwise false. + */ + public boolean purgeInstance(String workflowInstanceId) { + PurgeResult result = this.innerClient.purgeInstance(workflowInstanceId); + if (result != null) { + return result.getDeletedInstanceCount() > 0; + } + return false; + } + + public void createTaskHub(boolean recreateIfExists) { + this.innerClient.createTaskHub(recreateIfExists); + } + + public void deleteTaskHub() { + this.innerClient.deleteTaskHub(); + } + + /** + * Closes the inner DurableTask client and shutdown the GRPC channel. */ public void close() throws InterruptedException { try { diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/client/WorkflowFailureDetails.java b/sdk-workflows/src/main/java/io/dapr/workflows/client/WorkflowFailureDetails.java new file mode 100644 index 000000000..2f9cc641d --- /dev/null +++ b/sdk-workflows/src/main/java/io/dapr/workflows/client/WorkflowFailureDetails.java @@ -0,0 +1,65 @@ +/* + * Copyright 2023 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.workflows.client; + +import com.microsoft.durabletask.FailureDetails; + +/** + * Represents a workflow failure details. + */ +public class WorkflowFailureDetails { + + private final FailureDetails workflowFailureDetails; + + /** + * Class constructor. + * + * @param failureDetails failure Details + */ + public WorkflowFailureDetails(FailureDetails failureDetails) { + this.workflowFailureDetails = failureDetails; + } + + /** + * Gets the error type, which is the namespace-qualified exception type name. + * + * @return the error type, which is the namespace-qualified exception type name + */ + public String getErrorType() { + return workflowFailureDetails.getErrorType(); + } + + /** + * Gets the error message. + * + * @return the error message + */ + public String getErrorMessage() { + return workflowFailureDetails.getErrorMessage(); + } + + /** + * Gets the stack trace. + * + * @return the stack trace + */ + public String getStackTrace() { + return workflowFailureDetails.getStackTrace(); + } + + @Override + public String toString() { + return workflowFailureDetails.toString(); + } +} diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/client/WorkflowInstanceStatus.java b/sdk-workflows/src/main/java/io/dapr/workflows/client/WorkflowInstanceStatus.java new file mode 100644 index 000000000..dc61312ce --- /dev/null +++ b/sdk-workflows/src/main/java/io/dapr/workflows/client/WorkflowInstanceStatus.java @@ -0,0 +1,204 @@ +/* + * Copyright 2023 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.workflows.client; + +import com.microsoft.durabletask.FailureDetails; +import com.microsoft.durabletask.OrchestrationMetadata; +import com.microsoft.durabletask.OrchestrationRuntimeStatus; +import io.dapr.workflows.runtime.WorkflowRuntimeStatus; + +import javax.annotation.Nullable; + +import java.time.Instant; + +/** + * Represents a snapshot of a workflow instance's current state, including + * metadata. + */ +public class WorkflowInstanceStatus { + + private final OrchestrationMetadata orchestrationMetadata; + + @Nullable + private final WorkflowFailureDetails failureDetails; + + /** + * Class constructor. + * + * @param orchestrationMetadata Durable task orchestration metadata + */ + public WorkflowInstanceStatus(OrchestrationMetadata orchestrationMetadata) { + if (orchestrationMetadata == null) { + throw new IllegalArgumentException("OrchestrationMetadata cannot be null"); + } + this.orchestrationMetadata = orchestrationMetadata; + FailureDetails details = orchestrationMetadata.getFailureDetails(); + if (details != null) { + this.failureDetails = new WorkflowFailureDetails(details); + } else { + this.failureDetails = null; + } + } + + /** + * Gets the name of the workflow. + * + * @return the name of the workflow + */ + public String getName() { + return orchestrationMetadata.getName(); + } + + /** + * Gets the unique ID of the workflow instance. + * + * @return the unique ID of the workflow instance + */ + public String getInstanceId() { + return orchestrationMetadata.getInstanceId(); + } + + /** + * Gets the current runtime status of the workflow instance at the time this + * object was fetched. + * + * @return the current runtime status of the workflow instance at the time this object was fetched + */ + public WorkflowRuntimeStatus getRuntimeStatus() { + return WorkflowRuntimeStatus.fromOrchestrationRuntimeStatus(orchestrationMetadata.getRuntimeStatus()); + } + + /** + * Gets the workflow instance's creation time in UTC. + * + * @return the workflow instance's creation time in UTC + */ + public Instant getCreatedAt() { + return orchestrationMetadata.getCreatedAt(); + } + + /** + * Gets the workflow instance's last updated time in UTC. + * + * @return the workflow instance's last updated time in UTC + */ + public Instant getLastUpdatedAt() { + return orchestrationMetadata.getLastUpdatedAt(); + } + + /** + * Gets the workflow instance's serialized input, if any, as a string value. + * + * @return the workflow instance's serialized input or {@code null} + */ + public String getSerializedInput() { + return orchestrationMetadata.getSerializedInput(); + } + + /** + * Gets the workflow instance's serialized output, if any, as a string value. + * + * @return the workflow instance's serialized output or {@code null} + */ + public String getSerializedOutput() { + return orchestrationMetadata.getSerializedOutput(); + } + + /** + * Gets the failure details, if any, for the failed workflow instance. + * + *

This method returns data only if the workflow is in the + * {@link OrchestrationRuntimeStatus#FAILED} state, + * and only if this instance metadata was fetched with the option to include + * output data. + * + * @return the failure details of the failed workflow instance or {@code null} + */ + @Nullable + public WorkflowFailureDetails getFailureDetails() { + return this.failureDetails; + } + + /** + * Gets a value indicating whether the workflow instance was running at the time + * this object was fetched. + * + * @return {@code true} if the workflow existed and was in a running state otherwise {@code false} + */ + public boolean isRunning() { + return orchestrationMetadata.isRunning(); + } + + /** + * Gets a value indicating whether the workflow instance was completed at the + * time this object was fetched. + * + *

A workflow instance is considered completed when its runtime status value is + * {@link WorkflowRuntimeStatus#COMPLETED}, + * {@link WorkflowRuntimeStatus#FAILED}, or + * {@link WorkflowRuntimeStatus#TERMINATED}. + * + * @return {@code true} if the workflow was in a terminal state; otherwise + * {@code false} + */ + public boolean isCompleted() { + return orchestrationMetadata.isCompleted(); + } + + /** + * Deserializes the workflow's input into an object of the specified type. + * + *

Deserialization is performed using the DataConverter that was + * configured on the DurableTaskClient object that created this workflow + * metadata object. + * + * @param type the class associated with the type to deserialize the input data + * into + * @param the type to deserialize the input data into + * @return the deserialized input value + * @throws IllegalStateException if the metadata was fetched without the option + * to read inputs and outputs + */ + public T readInputAs(Class type) { + return orchestrationMetadata.readInputAs(type); + } + + /** + * Deserializes the workflow's output into an object of the specified type. + * + *

Deserialization is performed using the DataConverter that was + * configured on the DurableTaskClient + * object that created this workflow metadata object. + * + * @param type the class associated with the type to deserialize the output data + * into + * @param the type to deserialize the output data into + * @return the deserialized input value + * @throws IllegalStateException if the metadata was fetched without the option + * to read inputs and outputs + */ + public T readOutputAs(Class type) { + return orchestrationMetadata.readOutputAs(type); + } + + /** + * Generates a user-friendly string representation of the current metadata + * object. + * + * @return a user-friendly string representation of the current metadata object + */ + public String toString() { + return orchestrationMetadata.toString(); + } +} diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/ActivityWrapper.java b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/ActivityWrapper.java new file mode 100644 index 000000000..8b02abf69 --- /dev/null +++ b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/ActivityWrapper.java @@ -0,0 +1,69 @@ +/* + * Copyright 2023 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.workflows.runtime; + +import com.microsoft.durabletask.TaskActivity; +import com.microsoft.durabletask.TaskActivityFactory; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; + +/** + * Wrapper for Durable Task Framework task activity factory. + */ +public class ActivityWrapper implements TaskActivityFactory { + private final Constructor activityConstructor; + private final String name; + + /** + * Constructor for ActivityWrapper. + * + * @param clazz Class of the activity to wrap. + */ + public ActivityWrapper(Class clazz) { + this.name = clazz.getCanonicalName(); + try { + this.activityConstructor = clazz.getDeclaredConstructor(); + } catch (NoSuchMethodException e) { + throw new RuntimeException( + String.format("No constructor found for activity class '%s'.", this.name), e + ); + } + } + + @Override + public String getName() { + return name; + } + + + @Override + public TaskActivity create() { + return ctx -> { + Object result; + T activity; + + try { + activity = this.activityConstructor.newInstance(); + } catch (InstantiationException | IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException( + String.format("Unable to instantiate instance of activity class '%s'", this.name), e + ); + } + + result = activity.run(new WorkflowActivityContext(ctx)); + return result; + }; + } +} diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowActivity.java b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowActivity.java new file mode 100644 index 000000000..bcaab62d8 --- /dev/null +++ b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowActivity.java @@ -0,0 +1,45 @@ +/* + * Copyright 2023 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.workflows.runtime; + +/** + * Common interface for task activity implementations. + * + *

Activities are the basic unit of work in a durable task orchestration. Activities are the tasks that are + * orchestrated in the business process. For example, you might create an orchestrator to process an order. The tasks + * ay involve checking the inventory, charging the customer, and creating a shipment. Each task would be a separate + * activity. These activities may be executed serially, in parallel, or some combination of both. + * + *

Unlike task orchestrators, activities aren't restricted in the type of work you can do in them. Activity functions + * are frequently used to make network calls or run CPU intensive operations. An activity can also return data back to + * the orchestrator function. The Durable Task runtime guarantees that each called activity function will be executed + * at least once during an orchestration's execution. + * + *

Because activities only guarantee at least once execution, it's recommended that activity logic be implemented as + * idempotent whenever possible. + * + *

Activities are scheduled by orchestrators using one of the {@link io.dapr.workflows.WorkflowContext#callActivity} + * method overloads. + */ +public interface WorkflowActivity { + /** + * Executes the activity logic and returns a value which will be serialized and + * returned to the calling orchestrator. + * + * @param ctx provides information about the current activity execution, like the activity's name and the input + * data provided to it by the orchestrator. + * @return any serializable value to be returned to the calling orchestrator. + */ + Object run(WorkflowActivityContext ctx); +} diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowActivityContext.java b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowActivityContext.java new file mode 100644 index 000000000..1277501ee --- /dev/null +++ b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowActivityContext.java @@ -0,0 +1,56 @@ +/* + * Copyright 2023 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.workflows.runtime; + +import com.microsoft.durabletask.TaskActivityContext; + +/** + * Wrapper for Durable Task Framework {@link TaskActivityContext}. + */ +public class WorkflowActivityContext implements TaskActivityContext { + private final TaskActivityContext innerContext; + + /** + * Constructor for WorkflowActivityContext. + * + * @param context TaskActivityContext + * @throws IllegalArgumentException if context is null + */ + public WorkflowActivityContext(TaskActivityContext context) throws IllegalArgumentException { + if (context == null) { + throw new IllegalArgumentException("Context cannot be null"); + } + this.innerContext = context; + } + + /** + * Gets the name of the current activity. + * + * @return the name of the current activity + */ + public String getName() { + return this.innerContext.getName(); + } + + /** + * Gets the input of the current activity. + * + * @param the type of the input + * @param targetType targetType of the input + * @return the input of the current activity + */ + public T getInput(Class targetType) { + return this.innerContext.getInput(targetType); + } +} diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowRuntimeBuilder.java b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowRuntimeBuilder.java index 11fe624ba..b8465cdda 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowRuntimeBuilder.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowRuntimeBuilder.java @@ -44,7 +44,7 @@ public WorkflowRuntime build() { /** * Registers a Workflow object. * - * @param any Workflow type + * @param any Workflow type * @param clazz the class being registered * @return the WorkflowRuntimeBuilder */ @@ -55,4 +55,16 @@ public WorkflowRuntimeBuilder registerWorkflow(Class cla return this; } + + /** + * Registers an Activity object. + * + * @param clazz the class being registered + * @param any WorkflowActivity type + */ + public void registerActivity(Class clazz) { + this.builder = this.builder.addActivity( + new ActivityWrapper<>(clazz) + ); + } } diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowRuntimeStatus.java b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowRuntimeStatus.java new file mode 100644 index 000000000..79ef9dbc2 --- /dev/null +++ b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowRuntimeStatus.java @@ -0,0 +1,139 @@ +/* + * Copyright 2023 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.workflows.runtime; + +import com.microsoft.durabletask.OrchestrationRuntimeStatus; + +import java.util.List; +import java.util.stream.Collectors; + +/** + * Enum describing the runtime status of a workflow. + */ +public enum WorkflowRuntimeStatus { + /** + * The workflow started running. + */ + RUNNING, + + /** + * The workflow completed normally. + */ + COMPLETED, + + /** + * The workflow is continued as new. + */ + CONTINUED_AS_NEW, + + /** + * The workflow completed with an unhandled exception. + */ + FAILED, + + /** + * The workflow was abruptly cancelled via a management API call. + */ + CANCELED, + + /** + * The workflow was abruptly terminated via a management API call. + */ + TERMINATED, + + /** + * The workflow was scheduled but hasn't started running. + */ + PENDING, + + /** + * The workflow was suspended. + */ + SUSPENDED; + + /** + * Convert runtime OrchestrationRuntimeStatus to WorkflowRuntimeStatus. + * + * @param status The OrchestrationRuntimeStatus to convert to WorkflowRuntimeStatus. + * @return The runtime status of the workflow. + */ + public static WorkflowRuntimeStatus fromOrchestrationRuntimeStatus(OrchestrationRuntimeStatus status) { + if (status == null) { + throw new IllegalArgumentException("status cannot be null"); + } + + switch (status) { + case RUNNING: + return WorkflowRuntimeStatus.RUNNING; + case COMPLETED: + return WorkflowRuntimeStatus.COMPLETED; + case CONTINUED_AS_NEW: + return WorkflowRuntimeStatus.CONTINUED_AS_NEW; + case FAILED: + return WorkflowRuntimeStatus.FAILED; + case CANCELED: + return WorkflowRuntimeStatus.CANCELED; + case TERMINATED: + return WorkflowRuntimeStatus.TERMINATED; + case PENDING: + return WorkflowRuntimeStatus.PENDING; + case SUSPENDED: + return WorkflowRuntimeStatus.SUSPENDED; + default: + throw new RuntimeException(String.format("Unknown status value: %s", status)); + } + } + + /** + * Convert runtime WorkflowRuntimeStatus to OrchestrationRuntimeStatus. + * + * @param status The OrchestrationRuntimeStatus to convert to WorkflowRuntimeStatus. + * @return The runtime status of the Orchestration. + */ + public static OrchestrationRuntimeStatus toOrchestrationRuntimeStatus(WorkflowRuntimeStatus status) { + + switch (status) { + case RUNNING: + return OrchestrationRuntimeStatus.RUNNING; + case COMPLETED: + return OrchestrationRuntimeStatus.COMPLETED; + case CONTINUED_AS_NEW: + return OrchestrationRuntimeStatus.CONTINUED_AS_NEW; + case FAILED: + return OrchestrationRuntimeStatus.FAILED; + case CANCELED: + return OrchestrationRuntimeStatus.CANCELED; + case TERMINATED: + return OrchestrationRuntimeStatus.TERMINATED; + case PENDING: + return OrchestrationRuntimeStatus.PENDING; + case SUSPENDED: + return OrchestrationRuntimeStatus.SUSPENDED; + default: + throw new RuntimeException(String.format("Unknown status value: %s", status)); + } + } + + /** + * Convert runtime WorkflowRuntimeStatus to OrchestrationRuntimeStatus. + * + * @param statuses The list of orchestrationRuntimeStatus to convert to a list of WorkflowRuntimeStatuses. + * @return The list runtime status of the Orchestration. + */ + public static List toOrchestrationRuntimeStatus(List statuses) { + return statuses.stream() + .map(x -> toOrchestrationRuntimeStatus(x)) + .collect(Collectors.toList()); + } +} diff --git a/sdk-workflows/src/test/java/io/dapr/workflows/DaprWorkflowContextImplTest.java b/sdk-workflows/src/test/java/io/dapr/workflows/DaprWorkflowContextImplTest.java index 2cdd1bada..e584acead 100644 --- a/sdk-workflows/src/test/java/io/dapr/workflows/DaprWorkflowContextImplTest.java +++ b/sdk-workflows/src/test/java/io/dapr/workflows/DaprWorkflowContextImplTest.java @@ -13,17 +13,24 @@ package io.dapr.workflows; +import com.microsoft.durabletask.RetryPolicy; import com.microsoft.durabletask.Task; +import com.microsoft.durabletask.TaskOptions; import com.microsoft.durabletask.TaskOrchestrationContext; -import io.dapr.workflows.DaprWorkflowContextImpl; import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; import java.time.Duration; +import java.time.ZonedDateTime; +import java.util.Arrays; +import java.util.List; -import static org.junit.Assert.assertThrows; -import static org.mockito.Mockito.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class DaprWorkflowContextImplTest { private DaprWorkflowContextImpl context; @@ -35,13 +42,6 @@ public void setUp() { context = new DaprWorkflowContextImpl(mockInnerContext); } - @Test - public void nullConstructorTest() { - assertThrows(IllegalArgumentException.class, () -> { new DaprWorkflowContextImpl(mockInnerContext, null); }); - assertThrows(IllegalArgumentException.class, () -> { new DaprWorkflowContextImpl(null, mock(Logger.class)); }); - assertThrows(IllegalArgumentException.class, () -> { new DaprWorkflowContextImpl(null, null); }); - } - @Test public void getNameTest() { context.getName(); @@ -54,20 +54,47 @@ public void getInstanceIdTest() { verify(mockInnerContext, times(1)).getInstanceId(); } + @Test + public void getCurrentInstantTest() { + context.getCurrentInstant(); + verify(mockInnerContext, times(1)).getCurrentInstant(); + } + + @Test + public void waitForExternalEventWithEventAndDurationTest() { + String expectedEvent = "TestEvent"; + Duration expectedDuration = Duration.ofSeconds(1); + + context.waitForExternalEvent(expectedEvent, expectedDuration); + verify(mockInnerContext, times(1)).waitForExternalEvent(expectedEvent, expectedDuration, Void.class); + } + @Test public void waitForExternalEventTest() { - doReturn(mock(Task.class)) - .when(mockInnerContext).waitForExternalEvent(any(String.class), any(Duration.class)); - DaprWorkflowContextImpl testContext = new DaprWorkflowContextImpl(mockInnerContext); String expectedEvent = "TestEvent"; Duration expectedDuration = Duration.ofSeconds(1); - testContext.waitForExternalEvent(expectedEvent, expectedDuration).await(); - verify(mockInnerContext, times(1)).waitForExternalEvent(expectedEvent, expectedDuration); + context.waitForExternalEvent(expectedEvent, expectedDuration, String.class); + verify(mockInnerContext, times(1)).waitForExternalEvent(expectedEvent, expectedDuration, String.class); } + @Test + public void callActivityTest() { + String expectedName = "TestActivity"; + String expectedInput = "TestInput"; + + context.callActivity(expectedName, expectedInput, String.class); + verify(mockInnerContext, times(1)).callActivity(expectedName, expectedInput, null, String.class); + } + + @Test(expected = IllegalArgumentException.class) public void DaprWorkflowContextWithEmptyInnerContext() { + context = new DaprWorkflowContextImpl(mockInnerContext, null); + } + + @Test(expected = IllegalArgumentException.class) + public void DaprWorkflowContextWithEmptyLogger() { context = new DaprWorkflowContextImpl(null, null); } @@ -77,10 +104,16 @@ public void completeTest() { verify(mockInnerContext, times(1)).complete(null); } + @Test + public void getIsReplayingTest() { + context.isReplaying(); + verify(mockInnerContext, times(1)).getIsReplaying(); + } + @Test public void getLoggerReplayingTest() { Logger mockLogger = mock(Logger.class); - when(mockInnerContext.getIsReplaying()).thenReturn(true); + when(context.isReplaying()).thenReturn(true); DaprWorkflowContextImpl testContext = new DaprWorkflowContextImpl(mockInnerContext, mockLogger); String expectedArg = "test print"; @@ -92,7 +125,7 @@ public void getLoggerReplayingTest() { @Test public void getLoggerFirstTimeTest() { Logger mockLogger = mock(Logger.class); - when(mockInnerContext.getIsReplaying()).thenReturn(false); + when(context.isReplaying()).thenReturn(false); DaprWorkflowContextImpl testContext = new DaprWorkflowContextImpl(mockInnerContext, mockLogger); String expectedArg = "test print"; @@ -100,4 +133,74 @@ public void getLoggerFirstTimeTest() { verify(mockLogger, times(1)).info(expectedArg); } + + @Test + public void continueAsNewTest() { + String expectedInput = "TestInput"; + context.continueAsNew(expectedInput); + verify(mockInnerContext, times(1)).continueAsNew(expectedInput); + } + + @Test + public void allOfTest() { + Task t1 = mockInnerContext.callActivity("task1"); + Task t2 = mockInnerContext.callActivity("task2"); + List> taskList = Arrays.asList(t1, t2); + context.allOf(taskList); + verify(mockInnerContext, times(1)).allOf(taskList); + } + + @Test + public void anyOfTest() { + Task t1 = mockInnerContext.callActivity("task1"); + Task t2 = mockInnerContext.callActivity("task2"); + Task t3 = mockInnerContext.callActivity("task3"); + List> taskList = Arrays.asList(t1, t2); + + context.anyOf(taskList); + verify(mockInnerContext, times(1)).anyOf(taskList); + + context.anyOf(t1, t2, t3); + verify(mockInnerContext, times(1)).anyOf(Arrays.asList(t1, t2, t3)); + } + + @Test + public void createTimerTest() { + context.createTimer(Duration.ofSeconds(10)); + verify(mockInnerContext, times(1)).createTimer(Duration.ofSeconds(10)); + } + + @Test(expected = UnsupportedOperationException.class) + public void createTimerWithZonedDateTimeThrowsTest() { + context.createTimer(ZonedDateTime.now()); + } + + @Test + public void callSubWorkflowWithName() { + String expectedName = "TestActivity"; + + context.callSubWorkflow(expectedName); + verify(mockInnerContext, times(1)).callSubOrchestrator(expectedName, null, null, null, null); + } + + @Test + public void callSubWorkflowWithOptions() { + String expectedName = "TestActivity"; + String expectedInput = "TestInput"; + String expectedInstanceId = "TestInstanceId"; + TaskOptions expectedOptions = new TaskOptions(new RetryPolicy(1, Duration.ofSeconds(10))); + + context.callSubWorkflow(expectedName, expectedInput, expectedInstanceId, expectedOptions, String.class); + verify(mockInnerContext, times(1)).callSubOrchestrator(expectedName, expectedInput, expectedInstanceId, + expectedOptions, String.class); + } + + @Test + public void callSubWorkflow() { + String expectedName = "TestActivity"; + String expectedInput = "TestInput"; + + context.callSubWorkflow(expectedName, expectedInput, String.class); + verify(mockInnerContext, times(1)).callSubOrchestrator(expectedName, expectedInput, null, null, String.class); + } } diff --git a/sdk-workflows/src/test/java/io/dapr/workflows/client/DaprWorkflowClientTest.java b/sdk-workflows/src/test/java/io/dapr/workflows/client/DaprWorkflowClientTest.java index a3f826123..c5fb789ee 100644 --- a/sdk-workflows/src/test/java/io/dapr/workflows/client/DaprWorkflowClientTest.java +++ b/sdk-workflows/src/test/java/io/dapr/workflows/client/DaprWorkflowClientTest.java @@ -14,7 +14,10 @@ package io.dapr.workflows.client; import com.microsoft.durabletask.DurableTaskClient; +import com.microsoft.durabletask.OrchestrationMetadata; +import com.microsoft.durabletask.OrchestrationRuntimeStatus; import io.dapr.workflows.Workflow; +import io.dapr.workflows.WorkflowContext; import io.dapr.workflows.WorkflowStub; import io.grpc.ManagedChannel; import org.junit.Before; @@ -22,11 +25,17 @@ import org.junit.Test; import java.lang.reflect.Constructor; +import java.time.Duration; import java.util.Arrays; +import java.util.concurrent.TimeoutException; -import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class DaprWorkflowClientTest { private static Constructor constructor; @@ -34,18 +43,21 @@ public class DaprWorkflowClientTest { private DurableTaskClient mockInnerClient; private ManagedChannel mockGrpcChannel; - public class TestWorkflow extends Workflow { + public static class TestWorkflow extends Workflow { @Override public WorkflowStub create() { - return ctx -> { }; + return WorkflowContext::getInstanceId; } } @BeforeClass public static void beforeAll() { - constructor = + constructor = Constructor.class.cast(Arrays.stream(DaprWorkflowClient.class.getDeclaredConstructors()) - .filter(c -> c.getParameters().length == 2).peek(c -> c.setAccessible(true)).findFirst().get()); + .filter(c -> c.getParameters().length == 2).map(c -> { + c.setAccessible(true); + return c; + }).findFirst().get()); } @Before @@ -103,17 +115,102 @@ public void terminateWorkflow() { } @Test - public void close() throws InterruptedException { - client.close(); - verify(mockInnerClient, times(1)).close(); - verify(mockGrpcChannel, times(1)).shutdown(); + public void getInstanceMetadata() { + + // Arrange + String instanceId = "TestWorkflowInstanceId"; + + OrchestrationMetadata expectedMetadata = mock(OrchestrationMetadata.class); + when(expectedMetadata.getInstanceId()).thenReturn(instanceId); + when(expectedMetadata.getName()).thenReturn("WorkflowName"); + when(expectedMetadata.getRuntimeStatus()).thenReturn(OrchestrationRuntimeStatus.RUNNING); + when(mockInnerClient.getInstanceMetadata(instanceId, true)).thenReturn(expectedMetadata); + + // Act + WorkflowInstanceStatus metadata = client.getInstanceState(instanceId, true); + + // Assert + verify(mockInnerClient, times(1)).getInstanceMetadata(instanceId, true); + assertNotEquals(metadata, null); + assertEquals(metadata.getInstanceId(), expectedMetadata.getInstanceId()); + assertEquals(metadata.getName(), expectedMetadata.getName()); + assertEquals(metadata.isRunning(), expectedMetadata.isRunning()); + assertEquals(metadata.isCompleted(), expectedMetadata.isCompleted()); + } + + @Test + public void waitForInstanceStart() throws TimeoutException { + + // Arrange + String instanceId = "TestWorkflowInstanceId"; + Duration timeout = Duration.ofSeconds(10); + + OrchestrationMetadata expectedMetadata = mock(OrchestrationMetadata.class); + when(expectedMetadata.getInstanceId()).thenReturn(instanceId); + when(mockInnerClient.waitForInstanceStart(instanceId, timeout, true)).thenReturn(expectedMetadata); + + // Act + WorkflowInstanceStatus result = client.waitForInstanceStart(instanceId, timeout, true); + + // Assert + verify(mockInnerClient, times(1)).waitForInstanceStart(instanceId, timeout, true); + assertNotEquals(result, null); + assertEquals(result.getInstanceId(), expectedMetadata.getInstanceId()); + } + + @Test + public void waitForInstanceCompletion() throws TimeoutException { + + // Arrange + String instanceId = "TestWorkflowInstanceId"; + Duration timeout = Duration.ofSeconds(10); + + OrchestrationMetadata expectedMetadata = mock(OrchestrationMetadata.class); + when(expectedMetadata.getInstanceId()).thenReturn(instanceId); + when(mockInnerClient.waitForInstanceCompletion(instanceId, timeout, true)).thenReturn(expectedMetadata); + + // Act + WorkflowInstanceStatus result = client.waitForInstanceCompletion(instanceId, timeout, true); + + // Assert + verify(mockInnerClient, times(1)).waitForInstanceCompletion(instanceId, timeout, true); + assertNotEquals(result, null); + assertEquals(result.getInstanceId(), expectedMetadata.getInstanceId()); + } + + @Test + public void raiseEvent() { + String expectedInstanceId = "TestWorkflowInstanceId"; + String expectedEventName = "TestEventName"; + Object expectedEventPayload = new Object(); + client.raiseEvent(expectedInstanceId, expectedEventName, expectedEventPayload); + verify(mockInnerClient, times(1)).raiseEvent(expectedInstanceId, + expectedEventName, expectedEventPayload); + } + + @Test + public void purgeInstance() { + String expectedArgument = "TestWorkflowInstanceId"; + client.purgeInstance(expectedArgument); + verify(mockInnerClient, times(1)).purgeInstance(expectedArgument); + } + + @Test + public void createTaskHub() { + boolean expectedArgument = true; + client.createTaskHub(expectedArgument); + verify(mockInnerClient, times(1)).createTaskHub(expectedArgument); } @Test - public void closeWithInnerClientRuntimeException() throws InterruptedException { - doThrow(RuntimeException.class).when(mockInnerClient).close(); + public void deleteTaskHub() { + client.deleteTaskHub(); + verify(mockInnerClient, times(1)).deleteTaskHub(); + } - assertThrows(RuntimeException.class, () -> { client.close(); }); + @Test + public void close() throws InterruptedException { + client.close(); verify(mockInnerClient, times(1)).close(); verify(mockGrpcChannel, times(1)).shutdown(); } diff --git a/sdk-workflows/src/test/java/io/dapr/workflows/client/WorkflowInstanceStatusTest.java b/sdk-workflows/src/test/java/io/dapr/workflows/client/WorkflowInstanceStatusTest.java new file mode 100644 index 000000000..ba05c7f3e --- /dev/null +++ b/sdk-workflows/src/test/java/io/dapr/workflows/client/WorkflowInstanceStatusTest.java @@ -0,0 +1,232 @@ +/* + * Copyright 2023 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.workflows.client; + +import com.microsoft.durabletask.FailureDetails; +import com.microsoft.durabletask.OrchestrationMetadata; +import com.microsoft.durabletask.OrchestrationRuntimeStatus; +import io.dapr.workflows.runtime.WorkflowRuntimeStatus; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.time.Instant; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + + +public class WorkflowInstanceStatusTest { + + private OrchestrationMetadata mockOrchestrationMetadata; + private WorkflowInstanceStatus workflowMetadata; + + @Before + public void setUp() throws Exception { + mockOrchestrationMetadata = mock(OrchestrationMetadata.class); + workflowMetadata = new WorkflowInstanceStatus(mockOrchestrationMetadata); + } + + @Test + public void getInstanceId() { + // Arrange + String expected = "instanceId"; + when(mockOrchestrationMetadata.getInstanceId()).thenReturn(expected); + + // Act + String result = workflowMetadata.getInstanceId(); + + // Assert + verify(mockOrchestrationMetadata, times(1)).getInstanceId(); + Assert.assertEquals(result, expected); + } + + @Test + public void getName() { + // Arrange + String expected = "WorkflowName"; + when(mockOrchestrationMetadata.getName()).thenReturn(expected); + + // Act + String result = workflowMetadata.getName(); + + // Assert + verify(mockOrchestrationMetadata, times(1)).getName(); + Assert.assertEquals(result, expected); + } + + @Test + public void getCreatedAt() { + // Arrange + Instant expected = Instant.now(); + when(mockOrchestrationMetadata.getCreatedAt()).thenReturn(expected); + + // Act + Instant result = workflowMetadata.getCreatedAt(); + + // Assert + verify(mockOrchestrationMetadata, times(1)).getCreatedAt(); + Assert.assertEquals(result, expected); + } + + @Test + public void getLastUpdatedAt() { + // Arrange + Instant expected = Instant.now(); + when(mockOrchestrationMetadata.getLastUpdatedAt()).thenReturn(expected); + + // Act + Instant result = workflowMetadata.getLastUpdatedAt(); + + // Assert + verify(mockOrchestrationMetadata, times(1)).getLastUpdatedAt(); + Assert.assertEquals(result, expected); + } + + @Test + public void getFailureDetails() { + // Arrange + FailureDetails mockFailureDetails = mock(FailureDetails.class); + when(mockFailureDetails.getErrorType()).thenReturn("errorType"); + when(mockFailureDetails.getErrorMessage()).thenReturn("errorMessage"); + when(mockFailureDetails.getStackTrace()).thenReturn("stackTrace"); + + OrchestrationMetadata orchestrationMetadata = mock(OrchestrationMetadata.class); + when(orchestrationMetadata.getFailureDetails()).thenReturn(mockFailureDetails); + + // Act + WorkflowInstanceStatus metadata = new WorkflowInstanceStatus(orchestrationMetadata); + WorkflowFailureDetails result = metadata.getFailureDetails(); + + // Assert + verify(orchestrationMetadata, times(1)).getFailureDetails(); + Assert.assertEquals(result.getErrorType(), mockFailureDetails.getErrorType()); + Assert.assertEquals(result.getErrorMessage(), mockFailureDetails.getErrorMessage()); + Assert.assertEquals(result.getStackTrace(), mockFailureDetails.getStackTrace()); + } + + @Test + public void getRuntimeStatus() { + // Arrange + WorkflowRuntimeStatus expected = WorkflowRuntimeStatus.RUNNING; + when(mockOrchestrationMetadata.getRuntimeStatus()).thenReturn(OrchestrationRuntimeStatus.RUNNING); + + // Act + WorkflowRuntimeStatus result = workflowMetadata.getRuntimeStatus(); + + // Assert + verify(mockOrchestrationMetadata, times(1)).getRuntimeStatus(); + Assert.assertEquals(result, expected); + } + + @Test + public void isRunning() { + // Arrange + boolean expected = true; + when(mockOrchestrationMetadata.isRunning()).thenReturn(expected); + + // Act + boolean result = workflowMetadata.isRunning(); + + // Assert + verify(mockOrchestrationMetadata, times(1)).isRunning(); + Assert.assertEquals(result, expected); + } + + @Test + public void isCompleted() { + // Arrange + boolean expected = true; + when(mockOrchestrationMetadata.isCompleted()).thenReturn(expected); + + // Act + boolean result = workflowMetadata.isCompleted(); + + // Assert + verify(mockOrchestrationMetadata, times(1)).isCompleted(); + Assert.assertEquals(result, expected); + } + + @Test + public void getSerializedInput() { + // Arrange + String expected = "{input: \"test\"}"; + when(mockOrchestrationMetadata.getSerializedInput()).thenReturn(expected); + + // Act + String result = workflowMetadata.getSerializedInput(); + + // Assert + verify(mockOrchestrationMetadata, times(1)).getSerializedInput(); + Assert.assertEquals(result, expected); + } + + @Test + public void getSerializedOutput() { + // Arrange + String expected = "{output: \"test\"}"; + when(mockOrchestrationMetadata.getSerializedOutput()).thenReturn(expected); + + // Act + String result = workflowMetadata.getSerializedOutput(); + + // Assert + verify(mockOrchestrationMetadata, times(1)).getSerializedOutput(); + Assert.assertEquals(result, expected); + } + + @Test + public void readInputAs() { + // Arrange + String expected = "[{property: \"test input\"}}]"; + when(mockOrchestrationMetadata.readInputAs(String.class)).thenReturn(expected); + + // Act + String result = workflowMetadata.readInputAs(String.class); + + // Assert + verify(mockOrchestrationMetadata, times(1)).readInputAs(String.class); + Assert.assertEquals(result, expected); + } + + @Test + public void readOutputAs() { + // Arrange + String expected = "[{property: \"test output\"}}]"; + when(mockOrchestrationMetadata.readOutputAs(String.class)).thenReturn(expected); + + // Act + String result = workflowMetadata.readOutputAs(String.class); + + // Assert + verify(mockOrchestrationMetadata, times(1)).readOutputAs(String.class); + Assert.assertEquals(result, expected); + } + + @Test + public void testToString() { + // Arrange + String expected = "string value"; + when(mockOrchestrationMetadata.toString()).thenReturn(expected); + + // Act + String result = workflowMetadata.toString(); + + // Assert + //verify(mockOrchestrationMetadata, times(1)).toString(); + Assert.assertEquals(result, expected); + } +} diff --git a/sdk-workflows/src/test/java/io/dapr/workflows/runtime/ActivityWrapperTest.java b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/ActivityWrapperTest.java new file mode 100644 index 000000000..1b3a134b4 --- /dev/null +++ b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/ActivityWrapperTest.java @@ -0,0 +1,43 @@ +package io.dapr.workflows.runtime; + +import com.microsoft.durabletask.TaskActivityContext; +import org.junit.Assert; +import org.junit.Test; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + + +public class ActivityWrapperTest { + public static class TestActivity implements WorkflowActivity { + @Override + public Object run(WorkflowActivityContext ctx) { + String activityContextName = ctx.getName(); + return ctx.getInput(String.class) + " world! from " + activityContextName; + } + } + + @Test + public void getName() throws NoSuchMethodException { + ActivityWrapper wrapper = new ActivityWrapper<>( + ActivityWrapperTest.TestActivity.class); + Assert.assertEquals( + "io.dapr.workflows.runtime.ActivityWrapperTest.TestActivity", + wrapper.getName() + ); + } + + @Test + public void createWithClass() throws NoSuchMethodException { + TaskActivityContext mockContext = mock(TaskActivityContext.class); + ActivityWrapper wrapper = new ActivityWrapper<>( + ActivityWrapperTest.TestActivity.class); + when(mockContext.getInput(String.class)).thenReturn("Hello"); + when(mockContext.getName()).thenReturn("TestActivityContext"); + Object result = wrapper.create().run(mockContext); + verify(mockContext, times(1)).getInput(String.class); + Assert.assertEquals("Hello world! from TestActivityContext", result); + } +} diff --git a/sdk-workflows/src/test/java/io/dapr/workflows/runtime/OrchestratorWrapperTest.java b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/OrchestratorWrapperTest.java index eff6ccd98..6d6efa15a 100644 --- a/sdk-workflows/src/test/java/io/dapr/workflows/runtime/OrchestratorWrapperTest.java +++ b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/OrchestratorWrapperTest.java @@ -21,15 +21,18 @@ import org.junit.Assert; import org.junit.Test; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class OrchestratorWrapperTest { - public static class TestWorkflow extends Workflow { - @Override - public WorkflowStub create() { - return WorkflowContext::getInstanceId; - } + public static class TestWorkflow extends Workflow { + @Override + public WorkflowStub create() { + return WorkflowContext::getInstanceId; } + } @Test public void getName() { @@ -44,9 +47,9 @@ public void getName() { public void createWithClass() { TaskOrchestrationContext mockContext = mock(TaskOrchestrationContext.class); OrchestratorWrapper wrapper = new OrchestratorWrapper<>(TestWorkflow.class); - when( mockContext.getInstanceId() ).thenReturn("uuid"); + when(mockContext.getInstanceId()).thenReturn("uuid"); wrapper.create().run(mockContext); verify(mockContext, times(1)).getInstanceId(); } -} +} \ No newline at end of file diff --git a/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowRuntimeBuilderTest.java b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowRuntimeBuilderTest.java index 905b89e9d..23fb5254a 100644 --- a/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowRuntimeBuilderTest.java +++ b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowRuntimeBuilderTest.java @@ -11,7 +11,15 @@ public class WorkflowRuntimeBuilderTest { public static class TestWorkflow extends Workflow { @Override public WorkflowStub create() { - return ctx -> { }; + return ctx -> { + }; + } + } + + public static class TestActivity implements WorkflowActivity { + @Override + public Object run(WorkflowActivityContext ctx) { + return null; } } @@ -20,5 +28,13 @@ public void registerValidWorkflowClass() { assertDoesNotThrow(() -> new WorkflowRuntimeBuilder().registerWorkflow(TestWorkflow.class)); } + @Test + public void registerValidWorkflowActivityClass() { + assertDoesNotThrow(() -> new WorkflowRuntimeBuilder().registerActivity(TestActivity.class)); + } + @Test + public void buildTest() { + assertDoesNotThrow(() -> new WorkflowRuntimeBuilder().build()); + } } diff --git a/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowRuntimeStatusTest.java b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowRuntimeStatusTest.java new file mode 100644 index 000000000..45423e2f1 --- /dev/null +++ b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowRuntimeStatusTest.java @@ -0,0 +1,96 @@ +/* + * Copyright 2023 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.workflows.runtime; + + +import com.microsoft.durabletask.OrchestrationRuntimeStatus; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + + +public class WorkflowRuntimeStatusTest { + + @Before + public void setUp() throws Exception { + + } + + @Test + public void fromOrchestrationRuntimeStatus() { + + Assert.assertEquals(WorkflowRuntimeStatus.fromOrchestrationRuntimeStatus(OrchestrationRuntimeStatus.RUNNING), + WorkflowRuntimeStatus.RUNNING); + + Assert.assertEquals(WorkflowRuntimeStatus.fromOrchestrationRuntimeStatus(OrchestrationRuntimeStatus.COMPLETED), + WorkflowRuntimeStatus.COMPLETED); + + Assert.assertEquals( + WorkflowRuntimeStatus.fromOrchestrationRuntimeStatus(OrchestrationRuntimeStatus.CONTINUED_AS_NEW), + WorkflowRuntimeStatus.CONTINUED_AS_NEW); + + Assert.assertEquals(WorkflowRuntimeStatus.fromOrchestrationRuntimeStatus(OrchestrationRuntimeStatus.FAILED), + WorkflowRuntimeStatus.FAILED); + + Assert.assertEquals(WorkflowRuntimeStatus.fromOrchestrationRuntimeStatus(OrchestrationRuntimeStatus.CANCELED), + WorkflowRuntimeStatus.CANCELED); + + Assert.assertEquals(WorkflowRuntimeStatus.fromOrchestrationRuntimeStatus(OrchestrationRuntimeStatus.TERMINATED), + WorkflowRuntimeStatus.TERMINATED); + + Assert.assertEquals(WorkflowRuntimeStatus.fromOrchestrationRuntimeStatus(OrchestrationRuntimeStatus.PENDING), + WorkflowRuntimeStatus.PENDING); + + Assert.assertEquals(WorkflowRuntimeStatus.fromOrchestrationRuntimeStatus(OrchestrationRuntimeStatus.SUSPENDED), + WorkflowRuntimeStatus.SUSPENDED); + } + + @Test + public void toOrchestrationRuntimeStatus() { + Assert.assertEquals(WorkflowRuntimeStatus.toOrchestrationRuntimeStatus(WorkflowRuntimeStatus.RUNNING), + OrchestrationRuntimeStatus.RUNNING); + + Assert.assertEquals(WorkflowRuntimeStatus.toOrchestrationRuntimeStatus(WorkflowRuntimeStatus.COMPLETED), + OrchestrationRuntimeStatus.COMPLETED); + + Assert.assertEquals( + WorkflowRuntimeStatus.toOrchestrationRuntimeStatus(WorkflowRuntimeStatus.CONTINUED_AS_NEW), + OrchestrationRuntimeStatus.CONTINUED_AS_NEW); + + Assert.assertEquals(WorkflowRuntimeStatus.toOrchestrationRuntimeStatus(WorkflowRuntimeStatus.FAILED), + OrchestrationRuntimeStatus.FAILED); + + Assert.assertEquals(WorkflowRuntimeStatus.toOrchestrationRuntimeStatus(WorkflowRuntimeStatus.CANCELED), + OrchestrationRuntimeStatus.CANCELED); + + Assert.assertEquals(WorkflowRuntimeStatus.toOrchestrationRuntimeStatus(WorkflowRuntimeStatus.TERMINATED), + OrchestrationRuntimeStatus.TERMINATED); + + Assert.assertEquals(WorkflowRuntimeStatus.toOrchestrationRuntimeStatus(WorkflowRuntimeStatus.PENDING), + OrchestrationRuntimeStatus.PENDING); + + Assert.assertEquals(WorkflowRuntimeStatus.toOrchestrationRuntimeStatus(WorkflowRuntimeStatus.SUSPENDED), + OrchestrationRuntimeStatus.SUSPENDED); + } + + @Test + public void fromOrchestrationRuntimeStatusThrowsIllegalArgumentException() { + try { + WorkflowRuntimeStatus.fromOrchestrationRuntimeStatus(null); + Assert.fail("Expected exception not thrown"); + } catch (IllegalArgumentException e) { + Assert.assertEquals("status cannot be null", e.getMessage()); + } + } +} diff --git a/sdk/src/test/java/io/dapr/client/domain/query/QueryTest.java b/sdk/src/test/java/io/dapr/client/domain/query/QueryTest.java index 74602cf3f..220a7f209 100644 --- a/sdk/src/test/java/io/dapr/client/domain/query/QueryTest.java +++ b/sdk/src/test/java/io/dapr/client/domain/query/QueryTest.java @@ -28,7 +28,7 @@ public void testQuerySerialize() throws JsonProcessingException { orFilter.addClause(new EqFilter<>("v2", true)); orFilter.addClause(new InFilter<>("v3", 1.3, 1.5)); - filter.addClause(orFilter); + filter.addClause((Filter) orFilter); // Add Filter q.setFilter(filter); @@ -110,7 +110,7 @@ public void testQueryInValidFilter() throws JsonProcessingException { orFilter.addClause(new EqFilter<>("v2", true)); // invalid OR filter - filter.addClause(orFilter); + filter.addClause((Filter) orFilter); // Add Filter q.setFilter(filter);