diff --git a/.changeset/stream-on-start.md b/.changeset/stream-on-start.md
new file mode 100644
index 0000000000..ff07852d97
--- /dev/null
+++ b/.changeset/stream-on-start.md
@@ -0,0 +1,22 @@
+---
+"effect": minor
+---
+
+Implement `Stream.onStart` that adds an effect to be executed at the start of the stream.
+
+```ts
+import { Console, Effect, Stream } from "effect";
+
+const stream = Stream.make(1, 2, 3).pipe(
+ Stream.onStart(Console.log("Stream started")),
+ Stream.map((n) => n * 2),
+ Stream.tap((n) => Console.log(`after mapping: ${n}`))
+)
+
+Effect. runPromise(Stream. runCollect(stream)).then(console. log)
+// Stream started
+// after mapping: 2
+// after mapping: 4
+// after mapping: 6
+// { _id: 'Chunk', values: [ 2, 4, 6 ] }
+```
diff --git a/packages/effect/src/Stream.ts b/packages/effect/src/Stream.ts
index 338e2ca1ae..4f148dc6f8 100644
--- a/packages/effect/src/Stream.ts
+++ b/packages/effect/src/Stream.ts
@@ -2918,6 +2918,38 @@ export const onDone: {
(self: Stream, cleanup: () => Effect.Effect): Stream
} = internal.onDone
+/**
+ * Adds an effect to be executed at the start of the stream.
+ *
+ * @example
+ * import { Console, Effect, Stream } from "effect"
+ *
+ * const stream = Stream.make(1, 2, 3).pipe(
+ * Stream.onStart(Console.log("Stream started")),
+ * Stream.map((n) => n * 2),
+ * Stream.tap((n) => Console.log(`after mapping: ${n}`))
+ * )
+ *
+ * // Effect.runPromise(Stream.runCollect(stream)).then(console.log)
+ * // Stream started
+ * // after mapping: 2
+ * // after mapping: 4
+ * // after mapping: 6
+ * // { _id: 'Chunk', values: [ 2, 4, 6 ] }
+ *
+ * @since 3.6.0
+ * @category sequencing
+ */
+export const onStart: {
+ <_, E2, R2>(
+ effect: Effect.Effect<_, E2, R2>
+ ): (self: Stream) => Stream
+ (
+ self: Stream,
+ effect: Effect.Effect<_, E2, R2>
+ ): Stream
+} = internal.onStart
+
/**
* Translates any failure into a stream termination, making the stream
* infallible and all failures unchecked.
diff --git a/packages/effect/src/internal/stream.ts b/packages/effect/src/internal/stream.ts
index 402408abae..0fa592eb75 100644
--- a/packages/effect/src/internal/stream.ts
+++ b/packages/effect/src/internal/stream.ts
@@ -4154,6 +4154,23 @@ export const onDone = dual<
)
)
+/** @internal */
+export const onStart: {
+ <_, E2, R2>(
+ effect: Effect.Effect<_, E2, R2>
+ ): (self: Stream.Stream) => Stream.Stream
+ (
+ self: Stream.Stream,
+ effect: Effect.Effect<_, E2, R2>
+ ): Stream.Stream
+} = dual(
+ 2,
+ (
+ self: Stream.Stream,
+ effect: Effect.Effect<_, E2, R2>
+ ): Stream.Stream => unwrap(Effect.as(effect, self))
+)
+
/** @internal */
export const orDie = (self: Stream.Stream): Stream.Stream =>
pipe(self, orDieWith(identity))
diff --git a/packages/effect/test/Stream/lifecycle.test.ts b/packages/effect/test/Stream/lifecycle.test.ts
new file mode 100644
index 0000000000..59826f0ab7
--- /dev/null
+++ b/packages/effect/test/Stream/lifecycle.test.ts
@@ -0,0 +1,18 @@
+import * as Effect from "effect/Effect"
+import * as Stream from "effect/Stream"
+import * as it from "effect/test/utils/extend"
+import { assert, describe } from "vitest"
+
+describe("Stream", () => {
+ it.effect("onStart", () =>
+ Effect.gen(function*($) {
+ let counter = 0
+ const result = yield* $(
+ Stream.make(1, 1),
+ Stream.onStart(Effect.sync(() => counter++)),
+ Stream.runCollect
+ )
+ assert.strictEqual(counter, 1)
+ assert.deepStrictEqual(Array.from(result), [1, 1])
+ }))
+})
diff --git a/packages/effect/test/Stream/tapping.test.ts b/packages/effect/test/Stream/tapping.test.ts
index 2d47b72940..37bea2cbe9 100644
--- a/packages/effect/test/Stream/tapping.test.ts
+++ b/packages/effect/test/Stream/tapping.test.ts
@@ -97,7 +97,11 @@ describe("Stream", () => {
const result = yield* $(
Stream.make(1, 2, 3),
Stream.tapBoth({
- onSuccess: (n) => pipe(Effect.fail("error"), Effect.when(() => n === 3)),
+ onSuccess: (n) =>
+ pipe(
+ Effect.fail("error"),
+ Effect.when(() => n === 3)
+ ),
onFailure: () => Effect.void
}),
Stream.either,