Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add child workflow implementation #58

Merged
merged 8 commits into from
May 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 81 additions & 0 deletions dev-resources/utils.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
(ns utils
ghaskins marked this conversation as resolved.
Show resolved Hide resolved
(:require [taoensso.timbre :as log]
[temporal.activity :as a :refer [defactivity]]
[temporal.client.core :as c]
[temporal.client.worker :as worker]
[temporal.workflow :as w :refer [defworkflow]])
(:import [java.time Duration]))

(def client (atom nil))
(def current-worker-thread (atom nil))

(def default-client-options {:target "localhost:7233"
:namespace "default"
:enable-https false})

(def default-worker-options {:task-queue "default"})

(def default-workflow-options {:task-queue "default"
:workflow-execution-timeout (Duration/ofSeconds 30)
:retry-options {:maximum-attempts 1}
:workflow-id "test-workflow"})

(defactivity greet-activity
[ctx {:keys [name] :as args}]
(log/info "greet-activity:" args)
(str "Hi, " name))

(defworkflow child-workflow
[{names :names :as args}]
(log/info "child-workflow:" names)
(for [name names]
@(a/invoke greet-activity {:name name})))

(defworkflow parent-workflow
[args]
(log/info "parent-workflow:" args)
@(w/invoke child-workflow args (merge default-workflow-options {:workflow-id "child-workflow"})))

(defn create-temporal-client
"Creates a new temporal client if the old one does not exist"
([] (create-temporal-client nil))
([options]
(when-not @client
(let [options (merge default-client-options options)]
(log/info "creating temporal client" options)
(reset! client (c/create-client options))))))

(defn worker-loop
([client] (worker-loop client nil))
([client options]
(let [options (merge default-worker-options options)]
(log/info "starting temporal worker" options)
(worker/start client options))))

(defn create-temporal-worker
"Starts a new instance running on another daemon thread,
stops the current temporal worker and thread if they exist"
([client] (create-temporal-worker client nil))
([client options]
(when (and @current-worker-thread (.isAlive @current-worker-thread))
(.interrupt @current-worker-thread)
(reset! current-worker-thread nil))
(let [thread (Thread. (partial worker-loop client options))]
(doto thread
(.setDaemon true)
(.start))
(reset! current-worker-thread thread))))

(defn execute-workflow
([client workflow arguments] (execute-workflow client workflow arguments nil))
([client workflow arguments options]
(let [options (merge default-workflow-options options)
workflow (c/create-workflow client workflow options)]
(log/info "executing workflow" arguments)
(c/start workflow arguments)
@(c/get-result workflow))))

(comment
(do (create-temporal-client)
(create-temporal-worker @client)
(execute-workflow @client parent-workflow {:names ["Hanna" "Bob" "Tracy" "Felix"]})))
64 changes: 64 additions & 0 deletions doc/child_workflows.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# Child Workflows

## What is a Child Workflow?

A Child Workflow is a Workflow execution spawned from within a Workflow.

Child Workflows orchestrate invocations of Activities just like Workflows do.

Child Workflows should not be used for code organization, however they can be used to partition a Workflow execution's event history into smaller chunks which helps avoid the roughly *~50MB* Workflow event history limit, amongst other use cases.

You should visit the [workflows](https://cljdoc.org/d/io.github.manetu/temporal-sdk/CURRENT/workflows) page to learn more about Workflows, their constraints, and their executions in general.

For more information about Child Workflows in general visit [Temporal Child Workflows](https://docs.temporal.io/encyclopedia/child-workflows)

## Implementing Child Workflows

In this Clojure SDK programming model, a Temporal Workflow is a function declared with [defworkflow](https://cljdoc.org/d/io.github.manetu/temporal-sdk/CURRENT/api/temporal.workflow#defworkflow)

And a Child workflow is declared in the exact same way

### Example

```clojure
(require '[temporal.workflow :refer [defworkflow]])

(defworkflow my-workflow
[{:keys [foo]}]
...)
```
## Starting Child Workflow Executions

In this Clojure SDK, Workflows start Child Workflows with [temporal.workflow/invoke](https://cljdoc.org/d/io.github.manetu/temporal-sdk/CURRENT/api/temporal.workflow#invoke)

The options (`ChildWorkflowOptions`) provided to [temporal.workflow/invoke](https://cljdoc.org/d/io.github.manetu/temporal-sdk/CURRENT/api/temporal.workflow#invoke) are similar to the ones required to create a regular workflow with [temporal.client.core/create-workflow](https://cljdoc.org/d/io.github.manetu/temporal-sdk/CURRENT/api/temporal.client.core#create-workflow)

One big difference however is Child Workflows can provide options to control what happens to themselves with their parents close/fail/complete.

When a Parent Workflow Execution stops, the Temporal Cluster determines what will happen to any running child workflow executions based on the `:parent-close-policy` option.

See [Temporal Parent Close Policy](https://docs.temporal.io/encyclopedia/child-workflows#parent-close-policy) for more information

### Example

```clojure
(require '[temporal.workflow :refer [defworkflow]])
(require '[temporal.activity :refer [defactivity] :as a])

(defactivity child-greeter-activity
[ctx {:keys [name] :as args}]
(str "Hi, " name))

(defworkflow child-workflow
[{:keys [names] :as args}]
(for [name names]
@(a/invoke child-greeter-activity {:name name})))

(defworkflow parent-workflow
[args]
@(w/invoke child-workflow args {:retry-options {:maximum-attempts 1}
:workflow-task-timeout 10
:workflow-execution-timeout 3600
:workflow-run-timeout 3600}))
```

1 change: 1 addition & 0 deletions doc/cljdoc.edn
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{:cljdoc.doc/tree
[["Readme" {:file "README.md"}]
["Workflows" {:file "doc/workflows.md"}]
["Child Workflows" {:file "doc/child_workflows.md"}]
["Activities" {:file "doc/activities.md"}]
["Workers" {:file "doc/workers.md"}]
["Clients" {:file "doc/clients.md"}]
Expand Down
11 changes: 2 additions & 9 deletions src/temporal/activity.clj
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,6 @@ along with the Activity Task for the next retry attempt and can be extracted by
[]
(a/get-info))

(defn- complete-invoke
[activity result]
(log/trace activity "completed with" (count result) "bytes")
(let [r (nippy/thaw result)]
(log/trace activity "results:" r)
r))

(defn invoke
"
Invokes 'activity' with 'params' from within a workflow context. Returns a promise that when derefed will resolve to
Expand Down Expand Up @@ -98,7 +91,7 @@ Arguments:
stub (Workflow/newUntypedActivityStub (a/invoke-options-> options))]
(log/trace "invoke:" activity "with" params options)
(-> (.executeAsync stub act-name u/bytes-type (u/->objarray params))
(p/then (partial complete-invoke activity))
(p/then (partial u/complete-invoke activity))
(p/catch e/slingshot? e/recast-stone)
(p/catch (fn [e]
(log/error e)
Expand Down Expand Up @@ -139,7 +132,7 @@ Arguments:
stub (Workflow/newUntypedLocalActivityStub (a/local-invoke-options-> options))]
(log/trace "local-invoke:" activity "with" params options)
(-> (.executeAsync stub act-name u/bytes-type (u/->objarray params))
(p/then (partial complete-invoke activity))
(p/then (partial u/complete-invoke activity))
(p/catch e/slingshot? e/recast-stone)
(p/catch (fn [e]
(log/error e)
Expand Down
41 changes: 41 additions & 0 deletions src/temporal/internal/child_workflow.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
(ns temporal.internal.child-workflow
(:require [temporal.common :as common]
[temporal.internal.utils :as u]
[temporal.internal.workflow :as w])
(:import [java.time Duration]
[io.temporal.api.enums.v1 ParentClosePolicy]
[io.temporal.workflow ChildWorkflowOptions ChildWorkflowOptions$Builder ChildWorkflowCancellationType]))

(def cancellation-type->
{:abandon ChildWorkflowCancellationType/ABANDON
:try-cancel ChildWorkflowCancellationType/TRY_CANCEL
:wait-cancellation-completed ChildWorkflowCancellationType/WAIT_CANCELLATION_COMPLETED
:wait-cancellation-requested ChildWorkflowCancellationType/WAIT_CANCELLATION_REQUESTED})

(def parent-close-policy->
{:abandon ParentClosePolicy/PARENT_CLOSE_POLICY_ABANDON
:request-cancel ParentClosePolicy/PARENT_CLOSE_POLICY_REQUEST_CANCEL
:terminate ParentClosePolicy/PARENT_CLOSE_POLICY_TERMINATE})

(def ^:no-doc child-workflow-option-spec
{:task-queue #(.setTaskQueue ^ChildWorkflowOptions$Builder %1 (u/namify %2))
:workflow-id #(.setWorkflowId ^ChildWorkflowOptions$Builder %1 (u/namify %2))
:workflow-id-reuse-policy #(.setWorkflowIdReusePolicy ^ChildWorkflowOptions$Builder %1 (w/workflow-id-reuse-policy-> %2))
:parent-close-policy #(.setParentClosePolicy ^ChildWorkflowOptions$Builder %1 (parent-close-policy-> %2))
:workflow-execution-timeout #(.setWorkflowExecutionTimeout ^ChildWorkflowOptions$Builder %1 %2)
:workflow-run-timeout #(.setWorkflowRunTimeout ^ChildWorkflowOptions$Builder %1 %2)
:workflow-task-timeout #(.setWorkflowTaskTimeout ^ChildWorkflowOptions$Builder %1 %2)
:retry-options #(.setRetryOptions %1 (common/retry-options-> %2))
:cron-schedule #(.setCronSchedule ^ChildWorkflowOptions$Builder %1 %2)
:cancellation-type #(.setCancellationType ^ChildWorkflowOptions$Builder %1 (cancellation-type-> %2))
:memo #(.setMemo ^ChildWorkflowOptions$Builder %1 %2)})

(defn import-child-workflow-options
[{:keys [workflow-run-timeout workflow-execution-timeout] :as options}]
(cond-> options
(every? nil? [workflow-run-timeout workflow-execution-timeout])
(assoc :workflow-execution-timeout (Duration/ofSeconds 10))))

(defn child-workflow-options->
^ChildWorkflowOptions [options]
(u/build (ChildWorkflowOptions/newBuilder) child-workflow-option-spec (import-child-workflow-options options)))
9 changes: 8 additions & 1 deletion src/temporal/internal/utils.clj
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,13 @@
(fn [x]
(str (symbol x)))))

(defn complete-invoke
[stub result]
(log/trace stub "completed with" (count result) "bytes")
(let [r (nippy/thaw result)]
(log/trace stub "results:" r)
r))

(defn ->Func
[f]
(reify
Expand All @@ -141,4 +148,4 @@
(f x1 x2 x3 x4 x5))
Functions$Func6
(apply [_ x1 x2 x3 x4 x5 x6]
(f x1 x2 x3 x4 x5 x6))))
(f x1 x2 x3 x4 x5 x6))))
78 changes: 77 additions & 1 deletion src/temporal/workflow.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@
(:require
[taoensso.nippy :as nippy]
[taoensso.timbre :as log]
[promesa.core :as p]
[temporal.internal.exceptions :as e]
[temporal.internal.utils :as u]
[temporal.internal.workflow :as w])
[temporal.internal.workflow :as w]
[temporal.internal.child-workflow :as cw])
(:import [io.temporal.workflow DynamicQueryHandler Workflow]
[java.util.function Supplier]
[java.time Duration]))
Expand Down Expand Up @@ -111,3 +114,76 @@ Arguments:
(log/trace (str ~fqn ": ") args#)
(let [f# (fn ~params* (do ~@body))]
(f# args#))))))))

(defn invoke
"
Invokes a 'child workflow' with 'params' from within a workflow context.
Returns a promise that when derefed will resolve to the evaluation of the defworkflow once the workflow concludes.

Arguments:

- `workflow`: A reference to a symbol registered with [[defworkflow]], called a Child Workflow usually.
- `params`: Opaque serializable data that will be passed as arguments to the invoked child workflow
- `options`: See below.

#### options map

| Value | Description | Type | Default |
| --------------------------- | ------------------------------------------------------------------------------------------ | ------------ | ------- |
| :task-queue | Task queue to use for child workflow tasks | String | |
| :workflow-id | Workflow id to use when starting | String | |
| :workflow-id-reuse-policy | Specifies server behavior if a completed workflow with the same id exists | See `workflow id reuse policy types` below | |
| :parent-close-policy | Specifies how this workflow reacts to the death of the parent workflow | See `parent close policy types` below | |
| :workflow-execution-timeout | The time after which child workflow execution is automatically terminated | [Duration](https://docs.oracle.com/javase/8/docs/api//java/time/Duration.html) | 10 seconds |
| :workflow-run-timeout | The time after which child workflow run is automatically terminated | [Duration](https://docs.oracle.com/javase/8/docs/api//java/time/Duration.html) | |
| :workflow-task-timeout | Maximum execution time of a single workflow task | [Duration](https://docs.oracle.com/javase/8/docs/api//java/time/Duration.html) | |
| :retry-options | RetryOptions that define how child workflow is retried in case of failure | [[temporal.common/retry-options]] | |
| :cron-schedule | A cron schedule string | String | |
| :cancellation-type | In case of a child workflow cancellation it fails with a CanceledFailure | See `cancellation types` below | |
| :memo | Specifies additional non-indexed information in result of list workflow | String | |

#### cancellation types

| Value | Description |
| ------------------------- | --------------------------------------------------------------------------- |
| :try-cancel | Initiate a cancellation request and immediately report cancellation to the parent |
| :abandon | Do not request cancellation of the child workflow |
| :wait-cancellation-completed | Wait for child cancellation completion |
| :wait-cancellation-requested | Request cancellation of the child and wait for confirmation that the request was received |

#### parent close policy types

| Value | Description |
| ------------------------- | --------------------------------------------------------------------------- |
| :abandon | Do not request cancellation of the child workflow |
| :request-cancel | Request cancellation of the child and wait for confirmation that the request was received |
| :terminate | Terminate the child workflow |

#### workflow id reuse policy types

| Value | Description |
| ---------------------------- | --------------------------------------------------------------------------- |
| :allow-duplicate | Allow starting a child workflow execution using the same workflow id. |
| :allow-duplicate-failed-only | Allow starting a child workflow execution using the same workflow id, only when the last execution's final state is one of [terminated, cancelled, timed out, failed] |
| :reject-duplicate | Do not permit re-use of the child workflow id for this workflow. |
| :terminate-if-running | If a workflow is running using the same child workflow ID, terminate it and start a new one. If no running child workflow, then the behavior is the same as ALLOW_DUPLICATE |

```clojure
(defworkflow my-workflow
[ctx {:keys [foo] :as args}]
...)

(invoke my-workflow {:foo \"bar\"} {:start-to-close-timeout (Duration/ofSeconds 3))
```
"
([workflow params] (invoke workflow params {}))
([workflow params options]
(let [wf-name (w/get-annotated-name workflow)
stub (Workflow/newUntypedChildWorkflowStub wf-name (cw/child-workflow-options-> options))]
(log/trace "invoke:" workflow "with" params options)
(-> (.executeAsync stub u/bytes-type (u/->objarray params))
(p/then (partial u/complete-invoke workflow))
(p/catch e/slingshot? e/recast-stone)
(p/catch (fn [e]
(log/error e)
(throw e)))))))
39 changes: 34 additions & 5 deletions test/temporal/test/async.clj
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
;; Copyright © Manetu, Inc. All rights reserved

(ns temporal.test.async
(:require [clojure.test :refer :all]
(:require [clojure.test :refer [deftest testing is use-fixtures]]
[clojure.core.async :refer [go]]
[taoensso.timbre :as log]
[temporal.client.core :as c]
[temporal.workflow :refer [defworkflow]]
[temporal.activity :refer [defactivity] :as a]
[temporal.test.utils :as t]))
[temporal.client.core :as c]
[temporal.test.utils :as t]
[temporal.workflow :refer [defworkflow] :as w]))

(use-fixtures :once t/wrap-service)

Expand All @@ -24,7 +24,7 @@
(log/info "greeter-workflow:" args)
@(a/invoke async-greet-activity args {:retry-options {:maximum-attempts 1}}))

(deftest the-test
(deftest basic-async-test
(testing "Verifies that we can round-trip with an async task"
(let [workflow (t/create-workflow async-greeter-workflow)]
(c/start workflow {:name "Bob"})
Expand All @@ -34,3 +34,32 @@
(c/start workflow {:name "Charlie"})
(is (thrown? java.util.concurrent.ExecutionException
@(c/get-result workflow))))))

(defactivity async-child-activity
[ctx {:keys [name] :as args}]
(go
(log/info "async-child-activity:" args)
(if (= name "Charlie")
(ex-info "permission-denied" {})
(str "Hi, " name))))

(defworkflow async-child-workflow
[{:keys [name] :as args}]
(log/info "async-child-workflow:" args)
@(a/invoke async-child-activity args {:retry-options {:maximum-attempts 1}}))

(defworkflow async-parent-workflow
[args]
(log/info "async-parent-workflow:" args)
@(w/invoke async-child-workflow args {:retry-options {:maximum-attempts 1} :task-queue t/task-queue}))

(deftest child-workflow-test
(testing "Verifies that we can round-trip with an async task"
(let [workflow (t/create-workflow async-parent-workflow)]
(c/start workflow {:name "Bob"})
(is (= @(c/get-result workflow) "Hi, Bob"))))
(testing "Verifies that we can process errors in async mode"
(let [workflow (t/create-workflow async-parent-workflow)]
(c/start workflow {:name "Charlie"})
(is (thrown? java.util.concurrent.ExecutionException
@(c/get-result workflow))))))
Loading