diff --git a/build.sbt b/build.sbt index 4000775c..5161f495 100644 --- a/build.sbt +++ b/build.sbt @@ -3,7 +3,7 @@ val scala213 = "2.13.7" val scala3 = "3.1.0" val zioVersion = "1.0.13" -val zio2Version = "2.0.0-M4" +val zio2Version = "2.0.0-RC1" val scalacOptions212 = Seq("-Ypartial-unification", "-deprecation", "-target:jvm-1.8") val scalacOptions213 = Seq("-deprecation", "-target:jvm-1.8") @@ -124,7 +124,7 @@ lazy val proxZStream2 = Project("prox-zstream-2", file("prox-zstream-2")).settin libraryDependencies ++= Seq( "dev.zio" %% "zio" % zio2Version, "dev.zio" %% "zio-streams" % zio2Version, - "dev.zio" %% "zio-prelude" % "1.0.0-RC5+45-aa9e4694-SNAPSHOT", + "dev.zio" %% "zio-prelude" % "1.0.0-RC9", "dev.zio" %% "zio-test" % zio2Version % "test", "dev.zio" %% "zio-test-sbt" % zio2Version % "test", @@ -138,7 +138,7 @@ lazy val proxJava9 = Project("prox-java9", file("prox-java9")).settings(commonSe lazy val docs = project .enablePlugins(GhpagesPlugin, SiteScaladocPlugin, ScalaUnidocPlugin, MicrositesPlugin) .settings( - addCompilerPlugin("org.typelevel" %% s"kind-projector" % "0.13.0" cross CrossVersion.full), + addCompilerPlugin("org.typelevel" %% s"kind-projector" % "0.13.2" cross CrossVersion.full), publishArtifact := false, skip in publish := true, scalaVersion := scala213, diff --git a/prox-zstream-2/src/main/scala/io/github/vigoo/prox/ProxZStream.scala b/prox-zstream-2/src/main/scala/io/github/vigoo/prox/ProxZStream.scala index 3dde7f25..45f2fd22 100644 --- a/prox-zstream-2/src/main/scala/io/github/vigoo/prox/ProxZStream.scala +++ b/prox-zstream-2/src/main/scala/io/github/vigoo/prox/ProxZStream.scala @@ -4,7 +4,7 @@ import java.io import java.io.IOException import zio.prelude.Identity -import zio.stream.{ZSink, ZStream, ZTransducer} +import zio.stream.{ZSink, ZStream, ZPipeline} import zio._ import scala.language.implicitConversions @@ -17,8 +17,8 @@ trait ProxZStream extends Prox { transform(s).run(sink) } object TransformAndSink { - def apply[A, B](transducer: ZTransducer[Any, ProxError, A, B], sink: ZSink[Any, ProxError, B, Any, Unit]): TransformAndSink[A, B] = - TransformAndSink(_.transduce(transducer), sink) + def apply[A, B](transducer: ZPipeline[Any, ProxError, A, B], sink: ZSink[Any, ProxError, B, Any, Unit]): TransformAndSink[A, B] = + TransformAndSink(_.via(transducer), sink) } override type ProxExitCode = zio.ExitCode @@ -93,10 +93,10 @@ trait ProxZStream extends Prox { s.runCollect.map(_.toVector) protected override final def foldStream[A, B](s: ProxStream[A], init: B, f: (B, A) => B): ProxIO[B] = - s.fold(init)(f) + s.runFold(init)(f) protected override final def foldMonoidStream[A: Identity](s: ProxStream[A]): ProxIO[A] = - s.fold(Identity[A].identity)((a, b) => Identity[A].combine(a, b)) + s.runFold(Identity[A].identity)((a, b) => Identity[A].combine(a, b)) protected override final def streamThrough[A, B](s: ProxStream[A], pipe: ProxPipe[A, B]): ProxStream[B] = pipe(s) @@ -138,8 +138,8 @@ trait ProxZStream extends Prox { protected override final def startFiber[A](f: ProxIO[A]): ProxIO[ProxFiber[A]] = f.fork - implicit def transducerAsPipe[A, B](transducer: ZTransducer[Any, ProxError, A, B]): ProxPipe[A, B] = - (s: ProxStream[A]) => s.transduce(transducer) + implicit def transducerAsPipe[A, B](transducer: ZPipeline[Any, ProxError, A, B]): ProxPipe[A, B] = + (s: ProxStream[A]) => s.via(transducer) implicit def sinkAsTransformAndSink[A](sink: ZSink[Any, ProxError, A, Any, Unit]): TransformAndSink[A, A] = TransformAndSink(identity[ZStream[Any, ProxError, A]] _, sink) diff --git a/prox-zstream-2/src/test/scala/io/github/vigoo/prox/tests/zstream/ProcessGroupSpecs.scala b/prox-zstream-2/src/test/scala/io/github/vigoo/prox/tests/zstream/ProcessGroupSpecs.scala index 17193198..7fca0cea 100644 --- a/prox-zstream-2/src/test/scala/io/github/vigoo/prox/tests/zstream/ProcessGroupSpecs.scala +++ b/prox-zstream-2/src/test/scala/io/github/vigoo/prox/tests/zstream/ProcessGroupSpecs.scala @@ -3,7 +3,7 @@ package io.github.vigoo.prox.tests.zstream import io.github.vigoo.prox.zstream._ import io.github.vigoo.prox.{UnknownProxError, zstream} import zio.{Clock, _} -import zio.stream.{ZSink, ZStream, ZTransducer} +import zio.stream.{ZSink, ZStream, ZPipeline} import zio.test.Assertion._ import zio.test.TestAspect._ import zio.test._ @@ -19,7 +19,7 @@ object ProcessGroupSpecs extends DefaultRunnableSpec with ProxSpecHelpers { suite("Piping")( test("is possible with two") { - val processGroup = (Process("echo", List("This is a test string")) | Process("wc", List("-w"))) ># ZTransducer.utf8Decode + val processGroup = (Process("echo", List("This is a test string")) | Process("wc", List("-w"))) ># ZPipeline.utf8Decode val program = processGroup.run().map(_.output.trim) assertM(program)(equalTo("5")) @@ -32,7 +32,7 @@ object ProcessGroupSpecs extends DefaultRunnableSpec with ProxSpecHelpers { Process("sort") | Process("uniq", List("-c")) | Process("head", List("-n 1")) - ) >? (ZTransducer.utf8Decode >>> ZTransducer.splitLines) + ) >? (ZPipeline.utf8Decode >>> ZPipeline.splitLines) val program = processGroup.run().map( r => r.output.map(_.stripLineEnd.trim).filter(_.nonEmpty) @@ -43,12 +43,12 @@ object ProcessGroupSpecs extends DefaultRunnableSpec with ProxSpecHelpers { test("is customizable with pipes") { val customPipe = (s: zstream.ProxStream[Byte]) => s - .transduce(ZTransducer.utf8Decode >>> ZTransducer.splitLines) + .via(ZPipeline.utf8Decode >>> ZPipeline.splitLines) .map(_.split(' ').toVector) .map(v => v.map(_ + " !!!").mkString(" ")) .intersperse("\n") .flatMap(s => ZStream.fromIterable(s.getBytes(StandardCharsets.UTF_8))) - val processGroup = Process("echo", List("This is a test string")).via(customPipe).to(Process("wc", List("-w"))) ># ZTransducer.utf8Decode + val processGroup = Process("echo", List("This is a test string")).via(customPipe).to(Process("wc", List("-w"))) ># ZPipeline.utf8Decode val program = processGroup.run().map(_.output.trim) assertM(program)(equalTo("10")) @@ -57,7 +57,7 @@ object ProcessGroupSpecs extends DefaultRunnableSpec with ProxSpecHelpers { test("can be mapped") { import zstream.Process._ - val processGroup1 = (Process("!echo", List("This is a test string")) | Process("!wc", List("-w"))) ># ZTransducer.utf8Decode + val processGroup1 = (Process("!echo", List("This is a test string")) | Process("!wc", List("-w"))) ># ZPipeline.utf8Decode val processGroup2 = processGroup1.map(new ProcessGroup.Mapper[String, Unit] { override def mapFirst[P <: Process[zstream.ProxStream[Byte], Unit]](process: P): P = process.withCommand(process.command.tail).asInstanceOf[P] @@ -122,7 +122,7 @@ object ProcessGroupSpecs extends DefaultRunnableSpec with ProxSpecHelpers { test("can be fed with an input stream") { val stream = ZStream("This is a test string").flatMap(s => ZStream.fromIterable(s.getBytes(StandardCharsets.UTF_8))) - val processGroup = (Process("cat") | Process("wc", List("-w"))) < stream ># ZTransducer.utf8Decode + val processGroup = (Process("cat") | Process("wc", List("-w"))) < stream ># ZPipeline.utf8Decode val program = processGroup.run().map(_.output.trim) assertM(program)(equalTo("5")) @@ -133,7 +133,7 @@ object ProcessGroupSpecs extends DefaultRunnableSpec with ProxSpecHelpers { withTempFile { tempFile => val program = for { _ <- ZIO.attempt(Files.write(tempFile.toPath, "This is a test string".getBytes("UTF-8"))).mapError(UnknownProxError.apply) - processGroup = (Process("cat") | Process("wc", List("-w"))) < tempFile.toPath ># ZTransducer.utf8Decode + processGroup = (Process("cat") | Process("wc", List("-w"))) < tempFile.toPath ># ZPipeline.utf8Decode result <- processGroup.run() } yield result.output.trim @@ -148,7 +148,7 @@ object ProcessGroupSpecs extends DefaultRunnableSpec with ProxSpecHelpers { val processGroup = (Process("echo", List("This is a test string")) | Process("wc", List("-w"))) > tempFile.toPath val program = for { _ <- processGroup.run() - contents <- ZStream.fromFile(tempFile.toPath, 1024).transduce(ZTransducer.utf8Decode).fold("")(_ + _).mapError(UnknownProxError.apply) + contents <- ZStream.fromFile(tempFile, 1024).via(ZPipeline.utf8Decode).runFold("")(_ + _).mapError(UnknownProxError.apply) } yield contents.trim assertM(program)(equalTo("5")) @@ -161,7 +161,7 @@ object ProcessGroupSpecs extends DefaultRunnableSpec with ProxSpecHelpers { val p1 = Process("perl", List("-e", """print STDERR "Hello"""")) val p2 = Process("perl", List("-e", """print STDERR "world"""")) - val processGroup = (p1 | p2) !># ZTransducer.utf8Decode + val processGroup = (p1 | p2) !># ZPipeline.utf8Decode val program = processGroup.run() program.map { result => @@ -199,7 +199,7 @@ object ProcessGroupSpecs extends DefaultRunnableSpec with ProxSpecHelpers { val p1 = Process("perl", List("-e", """print STDERR "Hello"""")) val p2 = Process("perl", List("-e", """print STDERR "world!"""")) - val stream = (ZTransducer.utf8Decode >>> ZTransducer.splitLines).map(_.length) + val stream = ZPipeline.utf8Decode >>> ZPipeline.splitLines >>> ZPipeline.map(_.length) val processGroup = (p1 | p2) !>? stream val program = processGroup.run() @@ -217,7 +217,7 @@ object ProcessGroupSpecs extends DefaultRunnableSpec with ProxSpecHelpers { val p1 = Process("perl", List("-e", """print STDERR "Hello"""")) val p2 = Process("perl", List("-e", """print STDERR "world"""")) - val processGroup = (p1 | p2) drainErrors ZTransducer.utf8Decode + val processGroup = (p1 | p2) drainErrors ZPipeline.utf8Decode val program = processGroup.run() program.map { result => @@ -234,7 +234,7 @@ object ProcessGroupSpecs extends DefaultRunnableSpec with ProxSpecHelpers { val p1 = Process("perl", List("-e", "print STDERR 'Hello\nworld'")) val p2 = Process("perl", List("-e", "print STDERR 'Does\nit\nwork?'")) val processGroup = (p1 | p2).foldErrors( - ZTransducer.utf8Decode >>> ZTransducer.splitLines, + ZPipeline.utf8Decode >>> ZPipeline.splitLines, Vector.empty, (l: Vector[Option[Char]], s: String) => l :+ s.headOption ) @@ -255,8 +255,8 @@ object ProcessGroupSpecs extends DefaultRunnableSpec with ProxSpecHelpers { val p1 = Process("perl", List("-e", """print STDERR "Hello"""")) val p2 = Process("perl", List("-e", """print STDERR "world"""")) val processGroup = (p1 | p2).customizedPerProcess.errorsToFoldMonoid { - case p if p == p1 => ZTransducer.utf8Decode.map(s => "P1: " + s) - case p if p == p2 => ZTransducer.utf8Decode.map(s => "P2: " + s) + case p if p == p1 => ZPipeline.utf8Decode >>> ZPipeline.map(s => "P1: " + s) + case p if p == p2 => ZPipeline.utf8Decode >>> ZPipeline.map(s => "P2: " + s) } val program = processGroup.run() @@ -299,11 +299,11 @@ object ProcessGroupSpecs extends DefaultRunnableSpec with ProxSpecHelpers { val p1 = Process("perl", List("-e", """print STDERR "Hello"""")) val p2 = Process("perl", List("-e", """print STDERR "world!"""")) - val stream = (ZTransducer.utf8Decode >>> ZTransducer.splitLines).map(_.length) + val stream = ZPipeline.utf8Decode >>> ZPipeline.splitLines >>> ZPipeline.map(_.length) val processGroup = (p1 | p2).customizedPerProcess.errorsToVector { - case p if p == p1 => stream.map(l => (1, l)) - case p if p == p2 => stream.map(l => (2, l)) + case p if p == p1 => stream >>> ZPipeline.map(l => (1, l)) + case p if p == p2 => stream >>> ZPipeline.map(l => (2, l)) } val program = processGroup.run() @@ -320,7 +320,7 @@ object ProcessGroupSpecs extends DefaultRunnableSpec with ProxSpecHelpers { val p1 = Process("perl", List("-e", """print STDERR "Hello"""")) val p2 = Process("perl", List("-e", """print STDERR "world"""")) - val processGroup = (p1 | p2).customizedPerProcess.drainErrors(_ => ZTransducer.utf8Decode) + val processGroup = (p1 | p2).customizedPerProcess.drainErrors(_ => ZPipeline.utf8Decode) val program = processGroup.run() program.map { result => @@ -338,8 +338,8 @@ object ProcessGroupSpecs extends DefaultRunnableSpec with ProxSpecHelpers { val p2 = Process("perl", List("-e", "print STDERR 'Does\nit\nwork?'")) val processGroup = (p1 | p2).customizedPerProcess.foldErrors( { - case p if p == p1 => ZTransducer.utf8Decode >>> ZTransducer.splitLines - case p if p == p2 => (ZTransducer.utf8Decode >>> ZTransducer.splitLines).map(_.reverse) + case p if p == p1 => ZPipeline.utf8Decode >>> ZPipeline.splitLines + case p if p == p2 => ZPipeline.utf8Decode >>> ZPipeline.splitLines >>> ZPipeline.map(_.reverse) }, Vector.empty, (l: Vector[Option[Char]], s: String) => l :+ s.headOption @@ -367,8 +367,8 @@ object ProcessGroupSpecs extends DefaultRunnableSpec with ProxSpecHelpers { } val program = for { _ <- processGroup.run() - contents1 <- ZStream.fromFile(tempFile1.toPath, 1024).transduce(ZTransducer.utf8Decode).fold("")(_ + _).mapError(UnknownProxError.apply) - contents2 <- ZStream.fromFile(tempFile2.toPath, 1024).transduce(ZTransducer.utf8Decode).fold("")(_ + _).mapError(UnknownProxError.apply) + contents1 <- ZStream.fromFile(tempFile1, 1024).via(ZPipeline.utf8Decode).runFold("")(_ + _).mapError(UnknownProxError.apply) + contents2 <- ZStream.fromFile(tempFile2, 1024).via(ZPipeline.utf8Decode).runFold("")(_ + _).mapError(UnknownProxError.apply) } yield (contents1, contents2) assertM(program)(equalTo(("Hello", "world"))) @@ -384,7 +384,7 @@ object ProcessGroupSpecs extends DefaultRunnableSpec with ProxSpecHelpers { val p1 = Process("perl", List("-e", """my $str=<>; print STDERR Hello; print STDOUT "$str"""")) val p2 = Process("sort") val p3 = Process("wc", List("-w")) - val processGroup = (p1 | p2 | p3) < stream ># ZTransducer.utf8Decode !># ZTransducer.utf8Decode + val processGroup = (p1 | p2 | p3) < stream ># ZPipeline.utf8Decode !># ZPipeline.utf8Decode processGroup.run() .map { result => @@ -401,7 +401,7 @@ object ProcessGroupSpecs extends DefaultRunnableSpec with ProxSpecHelpers { val p1 = Process("perl", List("-e", """my $str=<>; print STDERR Hello; print STDOUT "$str"""")) val p2 = Process("sort") val p3 = Process("wc", List("-w")) - val processGroup = ((p1 | p2 | p3) < stream !># ZTransducer.utf8Decode) ># ZTransducer.utf8Decode + val processGroup = ((p1 | p2 | p3) < stream !># ZPipeline.utf8Decode) ># ZPipeline.utf8Decode processGroup.run() .map { result => @@ -418,7 +418,7 @@ object ProcessGroupSpecs extends DefaultRunnableSpec with ProxSpecHelpers { val p1 = Process("perl", List("-e", """my $str=<>; print STDERR Hello; print STDOUT "$str"""")) val p2 = Process("sort") val p3 = Process("wc", List("-w")) - val processGroup = ((p1 | p2 | p3) !># ZTransducer.utf8Decode) ># ZTransducer.utf8Decode < stream + val processGroup = ((p1 | p2 | p3) !># ZPipeline.utf8Decode) ># ZPipeline.utf8Decode < stream processGroup.run() .map { result => @@ -435,7 +435,7 @@ object ProcessGroupSpecs extends DefaultRunnableSpec with ProxSpecHelpers { val p1 = Process("perl", List("-e", """my $str=<>; print STDERR Hello; print STDOUT "$str"""")) val p2 = Process("sort") val p3 = Process("wc", List("-w")) - val processGroup = ((p1 | p2 | p3) !># ZTransducer.utf8Decode) < stream ># ZTransducer.utf8Decode + val processGroup = ((p1 | p2 | p3) !># ZPipeline.utf8Decode) < stream ># ZPipeline.utf8Decode processGroup.run() .map { result => @@ -452,7 +452,7 @@ object ProcessGroupSpecs extends DefaultRunnableSpec with ProxSpecHelpers { val p1 = Process("perl", List("-e", """my $str=<>; print STDERR Hello; print STDOUT "$str"""")) val p2 = Process("sort") val p3 = Process("wc", List("-w")) - val processGroup = ((p1 | p2 | p3) ># ZTransducer.utf8Decode) < stream !># ZTransducer.utf8Decode + val processGroup = ((p1 | p2 | p3) ># ZPipeline.utf8Decode) < stream !># ZPipeline.utf8Decode processGroup.run() .map { result => @@ -469,7 +469,7 @@ object ProcessGroupSpecs extends DefaultRunnableSpec with ProxSpecHelpers { val p1 = Process("perl", List("-e", """my $str=<>; print STDERR Hello; print STDOUT "$str"""")) val p2 = Process("sort") val p3 = Process("wc", List("-w")) - val processGroup = (((p1 | p2 | p3) ># ZTransducer.utf8Decode) !># ZTransducer.utf8Decode) < stream + val processGroup = (((p1 | p2 | p3) ># ZPipeline.utf8Decode) !># ZPipeline.utf8Decode) < stream processGroup.run() .map { result => @@ -483,7 +483,7 @@ object ProcessGroupSpecs extends DefaultRunnableSpec with ProxSpecHelpers { test("bound process is not pipeable") { assertM( - typeCheck("""val bad = (Process("echo", List("Hello world")) ># ZTransducer.utf8Decode) | Process("wc", List("-w"))"""))( + typeCheck("""val bad = (Process("echo", List("Hello world")) ># ZPipeline.utf8Decode) | Process("wc", List("-w"))"""))( isLeft(anything) ) } diff --git a/prox-zstream-2/src/test/scala/io/github/vigoo/prox/tests/zstream/ProcessSpecs.scala b/prox-zstream-2/src/test/scala/io/github/vigoo/prox/tests/zstream/ProcessSpecs.scala index a6250a1e..eebab17d 100644 --- a/prox-zstream-2/src/test/scala/io/github/vigoo/prox/tests/zstream/ProcessSpecs.scala +++ b/prox-zstream-2/src/test/scala/io/github/vigoo/prox/tests/zstream/ProcessSpecs.scala @@ -6,11 +6,10 @@ import java.nio.file.Files import io.github.vigoo.prox.{ProxError, UnknownProxError, zstream} import io.github.vigoo.prox.zstream._ import zio._ -import zio.stream.{ZSink, ZStream, ZTransducer} +import zio.stream.{ZSink, ZStream, ZPipeline} import zio.test.Assertion.{anything, equalTo, hasSameElements, isLeft} import zio.test.TestAspect._ import zio.test._ -import zio.test.environment.Live import zio.{ExitCode, ZIO} object ProcessSpecs extends DefaultRunnableSpec with ProxSpecHelpers { @@ -33,7 +32,7 @@ object ProcessSpecs extends DefaultRunnableSpec with ProxSpecHelpers { val process = Process("echo", List("Hello world!")) > tempFile.toPath val program = for { _ <- process.run() - contents <- ZStream.fromFile(tempFile.toPath, 1024).transduce(ZTransducer.utf8Decode).fold("")(_ + _).mapError(UnknownProxError.apply) + contents <- ZStream.fromFile(tempFile, 1024).via(ZPipeline.utf8Decode).runFold("")(_ + _).mapError(UnknownProxError.apply) } yield contents assertM(program)(equalTo("Hello world!\n")) @@ -47,7 +46,7 @@ object ProcessSpecs extends DefaultRunnableSpec with ProxSpecHelpers { val program = for { _ <- process1.run() _ <- process2.run() - contents <- ZStream.fromFile(tempFile.toPath, 1024).transduce(ZTransducer.utf8Decode).fold("")(_ + _).mapError(UnknownProxError.apply) + contents <- ZStream.fromFile(tempFile, 1024).via(ZPipeline.utf8Decode).runFold("")(_ + _).mapError(UnknownProxError.apply) } yield contents assertM(program)(equalTo("Hello\nworld\n")) @@ -55,14 +54,14 @@ object ProcessSpecs extends DefaultRunnableSpec with ProxSpecHelpers { }, test("can redirect output to stream") { - val process = Process("echo", List("Hello world!")) ># ZTransducer.utf8Decode + val process = Process("echo", List("Hello world!")) ># ZPipeline.utf8Decode val program = process.run().map(_.output) assertM(program)(equalTo("Hello world!\n")) }, test("can redirect output to stream folding monoid") { - val process = Process("echo", List("Hello\nworld!")) ># (ZTransducer.utf8Decode >>> ZTransducer.splitLines) + val process = Process("echo", List("Hello\nworld!")) ># (ZPipeline.utf8Decode >>> ZPipeline.splitLines) val program = process.run().map(_.output) assertM(program)(equalTo("Helloworld!")) @@ -72,8 +71,9 @@ object ProcessSpecs extends DefaultRunnableSpec with ProxSpecHelpers { case class StringLength(value: Int) val stream = - ZTransducer.utf8Decode >>> - ZTransducer.splitLines.map(s => StringLength(s.length)) + ZPipeline.utf8Decode >>> + ZPipeline.splitLines >>> + ZPipeline.map(s => StringLength(s.length)) val process = Process("echo", List("Hello\nworld!")) >? stream val program = process.run().map(_.output) @@ -82,7 +82,7 @@ object ProcessSpecs extends DefaultRunnableSpec with ProxSpecHelpers { }, test("can redirect output to stream and ignore it's result") { - val process = Process("echo", List("Hello\nworld!")).drainOutput(ZTransducer.utf8Decode >>> ZTransducer.splitLines) + val process = Process("echo", List("Hello\nworld!")).drainOutput(ZPipeline.utf8Decode >>> ZPipeline.splitLines) val program = process.run().map(_.output) assertM(program)(equalTo(())) @@ -90,7 +90,7 @@ object ProcessSpecs extends DefaultRunnableSpec with ProxSpecHelpers { test("can redirect output to stream and fold it") { val process = Process("echo", List("Hello\nworld!")).foldOutput( - ZTransducer.utf8Decode >>> ZTransducer.splitLines, + ZPipeline.utf8Decode >>> ZPipeline.splitLines, Vector.empty, (l: Vector[Option[Char]], s: String) => l :+ s.headOption ) @@ -115,7 +115,7 @@ object ProcessSpecs extends DefaultRunnableSpec with ProxSpecHelpers { val process = Process("perl", List("-e", "print STDERR 'Hello world!'")) !> tempFile.toPath val program = for { _ <- process.run() - contents <- ZStream.fromFile(tempFile.toPath, 1024).transduce(ZTransducer.utf8Decode).fold("")(_ + _).mapError(UnknownProxError.apply) + contents <- ZStream.fromFile(tempFile, 1024).via(ZPipeline.utf8Decode).runFold("")(_ + _).mapError(UnknownProxError.apply) } yield contents assertM(program)(equalTo("Hello world!")) @@ -129,7 +129,7 @@ object ProcessSpecs extends DefaultRunnableSpec with ProxSpecHelpers { val program = for { _ <- process1.run() _ <- process2.run() - contents <- ZStream.fromFile(tempFile.toPath, 1024).transduce(ZTransducer.utf8Decode).fold("")(_ + _).mapError(UnknownProxError.apply) + contents <- ZStream.fromFile(tempFile, 1024).via(ZPipeline.utf8Decode).runFold("")(_ + _).mapError(UnknownProxError.apply) } yield contents assertM(program)(equalTo("Helloworld")) @@ -137,14 +137,14 @@ object ProcessSpecs extends DefaultRunnableSpec with ProxSpecHelpers { }, test("can redirect error to stream") { - val process = Process("perl", List("-e", """print STDERR "Hello"""")) !># ZTransducer.utf8Decode + val process = Process("perl", List("-e", """print STDERR "Hello"""")) !># ZPipeline.utf8Decode val program = process.run().map(_.error) assertM(program)(equalTo("Hello")) }, test("can redirect error to stream folding monoid") { - val process = Process("perl", List("-e", "print STDERR 'Hello\nworld!'")) !># (ZTransducer.utf8Decode >>> ZTransducer.splitLines) + val process = Process("perl", List("-e", "print STDERR 'Hello\nworld!'")) !># (ZPipeline.utf8Decode >>> ZPipeline.splitLines) val program = process.run().map(_.error) assertM(program)(equalTo("Helloworld!")) @@ -154,8 +154,9 @@ object ProcessSpecs extends DefaultRunnableSpec with ProxSpecHelpers { case class StringLength(value: Int) val stream = - ZTransducer.utf8Decode >>> - ZTransducer.splitLines.map(s => StringLength(s.length)) + ZPipeline.utf8Decode >>> + ZPipeline.splitLines >>> + ZPipeline.map(s => StringLength(s.length)) val process = Process("perl", List("-e", "print STDERR 'Hello\nworld!'")) !>? stream val program = process.run().map(_.error) @@ -164,7 +165,7 @@ object ProcessSpecs extends DefaultRunnableSpec with ProxSpecHelpers { }, test("can redirect error to stream and ignore it's result") { - val process = Process("perl", List("-e", "print STDERR 'Hello\nworld!'")).drainError(ZTransducer.utf8Decode >>> ZTransducer.splitLines) + val process = Process("perl", List("-e", "print STDERR 'Hello\nworld!'")).drainError(ZPipeline.utf8Decode >>> ZPipeline.splitLines) val program = process.run().map(_.error) assertM(program)(equalTo(())) @@ -172,7 +173,7 @@ object ProcessSpecs extends DefaultRunnableSpec with ProxSpecHelpers { test("can redirect error to stream and fold it") { val process = Process("perl", List("-e", "print STDERR 'Hello\nworld!'")).foldError( - ZTransducer.utf8Decode >>> ZTransducer.splitLines, + ZPipeline.utf8Decode >>> ZPipeline.splitLines, Vector.empty, (l: Vector[Option[Char]], s: String) => l :+ s.headOption ) @@ -195,21 +196,21 @@ object ProcessSpecs extends DefaultRunnableSpec with ProxSpecHelpers { suite("Redirection ordering")( test("can redirect first input and then error to stream") { val source = ZStream("This is a test string").flatMap(s => ZStream.fromIterable(s.getBytes(StandardCharsets.UTF_8))) - val process = Process("perl", List("-e", """my $str = <>; print STDERR "$str"""".stripMargin)) < source !># ZTransducer.utf8Decode + val process = Process("perl", List("-e", """my $str = <>; print STDERR "$str"""".stripMargin)) < source !># ZPipeline.utf8Decode val program = process.run().map(_.error) assertM(program)(equalTo("This is a test string")) }, test("can redirect error first then output to stream") { - val process = (Process("perl", List("-e", """print STDOUT Hello; print STDERR World""".stripMargin)) !># ZTransducer.utf8Decode) ># ZTransducer.utf8Decode + val process = (Process("perl", List("-e", """print STDOUT Hello; print STDERR World""".stripMargin)) !># ZPipeline.utf8Decode) ># ZPipeline.utf8Decode val program = process.run().map(r => r.output + r.error) assertM(program)(equalTo("HelloWorld")) }, test("can redirect output first then error to stream") { - val process = (Process("perl", List("-e", """print STDOUT Hello; print STDERR World""".stripMargin)) ># ZTransducer.utf8Decode) !># ZTransducer.utf8Decode + val process = (Process("perl", List("-e", """print STDOUT Hello; print STDERR World""".stripMargin)) ># ZPipeline.utf8Decode) !># ZPipeline.utf8Decode val program = process.run().map(r => r.output + r.error) assertM(program)(equalTo("HelloWorld")) @@ -218,8 +219,8 @@ object ProcessSpecs extends DefaultRunnableSpec with ProxSpecHelpers { test("can redirect output first then error finally input to stream") { val source = ZStream("Hello").flatMap(s => ZStream.fromIterable(s.getBytes(StandardCharsets.UTF_8))) val process = ((Process("perl", List("-e", """my $str = <>; print STDOUT "$str"; print STDERR World""".stripMargin)) - ># ZTransducer.utf8Decode) - !># ZTransducer.utf8Decode) < source + ># ZPipeline.utf8Decode) + !># ZPipeline.utf8Decode) < source val program = process.run().map(r => r.output + r.error) assertM(program)(equalTo("HelloWorld")) @@ -228,8 +229,8 @@ object ProcessSpecs extends DefaultRunnableSpec with ProxSpecHelpers { test("can redirect output first then input finally error to stream") { val source = ZStream("Hello").flatMap(s => ZStream.fromIterable(s.getBytes(StandardCharsets.UTF_8))) val process = ((Process("perl", List("-e", """my $str = <>; print STDOUT "$str"; print STDERR World""".stripMargin)) - ># ZTransducer.utf8Decode) - < source) !># ZTransducer.utf8Decode + ># ZPipeline.utf8Decode) + < source) !># ZPipeline.utf8Decode val program = process.run().map(r => r.output + r.error) assertM(program)(equalTo("HelloWorld")) @@ -239,7 +240,7 @@ object ProcessSpecs extends DefaultRunnableSpec with ProxSpecHelpers { val source = ZStream("Hello").flatMap(s => ZStream.fromIterable(s.getBytes(StandardCharsets.UTF_8))) val process = ((Process("perl", List("-e", """my $str = <>; print STDOUT "$str"; print STDERR World""".stripMargin)) < source) - !># ZTransducer.utf8Decode) ># ZTransducer.utf8Decode + !># ZPipeline.utf8Decode) ># ZPipeline.utf8Decode val program = process.run().map(r => r.output + r.error) assertM(program)(equalTo("HelloWorld")) @@ -249,7 +250,7 @@ object ProcessSpecs extends DefaultRunnableSpec with ProxSpecHelpers { suite("Input redirection")( test("can use stream as input") { val source = ZStream("This is a test string").flatMap(s => ZStream.fromIterable(s.getBytes(StandardCharsets.UTF_8))) - val process = Process("wc", List("-w")) < source ># ZTransducer.utf8Decode + val process = Process("wc", List("-w")) < source ># ZPipeline.utf8Decode val program = process.run().map(_.output.trim) assertM(program)(equalTo("5")) @@ -257,7 +258,7 @@ object ProcessSpecs extends DefaultRunnableSpec with ProxSpecHelpers { test("can use stream as input flushing after each chunk") { val source = ZStream("This ", "is a test", " string").flatMap(s => ZStream.fromIterable(s.getBytes(StandardCharsets.UTF_8))) - val process = (Process("wc", List("-w")) !< source) ># ZTransducer.utf8Decode + val process = (Process("wc", List("-w")) !< source) ># ZPipeline.utf8Decode val program = process.run().map(_.output.trim) assertM(program)(equalTo("5")) @@ -309,7 +310,7 @@ object ProcessSpecs extends DefaultRunnableSpec with ProxSpecHelpers { suite("Customization")( test("can change the command") { - val p1 = Process("something", List("Hello", "world")) ># ZTransducer.utf8Decode + val p1 = Process("something", List("Hello", "world")) ># ZPipeline.utf8Decode val p2 = p1.withCommand("echo") val program = p2.run().map(_.output) @@ -317,7 +318,7 @@ object ProcessSpecs extends DefaultRunnableSpec with ProxSpecHelpers { }, test("can change the arguments") { - val p1 = Process("echo") ># ZTransducer.utf8Decode + val p1 = Process("echo") ># ZPipeline.utf8Decode val p2 = p1.withArguments(List("Hello", "world")) val program = p2.run().map(_.output) @@ -326,7 +327,7 @@ object ProcessSpecs extends DefaultRunnableSpec with ProxSpecHelpers { test("respects the working directory") { ZIO.attempt(Files.createTempDirectory("prox")).flatMap { tempDirectory => - val process = (Process("pwd") in tempDirectory) ># ZTransducer.utf8Decode + val process = (Process("pwd") in tempDirectory) ># ZPipeline.utf8Decode val program = process.run().map(_.output.trim) assertM(program)(equalTo(tempDirectory.toString) || equalTo(s"/private${tempDirectory}")) @@ -336,7 +337,7 @@ object ProcessSpecs extends DefaultRunnableSpec with ProxSpecHelpers { test("is customizable with environment variables") { val process = (Process("sh", List("-c", "echo \"Hello $TEST1! I am $TEST2!\"")) `with` ("TEST1" -> "world") - `with` ("TEST2" -> "prox")) ># ZTransducer.utf8Decode + `with` ("TEST2" -> "prox")) ># ZPipeline.utf8Decode val program = process.run().map(_.output) assertM(program)(equalTo("Hello world! I am prox!\n")) @@ -346,14 +347,14 @@ object ProcessSpecs extends DefaultRunnableSpec with ProxSpecHelpers { val process = (Process("sh", List("-c", "echo \"Hello $TEST1! I am $TEST2!\"")) `with` ("TEST1" -> "world") `with` ("TEST2" -> "prox") - `without` "TEST1") ># ZTransducer.utf8Decode + `without` "TEST1") ># ZPipeline.utf8Decode val program = process.run().map(_.output) assertM(program)(equalTo("Hello ! I am prox!\n")) }, test("is customizable with environment variables output is bound") { - val process = (Process("sh", List("-c", "echo \"Hello $TEST1! I am $TEST2!\"")) ># ZTransducer.utf8Decode + val process = (Process("sh", List("-c", "echo \"Hello $TEST1! I am $TEST2!\"")) ># ZPipeline.utf8Decode `with` ("TEST1" -> "world") `with` ("TEST2" -> "prox")) val program = process.run().map(_.output) @@ -365,16 +366,16 @@ object ProcessSpecs extends DefaultRunnableSpec with ProxSpecHelpers { val source = ZStream("This is a test string").flatMap(s => ZStream.fromIterable(s.getBytes(StandardCharsets.UTF_8))) val process = ((Process("sh", List("-c", "cat > /dev/null; echo \"Hello $TEST1! I am $TEST2!\"")) < source) `with` ("TEST1" -> "world") - `with` ("TEST2" -> "prox")) ># ZTransducer.utf8Decode + `with` ("TEST2" -> "prox")) ># ZPipeline.utf8Decode val program = process.run().map(_.output) assertM(program)(equalTo("Hello world! I am prox!\n")) }, test("is customizable with environment variables if error is bound") { - val process = ((Process("sh", List("-c", "echo \"Hello $TEST1! I am $TEST2!\"")) !># ZTransducer.utf8Decode) + val process = ((Process("sh", List("-c", "echo \"Hello $TEST1! I am $TEST2!\"")) !># ZPipeline.utf8Decode) `with` ("TEST1" -> "world") - `with` ("TEST2" -> "prox")) ># ZTransducer.utf8Decode + `with` ("TEST2" -> "prox")) ># ZPipeline.utf8Decode val program = process.run().map(_.output) assertM(program)(equalTo("Hello world! I am prox!\n")) @@ -382,7 +383,7 @@ object ProcessSpecs extends DefaultRunnableSpec with ProxSpecHelpers { test("is customizable with environment variables if input and output are bound") { val source = ZStream("This is a test string").flatMap(s => ZStream.fromIterable(s.getBytes(StandardCharsets.UTF_8))) - val process = (((Process("sh", List("-c", "cat > /dev/null; echo \"Hello $TEST1! I am $TEST2!\"")) < source) ># ZTransducer.utf8Decode) + val process = (((Process("sh", List("-c", "cat > /dev/null; echo \"Hello $TEST1! I am $TEST2!\"")) < source) ># ZPipeline.utf8Decode) `with` ("TEST1" -> "world") `with` ("TEST2" -> "prox")) val program = process.run().map(_.output) @@ -392,16 +393,16 @@ object ProcessSpecs extends DefaultRunnableSpec with ProxSpecHelpers { test("is customizable with environment variables if input and error are bound") { val source = ZStream("This is a test string").flatMap(s => ZStream.fromIterable(s.getBytes(StandardCharsets.UTF_8))) - val process = (((Process("sh", List("-c", "cat > /dev/null; echo \"Hello $TEST1! I am $TEST2!\"")) < source) !># ZTransducer.utf8Decode) + val process = (((Process("sh", List("-c", "cat > /dev/null; echo \"Hello $TEST1! I am $TEST2!\"")) < source) !># ZPipeline.utf8Decode) `with` ("TEST1" -> "world") - `with` ("TEST2" -> "prox")) ># ZTransducer.utf8Decode + `with` ("TEST2" -> "prox")) ># ZPipeline.utf8Decode val program = process.run().map(_.output) assertM(program)(equalTo("Hello world! I am prox!\n")) }, test("is customizable with environment variables if output and error are bound") { - val process = (((Process("sh", List("-c", "echo \"Hello $TEST1! I am $TEST2!\"")) !># ZTransducer.utf8Decode) ># ZTransducer.utf8Decode) + val process = (((Process("sh", List("-c", "echo \"Hello $TEST1! I am $TEST2!\"")) !># ZPipeline.utf8Decode) ># ZPipeline.utf8Decode) `with` ("TEST1" -> "world") `with` ("TEST2" -> "prox")) val program = process.run().map(_.output) @@ -411,7 +412,7 @@ object ProcessSpecs extends DefaultRunnableSpec with ProxSpecHelpers { test("is customizable with environment variables if everything is bound") { val source = ZStream("This is a test string").flatMap(s => ZStream.fromIterable(s.getBytes(StandardCharsets.UTF_8))) - val process = ((((Process("sh", List("-c", "cat > /dev/null; echo \"Hello $TEST1! I am $TEST2!\"")) < source) !># ZTransducer.utf8Decode) ># ZTransducer.utf8Decode) + val process = ((((Process("sh", List("-c", "cat > /dev/null; echo \"Hello $TEST1! I am $TEST2!\"")) < source) !># ZPipeline.utf8Decode) ># ZPipeline.utf8Decode) `with` ("TEST1" -> "world") `with` ("TEST2" -> "prox")) val program = process.run().map(_.output)