From 07676363d8915ee777e7ebd1a48cbe8cfce2b23b Mon Sep 17 00:00:00 2001 From: Michael Pilquist Date: Sat, 12 Oct 2019 10:52:13 -0400 Subject: [PATCH 1/5] Added alternative compile.to syntax, providing more efficient and uniform collection building --- build.sbt | 7 +- .../scala/fs2/internal/ThreadFactories.scala | 2 +- .../jvm/src/test/scala/fs2/CompressSpec.scala | 2 +- .../scala-2.12-/fs2/CollectorPlatform.scala | 43 +++++ .../scala-2.13+/fs2/CollectorPlatform.scala | 28 ++++ core/shared/src/main/scala/fs2/Chunk.scala | 53 +++++-- .../shared/src/main/scala/fs2/Collector.scala | 102 ++++++++++++ core/shared/src/main/scala/fs2/Stream.scala | 149 ++++++++++-------- .../src/main/scala/fs2/internal/Algebra.scala | 10 +- .../src/main/scala/fs2/internal/FreeC.scala | 2 +- core/shared/src/main/scala/fs2/text.scala | 2 +- .../src/test/scala/fs2/StreamSpec.scala | 4 +- 12 files changed, 316 insertions(+), 88 deletions(-) create mode 100644 core/shared/src/main/scala-2.12-/fs2/CollectorPlatform.scala create mode 100644 core/shared/src/main/scala-2.13+/fs2/CollectorPlatform.scala create mode 100644 core/shared/src/main/scala/fs2/Collector.scala diff --git a/build.sbt b/build.sbt index 6f7f141c75..9dc0f4180e 100644 --- a/build.sbt +++ b/build.sbt @@ -26,9 +26,14 @@ lazy val commonSettings = Seq( "-language:implicitConversions", "-language:higherKinds" ) ++ - (if (scalaBinaryVersion.value.startsWith("2.12")) + (if (scalaBinaryVersion.value.startsWith("2.13")) List( "-Xlint", + "-Ywarn-unused" + ) + else Nil) ++ + (if (scalaBinaryVersion.value.startsWith("2.12")) + List( "-Xfatal-warnings", "-Yno-adapted-args", "-Ywarn-value-discard", diff --git a/core/jvm/src/main/scala/fs2/internal/ThreadFactories.scala b/core/jvm/src/main/scala/fs2/internal/ThreadFactories.scala index 36d79b5c2f..7217055814 100644 --- a/core/jvm/src/main/scala/fs2/internal/ThreadFactories.scala +++ b/core/jvm/src/main/scala/fs2/internal/ThreadFactories.scala @@ -28,7 +28,7 @@ private[fs2] object ThreadFactories { if (exitJvmOnFatalError) { e match { case NonFatal(_) => () - case fatal => System.exit(-1) + case _ => System.exit(-1) } } } diff --git a/core/jvm/src/test/scala/fs2/CompressSpec.scala b/core/jvm/src/test/scala/fs2/CompressSpec.scala index 5da81d39f3..bd75b6abbd 100644 --- a/core/jvm/src/test/scala/fs2/CompressSpec.scala +++ b/core/jvm/src/test/scala/fs2/CompressSpec.scala @@ -140,7 +140,7 @@ class CompressSpec extends Fs2Spec { val bytes = s .through(compress.gzip(1024)) .compile - .to[Array] + .to(Array) val bis = new ByteArrayInputStream(bytes) val gzis = new GZIPInputStream(bis) diff --git a/core/shared/src/main/scala-2.12-/fs2/CollectorPlatform.scala b/core/shared/src/main/scala-2.12-/fs2/CollectorPlatform.scala new file mode 100644 index 0000000000..62d15e8094 --- /dev/null +++ b/core/shared/src/main/scala-2.12-/fs2/CollectorPlatform.scala @@ -0,0 +1,43 @@ +package fs2 + +import scala.collection.generic.{GenericTraversableTemplate, MapFactory, TraversableFactory} +import scala.collection.{MapLike, Traversable} + +import fs2.internal._ + +private[fs2] trait CollectorPlatform { self: Collector.type => + implicit def supportsFactory[A, C[_], B]( + f: Factory[A, C[B]] + ): Collector.Aux[A, C[B]] = make(Builder.fromFactory(f)) + + implicit def supportsTraversableFactory[A, C[x] <: Traversable[x] with GenericTraversableTemplate[ + x, + C + ]]( + f: TraversableFactory[C] + ): Collector.Aux[A, C[A]] = make(Builder.fromTraversableFactory(f)) + + implicit def supportsMapFactory[K, V, C[a, b] <: collection.Map[a, b] with MapLike[ + a, + b, + C[a, b] + ]]( + f: MapFactory[C] + ): Collector.Aux[(K, V), C[K, V]] = + make(Builder.fromMapFactory(f)) + + private[fs2] trait BuilderPlatform { self: Collector.Builder.type => + def fromFactory[A, C[_], B](f: Factory[A, C[B]]): Builder[A, C[B]] = + fromBuilder(f()) + + def fromTraversableFactory[A, C[x] <: Traversable[x] with GenericTraversableTemplate[x, C]]( + f: TraversableFactory[C] + ): Builder[A, C[A]] = + fromBuilder(f.newBuilder[A]) + + def fromMapFactory[K, V, C[a, b] <: collection.Map[a, b] with MapLike[a, b, C[a, b]]]( + f: MapFactory[C] + ): Builder[(K, V), C[K, V]] = + fromBuilder(f.newBuilder) + } +} diff --git a/core/shared/src/main/scala-2.13+/fs2/CollectorPlatform.scala b/core/shared/src/main/scala-2.13+/fs2/CollectorPlatform.scala new file mode 100644 index 0000000000..5aa59ad9c7 --- /dev/null +++ b/core/shared/src/main/scala-2.13+/fs2/CollectorPlatform.scala @@ -0,0 +1,28 @@ +package fs2 + +import scala.collection.{Factory, IterableFactory, MapFactory} + +import fs2.internal.{Resource => _, _} + +private[fs2] trait CollectorPlatform { self: Collector.type => + implicit def supportsFactory[A, C[_], B]( + f: Factory[A, C[B]] + ): Collector.Aux[A, C[B]] = make(Builder.fromFactory(f)) + + implicit def supportsIterableFactory[A, C[_]](f: IterableFactory[C]): Collector.Aux[A, C[A]] = + make(Builder.fromIterableFactory(f)) + + implicit def supportsMapFactory[K, V, C[_, _]](f: MapFactory[C]): Collector.Aux[(K, V), C[K, V]] = + make(Builder.fromMapFactory(f)) + + private[fs2] trait BuilderPlatform { self: Collector.Builder.type => + def fromFactory[A, C[_], B](f: Factory[A, C[B]]): Builder[A, C[B]] = + fromBuilder(f.newBuilder) + + def fromIterableFactory[A, C[_]](f: IterableFactory[C]): Builder[A, C[A]] = + fromBuilder(f.newBuilder) + + def fromMapFactory[K, V, C[_, _]](f: MapFactory[C]): Builder[(K, V), C[K, V]] = + fromBuilder(f.newBuilder) + } +} diff --git a/core/shared/src/main/scala/fs2/Chunk.scala b/core/shared/src/main/scala/fs2/Chunk.scala index d884c4cdf6..ba9ab60338 100644 --- a/core/shared/src/main/scala/fs2/Chunk.scala +++ b/core/shared/src/main/scala/fs2/Chunk.scala @@ -248,6 +248,9 @@ abstract class Chunk[+O] extends Serializable { self => arr } + /** Returns the elements of this chunk as an array, avoiding a copy if possible. */ + protected[fs2] def toArrayUnsafe[O2 >: O: ClassTag]: Array[O2] = toArray + /** * Converts this chunk to a `Chunk.Booleans`, allowing access to the underlying array of elements. * If this chunk is already backed by an unboxed array of booleans, this method runs in constant time. @@ -257,7 +260,7 @@ abstract class Chunk[+O] extends Serializable { self => val _ = ev // Convince scalac that ev is used this match { case c: Chunk.Booleans => c - case other => + case _ => Chunk.Booleans(this.asInstanceOf[Chunk[Boolean]].toArray, 0, size) } } @@ -271,7 +274,7 @@ abstract class Chunk[+O] extends Serializable { self => val _ = ev // Convince scalac that ev is used this match { case c: Chunk.Bytes => c - case other => Chunk.Bytes(this.asInstanceOf[Chunk[Byte]].toArray, 0, size) + case _ => Chunk.Bytes(this.asInstanceOf[Chunk[Byte]].toArray, 0, size) } } @@ -289,7 +292,7 @@ abstract class Chunk[+O] extends Serializable { self => (b: JBuffer).limit(c.offset.toInt + c.size) b } - case other => + case _ => JByteBuffer.wrap(this.asInstanceOf[Chunk[Byte]].toArray, 0, size) } } @@ -303,7 +306,7 @@ abstract class Chunk[+O] extends Serializable { self => val _ = ev // Convince scalac that ev is used this match { case c: Chunk.Shorts => c - case other => + case _ => Chunk.Shorts(this.asInstanceOf[Chunk[Short]].toArray, 0, size) } } @@ -317,7 +320,7 @@ abstract class Chunk[+O] extends Serializable { self => val _ = ev // Convince scalac that ev is used this match { case c: Chunk.Ints => c - case other => Chunk.Ints(this.asInstanceOf[Chunk[Int]].toArray, 0, size) + case _ => Chunk.Ints(this.asInstanceOf[Chunk[Int]].toArray, 0, size) } } @@ -330,7 +333,7 @@ abstract class Chunk[+O] extends Serializable { self => val _ = ev // Convince scalac that ev is used this match { case c: Chunk.Longs => c - case other => Chunk.Longs(this.asInstanceOf[Chunk[Long]].toArray, 0, size) + case _ => Chunk.Longs(this.asInstanceOf[Chunk[Long]].toArray, 0, size) } } @@ -343,7 +346,7 @@ abstract class Chunk[+O] extends Serializable { self => val _ = ev // Convince scalac that ev is used this match { case c: Chunk.Floats => c - case other => + case _ => Chunk.Floats(this.asInstanceOf[Chunk[Float]].toArray, 0, size) } } @@ -357,7 +360,7 @@ abstract class Chunk[+O] extends Serializable { self => val _ = ev // Convince scalac that ev is used this match { case c: Chunk.Doubles => c - case other => + case _ => Chunk.Doubles(this.asInstanceOf[Chunk[Double]].toArray, 0, size) } } @@ -646,7 +649,7 @@ object Chunk { values.size match { case 0 => empty case 1 => singleton(values(0)) - case n => + case _ => values match { case a: Array[Boolean] => booleans(a) case a: Array[Byte] => bytes(a) @@ -678,6 +681,10 @@ object Chunk { def size = length def apply(i: Int) = values(offset + i) + protected[fs2] override def toArrayUnsafe[O2 >: O: ClassTag]: Array[O2] = + if (offset == 0 && length == values.length) values.asInstanceOf[Array[O2]] + else values.slice(offset, length).asInstanceOf[Array[O2]] + def copyToArray[O2 >: O](xs: Array[O2], start: Int): Unit = if (xs.isInstanceOf[Array[AnyRef]]) System.arraycopy(values, offset, xs, start, length) @@ -721,6 +728,10 @@ object Chunk { def apply(i: Int) = values(offset + i) def at(i: Int) = values(offset + i) + protected[fs2] override def toArrayUnsafe[O2 >: Boolean: ClassTag]: Array[O2] = + if (offset == 0 && length == values.length) values.asInstanceOf[Array[O2]] + else values.slice(offset, length).asInstanceOf[Array[O2]] + def copyToArray[O2 >: Boolean](xs: Array[O2], start: Int): Unit = if (xs.isInstanceOf[Array[Boolean]]) System.arraycopy(values, offset, xs, start, length) @@ -763,6 +774,10 @@ object Chunk { def apply(i: Int) = values(offset + i) def at(i: Int) = values(offset + i) + protected[fs2] override def toArrayUnsafe[O2 >: Byte: ClassTag]: Array[O2] = + if (offset == 0 && length == values.length) values.asInstanceOf[Array[O2]] + else values.slice(offset, length).asInstanceOf[Array[O2]] + def copyToArray[O2 >: Byte](xs: Array[O2], start: Int): Unit = if (xs.isInstanceOf[Array[Byte]]) System.arraycopy(values, offset, xs, start, length) @@ -1127,6 +1142,10 @@ object Chunk { def apply(i: Int) = values(offset + i) def at(i: Int) = values(offset + i) + protected[fs2] override def toArrayUnsafe[O2 >: Short: ClassTag]: Array[O2] = + if (offset == 0 && length == values.length) values.asInstanceOf[Array[O2]] + else values.slice(offset, length).asInstanceOf[Array[O2]] + def copyToArray[O2 >: Short](xs: Array[O2], start: Int): Unit = if (xs.isInstanceOf[Array[Short]]) System.arraycopy(values, offset, xs, start, length) @@ -1168,6 +1187,10 @@ object Chunk { def apply(i: Int) = values(offset + i) def at(i: Int) = values(offset + i) + protected[fs2] override def toArrayUnsafe[O2 >: Int: ClassTag]: Array[O2] = + if (offset == 0 && length == values.length) values.asInstanceOf[Array[O2]] + else values.slice(offset, length).asInstanceOf[Array[O2]] + def copyToArray[O2 >: Int](xs: Array[O2], start: Int): Unit = if (xs.isInstanceOf[Array[Int]]) System.arraycopy(values, offset, xs, start, length) @@ -1209,6 +1232,10 @@ object Chunk { def apply(i: Int) = values(offset + i) def at(i: Int) = values(offset + i) + protected[fs2] override def toArrayUnsafe[O2 >: Long: ClassTag]: Array[O2] = + if (offset == 0 && length == values.length) values.asInstanceOf[Array[O2]] + else values.slice(offset, length).asInstanceOf[Array[O2]] + def copyToArray[O2 >: Long](xs: Array[O2], start: Int): Unit = if (xs.isInstanceOf[Array[Long]]) System.arraycopy(values, offset, xs, start, length) @@ -1251,6 +1278,10 @@ object Chunk { def apply(i: Int) = values(offset + i) def at(i: Int) = values(offset + i) + protected[fs2] override def toArrayUnsafe[O2 >: Float: ClassTag]: Array[O2] = + if (offset == 0 && length == values.length) values.asInstanceOf[Array[O2]] + else values.slice(offset, length).asInstanceOf[Array[O2]] + def copyToArray[O2 >: Float](xs: Array[O2], start: Int): Unit = if (xs.isInstanceOf[Array[Float]]) System.arraycopy(values, offset, xs, start, length) @@ -1293,6 +1324,10 @@ object Chunk { def apply(i: Int) = values(offset + i) def at(i: Int) = values(offset + i) + protected[fs2] override def toArrayUnsafe[O2 >: Double: ClassTag]: Array[O2] = + if (offset == 0 && length == values.length) values.asInstanceOf[Array[O2]] + else values.slice(offset, length).asInstanceOf[Array[O2]] + def copyToArray[O2 >: Double](xs: Array[O2], start: Int): Unit = if (xs.isInstanceOf[Array[Double]]) System.arraycopy(values, offset, xs, start, length) diff --git a/core/shared/src/main/scala/fs2/Collector.scala b/core/shared/src/main/scala/fs2/Collector.scala new file mode 100644 index 0000000000..df2770d742 --- /dev/null +++ b/core/shared/src/main/scala/fs2/Collector.scala @@ -0,0 +1,102 @@ +package fs2 + +import scala.reflect.ClassTag + +import scodec.bits.ByteVector + +/** + * Supports building a result of type `Out` from zero or more `Chunk[A]`. + * + * This is similar to the standard library collection builders but optimized for + * building a collection from a stream. + * + * The companion object provides implicit conversions (methods starting with `supports`), + * which adapts various collections to the `Collector` trait. + * + * The output type is a type member instead of a type parameter to avoid overloading + * resolution limitations with `s.compile.to[C]` vs `s.compile.to(C)`. + */ +trait Collector[A] { + type Out + def newBuilder: Collector.Builder[A, Out] +} + +object Collector extends CollectorPlatform { + type Aux[A, X] = Collector[A] { type Out = X } + + def string: Collector.Aux[String, String] = + make(Builder.string) + + implicit def supportsArray[A: ClassTag](a: Array.type): Collector.Aux[A, Array[A]] = { + val _ = a + make(implicitly[ClassTag[A]] match { + case ClassTag.Byte => + Builder.byteArray.asInstanceOf[Builder[A, Array[A]]] + case _ => Builder.array[A] + }) + } + + implicit def supportsChunk[A](c: Chunk.type): Collector.Aux[A, Chunk[A]] = { + val _ = c + make(Builder.chunk) + } + + implicit def supportsByteVector(b: ByteVector.type): Collector.Aux[Byte, ByteVector] = { + val _ = b + make(Builder.byteVector) + } + + protected def make[A, X](nb: => Builder[A, X]): Collector.Aux[A, X] = + new Collector[A] { + type Out = X + def newBuilder = nb + } + + /** Builds a value of type `X` from zero or more `Chunk[A]`. */ + trait Builder[A, X] { self => + def +=(c: Chunk[A]): Unit + def result: X + + def mapResult[Y](f: X => Y): Builder[A, Y] = new Builder[A, Y] { + def +=(c: Chunk[A]): Unit = self += c + def result: Y = f(self.result) + } + } + + object Builder extends BuilderPlatform { + def byteArray: Builder[Byte, Array[Byte]] = + byteVector.mapResult(_.toArray) + + def chunk[A]: Builder[A, Chunk[A]] = + new Builder[A, Chunk[A]] { + private[this] var queue = Chunk.Queue.empty[A] + def +=(c: Chunk[A]): Unit = queue = queue :+ c + def result: Chunk[A] = queue.toChunk + } + + def array[A: ClassTag]: Builder[A, Array[A]] = + chunk.mapResult(_.toArray) + + protected def fromBuilder[A, C[_], B]( + builder: collection.mutable.Builder[A, C[B]] + ): Builder[A, C[B]] = + new Builder[A, C[B]] { + def +=(c: Chunk[A]): Unit = builder ++= c.iterator + def result: C[B] = builder.result + } + + def string: Builder[String, String] = + new Builder[String, String] { + private[this] val builder = new StringBuilder + def +=(c: Chunk[String]): Unit = c.foreach(s => builder ++= s) + def result: String = builder.toString + } + + def byteVector: Builder[Byte, ByteVector] = + new Builder[Byte, ByteVector] { + private[this] var acc = ByteVector.empty + def +=(c: Chunk[Byte]): Unit = acc = acc ++ c.toByteVector + def result: ByteVector = acc + } + } +} diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index 881632f79f..acc14b8b82 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -478,8 +478,8 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Nothing, O, Unit] this.pull .find(pf.isDefinedAt) .flatMap { - case None => Pull.done - case Some((hd, tl)) => Pull.output1(pf(hd)) + case None => Pull.done + case Some((hd, _)) => Pull.output1(pf(hd)) } .stream @@ -1039,20 +1039,20 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Nothing, O, Unit] }.flatten /** - * Like `filterNot`, but allows filtering based on an effect. - * - * Note: The result Stream will consist of chunks that are empty or 1-element-long. - * If you want to operate on chunks after using it, consider buffering, e.g. by using [[buffer]]. - */ + * Like `filterNot`, but allows filtering based on an effect. + * + * Note: The result Stream will consist of chunks that are empty or 1-element-long. + * If you want to operate on chunks after using it, consider buffering, e.g. by using [[buffer]]. + */ def evalFilterNot[F2[x] >: F[x]: Functor](f: O => F2[Boolean]): Stream[F2, O] = flatMap(o => Stream.eval(f(o)).ifM(Stream.empty, Stream.emit(o))) /** - * Like `filterNot`, but allows filtering based on an effect, with up to [[maxConcurrent]] concurrently running effects. - * The ordering of emitted elements is unchanged. - */ + * Like `filterNot`, but allows filtering based on an effect, with up to [[maxConcurrent]] concurrently running effects. + * The ordering of emitted elements is unchanged. + */ def evalFilterNotAsync[F2[x] >: F[x]: Concurrent]( - maxConcurrent: Int + maxConcurrent: Int )(f: O => F2[Boolean]): Stream[F2, O] = parEvalMap[F2, Stream[F2, O]](maxConcurrent) { o => f(o).map(if (_) Stream.empty else Stream.emit(o)) @@ -1107,7 +1107,7 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Nothing, O, Unit] this.pull .find(f) .flatMap { - _.map { case (hd, tl) => Pull.output1(hd) }.getOrElse(Pull.done) + _.map { case (hd, _) => Pull.output1(hd) }.getOrElse(Pull.done) } .stream @@ -1139,7 +1139,7 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Nothing, O, Unit] case Result.Fail(err) => Result.Fail(err) case Result.Interrupted(scopeId: Token, err) => Stream.fromFreeC(Algebra.interruptBoundary(tl, scopeId, err)).flatMap(f).get - case Result.Interrupted(invalid, err) => + case Result.Interrupted(invalid, _) => sys.error(s"Invalid interruption context: $invalid (flatMap)") } } @@ -1405,7 +1405,7 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Nothing, O, Unit] } .onFinalize { currentTimeout.modify { - case st @ (cancelInFlightTimeout, streamTerminated) => + case (cancelInFlightTimeout, _) => (F.unit, true) -> cancelInFlightTimeout }.flatten } @@ -1816,7 +1816,6 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Nothing, O, Unit] Ref.of[F2, Boolean](false).flatMap { otherSideDone => Queue.unbounded[F2, Option[Stream[F2, O2]]].map { resultQ => def runStream( - tag: String, s: Stream[F2, O2], whenDone: Deferred[F2, Either[Throwable, Unit]] ): F2[Unit] = @@ -1854,8 +1853,8 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Nothing, O, Unit] .interruptWhen(interrupt.get.attempt) Stream.bracket( - F2.start(runStream("L", this, resultL)) >> - F2.start(runStream("R", that, resultR)) + F2.start(runStream(this, resultL)) >> + F2.start(runStream(that, resultR)) ) { _ => interrupt .complete(()) @@ -2132,7 +2131,7 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Nothing, O, Unit] .attempt .flatMap { case Left(err) => stop(Some(err)) >> decrementRunning - case Right(r) => F2.unit >> decrementRunning + case Right(_) => F2.unit >> decrementRunning } // awaits when all streams (outer + inner) finished, @@ -2645,7 +2644,8 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Nothing, O, Unit] * Applies the given sink to this stream. */ @deprecated("Use .through instead", "1.0.2") - def to[F2[x] >: F[x]](f: Stream[F, O] => Stream[F2, Unit]): Stream[F2, Unit] = f(this) + private[Stream] def to[F2[x] >: F[x]](f: Stream[F, O] => Stream[F2, Unit]): Stream[F2, Unit] = + f(this) /** * Translates effect type from `F` to `G` using the supplied `FunctionK`. @@ -2876,7 +2876,7 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Nothing, O, Unit] def zipWith[F2[x] >: F[x], O2 >: O, O3, O4]( that: Stream[F2, O3] )(f: (O2, O3) => O4): Stream[F2, O4] = - zipWith_[F2, O2, O3, O4](that)(sh => Pull.pure(None), h => Pull.pure(None))(f) + zipWith_[F2, O2, O3, O4](that)(_ => Pull.pure(None), _ => Pull.pure(None))(f) /** * Zips the elements of the input stream with its indices, and returns the new stream. @@ -3730,17 +3730,23 @@ object Stream extends StreamLowPriority { def covary[F[_]]: Stream[F, O] = self /** Runs this pure stream and returns the emitted elements in a collection of the specified type. Note: this method is only available on pure streams. */ - def to[C[_]](implicit f: Factory[O, C[O]]): C[O] = - self.covary[IO].compile.to[C].unsafeRunSync + def to(c: Collector[O]): c.Out = to_(c) + + @inline private def to_(c: Collector[O]): c.Out = + self.covary[IO].compile.to(c).unsafeRunSync + + /** Runs this pure stream and returns the emitted elements in a collection of the specified type. Note: this method is only available on pure streams. */ + def to[C[_]](implicit f: Factory[O, C[O]]): C[O] = to_(f) /** Runs this pure stream and returns the emitted elements in a chunk. Note: this method is only available on pure streams. */ - def toChunk: Chunk[O] = self.covary[IO].compile.toChunk.unsafeRunSync + @deprecated("2.0.2", "Use .to(Chunk) instead") + def toChunk: Chunk[O] = to_(Chunk) /** Runs this pure stream and returns the emitted elements in a list. Note: this method is only available on pure streams. */ - def toList: List[O] = self.covary[IO].compile.toList.unsafeRunSync + def toList: List[O] = to_(List) /** Runs this pure stream and returns the emitted elements in a vector. Note: this method is only available on pure streams. */ - def toVector: Vector[O] = self.covary[IO].compile.toVector.unsafeRunSync + def toVector: Vector[O] = to_(Vector) } /** Provides syntax for streams with effect type `cats.Id`. */ @@ -3773,18 +3779,23 @@ object Stream extends StreamLowPriority { } /** Runs this fallible stream and returns the emitted elements in a collection of the specified type. Note: this method is only available on fallible streams. */ - def to[C[_]](implicit f: Factory[O, C[O]]): Either[Throwable, C[O]] = - lift[IO].compile.to[C].attempt.unsafeRunSync + def to(c: Collector[O]): Either[Throwable, c.Out] = to_(c) + + @inline private def to_(c: Collector[O]): Either[Throwable, c.Out] = + lift[IO].compile.to(c).attempt.unsafeRunSync + + /** Runs this fallible stream and returns the emitted elements in a collection of the specified type. Note: this method is only available on fallible streams. */ + def to[C[_]](implicit f: Factory[O, C[O]]): Either[Throwable, C[O]] = to_(f) /** Runs this fallible stream and returns the emitted elements in a chunk. Note: this method is only available on fallible streams. */ - def toChunk: Either[Throwable, Chunk[O]] = lift[IO].compile.toChunk.attempt.unsafeRunSync + @deprecated("2.0.2", "Use .to(Chunk) instead") + def toChunk: Either[Throwable, Chunk[O]] = to_(Chunk) /** Runs this fallible stream and returns the emitted elements in a list. Note: this method is only available on fallible streams. */ - def toList: Either[Throwable, List[O]] = lift[IO].compile.toList.attempt.unsafeRunSync + def toList: Either[Throwable, List[O]] = to_(List) /** Runs this fallible stream and returns the emitted elements in a vector. Note: this method is only available on fallible streams. */ - def toVector: Either[Throwable, Vector[O]] = - lift[IO].compile.toVector.attempt.unsafeRunSync + def toVector: Either[Throwable, Vector[O]] = to_(Vector) } /** Projection of a `Stream` providing various ways to get a `Pull` from the `Stream`. */ @@ -3886,7 +3897,7 @@ object Stream extends StreamLowPriority { hd.size.toLong match { case m if m < n => tl.pull.drop(n - m) case m if m == n => Pull.pure(Some(tl)) - case m => Pull.pure(Some(tl.cons(hd.drop(n.toInt)))) + case _ => Pull.pure(Some(tl.cons(hd.drop(n.toInt)))) } } @@ -4085,7 +4096,7 @@ object Stream extends StreamLowPriority { hd.size.toLong match { case m if m < n => Pull.output(hd) >> tl.pull.take(n - m) case m if m == n => Pull.output(hd).as(Some(tl)) - case m => + case _ => val (pfx, sfx) = hd.splitAt(n.toInt) Pull.output(pfx).as(Some(tl.cons(sfx))) } @@ -4399,7 +4410,7 @@ object Stream extends StreamLowPriority { /** * Compiles this stream of strings in to a single string. * This is more efficient than `foldMonoid` because it uses a `StringBuilder` - * internally, minimizing string creation. + * internally, avoiding intermediate string creation. * * @example {{{ * scala> Stream("Hello ", "world!").compile.string @@ -4408,29 +4419,49 @@ object Stream extends StreamLowPriority { */ def string(implicit ev: O <:< String): G[String] = { val _ = ev - compiler(self.asInstanceOf[Stream[F, String]], () => new StringBuilder)((b, c) => { - c.foreach { s => - b.append(s); () - } - b - }, _.result) + self.asInstanceOf[Stream[F, String]].compile.to(Collector.string) } /** - * Compiles this stream into a value of the target effect type `F` by logging - * the output values to a `C`, given a `Factory`. + * Compiles this stream into a value of the target effect type `F` by collecting + * all of the output values in a collection. + * + * Collection building is done via an explicitly passed `Collector`. + * Standard library collections have collector instances, allowing syntax like: + * `s.compile.to(List)` or `s.compile.to(Array)` or `s.compile.to(Map)`. + * + * A collector is provided for `scodec.bits.ByteVector`, providing efficient byte + * vector construction from a stream of bytes: `s.compile.to(ByteVector)`. * * When this method has returned, the stream has not begun execution -- this method simply * compiles the stream down to the target effect type. * * @example {{{ * scala> import cats.effect.IO - * scala> Stream.range(0,100).take(5).covary[IO].compile.toList.unsafeRunSync + * scala> val s = Stream.range(0,100).take(5).covary[IO] + * scala> s.compile.to(List).unsafeRunSync * res0: List[Int] = List(0, 1, 2, 3, 4) + * scala> s.compile.to(Chunk).unsafeRunSync + * res1: Chunk[Int] = Chunk(0, 1, 2, 3, 4) + * scala> s.map(i => (i % 2, i)).compile.to(Map).unsafeRunSync + * res2: Map[Int, Int] = Map(0 -> 4, 1 -> 3) + * scala> s.map(_.toByte).compile.to(scodec.bits.ByteVector).unsafeRunSync + * res3: scodec.bits.ByteVector = ByteVector(5 bytes, 0x0001020304) * }}} */ - def to[C[_]](implicit f: Factory[O, C[O]]): G[C[O]] = - compiler(self, () => f.newBuilder)(_ ++= _.iterator, _.result) + def to(collector: Collector[O]): G[collector.Out] = to_(collector) + + @inline private def to_(collector: Collector[O]): G[collector.Out] = + compiler(self, () => collector.newBuilder)((acc, c) => { acc += c; acc }, _.result) + + /** + * Compiles this stream into a value of the target effect type `F` by logging + * the output values to a `C`, given a `Factory`. + * + * When this method has returned, the stream has not begun execution -- this method simply + * compiles the stream down to the target effect type. + */ + def to[C[_]](implicit f: Factory[O, C[O]]): G[C[O]] = to_(f) /** * Compiles this stream in to a value of the target effect type `F` by logging @@ -4438,15 +4469,9 @@ object Stream extends StreamLowPriority { * * When this method has returned, the stream has not begun execution -- this method simply * compiles the stream down to the target effect type. - * - * @example {{{ - * scala> import cats.effect.IO - * scala> Stream.range(0,100).take(5).covary[IO].compile.toChunk.unsafeRunSync - * res0: Chunk[Int] = Chunk(0, 1, 2, 3, 4) - * }}} */ - def toChunk: G[Chunk[O]] = - compiler(self, () => List.newBuilder[Chunk[O]])(_ += _, bldr => Chunk.concat(bldr.result)) + @deprecated("2.0.2", "Use .compile.to(Chunk) instead") + def toChunk: G[Chunk[O]] = to_(Chunk) /** * Compiles this stream in to a value of the target effect type `F` by logging @@ -4461,8 +4486,7 @@ object Stream extends StreamLowPriority { * res0: List[Int] = List(0, 1, 2, 3, 4) * }}} */ - def toList: G[List[O]] = - to[List] + def toList: G[List[O]] = to_(List) /** * Compiles this stream in to a value of the target effect type `F` by logging @@ -4477,8 +4501,7 @@ object Stream extends StreamLowPriority { * res0: Vector[Int] = Vector(0, 1, 2, 3, 4) * }}} */ - def toVector: G[Vector[O]] = - to[Vector] + def toVector: G[Vector[O]] = to_(Vector) /** * Compiles this stream in to a value of the target effect type `F` by logging @@ -4486,19 +4509,11 @@ object Stream extends StreamLowPriority { * * When this method has returned, the stream has not begun execution -- this method simply * compiles the stream down to the target effect type. - * - * @example {{{ - * scala> import cats.effect.IO - * scala> Stream.range(0,100).map(i => i -> i).take(5).covary[IO].compile.toMap.unsafeRunSync.mkString(", ") - * res0: String = 0 -> 0, 1 -> 1, 2 -> 2, 3 -> 3, 4 -> 4 - * }}} */ + @deprecated("2.0.2", "Use .compile.to(Map) instead") def toMap[K, V](implicit ev: O <:< (K, V)): G[Map[K, V]] = { val _ = ev - compiler(self.asInstanceOf[Stream[F, (K, V)]], () => Map.newBuilder[K, V])( - _ ++= _.iterator, - _.result - ) + self.asInstanceOf[Stream[F, (K, V)]].compile.to_(Map) } } diff --git a/core/shared/src/main/scala/fs2/internal/Algebra.scala b/core/shared/src/main/scala/fs2/internal/Algebra.scala index e5de7d31af..013f48b3a1 100644 --- a/core/shared/src/main/scala/fs2/internal/Algebra.scala +++ b/core/shared/src/main/scala/fs2/internal/Algebra.scala @@ -105,7 +105,7 @@ private[fs2] object Algebra { ) } - case Result.Interrupted(ctx, err) => sys.error(s"Impossible context: $ctx") + case Result.Interrupted(ctx, _) => sys.error(s"Impossible context: $ctx") } } @@ -405,16 +405,16 @@ private[fs2] object Algebra { view.step match { case output: Output[F, X] => Output[G, X](output.values).transformWith { - case r @ Result.Pure(v) if isMainLevel => + case r @ Result.Pure(_) if isMainLevel => translateStep(view.next(r), isMainLevel) - case r @ Result.Pure(v) if !isMainLevel => + case r @ Result.Pure(_) if !isMainLevel => // Cast is safe here, as at this point the evaluation of this Step will end // and the remainder of the free will be passed as a result in Bind. As such // next Step will have this to evaluate, and will try to translate again. view.next(r).asInstanceOf[FreeC[G, X, Unit]] - case r @ Result.Fail(err) => translateStep(view.next(r), isMainLevel) + case r @ Result.Fail(_) => translateStep(view.next(r), isMainLevel) case r @ Result.Interrupted(_, _) => translateStep(view.next(r), isMainLevel) } @@ -448,7 +448,7 @@ private[fs2] object Algebra { Acquire[G, r](fK(a.resource), (r, ec) => fK(a.release(r, ec))) .asInstanceOf[AlgEffect[G, R]] case e: Eval[F, R] => Eval[G, R](fK(e.value)) - case o: OpenScope[F] => OpenScope[G](concurrent).asInstanceOf[AlgEffect[G, R]] + case _: OpenScope[F] => OpenScope[G](concurrent).asInstanceOf[AlgEffect[G, R]] case c: CloseScope[F] => c.asInstanceOf[AlgEffect[G, R]] case g: GetScope[F] => g.asInstanceOf[AlgEffect[G, R]] } diff --git a/core/shared/src/main/scala/fs2/internal/FreeC.scala b/core/shared/src/main/scala/fs2/internal/FreeC.scala index dc1eaa6d36..490234eb0a 100644 --- a/core/shared/src/main/scala/fs2/internal/FreeC.scala +++ b/core/shared/src/main/scala/fs2/internal/FreeC.scala @@ -37,7 +37,7 @@ private[fs2] abstract class FreeC[F[_], +O, +R] { def append[O2 >: O, R2](post: => FreeC[F, O2, R2]): FreeC[F, O2, R2] = new Bind[F, O2, R, R2](this) { def cont(r: Result[R]): FreeC[F, O2, R2] = r match { - case r: Result.Pure[F, _] => post + case _: Result.Pure[F, _] => post case r: Result.Interrupted[F, _] => r case r: Result.Fail[F] => r } diff --git a/core/shared/src/main/scala/fs2/text.scala b/core/shared/src/main/scala/fs2/text.scala index 0c1e71fa07..c9bbf69530 100644 --- a/core/shared/src/main/scala/fs2/text.scala +++ b/core/shared/src/main/scala/fs2/text.scala @@ -137,7 +137,7 @@ object text { start = i + 2 i += 1 } - case c => + case _ => () } i += 1 diff --git a/core/shared/src/test/scala/fs2/StreamSpec.scala b/core/shared/src/test/scala/fs2/StreamSpec.scala index 91e97e0593..a250337cd2 100644 --- a/core/shared/src/test/scala/fs2/StreamSpec.scala +++ b/core/shared/src/test/scala/fs2/StreamSpec.scala @@ -177,7 +177,7 @@ class StreamSpec extends Fs2Spec { } val bracketsInSequence = if (isJVM) 1000000 else 10000 - bracketsInSequence + " brackets in sequence" in { + s"$bracketsInSequence brackets in sequence" in { Counter[IO].flatMap { counter => Stream .range(0, bracketsInSequence) @@ -445,7 +445,7 @@ class StreamSpec extends Fs2Spec { "chunk" in { forAll { (c: Chunk[Int]) => - Stream.chunk(c).toChunk shouldBe c + Stream.chunk(c).to(Chunk) shouldBe c } } From 8572720967690d6500d190afd0ef96191530e432 Mon Sep 17 00:00:00 2001 From: Michael Pilquist Date: Sat, 12 Oct 2019 16:45:51 -0400 Subject: [PATCH 2/5] Fixed binary compat --- core/shared/src/main/scala/fs2/Stream.scala | 10 +-- .../src/test/scala/fs2/ChunkQueueSpec.scala | 4 +- .../shared/src/test/scala/fs2/ChunkSpec.scala | 2 +- core/shared/src/test/scala/fs2/Fs2Spec.scala | 2 +- .../scala/fs2/StreamPerformanceSpec.scala | 2 +- .../src/test/scala/fs2/StreamSpec.scala | 72 +++++++++---------- .../test/scala/fs2/concurrent/TopicSpec.scala | 2 +- .../scala/scalacheck/GeneratorCompat.scala | 2 +- 8 files changed, 48 insertions(+), 48 deletions(-) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index acc14b8b82..e80f25f6c1 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -905,7 +905,7 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Nothing, O, Unit] * scala> import scala.concurrent.duration._, cats.effect.{ContextShift, IO, Timer} * scala> implicit val cs: ContextShift[IO] = IO.contextShift(scala.concurrent.ExecutionContext.Implicits.global) * scala> implicit val timer: Timer[IO] = IO.timer(scala.concurrent.ExecutionContext.Implicits.global) - * scala> val s1 = Stream.awakeEvery[IO](1000.millis).scan(0)((acc, i) => acc + 1) + * scala> val s1 = Stream.awakeEvery[IO](1000.millis).scan(0)((acc, _) => acc + 1) * scala> val s = s1.either(Stream.sleep_[IO](500.millis) ++ s1).take(10) * scala> s.take(10).compile.toVector.unsafeRunSync * res0: Vector[Either[Int,Int]] = Vector(Left(0), Right(0), Left(1), Right(1), Left(2), Right(2), Left(3), Right(3), Left(4), Right(4)) @@ -1415,7 +1415,7 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Nothing, O, Unit] * If `this` terminates with `Stream.raiseError(e)`, invoke `h(e)`. * * @example {{{ - * scala> Stream(1, 2, 3).append(Stream.raiseError[cats.effect.IO](new RuntimeException)).handleErrorWith(t => Stream(0)).compile.toList.unsafeRunSync() + * scala> Stream(1, 2, 3).append(Stream.raiseError[cats.effect.IO](new RuntimeException)).handleErrorWith(_ => Stream(0)).compile.toList.unsafeRunSync() * res0: List[Int] = List(1, 2, 3, 0) * }}} */ @@ -1800,7 +1800,7 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Nothing, O, Unit] * scala> import scala.concurrent.duration._, cats.effect.{ContextShift, IO, Timer} * scala> implicit val cs: ContextShift[IO] = IO.contextShift(scala.concurrent.ExecutionContext.Implicits.global) * scala> implicit val timer: Timer[IO] = IO.timer(scala.concurrent.ExecutionContext.Implicits.global) - * scala> val s1 = Stream.awakeEvery[IO](500.millis).scan(0)((acc, i) => acc + 1) + * scala> val s1 = Stream.awakeEvery[IO](500.millis).scan(0)((acc, _) => acc + 1) * scala> val s = s1.merge(Stream.sleep_[IO](250.millis) ++ s1) * scala> s.take(6).compile.toVector.unsafeRunSync * res0: Vector[Int] = Vector(0, 0, 1, 1, 2, 2) @@ -2334,7 +2334,7 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Nothing, O, Unit] * Preserves chunkiness. * * @example {{{ - * scala> Stream(Right(1), Right(2), Left(new RuntimeException), Right(3)).rethrow[cats.effect.IO, Int].handleErrorWith(t => Stream(-1)).compile.toList.unsafeRunSync + * scala> Stream(Right(1), Right(2), Left(new RuntimeException), Right(3)).rethrow[cats.effect.IO, Int].handleErrorWith(_ => Stream(-1)).compile.toList.unsafeRunSync * res0: List[Int] = List(-1) * }}} */ @@ -2644,7 +2644,7 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Nothing, O, Unit] * Applies the given sink to this stream. */ @deprecated("Use .through instead", "1.0.2") - private[Stream] def to[F2[x] >: F[x]](f: Stream[F, O] => Stream[F2, Unit]): Stream[F2, Unit] = + private[fs2] def to[F2[x] >: F[x]](f: Stream[F, O] => Stream[F2, Unit]): Stream[F2, Unit] = f(this) /** diff --git a/core/shared/src/test/scala/fs2/ChunkQueueSpec.scala b/core/shared/src/test/scala/fs2/ChunkQueueSpec.scala index 9fd583bb9d..deff2eda8a 100644 --- a/core/shared/src/test/scala/fs2/ChunkQueueSpec.scala +++ b/core/shared/src/test/scala/fs2/ChunkQueueSpec.scala @@ -37,7 +37,7 @@ class ChunkQueueSpec extends Fs2Spec { } "equals" in { - forAll { (chunks: List[Chunk[Int]], n: Int) => + forAll { (chunks: List[Chunk[Int]]) => val cq = Chunk.Queue(chunks: _*) cq shouldBe cq cq shouldBe Chunk.Queue(chunks: _*) @@ -47,7 +47,7 @@ class ChunkQueueSpec extends Fs2Spec { } "hashCode" in { - forAll { (chunks: List[Chunk[Int]], n: Int) => + forAll { (chunks: List[Chunk[Int]]) => val cq = Chunk.Queue(chunks: _*) cq.hashCode shouldBe cq.hashCode cq.hashCode shouldBe Chunk.Queue(chunks: _*).hashCode diff --git a/core/shared/src/test/scala/fs2/ChunkSpec.scala b/core/shared/src/test/scala/fs2/ChunkSpec.scala index 25facbd5ab..43059bb6bd 100644 --- a/core/shared/src/test/scala/fs2/ChunkSpec.scala +++ b/core/shared/src/test/scala/fs2/ChunkSpec.scala @@ -52,7 +52,7 @@ class ChunkSpec extends Fs2Spec { val c = Chunk.seq(as) try c shouldBe a[Chunk.Boxed[_]] // 2.11/2.12 catch { - case NonFatal(t) => c shouldBe a[Chunk.Ints] // 2.13+ + case NonFatal(_) => c shouldBe a[Chunk.Ints] // 2.13+ } } } diff --git a/core/shared/src/test/scala/fs2/Fs2Spec.scala b/core/shared/src/test/scala/fs2/Fs2Spec.scala index 0925c1ae9b..5c692cba4a 100644 --- a/core/shared/src/test/scala/fs2/Fs2Spec.scala +++ b/core/shared/src/test/scala/fs2/Fs2Spec.scala @@ -94,7 +94,7 @@ abstract class Fs2Spec */ def assertThrows[E <: Throwable](implicit F: Sync[F], ct: reflect.ClassTag[E]): F[Assertion] = self.attempt.flatMap { - case Left(t: E) => F.pure(Succeeded: Assertion) + case Left(_: E) => F.pure(Succeeded: Assertion) case Left(t) => F.delay( fail( diff --git a/core/shared/src/test/scala/fs2/StreamPerformanceSpec.scala b/core/shared/src/test/scala/fs2/StreamPerformanceSpec.scala index 99f80e23ee..98b8127ccb 100644 --- a/core/shared/src/test/scala/fs2/StreamPerformanceSpec.scala +++ b/core/shared/src/test/scala/fs2/StreamPerformanceSpec.scala @@ -46,7 +46,7 @@ class StreamPerformanceSpec extends Fs2Spec { pending (1 until N) .map(Stream.emit) - .foldLeft(Stream.emit(0))((acc, a) => acc.map(_ + 1)) + .foldLeft(Stream.emit(0))((acc, _) => acc.map(_ + 1)) .toVector shouldBe Vector(N - 1) } } diff --git a/core/shared/src/test/scala/fs2/StreamSpec.scala b/core/shared/src/test/scala/fs2/StreamSpec.scala index a250337cd2..c4c6ce8286 100644 --- a/core/shared/src/test/scala/fs2/StreamSpec.scala +++ b/core/shared/src/test/scala/fs2/StreamSpec.scala @@ -47,7 +47,7 @@ class StreamSpec extends Fs2Spec { "liveness" in { val s = Stream .awakeEvery[IO](1.milli) - .evalMap { i => + .evalMap { _ => IO.async[Unit](cb => executionContext.execute(() => cb(Right(())))) } .take(200) @@ -79,7 +79,7 @@ class StreamSpec extends Fs2Spec { .flatMap(_ => use) .compile .drain - .handleErrorWith { case t: Err => Sync[F].pure(()) } + .handleErrorWith { case _: Err => Sync[F].pure(()) } _ <- events.get.asserting { _ shouldBe Vector(Acquired, Released) } } yield () @@ -99,7 +99,7 @@ class StreamSpec extends Fs2Spec { .append(recordBracketEvents(events).flatMap(_ => use2)) .compile .drain - .handleErrorWith { case t: Err => Sync[F].pure(()) } + .handleErrorWith { case _: Err => Sync[F].pure(()) } _ <- events.get.asserting { _ shouldBe Vector(Acquired, Released, Acquired, Released) } } yield () @@ -208,7 +208,7 @@ class StreamSpec extends Fs2Spec { var o: Vector[Int] = Vector.empty (0 until 10) .foldLeft(Stream.eval(IO(0))) { (acc, i) => - Stream.bracket(IO(i))(i => IO { o = o :+ i }).flatMap(i => acc) + Stream.bracket(IO(i))(i => IO { o = o :+ i }).flatMap(_ => acc) } .compile .drain @@ -221,7 +221,7 @@ class StreamSpec extends Fs2Spec { var o: Vector[Int] = Vector.empty (0 until 10) .foldLeft(Stream.emit(1).map(_ => throw new Err).covaryAll[IO, Int]) { (acc, i) => - Stream.emit(i) ++ Stream.bracket(IO(i))(i => IO { o = o :+ i }).flatMap(i => acc) + Stream.emit(i) ++ Stream.bracket(IO(i))(i => IO { o = o :+ i }).flatMap(_ => acc) } .attempt .compile @@ -445,7 +445,7 @@ class StreamSpec extends Fs2Spec { "chunk" in { forAll { (c: Chunk[Int]) => - Stream.chunk(c).to(Chunk) shouldBe c + Stream.chunk(c).compile.to(Chunk) shouldBe c } } @@ -753,7 +753,7 @@ class StreamSpec extends Fs2Spec { .delayBy[IO](25.millis) .append(s) .concurrently(Stream.raiseError[IO](new Err)) - .evalTap(i => gate.get) + .evalTap(_ => gate.get) } .compile .drain @@ -779,7 +779,7 @@ class StreamSpec extends Fs2Spec { Stream .bracket(IO.unit)(_ => finRef.update(_ :+ "Outer")) - .flatMap { b => + .flatMap { _ => s.covary[IO].concurrently(runner) } .interruptWhen(halt.get.attempt) @@ -970,7 +970,7 @@ class StreamSpec extends Fs2Spec { case (sem, sig) => val tested = s .covary[IO] - .evalFilterAsync(n) { elem => + .evalFilterAsync(n) { _ => val ensureAcquired = sem.tryAcquire.ifM( IO.unit, @@ -1050,7 +1050,7 @@ class StreamSpec extends Fs2Spec { case (sem, sig) => val tested = s .covary[IO] - .evalFilterNotAsync(n) { elem => + .evalFilterNotAsync(n) { _ => val ensureAcquired = sem.tryAcquire.ifM( IO.unit, @@ -1118,13 +1118,13 @@ class StreamSpec extends Fs2Spec { .through(durationSinceLastTrue) (IO.shift >> durationsSinceSpike.compile.toVector).unsafeToFuture().map { result => - val (head :: tail) = result.toList - withClue("every always emits true first") { assert(head._1) } - withClue("true means the delay has passed: " + tail) { - assert(tail.filter(_._1).map(_._2).forall { _ >= delay }) + val list = result.toList + withClue("every always emits true first") { assert(list.head._1) } + withClue(s"true means the delay has passed: ${list.tail}") { + assert(list.tail.filter(_._1).map(_._2).forall { _ >= delay }) } - withClue("false means the delay has not passed: " + tail) { - assert(tail.filterNot(_._1).map(_._2).forall { _ <= delay }) + withClue(s"false means the delay has not passed: ${list.tail}") { + assert(list.tail.filterNot(_._1).map(_._2).forall { _ <= delay }) } } } @@ -1463,7 +1463,7 @@ class StreamSpec extends Fs2Spec { .range(0, 10) .covary[SyncIO] .append(Stream.raiseError[SyncIO](new Err)) - .handleErrorWith { t => + .handleErrorWith { _ => i += 1; Stream.empty } .compile @@ -1492,7 +1492,7 @@ class StreamSpec extends Fs2Spec { (Stream .range(0, 3) .covary[SyncIO] ++ Stream.raiseError[SyncIO](new Err)).unchunk.pull.echo - .handleErrorWith { t => + .handleErrorWith { _ => i += 1; Pull.done } .stream @@ -1821,7 +1821,7 @@ class StreamSpec extends Fs2Spec { s.covary[IO] .append(Stream(1)) .interruptWhen(interrupt) - .map(i => None) + .map(_ => None) .append(s.map(Some(_))) .flatMap { case None => Stream.eval(IO.never) @@ -1864,8 +1864,8 @@ class StreamSpec extends Fs2Spec { .pull .uncons .flatMap { - case None => Pull.done - case Some((hd, tl)) => Pull.eval(IO.never) + case None => Pull.done + case Some((_, _)) => Pull.eval(IO.never) } .stream .interruptScope @@ -2106,7 +2106,7 @@ class StreamSpec extends Fs2Spec { val prg0 = bracketed - .flatMap { b => + .flatMap { _ => (Stream.bracket(register("L"))(_ => finalizer("L")) >> s) .merge( Stream.bracket(register("R"))(_ => finalizer("R")) >> @@ -2148,7 +2148,7 @@ class StreamSpec extends Fs2Spec { "hangs" - { val full = if (isJVM) Stream.constant(42) else Stream.constant(42).evalTap(_ => IO.shift) - val hang = Stream.repeatEval(IO.async[Unit](cb => ())) + val hang = Stream.repeatEval(IO.async[Unit](_ => ())) val hang2: Stream[IO, Nothing] = full.drain val hang3: Stream[IO, Nothing] = Stream @@ -2246,7 +2246,7 @@ class StreamSpec extends Fs2Spec { "handle multiple consecutive observations" in { forAll { (s: Stream[Pure, Int]) => val expected = s.toList - val sink: Pipe[IO, Int, Unit] = _.evalMap(i => IO.unit) + val sink: Pipe[IO, Int, Unit] = _.evalMap(_ => IO.unit) observer(observer(s.covary[IO])(sink))(sink).compile.toList .asserting(_ shouldBe expected) } @@ -2463,7 +2463,7 @@ class StreamSpec extends Fs2Spec { "hangs" - { val full = if (isJVM) Stream.constant(42) else Stream.constant(42).evalTap(_ => IO.shift) - val hang = Stream.repeatEval(IO.async[Unit](cb => ())) + val hang = Stream.repeatEval(IO.async[Unit](_ => ())) val hang2: Stream[IO, Nothing] = full.drain val hang3: Stream[IO, Nothing] = Stream @@ -2506,7 +2506,7 @@ class StreamSpec extends Fs2Spec { } } - "pause" in forAll { (s1: Stream[Pure, Int]) => + "pause" in { Stream .eval(SignallingRef[IO, Boolean](false)) .flatMap { pause => @@ -2686,7 +2686,7 @@ class StreamSpec extends Fs2Spec { Stream(1, 2, 3, 4, 5).repartition(i => Chunk(i, i)).toList shouldBe List(1, 3, 6, 10, 15, 15) Stream(1, 10, 100) - .repartition(i => Chunk.seq(1 to 1000)) + .repartition(_ => Chunk.seq(1 to 1000)) .take(4) .toList shouldBe List(1, 2, 3, 4) } @@ -2767,7 +2767,7 @@ class StreamSpec extends Fs2Spec { val n = n0 % 10 + 1 Counter[IO].flatMap { outer => Counter[IO].flatMap { inner => - val s2 = Stream.bracket(outer.increment)(_ => outer.decrement) >> s.map { i => + val s2 = Stream.bracket(outer.increment)(_ => outer.decrement) >> s.map { _ => spuriousFail(Stream.bracket(inner.increment)(_ => inner.decrement) >> s) } val one = s2.parJoin(n).take(10).attempt @@ -2811,7 +2811,7 @@ class StreamSpec extends Fs2Spec { val sleepAndSet = IO.sleep(20.millis) >> signal.set(true) Stream .eval_(sleepAndSet.start) - .append(s.map { inner => + .append(s.map { _ => Stream .bracket(counter.increment)(_ => counter.decrement) .evalMap(_ => IO.never) @@ -2899,7 +2899,7 @@ class StreamSpec extends Fs2Spec { case Left(RetryErr(msg)) => failures shouldBe 5 msg shouldBe "5" - case other => fail("Expected a RetryErr") + case _ => fail("Expected a RetryErr") } } } @@ -2922,7 +2922,7 @@ class StreamSpec extends Fs2Spec { failures shouldBe 6 successes shouldBe 0 msg shouldBe "fatal" - case other => fail("Expected a RetryErr") + case _ => fail("Expected a RetryErr") } } } @@ -2950,7 +2950,7 @@ class StreamSpec extends Fs2Spec { Stream.retry(job, unit.millis, _ + unit.millis, maxTries).compile.drain.attempt.asserting { case Left(RetryErr(_)) => getDelays shouldBe List.range(1, maxTries) - case other => fail("Expected a RetryErr") + case _ => fail("Expected a RetryErr") } } } @@ -3008,7 +3008,7 @@ class StreamSpec extends Fs2Spec { Stream .eval(Ref.of[IO, Int](0)) .flatMap { ref => - Stream(1).flatMap { i => + Stream(1).flatMap { _ => Stream .bracketWeak(ref.update(_ + 1))(_ => ref.update(_ - 1)) .flatMap(_ => Stream.eval(ref.get)) ++ Stream.eval(ref.get) @@ -3102,7 +3102,7 @@ class StreamSpec extends Fs2Spec { Stream .eval(Ref[IO].of(true)) .flatMap { ref => - s.covary[IO].switchMap { i => + s.covary[IO].switchMap { _ => Stream.eval(ref.get).flatMap { released => if (!released) Stream.raiseError[IO](new Err) else @@ -3587,14 +3587,14 @@ class StreamSpec extends Fs2Spec { Stream("uno", "dos", "tres", "cuatro") .zipWithScan(0)(_ + _.length) .toList shouldBe List("uno" -> 0, "dos" -> 3, "tres" -> 6, "cuatro" -> 10) - Stream().zipWithScan(())((acc, i) => ???).toList shouldBe Nil + Stream().zipWithScan(())((_, _) => ???).toList shouldBe Nil } "zipWithScan1" in { Stream("uno", "dos", "tres", "cuatro") .zipWithScan1(0)(_ + _.length) .toList shouldBe List("uno" -> 3, "dos" -> 6, "tres" -> 10, "cuatro" -> 16) - Stream().zipWithScan1(())((acc, i) => ???).toList shouldBe Nil + Stream().zipWithScan1(())((_, _) => ???).toList shouldBe Nil } "regressions" - { diff --git a/core/shared/src/test/scala/fs2/concurrent/TopicSpec.scala b/core/shared/src/test/scala/fs2/concurrent/TopicSpec.scala index de12f0c727..57fb6e9cbd 100644 --- a/core/shared/src/test/scala/fs2/concurrent/TopicSpec.scala +++ b/core/shared/src/test/scala/fs2/concurrent/TopicSpec.scala @@ -73,7 +73,7 @@ class TopicSpec extends Fs2Spec { result.toMap.size shouldBe subs result.foreach { - case (idx, subResults) => + case (_, subResults) => val diff: Set[Int] = subResults.map { case (read, state) => Math.abs(state - read) }.toSet diff --git a/core/shared/src/test/scala/scalacheck/GeneratorCompat.scala b/core/shared/src/test/scala/scalacheck/GeneratorCompat.scala index f746167033..cbb683e513 100644 --- a/core/shared/src/test/scala/scalacheck/GeneratorCompat.scala +++ b/core/shared/src/test/scala/scalacheck/GeneratorCompat.scala @@ -5,7 +5,7 @@ import org.scalacheck.rng.Seed object GeneratorCompat { def genFromGenerator[A](g: Generator[A]): Gen[A] = - Gen.gen[A] { (p, seed) => + Gen.gen[A] { (_, seed) => val (n, _, rndNext) = g.next(SizeParam(0, 10, 10), Nil, Randomizer(seed.long._1)) Gen.r[A](Some(n), Seed(rndNext.nextLong._1)) } From 232aa200c3882baaf3a6462d455ddfe4dbda051b Mon Sep 17 00:00:00 2001 From: Michael Pilquist Date: Sat, 12 Oct 2019 17:15:25 -0400 Subject: [PATCH 3/5] Fixed binary compat on 2.11/2.12 --- core/shared/src/main/scala/fs2/Stream.scala | 37 ++++++++++++++++----- 1 file changed, 29 insertions(+), 8 deletions(-) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index e80f25f6c1..656e357dc6 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -3729,11 +3729,8 @@ object Stream extends StreamLowPriority { /** Lifts this stream to the specified effect type. */ def covary[F[_]]: Stream[F, O] = self - /** Runs this pure stream and returns the emitted elements in a collection of the specified type. Note: this method is only available on pure streams. */ - def to(c: Collector[O]): c.Out = to_(c) - @inline private def to_(c: Collector[O]): c.Out = - self.covary[IO].compile.to(c).unsafeRunSync + self.covary[SyncIO].compile.to(c).unsafeRunSync /** Runs this pure stream and returns the emitted elements in a collection of the specified type. Note: this method is only available on pure streams. */ def to[C[_]](implicit f: Factory[O, C[O]]): C[O] = to_(f) @@ -3749,6 +3746,19 @@ object Stream extends StreamLowPriority { def toVector: Vector[O] = to_(Vector) } + /** Provides `to` syntax for pure streams. */ + implicit def PureTo[O](s: Stream[Pure, O]): PureTo[O] = + new PureTo(s.get[Pure, O]) + + /** Provides `to` syntax for pure streams. */ + final class PureTo[O] private[Stream] (private val free: FreeC[Pure, O, Unit]) extends AnyVal { + private def self: Stream[Pure, O] = Stream.fromFreeC[Pure, O](free) + + /** Runs this pure stream and returns the emitted elements in a collection of the specified type. Note: this method is only available on pure streams. */ + def to(c: Collector[O]): c.Out = + self.covary[SyncIO].compile.to(c).unsafeRunSync + } + /** Provides syntax for streams with effect type `cats.Id`. */ implicit def IdOps[O](s: Stream[Id, O]): IdOps[O] = new IdOps(s.get[Id, O]) @@ -3778,11 +3788,8 @@ object Stream extends StreamLowPriority { self.asInstanceOf[Stream[F, O]] } - /** Runs this fallible stream and returns the emitted elements in a collection of the specified type. Note: this method is only available on fallible streams. */ - def to(c: Collector[O]): Either[Throwable, c.Out] = to_(c) - @inline private def to_(c: Collector[O]): Either[Throwable, c.Out] = - lift[IO].compile.to(c).attempt.unsafeRunSync + lift[SyncIO].compile.to(c).attempt.unsafeRunSync /** Runs this fallible stream and returns the emitted elements in a collection of the specified type. Note: this method is only available on fallible streams. */ def to[C[_]](implicit f: Factory[O, C[O]]): Either[Throwable, C[O]] = to_(f) @@ -3798,6 +3805,20 @@ object Stream extends StreamLowPriority { def toVector: Either[Throwable, Vector[O]] = to_(Vector) } + /** Provides `to` syntax for streams with effect type `Fallible`. */ + implicit def FallibleTo[O](s: Stream[Fallible, O]): FallibleTo[O] = + new FallibleTo(s.get[Fallible, O]) + + /** Provides `to` syntax for fallible streams. */ + final class FallibleTo[O] private[Stream] (private val free: FreeC[Fallible, O, Unit]) + extends AnyVal { + private def self: Stream[Fallible, O] = Stream.fromFreeC[Fallible, O](free) + + /** Runs this fallible stream and returns the emitted elements in a collection of the specified type. Note: this method is only available on fallible streams. */ + def to(c: Collector[O]): Either[Throwable, c.Out] = + self.lift[SyncIO].compile.to(c).attempt.unsafeRunSync + } + /** Projection of a `Stream` providing various ways to get a `Pull` from the `Stream`. */ final class ToPull[F[_], O] private[Stream] ( private val free: FreeC[Nothing, O, Unit] From 2e79368242aafbccb8c65298d70ef0144517afe4 Mon Sep 17 00:00:00 2001 From: Michael Pilquist Date: Mon, 14 Oct 2019 08:58:26 -0400 Subject: [PATCH 4/5] Added CollectorK to avoid need for an implicit conversion --- core/shared/src/main/scala/fs2/Chunk.scala | 9 +++++- .../shared/src/main/scala/fs2/Collector.scala | 30 +++++++++---------- 2 files changed, 23 insertions(+), 16 deletions(-) diff --git a/core/shared/src/main/scala/fs2/Chunk.scala b/core/shared/src/main/scala/fs2/Chunk.scala index ba9ab60338..c827548593 100644 --- a/core/shared/src/main/scala/fs2/Chunk.scala +++ b/core/shared/src/main/scala/fs2/Chunk.scala @@ -478,7 +478,7 @@ abstract class Chunk[+O] extends Serializable { self => iterator.mkString("Chunk(", ", ", ")") } -object Chunk { +object Chunk extends CollectorK[Chunk] { /** Optional mix-in that provides the class tag of the element type in a chunk. */ trait KnownElementType[A] { self: Chunk[A] => @@ -1722,4 +1722,11 @@ object Chunk { def empty[A]: Queue[A] = empty_.asInstanceOf[Queue[A]] def apply[A](chunks: Chunk[A]*): Queue[A] = chunks.foldLeft(empty[A])(_ :+ _) } + + def newBuilder[A]: Collector.Builder[A, Chunk[A]] = + new Collector.Builder[A, Chunk[A]] { + private[this] var queue = Chunk.Queue.empty[A] + def +=(c: Chunk[A]): Unit = queue = queue :+ c + def result: Chunk[A] = queue.toChunk + } } diff --git a/core/shared/src/main/scala/fs2/Collector.scala b/core/shared/src/main/scala/fs2/Collector.scala index df2770d742..6d3cbdc5ce 100644 --- a/core/shared/src/main/scala/fs2/Collector.scala +++ b/core/shared/src/main/scala/fs2/Collector.scala @@ -16,7 +16,7 @@ import scodec.bits.ByteVector * The output type is a type member instead of a type parameter to avoid overloading * resolution limitations with `s.compile.to[C]` vs `s.compile.to(C)`. */ -trait Collector[A] { +trait Collector[-A] { type Out def newBuilder: Collector.Builder[A, Out] } @@ -36,11 +36,6 @@ object Collector extends CollectorPlatform { }) } - implicit def supportsChunk[A](c: Chunk.type): Collector.Aux[A, Chunk[A]] = { - val _ = c - make(Builder.chunk) - } - implicit def supportsByteVector(b: ByteVector.type): Collector.Aux[Byte, ByteVector] = { val _ = b make(Builder.byteVector) @@ -53,7 +48,7 @@ object Collector extends CollectorPlatform { } /** Builds a value of type `X` from zero or more `Chunk[A]`. */ - trait Builder[A, X] { self => + trait Builder[-A, +X] { self => def +=(c: Chunk[A]): Unit def result: X @@ -67,15 +62,8 @@ object Collector extends CollectorPlatform { def byteArray: Builder[Byte, Array[Byte]] = byteVector.mapResult(_.toArray) - def chunk[A]: Builder[A, Chunk[A]] = - new Builder[A, Chunk[A]] { - private[this] var queue = Chunk.Queue.empty[A] - def +=(c: Chunk[A]): Unit = queue = queue :+ c - def result: Chunk[A] = queue.toChunk - } - def array[A: ClassTag]: Builder[A, Array[A]] = - chunk.mapResult(_.toArray) + Chunk.newBuilder.mapResult(_.toArray) protected def fromBuilder[A, C[_], B]( builder: collection.mutable.Builder[A, C[B]] @@ -100,3 +88,15 @@ object Collector extends CollectorPlatform { } } } + +/** Mixin trait for companions of collections that can build a `C[A]` for all `A`. */ +trait CollectorK[+C[_]] { + def newBuilder[A]: Collector.Builder[A, C[A]] +} + +object CollectorK { + implicit def toCollector[A, C[_]](c: CollectorK[C]): Collector.Aux[A, C[A]] = new Collector[A] { + type Out = C[A] + def newBuilder: Collector.Builder[A, C[A]] = c.newBuilder[A] + } +} From 6c4973e9ce51afb30f8e50f69117569b5c2dec03 Mon Sep 17 00:00:00 2001 From: Michael Pilquist Date: Mon, 14 Oct 2019 09:42:26 -0400 Subject: [PATCH 5/5] Adjusted build to default to 2.13, removed compilation warnings --- .travis.yml | 2 +- .../scala/fs2/benchmark/FreeCBenchmark.scala | 2 +- build.sbt | 29 +++++++------------ .../test/scala/fs2/MemorySanityChecks.scala | 2 +- .../scala/fs2/internal/CompileScope.scala | 2 +- .../scala/fs2/io/JavaInputOutputStream.scala | 8 ++--- .../main/scala/fs2/io/tcp/SocketGroup.scala | 4 ++- .../fs2/io/udp/AsynchronousSocketGroup.scala | 18 +++++------- io/src/test/scala/fs2/io/file/FileSpec.scala | 2 +- .../test/scala/fs2/io/file/WatcherSpec.scala | 10 +++---- io/src/test/scala/fs2/io/tcp/SocketSpec.scala | 2 +- .../reactivestreams/StreamSubscriber.scala | 6 ++-- .../StreamUnicastPublisherSpec.scala | 4 +-- 13 files changed, 40 insertions(+), 51 deletions(-) diff --git a/.travis.yml b/.travis.yml index a7e9c4027f..68b47f745a 100644 --- a/.travis.yml +++ b/.travis.yml @@ -2,8 +2,8 @@ language: scala scala: - 2.11.12 - - 2.13.0 - 2.12.9 + - 2.13.0 jdk: - openjdk11 diff --git a/benchmark/src/main/scala/fs2/benchmark/FreeCBenchmark.scala b/benchmark/src/main/scala/fs2/benchmark/FreeCBenchmark.scala index 22e551dad5..d4ee1b8044 100644 --- a/benchmark/src/main/scala/fs2/benchmark/FreeCBenchmark.scala +++ b/benchmark/src/main/scala/fs2/benchmark/FreeCBenchmark.scala @@ -38,7 +38,7 @@ class FreeCBenchmark { case Result.Pure(r) => F.pure(Some(r)) case Result.Fail(e) => F.raiseError(e) case Result.Interrupted(_, err) => err.fold[F[Option[R]]](F.pure(None)) { F.raiseError } - case v @ ViewL.View(_) => F.raiseError(new RuntimeException("Never get here)")) + case _ @ViewL.View(_) => F.raiseError(new RuntimeException("Never get here)")) } } diff --git a/build.sbt b/build.sbt index 9dc0f4180e..dd5558e624 100644 --- a/build.sbt +++ b/build.sbt @@ -26,26 +26,17 @@ lazy val commonSettings = Seq( "-language:implicitConversions", "-language:higherKinds" ) ++ - (if (scalaBinaryVersion.value.startsWith("2.13")) - List( - "-Xlint", - "-Ywarn-unused" - ) - else Nil) ++ - (if (scalaBinaryVersion.value.startsWith("2.12")) - List( - "-Xfatal-warnings", - "-Yno-adapted-args", - "-Ywarn-value-discard", - "-Ywarn-unused-import", - "-Ypartial-unification" - ) - else Nil) ++ (if (scalaBinaryVersion.value.startsWith("2.11")) - List("-Xexperimental", "-Ypartial-unification") - else - Nil), // 2.11 needs -Xexperimental to enable SAM conversion + (scalaBinaryVersion.value match { + case v if v.startsWith("2.13") => + List("-Xlint", "-Ywarn-unused") + case v if v.startsWith("2.12") => + List("-Ypartial-unification") + case v if v.startsWith("2.11") => + List("-Xexperimental", "-Ypartial-unification") + case other => sys.error(s"Unsupported scala version: $other") + }), scalacOptions in (Compile, console) ~= { - _.filterNot("-Ywarn-unused-import" == _) + _.filterNot("-Ywarn-unused" == _) .filterNot("-Xlint" == _) .filterNot("-Xfatal-warnings" == _) }, diff --git a/core/jvm/src/test/scala/fs2/MemorySanityChecks.scala b/core/jvm/src/test/scala/fs2/MemorySanityChecks.scala index afed7202e5..d5c7f7a001 100644 --- a/core/jvm/src/test/scala/fs2/MemorySanityChecks.scala +++ b/core/jvm/src/test/scala/fs2/MemorySanityChecks.scala @@ -45,7 +45,7 @@ object TopicContinuousPublishSanityTest extends App { } object ResourceTrackerSanityTest extends App { - val big = Stream.constant(1).flatMap { n => + val big = Stream.constant(1).flatMap { _ => Stream.bracket(IO(()))(_ => IO(())).flatMap(_ => Stream.emits(List(1, 2, 3))) } big.compile.drain.unsafeRunSync() diff --git a/core/shared/src/main/scala/fs2/internal/CompileScope.scala b/core/shared/src/main/scala/fs2/internal/CompileScope.scala index 41eda7119a..cd5137084b 100644 --- a/core/shared/src/main/scala/fs2/internal/CompileScope.scala +++ b/core/shared/src/main/scala/fs2/internal/CompileScope.scala @@ -475,7 +475,7 @@ private[fs2] object CompileScope { newScopeId: Token )(implicit F: Sync[F]): F[InterruptContext[F]] = interruptible - .map { concurent => + .map { concurrent => F.flatMap(concurrent.start(self.deferred.get)) { fiber => val context = InterruptContext[F]( concurrent = concurrent, diff --git a/io/src/main/scala/fs2/io/JavaInputOutputStream.scala b/io/src/main/scala/fs2/io/JavaInputOutputStream.scala index 15c44550be..1bd3d45a2f 100644 --- a/io/src/main/scala/fs2/io/JavaInputOutputStream.scala +++ b/io/src/main/scala/fs2/io/JavaInputOutputStream.scala @@ -71,9 +71,9 @@ private[io] object JavaInputOutputStream { // won't modify state if the data cannot be acquired def tryGetChunk(s: DownStreamState): (DownStreamState, Option[Bytes]) = s match { - case Done(None) => s -> None - case Done(Some(err)) => s -> None - case Ready(None) => s -> None + case Done(None) => s -> None + case Done(Some(_)) => s -> None + case Ready(None) => s -> None case Ready(Some(bytes)) => val cloned = Chunk.Bytes(bytes.toArray) if (bytes.size <= len) Ready(None) -> Some(cloned) @@ -141,7 +141,7 @@ private[io] object JavaInputOutputStream { ): F[Unit] = dnState.update { case s @ Done(_) => s - case other => Done(None) + case _ => Done(None) } >> upState.discrete .collectFirst { diff --git a/io/src/main/scala/fs2/io/tcp/SocketGroup.scala b/io/src/main/scala/fs2/io/tcp/SocketGroup.scala index 9cfb91b443..2aed8b0c39 100644 --- a/io/src/main/scala/fs2/io/tcp/SocketGroup.scala +++ b/io/src/main/scala/fs2/io/tcp/SocketGroup.scala @@ -135,6 +135,8 @@ final class SocketGroup(channelGroup: AsynchronousChannelGroup, blocker: Blocker CS: ContextShift[F] ): Stream[F, Either[InetSocketAddress, Resource[F, Socket[F]]]] = { + val _ = maxQueued // TODO: maxQueued param has never been used; remove in 3.0 + val setup: F[AsynchronousServerSocketChannel] = blocker.delay { val ch = AsynchronousChannelProvider .provider() @@ -167,7 +169,7 @@ final class SocketGroup(channelGroup: AsynchronousChannelGroup, blocker: Blocker } Stream.eval(acceptChannel.attempt).flatMap { - case Left(err) => Stream.empty[F] + case Left(_) => Stream.empty[F] case Right(accepted) => Stream.emit(apply(accepted)) } ++ go } diff --git a/io/src/main/scala/fs2/io/udp/AsynchronousSocketGroup.scala b/io/src/main/scala/fs2/io/udp/AsynchronousSocketGroup.scala index fbc6d2d0fe..3737f42b46 100644 --- a/io/src/main/scala/fs2/io/udp/AsynchronousSocketGroup.scala +++ b/io/src/main/scala/fs2/io/udp/AsynchronousSocketGroup.scala @@ -197,7 +197,7 @@ private[udp] object AsynchronousSocketGroup { pendingTimeouts += t } } else { - if (!read1(key, channel, attachment, cb)) { + if (!read1(channel, cb)) { cancelReader = attachment.queueReader(cb, t) t.foreach { t => pendingTimeouts += t @@ -205,16 +205,14 @@ private[udp] object AsynchronousSocketGroup { try { key.interestOps(key.interestOps | SelectionKey.OP_READ); () } catch { - case t: CancelledKeyException => /* Ignore; key was closed */ + case _: CancelledKeyException => /* Ignore; key was closed */ } } } } { cb(Left(new ClosedChannelException)) } private def read1( - key: SelectionKey, channel: DatagramChannel, - attachment: Attachment, reader: Either[Throwable, Packet] => Unit ): Boolean = try { @@ -269,7 +267,7 @@ private[udp] object AsynchronousSocketGroup { pendingTimeouts += t } } else { - if (!write1(key, channel, attachment, writerPacket, cb)) { + if (!write1(channel, writerPacket, cb)) { cancelWriter = attachment.queueWriter((writerPacket, cb), t) t.foreach { t => pendingTimeouts += t @@ -277,7 +275,7 @@ private[udp] object AsynchronousSocketGroup { try { key.interestOps(key.interestOps | SelectionKey.OP_WRITE); () } catch { - case t: CancelledKeyException => /* Ignore; key was closed */ + case _: CancelledKeyException => /* Ignore; key was closed */ } } } @@ -285,9 +283,7 @@ private[udp] object AsynchronousSocketGroup { } private def write1( - key: SelectionKey, channel: DatagramChannel, - attachment: Attachment, packet: WriterPacket, cb: Option[Throwable] => Unit ): Boolean = @@ -358,7 +354,7 @@ private[udp] object AsynchronousSocketGroup { var success = true while (success && attachment.hasReaders) { val reader = attachment.peekReader.get - success = read1(key, channel, attachment, reader) + success = read1(channel, reader) if (success) attachment.dequeueReader } } @@ -366,7 +362,7 @@ private[udp] object AsynchronousSocketGroup { var success = true while (success && attachment.hasWriters) { val (p, writer) = attachment.peekWriter.get - success = write1(key, channel, attachment, p, writer) + success = write1(channel, p, writer) if (success) attachment.dequeueWriter } } @@ -376,7 +372,7 @@ private[udp] object AsynchronousSocketGroup { ) } } catch { - case t: CancelledKeyException => // Ignore; key was closed + case _: CancelledKeyException => // Ignore; key was closed } } val now = System.currentTimeMillis diff --git a/io/src/test/scala/fs2/io/file/FileSpec.scala b/io/src/test/scala/fs2/io/file/FileSpec.scala index a0f5d75e53..eeb5a4442e 100644 --- a/io/src/test/scala/fs2/io/file/FileSpec.scala +++ b/io/src/test/scala/fs2/io/file/FileSpec.scala @@ -172,7 +172,7 @@ class FileSpec extends BaseFileSpec { blocker <- Stream.resource(Blocker[IO]) filePath <- tempFile tempDir <- tempDirectory - result <- Stream.eval(file.move[IO](blocker, filePath, tempDir.resolve("newfile"))) + _ <- Stream.eval(file.move[IO](blocker, filePath, tempDir.resolve("newfile"))) exists <- Stream.eval(file.exists[IO](blocker, filePath)) } yield exists).compile.fold(false)(_ || _).unsafeRunSync() shouldBe false } diff --git a/io/src/test/scala/fs2/io/file/WatcherSpec.scala b/io/src/test/scala/fs2/io/file/WatcherSpec.scala index 921b70712e..084c2a9375 100644 --- a/io/src/test/scala/fs2/io/file/WatcherSpec.scala +++ b/io/src/test/scala/fs2/io/file/WatcherSpec.scala @@ -18,7 +18,7 @@ class WatcherSpec extends BaseFileSpec { file .watch[IO](bec, f, modifiers = modifiers) .takeWhile({ - case Watcher.Event.Modified(f, _) => false; case _ => true + case Watcher.Event.Modified(_, _) => false; case _ => true }, true) .concurrently(smallDelay ++ modify(f)) } @@ -34,7 +34,7 @@ class WatcherSpec extends BaseFileSpec { file .watch[IO](bec, f, modifiers = modifiers) .takeWhile({ - case Watcher.Event.Deleted(f, _) => false; case _ => true + case Watcher.Event.Deleted(_, _) => false; case _ => true }, true) .concurrently(smallDelay ++ Stream.eval(IO(Files.delete(f)))) } @@ -56,7 +56,7 @@ class WatcherSpec extends BaseFileSpec { file .watch[IO](bec, dir, modifiers = modifiers) .takeWhile({ - case Watcher.Event.Modified(b, _) => false; case _ => true + case Watcher.Event.Modified(_, _) => false; case _ => true }) .concurrently(smallDelay ++ modify(b)) } @@ -74,7 +74,7 @@ class WatcherSpec extends BaseFileSpec { file .watch[IO](bec, dir, modifiers = modifiers) .takeWhile({ - case Watcher.Event.Created(b, _) => false; case _ => true + case Watcher.Event.Created(_, _) => false; case _ => true }) .concurrently( smallDelay ++ Stream @@ -98,7 +98,7 @@ class WatcherSpec extends BaseFileSpec { val c = Class.forName("com.sun.nio.file.SensitivityWatchEventModifier") Seq(c.getField("HIGH").get(c).asInstanceOf[WatchEvent.Modifier]) } catch { - case t: Throwable => Nil + case _: Throwable => Nil } } } diff --git a/io/src/test/scala/fs2/io/tcp/SocketSpec.scala b/io/src/test/scala/fs2/io/tcp/SocketSpec.scala index 101c509ac2..4085d54b18 100644 --- a/io/src/test/scala/fs2/io/tcp/SocketSpec.scala +++ b/io/src/test/scala/fs2/io/tcp/SocketSpec.scala @@ -44,7 +44,7 @@ class SocketSpec extends Fs2Spec { val clients: SocketGroup => Stream[IO, Array[Byte]] = socketGroup => Stream .range(0, clientCount) - .map { idx => + .map { _ => Stream.eval(localBindAddress.get).flatMap { local => Stream.resource(socketGroup.client[IO](local)).flatMap { socket => Stream diff --git a/reactive-streams/src/main/scala/fs2/interop/reactivestreams/StreamSubscriber.scala b/reactive-streams/src/main/scala/fs2/interop/reactivestreams/StreamSubscriber.scala index d863270584..4737c65f14 100644 --- a/reactive-streams/src/main/scala/fs2/interop/reactivestreams/StreamSubscriber.scala +++ b/reactive-streams/src/main/scala/fs2/interop/reactivestreams/StreamSubscriber.scala @@ -110,12 +110,12 @@ object StreamSubscriber { case o => o -> F.raiseError(new Error(s"received record [$a] in invalid state [$o]")) } case OnComplete => { - case WaitingOnUpstream(sub, r) => UpstreamCompletion -> r.complete(None.asRight) - case o => UpstreamCompletion -> F.unit + case WaitingOnUpstream(_, r) => UpstreamCompletion -> r.complete(None.asRight) + case _ => UpstreamCompletion -> F.unit } case OnError(e) => { case WaitingOnUpstream(_, r) => UpstreamError(e) -> r.complete(e.asLeft) - case o => UpstreamError(e) -> F.unit + case _ => UpstreamError(e) -> F.unit } case OnFinalize => { case WaitingOnUpstream(sub, r) => diff --git a/reactive-streams/src/test/scala/fs2/interop/reactivestreams/StreamUnicastPublisherSpec.scala b/reactive-streams/src/test/scala/fs2/interop/reactivestreams/StreamUnicastPublisherSpec.scala index 1e88b66cce..0c4c2fc0be 100644 --- a/reactive-streams/src/test/scala/fs2/interop/reactivestreams/StreamUnicastPublisherSpec.scala +++ b/reactive-streams/src/test/scala/fs2/interop/reactivestreams/StreamUnicastPublisherSpec.scala @@ -7,14 +7,14 @@ import org.reactivestreams._ import org.reactivestreams.tck.{PublisherVerification, TestEnvironment} import org.scalatestplus.testng._ -final class FailedSubscription(sub: Subscriber[_]) extends Subscription { +final class FailedSubscription extends Subscription { def cancel(): Unit = {} def request(n: Long): Unit = {} } final class FailedPublisher extends Publisher[Int] { def subscribe(subscriber: Subscriber[_ >: Int]): Unit = { - subscriber.onSubscribe(new FailedSubscription(subscriber)) + subscriber.onSubscribe(new FailedSubscription) subscriber.onError(new Error("BOOM")) } }