Skip to content

Commit

Permalink
Merge pull request #3000 from armanbilge/update/cats-effect-3.4.0-RC1
Browse files Browse the repository at this point in the history
Update to Cats Effect 3.2.0-RC1
  • Loading branch information
mpilquist authored Sep 30, 2022
2 parents 62f4545 + d5e62fa commit 9ac3e43
Show file tree
Hide file tree
Showing 16 changed files with 105 additions and 133 deletions.
20 changes: 10 additions & 10 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ jobs:
strategy:
matrix:
os: [ubuntu-22.04]
scala: [3.1.3, 2.12.17, 2.13.8]
scala: [3.2.0, 2.12.17, 2.13.8]
java: [temurin@17]
project: [rootJS, rootJVM, rootNative]
runs-on: ${{ matrix.os }}
Expand Down Expand Up @@ -161,32 +161,32 @@ jobs:
~/Library/Caches/Coursier/v1
key: ${{ runner.os }}-sbt-cache-v2-${{ hashFiles('**/*.sbt') }}-${{ hashFiles('project/build.properties') }}

- name: Download target directories (3.1.3, rootJS)
- name: Download target directories (3.2.0, rootJS)
uses: actions/download-artifact@v2
with:
name: target-${{ matrix.os }}-${{ matrix.java }}-3.1.3-rootJS
name: target-${{ matrix.os }}-${{ matrix.java }}-3.2.0-rootJS

- name: Inflate target directories (3.1.3, rootJS)
- name: Inflate target directories (3.2.0, rootJS)
run: |
tar xf targets.tar
rm targets.tar
- name: Download target directories (3.1.3, rootJVM)
- name: Download target directories (3.2.0, rootJVM)
uses: actions/download-artifact@v2
with:
name: target-${{ matrix.os }}-${{ matrix.java }}-3.1.3-rootJVM
name: target-${{ matrix.os }}-${{ matrix.java }}-3.2.0-rootJVM

- name: Inflate target directories (3.1.3, rootJVM)
- name: Inflate target directories (3.2.0, rootJVM)
run: |
tar xf targets.tar
rm targets.tar
- name: Download target directories (3.1.3, rootNative)
- name: Download target directories (3.2.0, rootNative)
uses: actions/download-artifact@v2
with:
name: target-${{ matrix.os }}-${{ matrix.java }}-3.1.3-rootNative
name: target-${{ matrix.os }}-${{ matrix.java }}-3.2.0-rootNative

- name: Inflate target directories (3.1.3, rootNative)
- name: Inflate target directories (3.2.0, rootNative)
run: |
tar xf targets.tar
rm targets.tar
Expand Down
11 changes: 7 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ ThisBuild / startYear := Some(2013)

val NewScala = "2.13.8"

ThisBuild / crossScalaVersions := Seq("3.1.3", "2.12.17", NewScala)
ThisBuild / crossScalaVersions := Seq("3.2.0", "2.12.17", NewScala)
ThisBuild / tlVersionIntroduced := Map("3" -> "3.0.3")

ThisBuild / githubWorkflowOSes := Seq("ubuntu-22.04")
Expand Down Expand Up @@ -176,6 +176,9 @@ ThisBuild / mimaBinaryIssueFilters ++= Seq(
),
ProblemFilters.exclude[ReversedMissingMethodProblem](
"fs2.io.net.tls.TLSContext#Builder.insecureResource"
),
ProblemFilters.exclude[DirectMissingMethodProblem]( // something funky in Scala 3.2.0 ...
"fs2.io.net.SocketGroupCompanionPlatform#AsyncSocketGroup.this"
)
)

Expand Down Expand Up @@ -210,9 +213,9 @@ lazy val core = crossProject(JVMPlatform, JSPlatform, NativePlatform)
libraryDependencies ++= Seq(
"org.typelevel" %%% "cats-core" % "2.8.0",
"org.typelevel" %%% "cats-laws" % "2.8.0" % Test,
"org.typelevel" %%% "cats-effect" % "3.3.14",
"org.typelevel" %%% "cats-effect-laws" % "3.3.14" % Test,
"org.typelevel" %%% "cats-effect-testkit" % "3.3.14" % Test,
"org.typelevel" %%% "cats-effect" % "3.4.0-RC1",
"org.typelevel" %%% "cats-effect-laws" % "3.4.0-RC1" % Test,
"org.typelevel" %%% "cats-effect-testkit" % "3.4.0-RC1" % Test,
"org.scodec" %%% "scodec-bits" % "1.1.34",
"org.typelevel" %%% "scalacheck-effect-munit" % "2.0.0-M2" % Test,
"org.typelevel" %%% "munit-cats-effect" % "2.0.0-M3" % Test,
Expand Down
35 changes: 18 additions & 17 deletions io/js/src/main/scala/fs2/io/ioplatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import cats.effect.std.Dispatcher
import cats.effect.std.Queue
import cats.effect.syntax.all._
import cats.syntax.all._
import fs2.concurrent.Channel
import fs2.io.internal.MicrotaskExecutor
import fs2.io.internal.facade

Expand Down Expand Up @@ -59,8 +60,8 @@ private[fs2] trait ioplatform {
destroyIfCanceled: Boolean = true
)(thunk: => R)(implicit F: Async[F]): Resource[F, (R, Stream[F, Byte])] =
(for {
dispatcher <- Dispatcher[F]
queue <- Queue.synchronous[F, Option[Unit]].toResource
dispatcher <- Dispatcher.sequential[F]
channel <- Channel.unbounded[F, Unit].toResource
error <- F.deferred[Throwable].toResource
readableResource = for {
readable <- Resource.makeCase(F.delay(thunk)) {
Expand All @@ -79,9 +80,9 @@ private[fs2] trait ioplatform {
else
F.unit
}
_ <- readable.registerListener[F, Any]("readable", dispatcher)(_ => queue.offer(Some(())))
_ <- readable.registerListener[F, Any]("end", dispatcher)(_ => queue.offer(None))
_ <- readable.registerListener[F, Any]("close", dispatcher)(_ => queue.offer(None))
_ <- readable.registerListener[F, Any]("readable", dispatcher)(_ => channel.send(()).void)
_ <- readable.registerListener[F, Any]("end", dispatcher)(_ => channel.close.void)
_ <- readable.registerListener[F, Any]("close", dispatcher)(_ => channel.close.void)
_ <- readable.registerListener[F, js.Error]("error", dispatcher) { e =>
error.complete(js.JavaScriptException(e)).void
}
Expand All @@ -94,8 +95,7 @@ private[fs2] trait ioplatform {
// our only recourse is to run the entire creation/listener registration process on the microtask executor.
readable <- readableResource.evalOn(MicrotaskExecutor)
stream =
(Stream
.fromQueueNoneTerminated(queue)
(channel.stream
.concurrently(Stream.eval(error.get.flatMap(F.raiseError[Unit]))) >>
Stream
.evalUnChunk(
Expand Down Expand Up @@ -199,9 +199,11 @@ private[fs2] trait ioplatform {
in: Stream[F, Byte]
)(implicit F: Async[F]): Resource[F, (Duplex, Stream[F, Byte])] =
for {
dispatcher <- Dispatcher[F]
readDispatcher <- Dispatcher.sequential[F]
writeDispatcher <- Dispatcher.sequential[F]
errorDispatcher <- Dispatcher.sequential[F]
readQueue <- Queue.bounded[F, Option[Chunk[Byte]]](1).toResource
writeQueue <- Queue.synchronous[F, Option[Chunk[Byte]]].toResource
writeChannel <- Channel.synchronous[F, Chunk[Byte]].toResource
error <- F.deferred[Throwable].toResource
duplex <- Resource.make {
F.delay {
Expand All @@ -211,27 +213,27 @@ private[fs2] trait ioplatform {
var autoDestroy = false

var read = { readable =>
dispatcher.unsafeRunAndForget(
readDispatcher.unsafeRunAndForget(
readQueue.take.flatMap { chunk =>
F.delay(readable.push(chunk.map(_.toUint8Array).orNull)).void
}
)
}

var write = { (_, chunk, _, cb) =>
dispatcher.unsafeRunAndForget(
writeQueue.offer(Some(Chunk.uint8Array(chunk))) *> F.delay(cb(null))
writeDispatcher.unsafeRunAndForget(
writeChannel.send(Chunk.uint8Array(chunk)) *> F.delay(cb(null))
)
}

var `final` = { (_, cb) =>
dispatcher.unsafeRunAndForget(
writeQueue.offer(None) *> F.delay(cb(null))
writeDispatcher.unsafeRunAndForget(
writeChannel.close *> F.delay(cb(null))
)
}

var destroy = { (_, err, cb) =>
dispatcher.unsafeRunAndForget {
errorDispatcher.unsafeRunAndForget {
error
.complete(
Option(err)
Expand All @@ -249,8 +251,7 @@ private[fs2] trait ioplatform {
}
}
drainIn = in.enqueueNoneTerminatedChunks(readQueue).drain
out = Stream
.fromQueueNoneTerminatedChunk(writeQueue)
out = writeChannel.stream.unchunks
.concurrently(Stream.eval(error.get.flatMap(F.raiseError[Unit])))
} yield (
duplex,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ private[net] trait DatagramSocketCompanionPlatform {
sock: facade.dgram.Socket
)(implicit F: Async[F]): Resource[F, DatagramSocket[F]] =
for {
dispatcher <- Dispatcher[F]
dispatcher <- Dispatcher.sequential[F]
queue <- Queue
.circularBuffer[F, Datagram](1024)
.toResource // TODO how to set this? Or, bad design?
Expand Down
16 changes: 7 additions & 9 deletions io/js/src/main/scala/fs2/io/net/SocketGroupPlatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ package net
import cats.effect.kernel.Async
import cats.effect.kernel.Resource
import cats.effect.std.Dispatcher
import cats.effect.std.Queue
import cats.effect.syntax.all._
import cats.syntax.all._
import com.comcast.ip4s.Host
import com.comcast.ip4s.IpAddress
import com.comcast.ip4s.Port
import com.comcast.ip4s.SocketAddress
import fs2.concurrent.Channel
import fs2.io.internal.facade

import scala.scalajs.js
Expand Down Expand Up @@ -83,8 +83,8 @@ private[net] trait SocketGroupCompanionPlatform { self: SocketGroup.type =>
options: List[SocketOption]
): Resource[F, (SocketAddress[IpAddress], Stream[F, Socket[F]])] =
(for {
dispatcher <- Dispatcher[F]
queue <- Queue.unbounded[F, Option[facade.net.Socket]].toResource
dispatcher <- Dispatcher.sequential[F]
channel <- Channel.unbounded[F, facade.net.Socket].toResource
server <- Resource.make(
F
.delay(
Expand All @@ -93,15 +93,14 @@ private[net] trait SocketGroupCompanionPlatform { self: SocketGroup.type =>
pauseOnConnect = true
allowHalfOpen = true
},
sock => dispatcher.unsafeRunAndForget(queue.offer(Some(sock)))
sock => dispatcher.unsafeRunAndForget(channel.send(sock))
)
)
)(server =>
F.async[Unit] { cb =>
if (server.listening)
F.delay(server.close(e => cb(e.toLeft(()).leftMap(js.JavaScriptException)))) >> queue
.offer(None)
.as(None)
F.delay(server.close(e => cb(e.toLeft(()).leftMap(js.JavaScriptException)))) *>
channel.close.as(None)
else
F.delay(cb(Right(()))).as(None)
}
Expand All @@ -126,8 +125,7 @@ private[net] trait SocketGroupCompanionPlatform { self: SocketGroup.type =>
val info = server.address()
SocketAddress(IpAddress.fromString(info.address).get, Port.fromInt(info.port).get)
}.toResource
sockets = Stream
.fromQueueNoneTerminated(queue)
sockets = channel.stream
.evalTap(setSocketOptions(options))
.flatMap(sock => Stream.resource(Socket.forAsync(sock)))
} yield (ipAddress, sockets)).adaptError { case IOException(ex) => ex }
Expand Down
20 changes: 10 additions & 10 deletions io/js/src/main/scala/fs2/io/net/tls/TLSContextPlatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,15 @@ private[tls] trait TLSContextCompanionPlatform { self: TLSContext.type =>
clientMode: Boolean,
params: TLSParameters,
logger: TLSLogger[F]
): Resource[F, TLSSocket[F]] = Dispatcher[F]
.flatMap { dispatcher =>
): Resource[F, TLSSocket[F]] = (Dispatcher.sequential[F], Dispatcher.parallel[F])
.flatMapN { (seqDispatcher, parDispatcher) =>
if (clientMode) {
Resource.eval(F.deferred[Either[Throwable, Unit]]).flatMap { handshake =>
TLSSocket
.forAsync(
socket,
sock => {
val options = params.toTLSConnectOptions(dispatcher)
val options = params.toTLSConnectOptions(parDispatcher)
options.secureContext = context
if (insecure)
options.rejectUnauthorized = false
Expand All @@ -79,18 +79,18 @@ private[tls] trait TLSContextCompanionPlatform { self: TLSContext.type =>
val tlsSock = facade.tls.connect(options)
tlsSock.once(
"secureConnect",
() => dispatcher.unsafeRunAndForget(handshake.complete(Either.unit))
() => seqDispatcher.unsafeRunAndForget(handshake.complete(Either.unit))
)
tlsSock.once[js.Error](
"error",
e =>
dispatcher.unsafeRunAndForget(
seqDispatcher.unsafeRunAndForget(
handshake.complete(Left(new js.JavaScriptException(e)))
)
)
tlsSock
},
dispatcher
seqDispatcher
)
.evalTap(_ => handshake.get.rethrow)
}
Expand All @@ -100,7 +100,7 @@ private[tls] trait TLSContextCompanionPlatform { self: TLSContext.type =>
.forAsync(
socket,
sock => {
val options = params.toTLSSocketOptions(dispatcher)
val options = params.toTLSSocketOptions(parDispatcher)
options.secureContext = context
if (insecure)
options.rejectUnauthorized = false
Expand All @@ -118,19 +118,19 @@ private[tls] trait TLSContextCompanionPlatform { self: TLSContext.type =>
.map(e => new JavaScriptSSLException(js.JavaScriptException(e)))
.toLeft(())
else Either.unit
dispatcher.unsafeRunAndForget(verifyError.complete(result))
seqDispatcher.unsafeRunAndForget(verifyError.complete(result))
}
)
tlsSock.once[js.Error](
"error",
e =>
dispatcher.unsafeRunAndForget(
seqDispatcher.unsafeRunAndForget(
verifyError.complete(Left(new js.JavaScriptException(e)))
)
)
tlsSock
},
dispatcher
seqDispatcher
)
.evalTap(_ => verifyError.get.rethrow)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ package io.net.unixsocket
import cats.effect.kernel.Async
import cats.effect.kernel.Resource
import cats.effect.std.Dispatcher
import cats.effect.std.Queue
import cats.syntax.all._
import fs2.concurrent.Channel
import fs2.io.file.Files
import fs2.io.file.Path
import fs2.io.net.Socket
Expand Down Expand Up @@ -65,8 +65,8 @@ private[unixsocket] trait UnixSocketsCompanionPlatform {
deleteOnClose: Boolean
): fs2.Stream[F, Socket[F]] =
for {
dispatcher <- Stream.resource(Dispatcher[F])
queue <- Stream.eval(Queue.unbounded[F, facade.net.Socket])
dispatcher <- Stream.resource(Dispatcher.sequential[F])
channel <- Stream.eval(Channel.unbounded[F, facade.net.Socket])
errored <- Stream.eval(F.deferred[js.JavaScriptException])
server <- Stream.bracket(
F.delay {
Expand All @@ -75,7 +75,7 @@ private[unixsocket] trait UnixSocketsCompanionPlatform {
pauseOnConnect = true
allowHalfOpen = true
},
sock => dispatcher.unsafeRunAndForget(queue.offer(sock))
sock => dispatcher.unsafeRunAndForget(channel.send(sock))
)
}
)(server =>
Expand Down Expand Up @@ -103,9 +103,7 @@ private[unixsocket] trait UnixSocketsCompanionPlatform {
()
}
)
socket <- Stream
.fromQueueUnterminated(queue)
.flatMap(sock => Stream.resource(Socket.forAsync(sock)))
socket <- channel.stream.flatMap(sock => Stream.resource(Socket.forAsync(sock)))
} yield socket

}
Expand Down
2 changes: 1 addition & 1 deletion io/jvm/src/main/scala/fs2/io/JavaInputOutputStream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ private[io] object JavaInputOutputStream {
* - DownStream signal - keeps any remainders from last `read` and signals
* that downstream has been terminated that in turn kills upstream
*/
Dispatcher[F].flatMap { dispatcher =>
Dispatcher.sequential[F].flatMap { dispatcher =>
Resource
.eval(
(
Expand Down
Loading

0 comments on commit 9ac3e43

Please sign in to comment.