Skip to content

Commit

Permalink
add new allSettled handler
Browse files Browse the repository at this point in the history
Signed-off-by: Bailey Kocin <bkocin@crossbeam.com>
  • Loading branch information
Bailey Kocin authored and ghaskins committed May 16, 2024
1 parent 06c2394 commit 0f6e80f
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 7 deletions.
2 changes: 1 addition & 1 deletion src/temporal/client/schedule.clj
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
```"
[^ScheduleClient client schedule-id options]
(let [schedule (s/schedule-> options)
schedule-options (s/schedule-options-> options)]
schedule-options (s/schedule-options-> (:schedule options))]
(log/tracef "create schedule:" schedule-id)
(.createSchedule client schedule-id schedule schedule-options)))

Expand Down
3 changes: 1 addition & 2 deletions src/temporal/internal/promise.clj
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
;; Copyright © Manetu, Inc. All rights reserved

(ns ^:no-doc temporal.internal.promise
(:require [taoensso.timbre :as log]
[promesa.protocols :as pt]
(:require [promesa.protocols :as pt]
[temporal.internal.utils :refer [->Func] :as u])
(:import [clojure.lang IDeref IBlockingDeref]
[io.temporal.workflow Promise]
Expand Down
3 changes: 1 addition & 2 deletions src/temporal/internal/schedule.clj
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
(ns ^:no-doc temporal.internal.schedule
(:require [clojure.walk :refer [stringify-keys]]
[temporal.internal.utils :as u]
(:require [temporal.internal.utils :as u]
[temporal.internal.workflow :as w])
(:import [io.temporal.api.enums.v1 ScheduleOverlapPolicy]
[io.temporal.client.schedules
Expand Down
27 changes: 26 additions & 1 deletion src/temporal/promise.clj
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,31 @@ promises returned from [[temporal.activity/invoke]] from within workflow context
(p/then (fn [_]
(mapv deref coll)))))

(defn allSettled
"Returns Promise that becomes completed when all arguments are completed, even in the face of errors.
*N.B. You must handle the exceptions in the returned promises when done*
Similar to [promesa/all](https://funcool.github.io/promesa/latest/promesa.core.html#var-all) but designed to work with
promises returned from [[temporal.activity/invoke]] from within workflow context.
For more Java SDK samples example look here:
https://github.com/temporalio/samples-java/tree/main/core/src/main/java/io/temporal/samples/batch
```clojure
(-> (allSettled [(a/invoke activity-a ..) (a/invoke activity-b ..)])
(promesa.core/then (fn [[a-result b-result]] ...)))
```
"
[coll]
(letfn [(wait! [^Promise p] (try (.get p) (catch Exception _)))]
(-> (into-array Promise (mapv wait! (->array coll)))
(Promise/allOf)
(pt/->PromiseAdapter)
;; The promises are all completed at this point,
;; this is just to use the promesa library
(p/then (fn [_] (mapv deref coll))))))

(defn race
"Returns Promise that becomes completed when any of the arguments are completed.
Expand All @@ -54,4 +79,4 @@ promises returned from [[temporal.activity/invoke]] from within workflow context
(defn rejected
"Returns a new, rejected promise"
[^Exception e]
(Workflow/newFailedPromise e))
(Workflow/newFailedPromise e))
35 changes: 34 additions & 1 deletion test/temporal/test/concurrency.clj
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,37 @@
(testing "Verifies that we can launch activities in parallel"
(let [workflow (t/create-workflow concurrency-workflow)]
(c/start workflow {})
(is (-> workflow c/get-result deref count (= 10))))))
(is (-> workflow c/get-result deref count (= 10))))))

(defactivity all-settled-activity
[ctx args] args)

(defworkflow all-settled-workflow
[args]
@(-> (pt/all (map #(a/invoke all-settled-activity %) (range 10)))
(p/then (fn [r] r))
(p/catch (fn [e] (:args (ex-data e))))))

(defactivity error-prone-activity
[ctx args]
(when (= args 5)
(throw (ex-info "error on 5" {:args args})))
args)

(defworkflow error-prone-workflow
[args]
@(-> (pt/all (map #(a/invoke error-prone-activity %) (range 10)))
(p/then (fn [r] r))
(p/catch (fn [e] (:args (ex-data e))))))

(deftest test-all-settled
(testing "Testing that allSettled waits for all the activities to complete
just like `p/all` does in spite of errors"
(let [workflow (t/create-workflow all-settled-workflow)]
(c/start workflow {})
(is (-> workflow c/get-result deref count (= 10)))))
(testing "Testing that allSettled waits for all the activities to complete
despite error and can return the errors"
(let [workflow (t/create-workflow error-prone-workflow)]
(c/start workflow {})
(is (-> workflow c/get-result deref (= 5))))))

0 comments on commit 0f6e80f

Please sign in to comment.