Skip to content

Commit

Permalink
add Stream.onStart api (#3302)
Browse files Browse the repository at this point in the history
Co-authored-by: Tim <hello@timsmart.co>
  • Loading branch information
2 people authored and gcanti committed Jul 26, 2024
1 parent 0d1e042 commit 3dba031
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 1 deletion.
22 changes: 22 additions & 0 deletions .changeset/stream-on-start.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
---
"effect": minor
---

Implement `Stream.onStart` that adds an effect to be executed at the start of the stream.

```ts
import { Console, Effect, Stream } from "effect";

const stream = Stream.make(1, 2, 3).pipe(
Stream.onStart(Console.log("Stream started")),
Stream.map((n) => n * 2),
Stream.tap((n) => Console.log(`after mapping: ${n}`))
)

Effect. runPromise(Stream. runCollect(stream)).then(console. log)
// Stream started
// after mapping: 2
// after mapping: 4
// after mapping: 6
// { _id: 'Chunk', values: [ 2, 4, 6 ] }
```
32 changes: 32 additions & 0 deletions packages/effect/src/Stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2918,6 +2918,38 @@ export const onDone: {
<A, E, R, X, R2>(self: Stream<A, E, R>, cleanup: () => Effect.Effect<X, never, R2>): Stream<A, E, R | R2>
} = internal.onDone

/**
* Adds an effect to be executed at the start of the stream.
*
* @example
* import { Console, Effect, Stream } from "effect"
*
* const stream = Stream.make(1, 2, 3).pipe(
* Stream.onStart(Console.log("Stream started")),
* Stream.map((n) => n * 2),
* Stream.tap((n) => Console.log(`after mapping: ${n}`))
* )
*
* // Effect.runPromise(Stream.runCollect(stream)).then(console.log)
* // Stream started
* // after mapping: 2
* // after mapping: 4
* // after mapping: 6
* // { _id: 'Chunk', values: [ 2, 4, 6 ] }
*
* @since 3.6.0
* @category sequencing
*/
export const onStart: {
<_, E2, R2>(
effect: Effect.Effect<_, E2, R2>
): <A, E, R>(self: Stream<A, E, R>) => Stream<A, E2 | E, R2 | R>
<A, E, R, _, E2, R2>(
self: Stream<A, E, R>,
effect: Effect.Effect<_, E2, R2>
): Stream<A, E | E2, R | R2>
} = internal.onStart

/**
* Translates any failure into a stream termination, making the stream
* infallible and all failures unchecked.
Expand Down
17 changes: 17 additions & 0 deletions packages/effect/src/internal/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4154,6 +4154,23 @@ export const onDone = dual<
)
)

/** @internal */
export const onStart: {
<_, E2, R2>(
effect: Effect.Effect<_, E2, R2>
): <A, E, R>(self: Stream.Stream<A, E, R>) => Stream.Stream<A, E2 | E, R2 | R>
<A, E, R, _, E2, R2>(
self: Stream.Stream<A, E, R>,
effect: Effect.Effect<_, E2, R2>
): Stream.Stream<A, E | E2, R | R2>
} = dual(
2,
<A, E, R, _, E2, R2>(
self: Stream.Stream<A, E, R>,
effect: Effect.Effect<_, E2, R2>
): Stream.Stream<A, E | E2, R | R2> => unwrap(Effect.as(effect, self))
)

/** @internal */
export const orDie = <A, E, R>(self: Stream.Stream<A, E, R>): Stream.Stream<A, never, R> =>
pipe(self, orDieWith(identity))
Expand Down
18 changes: 18 additions & 0 deletions packages/effect/test/Stream/lifecycle.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import * as Effect from "effect/Effect"
import * as Stream from "effect/Stream"
import * as it from "effect/test/utils/extend"
import { assert, describe } from "vitest"

describe("Stream", () => {
it.effect("onStart", () =>
Effect.gen(function*($) {
let counter = 0
const result = yield* $(
Stream.make(1, 1),
Stream.onStart(Effect.sync(() => counter++)),
Stream.runCollect
)
assert.strictEqual(counter, 1)
assert.deepStrictEqual(Array.from(result), [1, 1])
}))
})
6 changes: 5 additions & 1 deletion packages/effect/test/Stream/tapping.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,11 @@ describe("Stream", () => {
const result = yield* $(
Stream.make(1, 2, 3),
Stream.tapBoth({
onSuccess: (n) => pipe(Effect.fail("error"), Effect.when(() => n === 3)),
onSuccess: (n) =>
pipe(
Effect.fail("error"),
Effect.when(() => n === 3)
),
onFailure: () => Effect.void
}),
Stream.either,
Expand Down

0 comments on commit 3dba031

Please sign in to comment.