diff --git a/.changeset/stream-on-start.md b/.changeset/stream-on-start.md new file mode 100644 index 00000000000..ff07852d978 --- /dev/null +++ b/.changeset/stream-on-start.md @@ -0,0 +1,22 @@ +--- +"effect": minor +--- + +Implement `Stream.onStart` that adds an effect to be executed at the start of the stream. + +```ts +import { Console, Effect, Stream } from "effect"; + +const stream = Stream.make(1, 2, 3).pipe( + Stream.onStart(Console.log("Stream started")), + Stream.map((n) => n * 2), + Stream.tap((n) => Console.log(`after mapping: ${n}`)) +) + +Effect. runPromise(Stream. runCollect(stream)).then(console. log) +// Stream started +// after mapping: 2 +// after mapping: 4 +// after mapping: 6 +// { _id: 'Chunk', values: [ 2, 4, 6 ] } +``` diff --git a/packages/effect/src/Stream.ts b/packages/effect/src/Stream.ts index 6b695e26bcc..8a0356c5613 100644 --- a/packages/effect/src/Stream.ts +++ b/packages/effect/src/Stream.ts @@ -2918,6 +2918,38 @@ export const onDone: { (self: Stream, cleanup: () => Effect.Effect): Stream } = internal.onDone +/** + * Adds an effect to be executed at the start of the stream. + * + * @example + * import { Console, Effect, Stream } from "effect" + * + * const stream = Stream.make(1, 2, 3).pipe( + * Stream.onStart(Console.log("Stream started")), + * Stream.map((n) => n * 2), + * Stream.tap((n) => Console.log(`after mapping: ${n}`)) + * ) + * + * // Effect.runPromise(Stream.runCollect(stream)).then(console.log) + * // Stream started + * // after mapping: 2 + * // after mapping: 4 + * // after mapping: 6 + * // { _id: 'Chunk', values: [ 2, 4, 6 ] } + * + * @since 3.6.0 + * @category sequencing + */ +export const onStart: { + <_, E2, R2>( + effect: Effect.Effect<_, E2, R2> + ): (self: Stream) => Stream + ( + self: Stream, + effect: Effect.Effect<_, E2, R2> + ): Stream +} = internal.onStart + /** * Translates any failure into a stream termination, making the stream * infallible and all failures unchecked. diff --git a/packages/effect/src/internal/stream.ts b/packages/effect/src/internal/stream.ts index 402408abaea..0fa592eb753 100644 --- a/packages/effect/src/internal/stream.ts +++ b/packages/effect/src/internal/stream.ts @@ -4154,6 +4154,23 @@ export const onDone = dual< ) ) +/** @internal */ +export const onStart: { + <_, E2, R2>( + effect: Effect.Effect<_, E2, R2> + ): (self: Stream.Stream) => Stream.Stream + ( + self: Stream.Stream, + effect: Effect.Effect<_, E2, R2> + ): Stream.Stream +} = dual( + 2, + ( + self: Stream.Stream, + effect: Effect.Effect<_, E2, R2> + ): Stream.Stream => unwrap(Effect.as(effect, self)) +) + /** @internal */ export const orDie = (self: Stream.Stream): Stream.Stream => pipe(self, orDieWith(identity)) diff --git a/packages/effect/test/Stream/lifecycle.test.ts b/packages/effect/test/Stream/lifecycle.test.ts new file mode 100644 index 00000000000..59826f0ab7d --- /dev/null +++ b/packages/effect/test/Stream/lifecycle.test.ts @@ -0,0 +1,18 @@ +import * as Effect from "effect/Effect" +import * as Stream from "effect/Stream" +import * as it from "effect/test/utils/extend" +import { assert, describe } from "vitest" + +describe("Stream", () => { + it.effect("onStart", () => + Effect.gen(function*($) { + let counter = 0 + const result = yield* $( + Stream.make(1, 1), + Stream.onStart(Effect.sync(() => counter++)), + Stream.runCollect + ) + assert.strictEqual(counter, 1) + assert.deepStrictEqual(Array.from(result), [1, 1]) + })) +}) diff --git a/packages/effect/test/Stream/tapping.test.ts b/packages/effect/test/Stream/tapping.test.ts index 2d47b72940f..37bea2cbe91 100644 --- a/packages/effect/test/Stream/tapping.test.ts +++ b/packages/effect/test/Stream/tapping.test.ts @@ -97,7 +97,11 @@ describe("Stream", () => { const result = yield* $( Stream.make(1, 2, 3), Stream.tapBoth({ - onSuccess: (n) => pipe(Effect.fail("error"), Effect.when(() => n === 3)), + onSuccess: (n) => + pipe( + Effect.fail("error"), + Effect.when(() => n === 3) + ), onFailure: () => Effect.void }), Stream.either,