From 5e19b97878cc828126247dd1ece9b09beb360387 Mon Sep 17 00:00:00 2001 From: Ruslan Shevchenko Date: Sun, 19 May 2024 11:04:18 +0300 Subject: [PATCH] fixed possible read from finished stream in native --- .../BaseUnfoldCpsAsyncEmitAbsorber.scala | 31 ++++++++++++------- 1 file changed, 19 insertions(+), 12 deletions(-) diff --git a/native/src/main/scala/cps/stream/BaseUnfoldCpsAsyncEmitAbsorber.scala b/native/src/main/scala/cps/stream/BaseUnfoldCpsAsyncEmitAbsorber.scala index 6115c58b..7f08d07b 100644 --- a/native/src/main/scala/cps/stream/BaseUnfoldCpsAsyncEmitAbsorber.scala +++ b/native/src/main/scala/cps/stream/BaseUnfoldCpsAsyncEmitAbsorber.scala @@ -28,7 +28,7 @@ trait BaseUnfoldCpsAsyncEmitAbsorber[R,F[_],C <: CpsMonadContext[F], T](using ec case class Finished(result: Try[Unit]) extends SupplyEventRecord type ConsumerCallback = Try[SupplyEventRecord]=>Unit - type OneThreadTaskCallback = Unit => Unit + //type OneThreadTaskCallback = Unit => Unit class State: @@ -38,19 +38,26 @@ trait BaseUnfoldCpsAsyncEmitAbsorber[R,F[_],C <: CpsMonadContext[F], T](using ec private val consumerEvents = mutable.Queue[ConsumerCallback]() private def queueEmit(v:T): F[Unit] = - val p = Promise[Unit]() - val emitted = Emitted(v, x => p.tryComplete(x) ) - supplyEvents.enqueue(emitted) - asyncMonad.adoptCallbackStyle{ emitCallback => - p.future.onComplete(emitCallback) - } + if (finishRef.get() != null) then + // impossible: attempt to write to closed stream + asyncMonad.error(new IllegalStateException("Stream is closed")) + else + val p = Promise[Unit]() + val emitted = Emitted(v, x => p.tryComplete(x) ) + supplyEvents.enqueue(emitted) + asyncMonad.adoptCallbackStyle{ emitCallback => + p.future.onComplete(emitCallback) + } private def queueConsumer(): F[SupplyEventRecord] = - val p = Promise[SupplyEventRecord]() - consumerEvents.enqueue( x => p.complete(x)) - asyncMonad.adoptCallbackStyle[SupplyEventRecord]{ evalCallback => - p.future.onComplete(evalCallback) - } + if (finishRef.get() != null) then + asyncMonad.pure(Finished(Failure(new CancellationException("Stream is closed")))) + else + val p = Promise[SupplyEventRecord]() + consumerEvents.enqueue( x => p.complete(x)) + asyncMonad.adoptCallbackStyle[SupplyEventRecord]{ evalCallback => + p.future.onComplete(evalCallback) + } private def tryDequeConsumer(): Option[ConsumerCallback] = if (consumerEvents.isEmpty) None