Skip to content

Commit

Permalink
Add feature: Signal child workflow (#311)
Browse files Browse the repository at this point in the history
Implement features/child_workflow/signal for the 4 languages:

- A child workflow is defined that waits for a signal and then returns the signal data
- A parent workflow is defined that starts the child workflow, sends the signal, and returns the result of the child workflow.
- We assert that the result of the parent workflow is equal to the signal data that it sent.
  • Loading branch information
dandavison authored Jul 14, 2023
1 parent a809899 commit 6385bb5
Show file tree
Hide file tree
Showing 9 changed files with 266 additions and 8 deletions.
4 changes: 2 additions & 2 deletions features/child_workflow/signal/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ This feature executes a Workflow that:

- starts a Child Workflow that is blocked on a flag
- sends an unblock Signal to the Child
- returns when when the Child completes
- returns when the Child completes

# Detailed spec

TODO
TODO
52 changes: 52 additions & 0 deletions features/child_workflow/signal/feature.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package signal

import (
"context"
"time"

"go.temporal.io/features/harness/go/harness"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/workflow"
)

const (
UnblockSignal = "unblock-signal"
UnblockMessage = "unblock"
)

// A workflow that starts a child workflow, unblocks it, and returns the result
// of the child workflow.
func Workflow(ctx workflow.Context) (string, error) {
ctx = workflow.WithChildOptions(ctx, workflow.ChildWorkflowOptions{
WorkflowExecutionTimeout: 10 * time.Minute,
WorkflowTaskTimeout: time.Minute,
})
childWorkflowFut := workflow.ExecuteChildWorkflow(ctx, ChildWorkflow)
childWorkflowFut.SignalChildWorkflow(ctx, UnblockSignal, UnblockMessage)
result := ""
err := childWorkflowFut.Get(ctx, &result)
if err != nil {
return "", err
}
return result, nil
}

// A workflow that waits for a signal and returns the data received.
func ChildWorkflow(ctx workflow.Context) (string, error) {
unblockMessage := ""
signalCh := workflow.GetSignalChannel(ctx, UnblockSignal)
signalCh.Receive(ctx, &unblockMessage)
return unblockMessage, nil
}

var Feature = harness.Feature{
Workflows: []interface{}{Workflow, ChildWorkflow},
ExpectRunResult: UnblockMessage,
Execute: func(ctx context.Context, runner *harness.Runner) (client.WorkflowRun, error) {
run, err := runner.ExecuteDefault(ctx)
if err != nil {
return nil, err
}
return run, nil
},
}
98 changes: 98 additions & 0 deletions features/child_workflow/signal/feature.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package child_workflow.signal;

import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowOptions;
import io.temporal.sdkfeatures.Assertions;
import io.temporal.sdkfeatures.Feature;
import io.temporal.sdkfeatures.Run;
import io.temporal.sdkfeatures.Runner;
import io.temporal.worker.Worker;
import io.temporal.workflow.Async;
import io.temporal.workflow.Promise;
import io.temporal.workflow.SignalMethod;
import io.temporal.workflow.Workflow;
import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;
import java.time.Duration;

@WorkflowInterface
public interface feature extends Feature {

@WorkflowInterface
public interface ChildWorkflow {

@WorkflowMethod
String workflow();

@SignalMethod
void unblock(String message);
}

class ChildWorkflowImpl implements ChildWorkflow {
/*
* A workflow that waits for a signal and returns the data received.
*/

private String childWorkflowUnblockMessage;

@Override
public String workflow() {
Workflow.await(() -> childWorkflowUnblockMessage != null);
return childWorkflowUnblockMessage;
}

@Override
public void unblock(String message) {
childWorkflowUnblockMessage = message;
}
}

@WorkflowMethod
String workflow();

class Impl implements feature {

@Override
public void prepareWorker(Worker worker) {
worker.registerWorkflowImplementationTypes(ChildWorkflowImpl.class);
}

private static final String UNBLOCK_MESSAGE = "unblock";

/*
* Parent workflow
*
* A workflow that starts a child workflow, unblocks it, and returns the
* result of the child workflow.
*/

@Override
public String workflow() {
ChildWorkflow child = Workflow.newChildWorkflowStub(ChildWorkflow.class);
Promise<String> childResult = Async.function(child::workflow);
child.unblock(UNBLOCK_MESSAGE);
return childResult.get();
}

/* Test */

@Override
public Run execute(Runner runner) throws Exception {
var options =
WorkflowOptions.newBuilder()
.setTaskQueue(runner.config.taskQueue)
.setWorkflowExecutionTimeout(Duration.ofMinutes(1))
.build();
var stub = runner.client.newWorkflowStub(feature.class, options);
var execution = WorkflowClient.start(stub::workflow);
var method = runner.featureInfo.metadata.getWorkflowMethods().get(0);
return new Run(method, execution);
}

@Override
public void checkResult(Runner runner, Run run) {
var resultStr = runner.waitForRunResult(run, String.class);
Assertions.assertEquals(UNBLOCK_MESSAGE, resultStr);
}
}
}
59 changes: 59 additions & 0 deletions features/child_workflow/signal/feature.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
from datetime import timedelta
from typing import Optional
from uuid import uuid4

from temporalio import workflow
from temporalio.client import WorkflowHandle

from harness.python.feature import Runner, register_feature

UNBLOCK_MESSAGE = "unblock"


@workflow.defn
class Workflow:
"""
A workflow that starts a child workflow, unblocks it, and returns the result of the child workflow.
"""

@workflow.run
async def run(self) -> str:
child_handle = await workflow.start_child_workflow(ChildWorkflow.run)
await child_handle.signal(ChildWorkflow.unblock, UNBLOCK_MESSAGE)
return await child_handle


@workflow.defn
class ChildWorkflow:
"""
A workflow that waits for a signal and returns the data received.
"""

def __init__(self) -> None:
self._unblock_message: Optional[str] = None

@workflow.run
async def run(self) -> str:
await workflow.wait_condition(lambda: self._unblock_message is not None)
assert self._unblock_message is not None
return self._unblock_message

@workflow.signal
def unblock(self, message: Optional[str]) -> None:
self._unblock_message = message


async def start(runner: Runner) -> WorkflowHandle:
return await runner.client.start_workflow(
Workflow,
id=f"{runner.feature.rel_dir}-{uuid4()}",
task_queue=runner.task_queue,
execution_timeout=timedelta(minutes=1),
)


register_feature(
workflows=[Workflow, ChildWorkflow],
expect_run_result=UNBLOCK_MESSAGE,
start=start,
)
42 changes: 42 additions & 0 deletions features/child_workflow/signal/feature.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import { randomUUID } from 'crypto';
import * as assert from 'assert';
import { Feature } from '@temporalio/harness';
import * as wf from '@temporalio/workflow';

const unblockMessage = 'unblock';

// A workflow that starts a child workflow, unblocks it, and returns the result
// of the child workflow.
export async function workflow(): Promise<string> {
const childHandle = await wf.startChild(childWorkflow);
await childHandle.signal(unblock, unblockMessage);
return await childHandle.result();
}

const unblock = wf.defineSignal<[string]>('unblock');

// A workflow that waits for a signal and returns the data received.
export async function childWorkflow(): Promise<string> {
let unblockMessage = '';
wf.setHandler(unblock, (message: string) => {
unblockMessage = message;
});
await wf.condition(() => unblockMessage !== '');
return unblockMessage;
}

export const feature = new Feature({
workflow,
async execute(runner) {
return await runner.client.start(workflow, {
taskQueue: runner.options.taskQueue,
workflowId: `${runner.source.relDir}-${randomUUID()}`,
workflowExecutionTimeout: 60000,
...(runner.feature.options.workflowStartOptions ?? {}),
});
},
async checkResult(runner, handle) {
const result = await handle.result();
assert.equal(result, unblockMessage);
},
});
2 changes: 2 additions & 0 deletions features/features.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"go.temporal.io/features/features/build_id_versioning/unversioned_worker_no_task"
"go.temporal.io/features/features/build_id_versioning/versions_added_while_worker_polling"
"go.temporal.io/features/features/child_workflow/result"
"go.temporal.io/features/features/child_workflow/signal"
"go.temporal.io/features/features/continue_as_new/continue_as_same"
"go.temporal.io/features/features/data_converter/binary"
"go.temporal.io/features/features/data_converter/binary_protobuf"
Expand Down Expand Up @@ -68,6 +69,7 @@ func init() {
result.Feature,
retry_on_error.Feature,
failure.Feature,
signal.Feature,
successful_query.Feature,
timeout_due_to_no_active_workers.Feature,
trigger.Feature,
Expand Down
15 changes: 9 additions & 6 deletions harness/java/io/temporal/sdkfeatures/Feature.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
import io.temporal.client.WorkflowClientOptions;
import io.temporal.client.WorkflowOptions;
import io.temporal.serviceclient.WorkflowServiceStubsOptions;
import io.temporal.worker.Worker;
import io.temporal.worker.WorkerFactoryOptions;
import io.temporal.worker.WorkerOptions;
import io.temporal.workflow.Workflow;

import java.util.function.Consumer;

public interface Feature {
Expand All @@ -19,15 +19,15 @@ default <T> T activities(Class<T> activityIface, Consumer<ActivityOptions.Builde
return (T) Workflow.newActivityStub(activityIface, builder.build());
}

default void workflowServiceOptions(WorkflowServiceStubsOptions.Builder builder) { }
default void workflowServiceOptions(WorkflowServiceStubsOptions.Builder builder) {}

default void workflowClientOptions(WorkflowClientOptions.Builder builder) { }
default void workflowClientOptions(WorkflowClientOptions.Builder builder) {}

default void workerFactoryOptions(WorkerFactoryOptions.Builder builder) { }
default void workerFactoryOptions(WorkerFactoryOptions.Builder builder) {}

default void workerOptions(WorkerOptions.Builder builder) { }
default void workerOptions(WorkerOptions.Builder builder) {}

default void workflowOptions(WorkflowOptions.Builder builder) { }
default void workflowOptions(WorkflowOptions.Builder builder) {}

default Run execute(Runner runner) throws Exception {
return runner.executeSingleParameterlessWorkflow();
Expand All @@ -41,4 +41,7 @@ default void checkResult(Runner runner, Run run) throws Exception {
default void checkHistory(Runner runner, Run run) throws Exception {
runner.checkCurrentAndPastHistories(run);
}

// This may be used to e.g. register additional workflow classes
default void prepareWorker(Worker worker) {}
}
1 change: 1 addition & 0 deletions harness/java/io/temporal/sdkfeatures/PreparedFeature.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ public class PreparedFeature {
activity.retry_on_error.feature.Impl.class,
activity.cancel_try_cancel.feature.Impl.class,
child_workflow.result.feature.Impl.class,
child_workflow.signal.feature.Impl.class,
continue_as_new.continue_as_same.feature.Impl.class,
data_converter.binary.feature.Impl.class,
data_converter.binary_protobuf.feature.Impl.class,
Expand Down
1 change: 1 addition & 0 deletions harness/java/io/temporal/sdkfeatures/Runner.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ public void restartWorker() {
var workerBuild = WorkerOptions.newBuilder();
feature.workerOptions(workerBuild);
this.worker = workerFactory.newWorker(config.taskQueue, workerBuild.build());
feature.prepareWorker(this.worker);

// Register workflow class
worker.registerWorkflowImplementationTypes(featureInfo.factoryClass);
Expand Down

0 comments on commit 6385bb5

Please sign in to comment.