Skip to content

Commit

Permalink
add Stream.asyncPush api
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-smart committed Jul 17, 2024
1 parent 7a29d3c commit 55d520e
Show file tree
Hide file tree
Showing 5 changed files with 289 additions and 0 deletions.
35 changes: 35 additions & 0 deletions .changeset/forty-beers-refuse.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
---
"effect": minor
---

add Stream.asyncPush api

This api creates a stream from an external push-based resource.

You can use the `emit` helper to emit values to the stream. You can also use
the `emit` helper to signal the end of the stream by using apis such as
`emit.end` or `emit.fail`.

By default it uses a buffer size of 16 and a dropping strategy to prevent
memory issues. You can customize the buffer size and strategy by passing an
object as the second argument with the `bufferSize` and `strategy` fields.

```ts
import { Effect, Stream } from "effect";

Stream.asyncPush<string>(
(emit) =>
Effect.acquireRelease(
Effect.gen(function* () {
yield* Effect.log("subscribing");
return setInterval(() => emit.single("tick"), 1000);
}),
(handle) =>
Effect.gen(function* () {
yield* Effect.log("unsubscribing");
clearInterval(handle);
}),
),
{ bufferSize: 16, strategy: "dropping" },
);
```
40 changes: 40 additions & 0 deletions packages/effect/src/Stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,46 @@ export const asyncEffect: <A, E = never, R = never>(
} | undefined
) => Stream<A, E, R> = internal.asyncEffect

/**
* Creates a stream from an external push-based resource.
*
* You can use the `emit` helper to emit values to the stream. The `emit` helper
* returns a boolean indicating whether the value was emitted or not.
*
* You can also use the `emit` helper to signal the end of the stream by
* using apis such as `emit.end` or `emit.fail`.
*
* By default it uses a buffer size of 16 and a dropping strategy to prevent
* memory issues. You can customize the buffer size and strategy by passing an
* object as the second argument with the `bufferSize` and `strategy` fields.
*
* @example
* import { Effect, Stream } from "effect"
*
* Stream.asyncPush<string>((emit) =>
* Effect.acquireRelease(
* Effect.gen(function*() {
* yield* Effect.log("subscribing")
* return setInterval(() => emit.single("tick"), 1000)
* }),
* (handle) =>
* Effect.gen(function*() {
* yield* Effect.log("unsubscribing")
* clearInterval(handle)
* })
* ), { bufferSize: 16, strategy: "dropping" })
*
* @since 3.6.0
* @category constructors
*/
export const asyncPush: <A, E = never, R = never>(
register: (emit: Emit.EmitOpsPush<E, A>) => Effect.Effect<unknown, never, R | Scope.Scope>,
options?: { bufferSize: "unbounded" } | {
readonly bufferSize?: number
readonly strategy?: "dropping" | "sliding" | undefined
} | undefined
) => Stream<A, E, Exclude<R, Scope.Scope>> = internal.asyncPush

/**
* Creates a stream from an asynchronous callback that can be called multiple
* times. The registration of the callback itself returns an a scoped
Expand Down
47 changes: 47 additions & 0 deletions packages/effect/src/StreamEmit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,50 @@ export interface EmitOps<in R, in E, in A, out B> {
*/
single(value: A): Promise<B>
}

/**
* @since 3.6.0
* @category models
*/
export interface EmitOpsPush<in E, in A> {
/**
* Emits a chunk containing the specified values.
*/
chunk(chunk: Chunk.Chunk<A>): boolean

/**
* Emits a chunk containing the specified values.
*/
array(chunk: ReadonlyArray<A>): boolean

/**
* Terminates with a cause that dies with the specified defect.
*/
die<Err>(defect: Err): void

/**
* Terminates with a cause that dies with a `Throwable` with the specified
* message.
*/
dieMessage(message: string): void

/**
* Terminates with an end of stream signal.
*/
end(): void

/**
* Terminates with the specified error.
*/
fail(error: E): void

/**
* Terminates the stream with the specified cause.
*/
halt(cause: Cause.Cause<E>): void

/**
* Emits a chunk containing the specified value.
*/
single(value: A): boolean
}
111 changes: 111 additions & 0 deletions packages/effect/src/internal/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,117 @@ export const asyncEffect = <A, E = never, R = never>(
fromChannel
)

const constEofPush = Symbol.for("effect/Stream/constEofPush")
const queueFromBufferOptionsPush = <A>(
options?: { bufferSize: "unbounded" } | {
readonly bufferSize?: number
readonly strategy?: "dropping" | "sliding" | undefined
} | undefined
): Effect.Effect<Queue.Queue<A | typeof constEofPush>> => {
if (options?.bufferSize === "unbounded") {
return Queue.unbounded()
}
switch (options?.strategy) {
case "sliding":
return Queue.sliding(options?.bufferSize ?? 16)
default:
return Queue.dropping(options?.bufferSize ?? 16)
}
}

class EmitOpsPushImpl<E, A> implements Emit.EmitOpsPush<E, A> {
constructor(
readonly queue: Queue.Queue<A | typeof constEofPush>,
readonly deferred: Deferred.Deferred<void, E>
) {}
finished = false
single(value: A) {
if (this.finished) return false
return this.queue.unsafeOffer(value)
}
chunk(chunk: Chunk.Chunk<A>) {
if (this.finished) return false
for (const value of chunk) {
if (!this.queue.unsafeOffer(value)) {
return false
}
}
return true
}
array(arr: ReadonlyArray<A>) {
if (this.finished) return false
for (let i = 0; i < arr.length; i++) {
if (!this.queue.unsafeOffer(arr[i])) {
return false
}
}
return true
}
done(exit: Exit.Exit<A, E>) {
if (this.finished) {
return
} else if (exit._tag === "Success") {
this.single(exit.value)
}
this.queue.unsafeOffer(constEofPush)
Deferred.unsafeDone(this.deferred, exit._tag === "Success" ? Exit.void : exit)
}
end() {
if (this.finished) return
this.queue.unsafeOffer(constEofPush)
Deferred.unsafeDone(this.deferred, Exit.void)
}
halt(cause: Cause.Cause<E>) {
this.done(Exit.failCause(cause))
}
fail(error: E) {
this.done(Exit.fail(error))
}
die<Err>(defect: Err): void {
this.done(Exit.die(defect))
}
dieMessage(message: string): void {
this.done(Exit.die(new Error(message)))
}
}

/** @internal */
export const asyncPush = <A, E = never, R = never>(
register: (emit: Emit.EmitOpsPush<E, A>) => Effect.Effect<unknown, never, R | Scope.Scope>,
options?: { bufferSize: "unbounded" } | {
readonly bufferSize?: number
readonly strategy?: "dropping" | "sliding" | undefined
} | undefined
): Stream.Stream<A, E, Exclude<R, Scope.Scope>> =>
Effect.acquireRelease(
queueFromBufferOptionsPush<A>(options),
Queue.shutdown
).pipe(
Effect.bindTo("queue"),
Effect.bind("deferred", () => Deferred.make<void, E>()),
Effect.tap(({ deferred, queue }) => register(new EmitOpsPushImpl(queue, deferred))),
Effect.map(({ deferred, queue }) => {
const onEnd = channel.zipRight(Deferred.await(deferred), core.void)
const loop: Channel.Channel<Chunk.Chunk<A>, unknown, E> = core.flatMap(
Effect.zipRight(
Effect.yieldNow(), // allow batches to accumulate
Queue.takeBetween(queue, 1, DefaultChunkSize)
),
(chunk) => {
const end = Chunk.unsafeLast(chunk) === constEofPush
const items: Chunk.Chunk<A> = end ? Chunk.dropRight(chunk, 1) as any : chunk
if (end) {
return channel.zipRight(core.write(items), onEnd)
}
return channel.zipRight(core.write(items), loop)
}
)
return loop
}),
channel.unwrapScoped,
fromChannel
)

/** @internal */
export const asyncScoped = <A, E = never, R = never>(
register: (emit: Emit.Emit<R, E, A, void>) => Effect.Effect<unknown, E, R | Scope.Scope>,
Expand Down
56 changes: 56 additions & 0 deletions packages/effect/test/Stream/async.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -406,4 +406,60 @@ describe("Stream", () => {
yield* $(Fiber.interrupt(fiber), Effect.exit)
assert.isFalse(result)
}))

it.effect("asyncPush", () =>
Effect.gen(function*() {
const array = [1, 2, 3, 4, 5]
const latch = yield* Deferred.make<void>()
const fiber = yield* Stream.asyncPush<number>((emit) => {
array.forEach((n) => {
emit.single(n)
})
return pipe(
Deferred.succeed(latch, void 0),
Effect.asVoid
)
}).pipe(
Stream.take(array.length),
Stream.run(Sink.collectAll()),
Effect.fork
)
yield* Deferred.await(latch)
const result = yield* Fiber.join(fiber)
assert.deepStrictEqual(Array.from(result), array)
}))

it.effect("asyncPush - signals the end of the stream", () =>
Effect.gen(function*() {
const result = yield* Stream.asyncPush<number>((emit) => {
emit.end()
return Effect.void
}).pipe(Stream.runCollect)
assert.isTrue(Chunk.isEmpty(result))
}))

it.effect("asyncPush - handles errors", () =>
Effect.gen(function*() {
const error = new Cause.RuntimeException("boom")
const result = yield* Stream.asyncPush<number, Cause.RuntimeException>((emit) => {
emit.fail(error)
return Effect.void
}).pipe(
Stream.runCollect,
Effect.exit
)
assert.deepStrictEqual(result, Exit.fail(error))
}))

it.effect("asyncPush - handles defects", () =>
Effect.gen(function*() {
const error = new Cause.RuntimeException("boom")
const result = yield* Stream.asyncPush<number, Cause.RuntimeException>(() => {
throw error
}).pipe(
Stream.runCollect,
Effect.exit
)
assert.deepStrictEqual(result, Exit.die(error))
}))
})

0 comments on commit 55d520e

Please sign in to comment.