diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 07aa63573e..f8fe4a932e 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -28,7 +28,7 @@ jobs: strategy: matrix: os: [ubuntu-22.04] - scala: [3.1.3, 2.12.17, 2.13.8] + scala: [3.2.0, 2.12.17, 2.13.8] java: [temurin@17] project: [rootJS, rootJVM, rootNative] runs-on: ${{ matrix.os }} @@ -161,32 +161,32 @@ jobs: ~/Library/Caches/Coursier/v1 key: ${{ runner.os }}-sbt-cache-v2-${{ hashFiles('**/*.sbt') }}-${{ hashFiles('project/build.properties') }} - - name: Download target directories (3.1.3, rootJS) + - name: Download target directories (3.2.0, rootJS) uses: actions/download-artifact@v2 with: - name: target-${{ matrix.os }}-${{ matrix.java }}-3.1.3-rootJS + name: target-${{ matrix.os }}-${{ matrix.java }}-3.2.0-rootJS - - name: Inflate target directories (3.1.3, rootJS) + - name: Inflate target directories (3.2.0, rootJS) run: | tar xf targets.tar rm targets.tar - - name: Download target directories (3.1.3, rootJVM) + - name: Download target directories (3.2.0, rootJVM) uses: actions/download-artifact@v2 with: - name: target-${{ matrix.os }}-${{ matrix.java }}-3.1.3-rootJVM + name: target-${{ matrix.os }}-${{ matrix.java }}-3.2.0-rootJVM - - name: Inflate target directories (3.1.3, rootJVM) + - name: Inflate target directories (3.2.0, rootJVM) run: | tar xf targets.tar rm targets.tar - - name: Download target directories (3.1.3, rootNative) + - name: Download target directories (3.2.0, rootNative) uses: actions/download-artifact@v2 with: - name: target-${{ matrix.os }}-${{ matrix.java }}-3.1.3-rootNative + name: target-${{ matrix.os }}-${{ matrix.java }}-3.2.0-rootNative - - name: Inflate target directories (3.1.3, rootNative) + - name: Inflate target directories (3.2.0, rootNative) run: | tar xf targets.tar rm targets.tar diff --git a/build.sbt b/build.sbt index f70ad60100..f949385fc0 100644 --- a/build.sbt +++ b/build.sbt @@ -10,7 +10,7 @@ ThisBuild / startYear := Some(2013) val NewScala = "2.13.8" -ThisBuild / crossScalaVersions := Seq("3.1.3", "2.12.17", NewScala) +ThisBuild / crossScalaVersions := Seq("3.2.0", "2.12.17", NewScala) ThisBuild / tlVersionIntroduced := Map("3" -> "3.0.3") ThisBuild / githubWorkflowOSes := Seq("ubuntu-22.04") @@ -176,6 +176,9 @@ ThisBuild / mimaBinaryIssueFilters ++= Seq( ), ProblemFilters.exclude[ReversedMissingMethodProblem]( "fs2.io.net.tls.TLSContext#Builder.insecureResource" + ), + ProblemFilters.exclude[DirectMissingMethodProblem]( // something funky in Scala 3.2.0 ... + "fs2.io.net.SocketGroupCompanionPlatform#AsyncSocketGroup.this" ) ) @@ -210,9 +213,9 @@ lazy val core = crossProject(JVMPlatform, JSPlatform, NativePlatform) libraryDependencies ++= Seq( "org.typelevel" %%% "cats-core" % "2.8.0", "org.typelevel" %%% "cats-laws" % "2.8.0" % Test, - "org.typelevel" %%% "cats-effect" % "3.3.14", - "org.typelevel" %%% "cats-effect-laws" % "3.3.14" % Test, - "org.typelevel" %%% "cats-effect-testkit" % "3.3.14" % Test, + "org.typelevel" %%% "cats-effect" % "3.4.0-RC1", + "org.typelevel" %%% "cats-effect-laws" % "3.4.0-RC1" % Test, + "org.typelevel" %%% "cats-effect-testkit" % "3.4.0-RC1" % Test, "org.scodec" %%% "scodec-bits" % "1.1.34", "org.typelevel" %%% "scalacheck-effect-munit" % "2.0.0-M2" % Test, "org.typelevel" %%% "munit-cats-effect" % "2.0.0-M3" % Test, diff --git a/io/js/src/main/scala/fs2/io/ioplatform.scala b/io/js/src/main/scala/fs2/io/ioplatform.scala index 424efcb2b6..b2eb620aef 100644 --- a/io/js/src/main/scala/fs2/io/ioplatform.scala +++ b/io/js/src/main/scala/fs2/io/ioplatform.scala @@ -30,6 +30,7 @@ import cats.effect.std.Dispatcher import cats.effect.std.Queue import cats.effect.syntax.all._ import cats.syntax.all._ +import fs2.concurrent.Channel import fs2.io.internal.MicrotaskExecutor import fs2.io.internal.facade @@ -59,8 +60,8 @@ private[fs2] trait ioplatform { destroyIfCanceled: Boolean = true )(thunk: => R)(implicit F: Async[F]): Resource[F, (R, Stream[F, Byte])] = (for { - dispatcher <- Dispatcher[F] - queue <- Queue.synchronous[F, Option[Unit]].toResource + dispatcher <- Dispatcher.sequential[F] + channel <- Channel.unbounded[F, Unit].toResource error <- F.deferred[Throwable].toResource readableResource = for { readable <- Resource.makeCase(F.delay(thunk)) { @@ -79,9 +80,9 @@ private[fs2] trait ioplatform { else F.unit } - _ <- readable.registerListener[F, Any]("readable", dispatcher)(_ => queue.offer(Some(()))) - _ <- readable.registerListener[F, Any]("end", dispatcher)(_ => queue.offer(None)) - _ <- readable.registerListener[F, Any]("close", dispatcher)(_ => queue.offer(None)) + _ <- readable.registerListener[F, Any]("readable", dispatcher)(_ => channel.send(()).void) + _ <- readable.registerListener[F, Any]("end", dispatcher)(_ => channel.close.void) + _ <- readable.registerListener[F, Any]("close", dispatcher)(_ => channel.close.void) _ <- readable.registerListener[F, js.Error]("error", dispatcher) { e => error.complete(js.JavaScriptException(e)).void } @@ -94,8 +95,7 @@ private[fs2] trait ioplatform { // our only recourse is to run the entire creation/listener registration process on the microtask executor. readable <- readableResource.evalOn(MicrotaskExecutor) stream = - (Stream - .fromQueueNoneTerminated(queue) + (channel.stream .concurrently(Stream.eval(error.get.flatMap(F.raiseError[Unit]))) >> Stream .evalUnChunk( @@ -199,9 +199,11 @@ private[fs2] trait ioplatform { in: Stream[F, Byte] )(implicit F: Async[F]): Resource[F, (Duplex, Stream[F, Byte])] = for { - dispatcher <- Dispatcher[F] + readDispatcher <- Dispatcher.sequential[F] + writeDispatcher <- Dispatcher.sequential[F] + errorDispatcher <- Dispatcher.sequential[F] readQueue <- Queue.bounded[F, Option[Chunk[Byte]]](1).toResource - writeQueue <- Queue.synchronous[F, Option[Chunk[Byte]]].toResource + writeChannel <- Channel.synchronous[F, Chunk[Byte]].toResource error <- F.deferred[Throwable].toResource duplex <- Resource.make { F.delay { @@ -211,7 +213,7 @@ private[fs2] trait ioplatform { var autoDestroy = false var read = { readable => - dispatcher.unsafeRunAndForget( + readDispatcher.unsafeRunAndForget( readQueue.take.flatMap { chunk => F.delay(readable.push(chunk.map(_.toUint8Array).orNull)).void } @@ -219,19 +221,19 @@ private[fs2] trait ioplatform { } var write = { (_, chunk, _, cb) => - dispatcher.unsafeRunAndForget( - writeQueue.offer(Some(Chunk.uint8Array(chunk))) *> F.delay(cb(null)) + writeDispatcher.unsafeRunAndForget( + writeChannel.send(Chunk.uint8Array(chunk)) *> F.delay(cb(null)) ) } var `final` = { (_, cb) => - dispatcher.unsafeRunAndForget( - writeQueue.offer(None) *> F.delay(cb(null)) + writeDispatcher.unsafeRunAndForget( + writeChannel.close *> F.delay(cb(null)) ) } var destroy = { (_, err, cb) => - dispatcher.unsafeRunAndForget { + errorDispatcher.unsafeRunAndForget { error .complete( Option(err) @@ -249,8 +251,7 @@ private[fs2] trait ioplatform { } } drainIn = in.enqueueNoneTerminatedChunks(readQueue).drain - out = Stream - .fromQueueNoneTerminatedChunk(writeQueue) + out = writeChannel.stream.unchunks .concurrently(Stream.eval(error.get.flatMap(F.raiseError[Unit]))) } yield ( duplex, diff --git a/io/js/src/main/scala/fs2/io/net/DatagramSocketPlatform.scala b/io/js/src/main/scala/fs2/io/net/DatagramSocketPlatform.scala index c1ce0b9595..c4ebb702bf 100644 --- a/io/js/src/main/scala/fs2/io/net/DatagramSocketPlatform.scala +++ b/io/js/src/main/scala/fs2/io/net/DatagramSocketPlatform.scala @@ -53,7 +53,7 @@ private[net] trait DatagramSocketCompanionPlatform { sock: facade.dgram.Socket )(implicit F: Async[F]): Resource[F, DatagramSocket[F]] = for { - dispatcher <- Dispatcher[F] + dispatcher <- Dispatcher.sequential[F] queue <- Queue .circularBuffer[F, Datagram](1024) .toResource // TODO how to set this? Or, bad design? diff --git a/io/js/src/main/scala/fs2/io/net/SocketGroupPlatform.scala b/io/js/src/main/scala/fs2/io/net/SocketGroupPlatform.scala index e866c5e933..453ad144e8 100644 --- a/io/js/src/main/scala/fs2/io/net/SocketGroupPlatform.scala +++ b/io/js/src/main/scala/fs2/io/net/SocketGroupPlatform.scala @@ -26,13 +26,13 @@ package net import cats.effect.kernel.Async import cats.effect.kernel.Resource import cats.effect.std.Dispatcher -import cats.effect.std.Queue import cats.effect.syntax.all._ import cats.syntax.all._ import com.comcast.ip4s.Host import com.comcast.ip4s.IpAddress import com.comcast.ip4s.Port import com.comcast.ip4s.SocketAddress +import fs2.concurrent.Channel import fs2.io.internal.facade import scala.scalajs.js @@ -83,8 +83,8 @@ private[net] trait SocketGroupCompanionPlatform { self: SocketGroup.type => options: List[SocketOption] ): Resource[F, (SocketAddress[IpAddress], Stream[F, Socket[F]])] = (for { - dispatcher <- Dispatcher[F] - queue <- Queue.unbounded[F, Option[facade.net.Socket]].toResource + dispatcher <- Dispatcher.sequential[F] + channel <- Channel.unbounded[F, facade.net.Socket].toResource server <- Resource.make( F .delay( @@ -93,15 +93,14 @@ private[net] trait SocketGroupCompanionPlatform { self: SocketGroup.type => pauseOnConnect = true allowHalfOpen = true }, - sock => dispatcher.unsafeRunAndForget(queue.offer(Some(sock))) + sock => dispatcher.unsafeRunAndForget(channel.send(sock)) ) ) )(server => F.async[Unit] { cb => if (server.listening) - F.delay(server.close(e => cb(e.toLeft(()).leftMap(js.JavaScriptException)))) >> queue - .offer(None) - .as(None) + F.delay(server.close(e => cb(e.toLeft(()).leftMap(js.JavaScriptException)))) *> + channel.close.as(None) else F.delay(cb(Right(()))).as(None) } @@ -126,8 +125,7 @@ private[net] trait SocketGroupCompanionPlatform { self: SocketGroup.type => val info = server.address() SocketAddress(IpAddress.fromString(info.address).get, Port.fromInt(info.port).get) }.toResource - sockets = Stream - .fromQueueNoneTerminated(queue) + sockets = channel.stream .evalTap(setSocketOptions(options)) .flatMap(sock => Stream.resource(Socket.forAsync(sock))) } yield (ipAddress, sockets)).adaptError { case IOException(ex) => ex } diff --git a/io/js/src/main/scala/fs2/io/net/tls/TLSContextPlatform.scala b/io/js/src/main/scala/fs2/io/net/tls/TLSContextPlatform.scala index 01be081296..64114305de 100644 --- a/io/js/src/main/scala/fs2/io/net/tls/TLSContextPlatform.scala +++ b/io/js/src/main/scala/fs2/io/net/tls/TLSContextPlatform.scala @@ -62,15 +62,15 @@ private[tls] trait TLSContextCompanionPlatform { self: TLSContext.type => clientMode: Boolean, params: TLSParameters, logger: TLSLogger[F] - ): Resource[F, TLSSocket[F]] = Dispatcher[F] - .flatMap { dispatcher => + ): Resource[F, TLSSocket[F]] = (Dispatcher.sequential[F], Dispatcher.parallel[F]) + .flatMapN { (seqDispatcher, parDispatcher) => if (clientMode) { Resource.eval(F.deferred[Either[Throwable, Unit]]).flatMap { handshake => TLSSocket .forAsync( socket, sock => { - val options = params.toTLSConnectOptions(dispatcher) + val options = params.toTLSConnectOptions(parDispatcher) options.secureContext = context if (insecure) options.rejectUnauthorized = false @@ -79,18 +79,18 @@ private[tls] trait TLSContextCompanionPlatform { self: TLSContext.type => val tlsSock = facade.tls.connect(options) tlsSock.once( "secureConnect", - () => dispatcher.unsafeRunAndForget(handshake.complete(Either.unit)) + () => seqDispatcher.unsafeRunAndForget(handshake.complete(Either.unit)) ) tlsSock.once[js.Error]( "error", e => - dispatcher.unsafeRunAndForget( + seqDispatcher.unsafeRunAndForget( handshake.complete(Left(new js.JavaScriptException(e))) ) ) tlsSock }, - dispatcher + seqDispatcher ) .evalTap(_ => handshake.get.rethrow) } @@ -100,7 +100,7 @@ private[tls] trait TLSContextCompanionPlatform { self: TLSContext.type => .forAsync( socket, sock => { - val options = params.toTLSSocketOptions(dispatcher) + val options = params.toTLSSocketOptions(parDispatcher) options.secureContext = context if (insecure) options.rejectUnauthorized = false @@ -118,19 +118,19 @@ private[tls] trait TLSContextCompanionPlatform { self: TLSContext.type => .map(e => new JavaScriptSSLException(js.JavaScriptException(e))) .toLeft(()) else Either.unit - dispatcher.unsafeRunAndForget(verifyError.complete(result)) + seqDispatcher.unsafeRunAndForget(verifyError.complete(result)) } ) tlsSock.once[js.Error]( "error", e => - dispatcher.unsafeRunAndForget( + seqDispatcher.unsafeRunAndForget( verifyError.complete(Left(new js.JavaScriptException(e))) ) ) tlsSock }, - dispatcher + seqDispatcher ) .evalTap(_ => verifyError.get.rethrow) } diff --git a/io/js/src/main/scala/fs2/io/net/unixsocket/UnixSocketsPlatform.scala b/io/js/src/main/scala/fs2/io/net/unixsocket/UnixSocketsPlatform.scala index 198852d76b..461184a668 100644 --- a/io/js/src/main/scala/fs2/io/net/unixsocket/UnixSocketsPlatform.scala +++ b/io/js/src/main/scala/fs2/io/net/unixsocket/UnixSocketsPlatform.scala @@ -25,8 +25,8 @@ package io.net.unixsocket import cats.effect.kernel.Async import cats.effect.kernel.Resource import cats.effect.std.Dispatcher -import cats.effect.std.Queue import cats.syntax.all._ +import fs2.concurrent.Channel import fs2.io.file.Files import fs2.io.file.Path import fs2.io.net.Socket @@ -65,8 +65,8 @@ private[unixsocket] trait UnixSocketsCompanionPlatform { deleteOnClose: Boolean ): fs2.Stream[F, Socket[F]] = for { - dispatcher <- Stream.resource(Dispatcher[F]) - queue <- Stream.eval(Queue.unbounded[F, facade.net.Socket]) + dispatcher <- Stream.resource(Dispatcher.sequential[F]) + channel <- Stream.eval(Channel.unbounded[F, facade.net.Socket]) errored <- Stream.eval(F.deferred[js.JavaScriptException]) server <- Stream.bracket( F.delay { @@ -75,7 +75,7 @@ private[unixsocket] trait UnixSocketsCompanionPlatform { pauseOnConnect = true allowHalfOpen = true }, - sock => dispatcher.unsafeRunAndForget(queue.offer(sock)) + sock => dispatcher.unsafeRunAndForget(channel.send(sock)) ) } )(server => @@ -103,9 +103,7 @@ private[unixsocket] trait UnixSocketsCompanionPlatform { () } ) - socket <- Stream - .fromQueueUnterminated(queue) - .flatMap(sock => Stream.resource(Socket.forAsync(sock))) + socket <- channel.stream.flatMap(sock => Stream.resource(Socket.forAsync(sock))) } yield socket } diff --git a/io/jvm/src/main/scala/fs2/io/JavaInputOutputStream.scala b/io/jvm/src/main/scala/fs2/io/JavaInputOutputStream.scala index 1688d5896d..eb8051f8b2 100644 --- a/io/jvm/src/main/scala/fs2/io/JavaInputOutputStream.scala +++ b/io/jvm/src/main/scala/fs2/io/JavaInputOutputStream.scala @@ -191,7 +191,7 @@ private[io] object JavaInputOutputStream { * - DownStream signal - keeps any remainders from last `read` and signals * that downstream has been terminated that in turn kills upstream */ - Dispatcher[F].flatMap { dispatcher => + Dispatcher.sequential[F].flatMap { dispatcher => Resource .eval( ( diff --git a/reactive-streams/src/main/scala/fs2/interop/reactivestreams/StreamSubscription.scala b/reactive-streams/src/main/scala/fs2/interop/reactivestreams/StreamSubscription.scala index 5b257eeca7..7d962a9a97 100644 --- a/reactive-streams/src/main/scala/fs2/interop/reactivestreams/StreamSubscription.scala +++ b/reactive-streams/src/main/scala/fs2/interop/reactivestreams/StreamSubscription.scala @@ -41,7 +41,8 @@ private[reactivestreams] final class StreamSubscription[F[_], A]( cancelled: SignallingRef[F, Boolean], sub: Subscriber[A], stream: Stream[F, A], - dispatcher: Dispatcher[F] + startDispatcher: Dispatcher[F], + requestDispatcher: Dispatcher[F] )(implicit F: Async[F]) extends Subscription { import StreamSubscription._ @@ -76,18 +77,17 @@ private[reactivestreams] final class StreamSubscription[F[_], A]( .compile .drain - dispatcher.unsafeRunAndForget(s) + startDispatcher.unsafeRunAndForget(s) } // According to the spec, it's acceptable for a concurrent cancel to not // be processed immediately, but if you have synchronous `cancel(); - // request()`, then the request _must_ be a no op. For this reason, we - // need to make sure that `cancel()` does not return until the - // `cancelled` signal has been set. + // request()`, then the request _must_ be a no op. Fortunately, + // ordering is guaranteed by a sequential d // See https://github.com/zainab-ali/fs2-reactive-streams/issues/29 // and https://github.com/zainab-ali/fs2-reactive-streams/issues/46 def cancel(): Unit = - dispatcher.unsafeRunSync(cancelled.set(true)) + requestDispatcher.unsafeRunAndForget(cancelled.set(true)) def request(n: Long): Unit = { val request: F[Request] = @@ -98,7 +98,7 @@ private[reactivestreams] final class StreamSubscription[F[_], A]( val prog = cancelled.get .ifM(ifTrue = F.unit, ifFalse = request.flatMap(requests.offer).handleErrorWith(onError)) - dispatcher.unsafeRunAndForget(prog) + requestDispatcher.unsafeRunAndForget(prog) } } @@ -112,11 +112,12 @@ private[reactivestreams] object StreamSubscription { def apply[F[_]: Async, A]( sub: Subscriber[A], stream: Stream[F, A], - dispatcher: Dispatcher[F] + startDispatcher: Dispatcher[F], + requestDispatcher: Dispatcher[F] ): F[StreamSubscription[F, A]] = SignallingRef(false).flatMap { cancelled => Queue.unbounded[F, Request].map { requests => - new StreamSubscription(requests, cancelled, sub, stream, dispatcher) + new StreamSubscription(requests, cancelled, sub, stream, startDispatcher, requestDispatcher) } } } diff --git a/reactive-streams/src/main/scala/fs2/interop/reactivestreams/StreamUnicastPublisher.scala b/reactive-streams/src/main/scala/fs2/interop/reactivestreams/StreamUnicastPublisher.scala index 1bdb292343..78d5c791a6 100644 --- a/reactive-streams/src/main/scala/fs2/interop/reactivestreams/StreamUnicastPublisher.scala +++ b/reactive-streams/src/main/scala/fs2/interop/reactivestreams/StreamUnicastPublisher.scala @@ -35,14 +35,19 @@ import org.reactivestreams._ * * @see [[https://github.com/reactive-streams/reactive-streams-jvm#1-publisher-code]] */ -final class StreamUnicastPublisher[F[_]: Async, A]( +final class StreamUnicastPublisher[F[_]: Async, A] private ( val stream: Stream[F, A], - dispatcher: Dispatcher[F] + startDispatcher: Dispatcher[F], + requestDispatcher: Dispatcher[F] ) extends Publisher[A] { + + @deprecated("Use StreamUnicastPublisher.apply", "3.4.0") + def this(stream: Stream[F, A], dispatcher: Dispatcher[F]) = this(stream, dispatcher, dispatcher) + def subscribe(subscriber: Subscriber[_ >: A]): Unit = { nonNull(subscriber) - dispatcher.unsafeRunAndForget { - StreamSubscription(subscriber, stream, dispatcher) + startDispatcher.unsafeRunAndForget { + StreamSubscription(subscriber, stream, startDispatcher, requestDispatcher) .flatMap { subscription => Sync[F].delay { subscriber.onSubscribe(subscription) @@ -56,9 +61,15 @@ final class StreamUnicastPublisher[F[_]: Async, A]( } object StreamUnicastPublisher { + @deprecated("Use overload which takes only the stream and returns a Resource", "3.4.0") def apply[F[_]: Async, A]( s: Stream[F, A], dispatcher: Dispatcher[F] ): StreamUnicastPublisher[F, A] = - new StreamUnicastPublisher(s, dispatcher) + new StreamUnicastPublisher(s, dispatcher, dispatcher) + + def apply[F[_]: Async, A]( + s: Stream[F, A] + ): Resource[F, StreamUnicastPublisher[F, A]] = + (Dispatcher.sequential[F], Dispatcher.sequential[F]).mapN(new StreamUnicastPublisher(s, _, _)) } diff --git a/reactive-streams/src/main/scala/fs2/interop/reactivestreams/package.scala b/reactive-streams/src/main/scala/fs2/interop/reactivestreams/package.scala index b9d5562964..1d25736df1 100644 --- a/reactive-streams/src/main/scala/fs2/interop/reactivestreams/package.scala +++ b/reactive-streams/src/main/scala/fs2/interop/reactivestreams/package.scala @@ -23,7 +23,6 @@ package fs2 package interop import cats.effect.kernel._ -import cats.effect.std.Dispatcher import org.reactivestreams._ /** Implementation of the reactivestreams protocol for fs2 @@ -103,8 +102,6 @@ package object reactivestreams { def toUnicastPublisher(implicit F: Async[F] ): Resource[F, StreamUnicastPublisher[F, A]] = - Dispatcher[F].map { dispatcher => - StreamUnicastPublisher(stream, dispatcher) - } + StreamUnicastPublisher(stream) } } diff --git a/reactive-streams/src/test/scala/fs2/interop/reactivestreams/CancellationSpec.scala b/reactive-streams/src/test/scala/fs2/interop/reactivestreams/CancellationSpec.scala index 1df3eed635..ec5855adad 100644 --- a/reactive-streams/src/test/scala/fs2/interop/reactivestreams/CancellationSpec.scala +++ b/reactive-streams/src/test/scala/fs2/interop/reactivestreams/CancellationSpec.scala @@ -26,6 +26,7 @@ package reactivestreams import org.reactivestreams._ import cats.effect._ import cats.effect.std.Dispatcher +import cats.syntax.all._ import java.util.concurrent.atomic.AtomicBoolean @@ -47,17 +48,18 @@ class CancellationSpec extends Fs2Suite { val attempts = 10000 - def withDispatcher(f: Dispatcher[IO] => Unit): Unit = - Dispatcher[IO] - .use(dispatcher => IO(f(dispatcher))) + def withDispatchers(f: (Dispatcher[IO], Dispatcher[IO]) => Unit): Unit = + (Dispatcher.sequential[IO], Dispatcher.sequential[IO]).tupled + .use { case (d1, d2) => IO(f(d1, d2)) } .unsafeRunSync() test("after subscription is cancelled request must be noOps") { - withDispatcher { dispatcher => + withDispatchers { (startDispatcher, requestDispatcher) => var i = 0 val b = new AtomicBoolean(false) while (i < attempts) { - val sub = StreamSubscription(Sub[Int](b), s, dispatcher).unsafeRunSync() + val sub = + StreamSubscription(Sub[Int](b), s, startDispatcher, requestDispatcher).unsafeRunSync() sub.unsafeStart() sub.cancel() sub.request(1) @@ -70,11 +72,12 @@ class CancellationSpec extends Fs2Suite { } test("after subscription is cancelled additional cancelations must be noOps") { - withDispatcher { dispatcher => + withDispatchers { (startDispatcher, requestDispatcher) => var i = 0 val b = new AtomicBoolean(false) while (i < attempts) { - val sub = StreamSubscription(Sub[Int](b), s, dispatcher).unsafeRunSync() + val sub = + StreamSubscription(Sub[Int](b), s, startDispatcher, requestDispatcher).unsafeRunSync() sub.unsafeStart() sub.cancel() sub.cancel() diff --git a/reactive-streams/src/test/scala/fs2/interop/reactivestreams/StreamUnicastPublisherSpec.scala b/reactive-streams/src/test/scala/fs2/interop/reactivestreams/StreamUnicastPublisherSpec.scala index 7631033c37..14e58088ef 100644 --- a/reactive-streams/src/test/scala/fs2/interop/reactivestreams/StreamUnicastPublisherSpec.scala +++ b/reactive-streams/src/test/scala/fs2/interop/reactivestreams/StreamUnicastPublisherSpec.scala @@ -24,8 +24,10 @@ package interop package reactivestreams import cats.effect._ +import cats.effect.unsafe.implicits._ import org.reactivestreams._ import org.reactivestreams.tck.{PublisherVerification, TestEnvironment} +import org.scalatestplus.testng._ final class FailedSubscription extends Subscription { def cancel(): Unit = {} @@ -41,14 +43,14 @@ final class FailedPublisher extends Publisher[Int] { final class StreamUnicastPublisherSpec extends PublisherVerification[Int](new TestEnvironment(1000L)) - with UnsafeTestNGSuite { + with TestNGSuiteLike { def createPublisher(n: Long): StreamUnicastPublisher[IO, Int] = { val s = if (n == java.lang.Long.MAX_VALUE) Stream.range(1, 20).repeat else Stream(1).repeat.scan(1)(_ + _).map(i => if (i > n) None else Some(i)).unNoneTerminate - StreamUnicastPublisher(s, dispatcher) + StreamUnicastPublisher[IO, Int](s).allocated.unsafeRunSync()._1 } def createFailedPublisher(): FailedPublisher = new FailedPublisher() diff --git a/reactive-streams/src/test/scala/fs2/interop/reactivestreams/SubscriberSpec.scala b/reactive-streams/src/test/scala/fs2/interop/reactivestreams/SubscriberSpec.scala index 2ec6cc679f..bbdba3f079 100644 --- a/reactive-streams/src/test/scala/fs2/interop/reactivestreams/SubscriberSpec.scala +++ b/reactive-streams/src/test/scala/fs2/interop/reactivestreams/SubscriberSpec.scala @@ -39,11 +39,13 @@ import org.reactivestreams.tck.{ TestEnvironment } +import org.scalatestplus.testng._ + import scala.concurrent.duration._ final class SubscriberWhiteboxSpec extends SubscriberWhiteboxVerification[Int](new TestEnvironment(1000L)) - with UnsafeTestNGSuite { + with TestNGSuiteLike { private val counter = new AtomicInteger() @@ -90,7 +92,7 @@ final class WhiteboxSubscriber[A](sub: StreamSubscriber[IO, A], probe: WhiteboxS final class SubscriberBlackboxSpec extends SubscriberBlackboxVerification[Int](new TestEnvironment(1000L)) - with UnsafeTestNGSuite { + with TestNGSuiteLike { private val counter = new AtomicInteger() diff --git a/reactive-streams/src/test/scala/fs2/interop/reactivestreams/UnsafeTestNGSuite.scala b/reactive-streams/src/test/scala/fs2/interop/reactivestreams/UnsafeTestNGSuite.scala deleted file mode 100644 index 1149de3a63..0000000000 --- a/reactive-streams/src/test/scala/fs2/interop/reactivestreams/UnsafeTestNGSuite.scala +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright (c) 2013 Functional Streams for Scala - * - * Permission is hereby granted, free of charge, to any person obtaining a copy of - * this software and associated documentation files (the "Software"), to deal in - * the Software without restriction, including without limitation the rights to - * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of - * the Software, and to permit persons to whom the Software is furnished to do so, - * subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS - * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR - * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER - * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN - * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - */ - -package fs2 -package interop -package reactivestreams - -import cats.effect._ -import cats.effect.std.Dispatcher -import cats.effect.unsafe.implicits.global -import org.scalatestplus.testng._ -import org.testng.annotations.AfterClass - -trait UnsafeTestNGSuite extends TestNGSuiteLike { - - protected var dispatcher: Dispatcher[IO] = _ - private var shutdownDispatcher: IO[Unit] = _ - - private val mkDispatcher = Dispatcher[IO].allocated - private val t = mkDispatcher.unsafeRunSync() - dispatcher = t._1 - shutdownDispatcher = t._2 - - @AfterClass - def afterAll() = shutdownDispatcher.unsafeRunSync() -} diff --git a/site/guide.md b/site/guide.md index 19afe0c637..5cb8760319 100644 --- a/site/guide.md +++ b/site/guide.md @@ -641,7 +641,7 @@ trait CSVHandle { def rows[F[_]](h: CSVHandle)(implicit F: Async[F]): Stream[F,Row] = { for { - dispatcher <- Stream.resource(Dispatcher[F]) + dispatcher <- Stream.resource(Dispatcher.sequential[F]) q <- Stream.eval(Queue.unbounded[F, Option[RowOrError]]) _ <- Stream.eval { F.delay { def enqueue(v: Option[RowOrError]): Unit = dispatcher.unsafeRunAndForget(q.offer(v))