Skip to content

Commit

Permalink
fixed possible read from finished stream in native
Browse files Browse the repository at this point in the history
  • Loading branch information
rssh committed May 19, 2024
1 parent 5f51e47 commit 5e19b97
Showing 1 changed file with 19 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ trait BaseUnfoldCpsAsyncEmitAbsorber[R,F[_],C <: CpsMonadContext[F], T](using ec
case class Finished(result: Try[Unit]) extends SupplyEventRecord

type ConsumerCallback = Try[SupplyEventRecord]=>Unit
type OneThreadTaskCallback = Unit => Unit
//type OneThreadTaskCallback = Unit => Unit


class State:
Expand All @@ -38,19 +38,26 @@ trait BaseUnfoldCpsAsyncEmitAbsorber[R,F[_],C <: CpsMonadContext[F], T](using ec
private val consumerEvents = mutable.Queue[ConsumerCallback]()

private def queueEmit(v:T): F[Unit] =
val p = Promise[Unit]()
val emitted = Emitted(v, x => p.tryComplete(x) )
supplyEvents.enqueue(emitted)
asyncMonad.adoptCallbackStyle{ emitCallback =>
p.future.onComplete(emitCallback)
}
if (finishRef.get() != null) then
// impossible: attempt to write to closed stream
asyncMonad.error(new IllegalStateException("Stream is closed"))
else
val p = Promise[Unit]()
val emitted = Emitted(v, x => p.tryComplete(x) )
supplyEvents.enqueue(emitted)
asyncMonad.adoptCallbackStyle{ emitCallback =>
p.future.onComplete(emitCallback)
}

private def queueConsumer(): F[SupplyEventRecord] =
val p = Promise[SupplyEventRecord]()
consumerEvents.enqueue( x => p.complete(x))
asyncMonad.adoptCallbackStyle[SupplyEventRecord]{ evalCallback =>
p.future.onComplete(evalCallback)
}
if (finishRef.get() != null) then
asyncMonad.pure(Finished(Failure(new CancellationException("Stream is closed"))))
else
val p = Promise[SupplyEventRecord]()
consumerEvents.enqueue( x => p.complete(x))
asyncMonad.adoptCallbackStyle[SupplyEventRecord]{ evalCallback =>
p.future.onComplete(evalCallback)
}

private def tryDequeConsumer(): Option[ConsumerCallback] =
if (consumerEvents.isEmpty) None
Expand Down

0 comments on commit 5e19b97

Please sign in to comment.