From 866c5292a247b681dc250dd925ef05fa2b90bf50 Mon Sep 17 00:00:00 2001 From: Michael Pilquist Date: Sat, 4 Jan 2020 09:09:42 -0500 Subject: [PATCH 1/3] Added debug and debugChunks combinators --- core/shared/src/main/scala/fs2/Stream.scala | 63 +++++++++++++++++++++ 1 file changed, 63 insertions(+) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index 9cece06549..6cfac632a1 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -656,6 +656,69 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Nothing, O, Unit] def metered[F2[x] >: F[x]: Timer](rate: FiniteDuration): Stream[F2, O] = Stream.fixedRate[F2](rate).zipRight(this) + /** + * Logs the elements of this stream as they are pulled. + * + * If a tag is specified, it prefixes each element using the format `$tag: $o`. + * By default, log output is sent to standard out. Supply a different logger param + * to change this. + * + * This method does not change the chunk structure of the stream. To debug the + * chunk structure, see [[debugChunks]]. + * + * Logging is not done in `F` because this operation is intended for debugging, + * including pure streams. + * + * @example {{{ + * scala> Stream(1, 2).append(Stream(3, 4)).debug("a").toList + * a: 1 + * a: 2 + * a: 3 + * a: 4 + * res0: List[Int] = List(1, 2, 3, 4) + * }}} + */ + def debug(tag: String = "", logger: String => Unit = println(_)): Stream[F, O] = + if (tag.isEmpty) debugFormat(_.toString, logger) else debugFormat(o => s"$tag: $o", logger) + + /** + * Like [[debug]] but allows custom formatting of each element. + */ + def debugFormat(formatter: O => String, logger: String => Unit = println(_)): Stream[F, O] = + map { o => + logger(formatter(o)) + o + } + + /** + * Like [[debug]] but logs chunks as they are pulled instead of individual elements. + * + * @example {{{ + * scala> Stream(1, 2, 3).append(Stream(4, 5, 6)).debugChunks("a").buffer(2).debugChunks("b").toList + * a: Chunk(1, 2, 3) + * b: Chunk(1, 2) + * a: Chunk(4, 5, 6) + * b: Chunk(3, 4) + * b: Chunk(5, 6) + * res0: List[Int] = List(1, 2, 3, 4, 5, 6) + * }}} + */ + def debugChunks(tag: String = "", logger: String => Unit = println(_)): Stream[F, O] = + if (tag.isEmpty) debugChunksFormat(_.toString, logger) + else debugChunksFormat(o => s"$tag: $o", logger) + + /** + * Like [[debugChunks]] but allows custom formatting of each chunk. + */ + def debugChunksFormat( + formatter: Chunk[O] => String, + logger: String => Unit = println(_) + ): Stream[F, O] = + chunks.flatMap { os => + logger(formatter(os)) + Stream.chunk(os) + } + /** * Returns a stream that when run, sleeps for duration `d` and then pulls from this stream. * From 1e8c01026c572cb7b12bb15307aacf7a9c09acb5 Mon Sep 17 00:00:00 2001 From: Michael Pilquist Date: Tue, 7 Jan 2020 08:08:31 -0500 Subject: [PATCH 2/3] Combined debug/debugFormat and debugChunks/debugChunksFormat --- core/shared/src/main/scala/fs2/Stream.scala | 29 ++++++--------------- 1 file changed, 8 insertions(+), 21 deletions(-) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index ac3ad19cd6..dbd9a86a64 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -659,9 +659,9 @@ final class Stream[+F[_], +O] private (private[fs2] val free: FreeC[F, O, Unit]) /** * Logs the elements of this stream as they are pulled. * - * If a tag is specified, it prefixes each element using the format `$tag: $o`. - * By default, log output is sent to standard out. Supply a different logger param - * to change this. + * By default, `toString` is called on each element and the result is printed + * to standard out. To change formatting, supply a value for the `formatter` + * param. To change the destination, supply a value for the `logger` param. * * This method does not change the chunk structure of the stream. To debug the * chunk structure, see [[debugChunks]]. @@ -670,7 +670,7 @@ final class Stream[+F[_], +O] private (private[fs2] val free: FreeC[F, O, Unit]) * including pure streams. * * @example {{{ - * scala> Stream(1, 2).append(Stream(3, 4)).debug("a").toList + * scala> Stream(1, 2).append(Stream(3, 4)).debug(o => s"a: $o").toList * a: 1 * a: 2 * a: 3 @@ -678,13 +678,7 @@ final class Stream[+F[_], +O] private (private[fs2] val free: FreeC[F, O, Unit]) * res0: List[Int] = List(1, 2, 3, 4) * }}} */ - def debug(tag: String = "", logger: String => Unit = println(_)): Stream[F, O] = - if (tag.isEmpty) debugFormat(_.toString, logger) else debugFormat(o => s"$tag: $o", logger) - - /** - * Like [[debug]] but allows custom formatting of each element. - */ - def debugFormat(formatter: O => String, logger: String => Unit = println(_)): Stream[F, O] = + def debug(formatter: O => String = _.toString, logger: String => Unit = println(_)): Stream[F, O] = map { o => logger(formatter(o)) o @@ -694,7 +688,7 @@ final class Stream[+F[_], +O] private (private[fs2] val free: FreeC[F, O, Unit]) * Like [[debug]] but logs chunks as they are pulled instead of individual elements. * * @example {{{ - * scala> Stream(1, 2, 3).append(Stream(4, 5, 6)).debugChunks("a").buffer(2).debugChunks("b").toList + * scala> Stream(1, 2, 3).append(Stream(4, 5, 6)).debugChunks(c => s"a: $c").buffer(2).debugChunks(c => s"b: $c").toList * a: Chunk(1, 2, 3) * b: Chunk(1, 2) * a: Chunk(4, 5, 6) @@ -703,15 +697,8 @@ final class Stream[+F[_], +O] private (private[fs2] val free: FreeC[F, O, Unit]) * res0: List[Int] = List(1, 2, 3, 4, 5, 6) * }}} */ - def debugChunks(tag: String = "", logger: String => Unit = println(_)): Stream[F, O] = - if (tag.isEmpty) debugChunksFormat(_.toString, logger) - else debugChunksFormat(o => s"$tag: $o", logger) - - /** - * Like [[debugChunks]] but allows custom formatting of each chunk. - */ - def debugChunksFormat( - formatter: Chunk[O] => String, + def debugChunks( + formatter: Chunk[O] => String = _.toString, logger: String => Unit = println(_) ): Stream[F, O] = chunks.flatMap { os => From 403a348bc74d4a5b76665b1d2e3cee60f9c43347 Mon Sep 17 00:00:00 2001 From: Michael Pilquist Date: Tue, 7 Jan 2020 11:04:40 -0500 Subject: [PATCH 3/3] Scalafmt --- core/shared/src/main/scala/fs2/Stream.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index dbd9a86a64..1068d4f0e6 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -678,7 +678,10 @@ final class Stream[+F[_], +O] private (private[fs2] val free: FreeC[F, O, Unit]) * res0: List[Int] = List(1, 2, 3, 4) * }}} */ - def debug(formatter: O => String = _.toString, logger: String => Unit = println(_)): Stream[F, O] = + def debug( + formatter: O => String = _.toString, + logger: String => Unit = println(_) + ): Stream[F, O] = map { o => logger(formatter(o)) o