Skip to content

Commit 7c7d2e0

Browse files
authored
add Workflow.scope, a seperate Scope that only closes on completion (#5846)
1 parent 96c9537 commit 7c7d2e0

File tree

3 files changed

+69
-10
lines changed

3 files changed

+69
-10
lines changed

.changeset/young-areas-drop.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@effect/workflow": patch
3+
---
4+
5+
add Workflow.scope, a seperate Scope that only closes on completion

packages/workflow/src/Workflow.ts

Lines changed: 57 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import * as PrimaryKey from "effect/PrimaryKey"
1616
import type * as Schedule from "effect/Schedule"
1717
import * as Schema from "effect/Schema"
1818
import type * as AST from "effect/SchemaAST"
19-
import type * as Scope from "effect/Scope"
19+
import * as Scope from "effect/Scope"
2020
import { makeHashDigest } from "./internal/crypto.js"
2121
import type { WorkflowEngine, WorkflowInstance } from "./WorkflowEngine.js"
2222

@@ -537,6 +537,14 @@ export const intoResult = <A, E, R>(
537537
? Effect.failCause(cause as Cause.Cause<never>)
538538
: Effect.succeed(new Complete({ exit: Exit.failCause(cause) }))
539539
}),
540+
Effect.onExit((exit) => {
541+
if (Exit.isFailure(exit)) {
542+
return Scope.close(instance.scope, exit)
543+
} else if (exit.value._tag === "Complete") {
544+
return Scope.close(instance.scope, exit.value.exit)
545+
}
546+
return Effect.void
547+
}),
540548
Effect.uninterruptible
541549
)
542550
})
@@ -572,6 +580,53 @@ export const wrapActivityResult = <A, E, R>(
572580
})
573581
})
574582

583+
/**
584+
* Accesses the workflow scope.
585+
*
586+
* The workflow scope is only closed when the workflow execution fully
587+
* completes.
588+
*
589+
* @since 1.0.0
590+
* @category Scope
591+
*/
592+
export const scope: Effect.Effect<
593+
Scope.Scope,
594+
never,
595+
WorkflowInstance
596+
> = Effect.map(InstanceTag, (instance) => instance.scope as Scope.Scope)
597+
598+
/**
599+
* Provides the workflow scope to the given effect.
600+
*
601+
* The workflow scope is only closed when the workflow execution fully
602+
* completes.
603+
*
604+
* @since 1.0.0
605+
* @category Scope
606+
*/
607+
export const provideScope = <A, E, R>(
608+
effect: Effect.Effect<A, E, R>
609+
): Effect.Effect<A, E, Exclude<R, Scope.Scope> | WorkflowInstance> =>
610+
Effect.flatMap(scope, (scope) => Scope.extend(effect, scope))
611+
612+
/**
613+
* @since 1.0.0
614+
* @category Scope
615+
*/
616+
export const addFinalizer: <R>(
617+
f: (exit: Exit.Exit<unknown, unknown>) => Effect.Effect<void, never, R>
618+
) => Effect.Effect<
619+
void,
620+
never,
621+
WorkflowInstance | R
622+
> = Effect.fnUntraced(function*<R>(
623+
f: (exit: Exit.Exit<unknown, unknown>) => Effect.Effect<void, never, R>
624+
) {
625+
const scope = (yield* InstanceTag).scope
626+
const runtime = yield* Effect.runtime<R>()
627+
yield* Scope.addFinalizerExit(scope, (exit) => Effect.provide(f(exit), runtime))
628+
})
629+
575630
/**
576631
* Add compensation logic to an effect inside a Workflow. The compensation finalizer will be
577632
* called if the entire workflow fails, allowing you to perform cleanup or
@@ -600,14 +655,7 @@ export const withCompensation: {
600655
Effect.uninterruptibleMask((restore) =>
601656
Effect.tap(
602657
restore(effect),
603-
(value) =>
604-
Effect.contextWithEffect((context: Context.Context<WorkflowInstance>) =>
605-
Effect.addFinalizer((exit) =>
606-
Exit.isSuccess(exit) || Context.get(context, InstanceTag).suspended
607-
? Effect.void
608-
: compensation(value, exit.cause)
609-
)
610-
)
658+
(value) => addFinalizer((exit) => Exit.isSuccess(exit) ? Effect.void : compensation(value, exit.cause))
611659
)
612660
))
613661

packages/workflow/src/WorkflowEngine.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import * as Layer from "effect/Layer"
1111
import * as Option from "effect/Option"
1212
import * as Schedule from "effect/Schedule"
1313
import * as Schema from "effect/Schema"
14-
import type * as Scope from "effect/Scope"
14+
import * as Scope from "effect/Scope"
1515
import type * as Activity from "./Activity.js"
1616
import type { DurableClock } from "./DurableClock.js"
1717
import type * as DurableDeferred from "./DurableDeferred.js"
@@ -199,6 +199,11 @@ export class WorkflowInstance extends Context.Tag("@effect/workflow/WorkflowEngi
199199
*/
200200
readonly workflow: Workflow.Any
201201

202+
/**
203+
* The workflow scope, that represents the lifetime of the workflow.
204+
*/
205+
readonly scope: Scope.CloseableScope
206+
202207
/**
203208
* Whether the workflow has requested to be suspended.
204209
*/
@@ -228,6 +233,7 @@ export class WorkflowInstance extends Context.Tag("@effect/workflow/WorkflowEngi
228233
return WorkflowInstance.of({
229234
executionId,
230235
workflow,
236+
scope: Effect.runSync(Scope.make()),
231237
suspended: false,
232238
interrupted: false,
233239
cause: undefined,

0 commit comments

Comments
 (0)