Skip to content

Commit

Permalink
use scheduler instead of queueMicrotask
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-smart committed Jul 19, 2024
1 parent e1364d0 commit c4f1c3e
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 4 deletions.
5 changes: 4 additions & 1 deletion packages/effect/src/internal/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import * as Either from "../Either.js"
import * as Equal from "../Equal.js"
import * as Exit from "../Exit.js"
import * as Fiber from "../Fiber.js"
import * as FiberRef from "../FiberRef.js"
import type { LazyArg } from "../Function.js"
import { constTrue, dual, identity, pipe } from "../Function.js"
import * as Layer from "../Layer.js"
Expand Down Expand Up @@ -628,7 +629,9 @@ export const asyncPush = <A, E = never, R = never>(
queueFromBufferOptionsPush<A, E>(options),
Queue.shutdown
).pipe(
Effect.tap((queue) => register(emit.makePush(queue))),
Effect.tap((queue) =>
FiberRef.getWith(FiberRef.currentScheduler, (scheduler) => register(emit.makePush(queue, scheduler)))
),
Effect.map((queue) => {
const loop: Channel.Channel<Chunk.Chunk<A>, unknown, E> = core.flatMap(Queue.take(queue), (item) =>
Exit.isExit(item)
Expand Down
10 changes: 7 additions & 3 deletions packages/effect/src/internal/stream/emit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import * as Exit from "../../Exit.js"
import { pipe } from "../../Function.js"
import * as Option from "../../Option.js"
import type * as Queue from "../../Queue.js"
import type * as Scheduler from "../../Scheduler.js"
import type * as Emit from "../../StreamEmit.js"

/** @internal */
Expand Down Expand Up @@ -47,7 +48,10 @@ export const make = <R, E, A, B>(
}

/** @internal */
export const makePush = <E, A>(queue: Queue.Queue<Array<A> | Exit.Exit<void, E>>): Emit.EmitOpsPush<E, A> => {
export const makePush = <E, A>(
queue: Queue.Queue<Array<A> | Exit.Exit<void, E>>,
scheduler: Scheduler.Scheduler
): Emit.EmitOpsPush<E, A> => {
let finished = false
let buffer: Array<A> = []
let running = false
Expand All @@ -62,7 +66,7 @@ export const makePush = <E, A>(queue: Queue.Queue<Array<A> | Exit.Exit<void, E>>
}
if (!running) {
running = true
queueMicrotask(flush)
scheduler.scheduleTask(flush, 0)
}
return true
}
Expand All @@ -88,7 +92,7 @@ export const makePush = <E, A>(queue: Queue.Queue<Array<A> | Exit.Exit<void, E>>
buffer.push(value)
if (!running) {
running = true
queueMicrotask(flush)
scheduler.scheduleTask(flush, 0)
}
return true
},
Expand Down

0 comments on commit c4f1c3e

Please sign in to comment.