diff --git a/io/js/src/main/scala/fs2/io/internal/facade/child_process.scala b/io/js/src/main/scala/fs2/io/internal/facade/child_process.scala index a719b0345f..659aeb3320 100644 --- a/io/js/src/main/scala/fs2/io/internal/facade/child_process.scala +++ b/io/js/src/main/scala/fs2/io/internal/facade/child_process.scala @@ -46,6 +46,7 @@ private[io] object child_process { var env: js.UndefOr[js.Dictionary[String]] = js.undefined + var stdio: js.UndefOr[js.Any] = js.undefined } @js.native diff --git a/io/js/src/main/scala/fs2/io/process/ProcessesPlatform.scala b/io/js/src/main/scala/fs2/io/process/ProcessesPlatform.scala index dc54026941..4c27a0b840 100644 --- a/io/js/src/main/scala/fs2/io/process/ProcessesPlatform.scala +++ b/io/js/src/main/scala/fs2/io/process/ProcessesPlatform.scala @@ -36,17 +36,42 @@ private[process] trait ProcessesCompanionPlatform { def spawn(process: ProcessBuilder): Resource[F, Process[F]] = Resource { F.async_[(Process[F], F[Unit])] { cb => + val spawnOptions = new facade.child_process.SpawnOptions { + cwd = process.workingDirectory.fold[js.UndefOr[String]](js.undefined)(_.toString) + env = + if (process.inheritEnv) + (facade.process.env ++ process.extraEnv).toJSDictionary + else + process.extraEnv.toJSDictionary + } + + val stdinOpt: js.Any = process.outputConfig.stdin match { + case StreamRedirect.Inherit => "inherit" + case StreamRedirect.Discard => "ignore" + case StreamRedirect.File(path) => "pipe" + case StreamRedirect.Pipe => + } + + val stdoutOpt: js.Any = process.outputConfig.stdout match { + case StreamRedirect.Inherit => "inherit" + case StreamRedirect.Discard => "ignore" + case StreamRedirect.File(path) => "pipe" + case StreamRedirect.Pipe => + } + + val stderrOpt: js.Any = process.outputConfig.stderr match { + case StreamRedirect.Inherit => "inherit" + case StreamRedirect.Discard => "ignore" + case StreamRedirect.File(path) => "pipe" + case StreamRedirect.Pipe => + } + + spawnOptions.stdio = js.Array(stdinOpt, stdoutOpt, stderrOpt) + val childProcess = facade.child_process.spawn( process.command, process.args.toJSArray, - new facade.child_process.SpawnOptions { - cwd = process.workingDirectory.fold[js.UndefOr[String]](js.undefined)(_.toString) - env = - if (process.inheritEnv) - (facade.process.env ++ process.extraEnv).toJSDictionary - else - process.extraEnv.toJSDictionary - } + spawnOptions ) val fs2Process = new UnsealedProcess[F] { @@ -72,9 +97,21 @@ private[process] trait ProcessesCompanionPlatform { def stdin = writeWritable(F.delay(childProcess.stdin)) - def stdout = unsafeReadReadable(childProcess.stdout) + def stdout = + if (process.outputConfig.stdout == StreamRedirect.Pipe) + unsafeReadReadable(childProcess.stdout) + else + Stream.empty + + def stderr = + if (process.outputConfig.stderr == StreamRedirect.Pipe) + unsafeReadReadable(childProcess.stderr) + else + Stream.empty + + def mergedOutput: Stream[F, Byte] = + stdout.merge(stderr) - def stderr = unsafeReadReadable(childProcess.stderr) } val finalize = F.asyncCheckAttempt[Unit] { cb => diff --git a/io/jvm-native/src/main/scala/fs2/io/process/ProcessesPlatform.scala b/io/jvm-native/src/main/scala/fs2/io/process/ProcessesPlatform.scala index 0c9f5452bf..23a8789f25 100644 --- a/io/jvm-native/src/main/scala/fs2/io/process/ProcessesPlatform.scala +++ b/io/jvm-native/src/main/scala/fs2/io/process/ProcessesPlatform.scala @@ -28,7 +28,7 @@ import cats.effect.kernel.Resource import cats.syntax.all.* import fs2.io.CollectionCompat.* -import java.lang +import java.lang.ProcessBuilder.Redirect private[process] trait ProcessesCompanionPlatform { def forAsync[F[_]](implicit F: Async[F]): Processes[F] = new UnsealedProcesses[F] { @@ -37,7 +37,7 @@ private[process] trait ProcessesCompanionPlatform { Resource .make { F.blocking { - val builder = new lang.ProcessBuilder((process.command :: process.args).asJava) + val builder = new java.lang.ProcessBuilder((process.command :: process.args).asJava) process.workingDirectory.foreach { path => builder.directory(path.toNioPath.toFile) @@ -49,6 +49,30 @@ private[process] trait ProcessesCompanionPlatform { env.put(k, v) } + process.outputConfig.stdin match { + case StreamRedirect.Inherit => builder.redirectInput(Redirect.INHERIT) + case StreamRedirect.Discard => builder.redirectInput(Redirect.DISCARD) + case StreamRedirect.File(path) => + builder.redirectInput(Redirect.from(path.toNioPath.toFile)) + case StreamRedirect.Pipe => + } + + process.outputConfig.stdout match { + case StreamRedirect.Inherit => builder.redirectOutput(Redirect.INHERIT) + case StreamRedirect.Discard => builder.redirectOutput(Redirect.DISCARD) + case StreamRedirect.File(path) => + builder.redirectOutput(Redirect.to(path.toNioPath.toFile)) + case StreamRedirect.Pipe => + } + + process.outputConfig.stderr match { + case StreamRedirect.Inherit => builder.redirectError(Redirect.INHERIT) + case StreamRedirect.Discard => builder.redirectError(Redirect.DISCARD) + case StreamRedirect.File(path) => + builder.redirectError(Redirect.to(path.toNioPath.toFile)) + case StreamRedirect.Pipe => + } + builder.start() } } { process => @@ -89,7 +113,6 @@ private[process] trait ProcessesCompanionPlatform { F.blocking(process.destroy()), 8192 ) - } } } diff --git a/io/shared/src/main/scala/fs2/io/process/ProcessBuilder.scala b/io/shared/src/main/scala/fs2/io/process/ProcessBuilder.scala index 7ab5a19eeb..041982ac64 100644 --- a/io/shared/src/main/scala/fs2/io/process/ProcessBuilder.scala +++ b/io/shared/src/main/scala/fs2/io/process/ProcessBuilder.scala @@ -49,6 +49,9 @@ sealed abstract class ProcessBuilder private { */ def workingDirectory: Option[Path] + /** Configures how stdout and stderr should be handled. */ + def outputConfig: ProcessOutputConfig + /** @see [[command]] */ def withCommand(command: String): ProcessBuilder @@ -67,6 +70,20 @@ sealed abstract class ProcessBuilder private { /** @see [[workingDirectory]] */ def withCurrentWorkingDirectory: ProcessBuilder + /** @see [[outputMode]] */ + def withOutputConfig(outputConfig: ProcessOutputConfig): ProcessBuilder + + /* @param mode The mode for handling stdin + */ + def redirectInput(mode: StreamRedirect): ProcessBuilder = + withOutputConfig(outputConfig.copy(stdin = mode)) + + def redirectOutput(mode: StreamRedirect): ProcessBuilder = + withOutputConfig(outputConfig.copy(stdout = mode)) + + def redirectError(mode: StreamRedirect): ProcessBuilder = + withOutputConfig(outputConfig.copy(stderr = mode)) + /** Starts the process and returns a handle for interacting with it. * Closing the resource will kill the process if it has not already terminated. */ @@ -74,10 +91,24 @@ sealed abstract class ProcessBuilder private { Processes[F].spawn(this) } +sealed abstract class StreamRedirect +object StreamRedirect { + case object Pipe extends StreamRedirect + case object Inherit extends StreamRedirect + case object Discard extends StreamRedirect + final case class File(path: Path) extends StreamRedirect +} + +final case class ProcessOutputConfig( + stdin: StreamRedirect = StreamRedirect.Pipe, + stdout: StreamRedirect = StreamRedirect.Pipe, + stderr: StreamRedirect = StreamRedirect.Pipe +) + object ProcessBuilder { def apply(command: String, args: List[String]): ProcessBuilder = - ProcessBuilderImpl(command, args, true, Map.empty, None) + ProcessBuilderImpl(command, args, true, Map.empty, None, ProcessOutputConfig()) def apply(command: String, args: String*): ProcessBuilder = apply(command, args.toList) @@ -87,7 +118,8 @@ object ProcessBuilder { args: List[String], inheritEnv: Boolean, extraEnv: Map[String, String], - workingDirectory: Option[Path] + workingDirectory: Option[Path], + outputConfig: ProcessOutputConfig ) extends ProcessBuilder { def withCommand(command: String): ProcessBuilder = copy(command = command) @@ -100,7 +132,10 @@ object ProcessBuilder { def withWorkingDirectory(workingDirectory: Path): ProcessBuilder = copy(workingDirectory = Some(workingDirectory)) + def withCurrentWorkingDirectory: ProcessBuilder = copy(workingDirectory = None) - } + def withOutputConfig(outputConfig: ProcessOutputConfig): ProcessBuilder = + copy(outputConfig = outputConfig) + } } diff --git a/io/shared/src/main/scala/fs2/io/process/Processes.scala b/io/shared/src/main/scala/fs2/io/process/Processes.scala index ddca581653..235174c2ad 100644 --- a/io/shared/src/main/scala/fs2/io/process/Processes.scala +++ b/io/shared/src/main/scala/fs2/io/process/Processes.scala @@ -25,7 +25,6 @@ import cats.effect.IO import cats.effect.LiftIO import cats.effect.kernel.Async import cats.effect.kernel.Resource - sealed trait Processes[F[_]] { def spawn(process: ProcessBuilder): Resource[F, Process[F]] diff --git a/io/shared/src/test/scala/fs2/io/process/ProcessSuite.scala b/io/shared/src/test/scala/fs2/io/process/ProcessSuite.scala index 6bd25a2ba7..4274cb2e8b 100644 --- a/io/shared/src/test/scala/fs2/io/process/ProcessSuite.scala +++ b/io/shared/src/test/scala/fs2/io/process/ProcessSuite.scala @@ -52,7 +52,9 @@ class ProcessSuite extends Fs2IoSuite { "node", "-e", "console.log('good day stdout'); console.error('how do you do stderr')" - ).spawn[IO] + ) + .withOutputConfig(ProcessOutputConfig()) + .spawn[IO] .use { p => val testOut = p.stdout .through(fs2.text.utf8.decode) @@ -72,6 +74,57 @@ class ProcessSuite extends Fs2IoSuite { } } + test("merged stdout and stderr") { + ProcessBuilder("node", "-e", "console.log('merged stdout'); console.error('merged stderr')") + .withOutputConfig( + ProcessOutputConfig(stdout = StreamRedirect.Pipe, stderr = StreamRedirect.Pipe) + ) + .spawn[IO] + .use { p => + p.stdout + .through(fs2.text.utf8.decode) + .compile + .string + .assert(s => s.contains("merged stdout") && s.contains("merged stderr")) + } + } + + test("file output") { + Files[IO].tempFile.use { path => + ProcessBuilder("echo", "file output test") + .withOutputConfig(ProcessOutputConfig(stdout = StreamRedirect.File(path))) + .spawn[IO] + .use(_.exitValue) + .assertEquals(0) *> + Files[IO].readUtf8(path).compile.string.assertEquals("file output test\n") + } + } + + test("ignored output") { + ProcessBuilder("echo", "ignored output") + .withOutputConfig(ProcessOutputConfig(stdout = StreamRedirect.Discard)) + .spawn[IO] + .use(_.exitValue) + .assertEquals(0) + } + + test("stdin piping") { + ProcessBuilder("cat") + .withOutputConfig(ProcessOutputConfig(stdin = StreamRedirect.Pipe)) + .spawn[IO] + .use { p => + val input = Stream + .emit("piped input test") + .through(fs2.text.utf8.encode) + .through(p.stdin) + .compile + .drain + val output = + p.stdout.through(fs2.text.utf8.decode).compile.string.assertEquals("piped input test") + input *> output + } + } + if (!isNative) test("cat") { ProcessBuilder("cat").spawn[IO].use { p =>