Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added proper SIGTERM/SIGKILL handling for sub-processes #286

Merged
merged 10 commits into from
Aug 5, 2024
4 changes: 3 additions & 1 deletion build.sc
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ trait SafeDeps extends ScalaModule {
trait MiMaChecks extends Mima {
def mimaPreviousVersions = Seq("0.9.0", "0.9.1", "0.9.2", "0.9.3", "0.10.0")
override def mimaBinaryIssueFilters: T[Seq[ProblemFilter]] = Seq(
ProblemFilter.exclude[ReversedMissingMethodProblem]("os.PathConvertible.isCustomFs")
ProblemFilter.exclude[ReversedMissingMethodProblem]("os.PathConvertible.isCustomFs"),
// this is fine, because ProcessLike is sealed (and its subclasses should be final)
ProblemFilter.exclude[ReversedMissingMethodProblem]("os.ProcessLike.join")
)
}

Expand Down
130 changes: 97 additions & 33 deletions os/src/ProcessOps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,22 @@ case class proc(command: Shellable*) {
* `call` provides a number of parameters that let you configure how the subprocess
* is run:
*
* @param cwd the working directory of the subprocess
* @param env any additional environment variables you wish to set in the subprocess
* @param stdin any data you wish to pass to the subprocess's standard input
* @param stdout How the process's output stream is configured.
* @param stderr How the process's error stream is configured.
* @param mergeErrIntoOut merges the subprocess's stderr stream into it's stdout
* @param timeout how long to wait in milliseconds for the subprocess to complete
* @param check disable this to avoid throwing an exception if the subprocess
* fails with a non-zero exit code
* @param propagateEnv disable this to avoid passing in this parent process's
* environment variables to the subprocess
* @param cwd the working directory of the subprocess
* @param env any additional environment variables you wish to set in the subprocess
* @param stdin any data you wish to pass to the subprocess's standard input
* @param stdout How the process's output stream is configured.
* @param stderr How the process's error stream is configured.
* @param mergeErrIntoOut merges the subprocess's stderr stream into it's stdout
* @param timeout how long to wait in milliseconds for the subprocess to complete
* (-1 for no timeout)
* @param check disable this to avoid throwing an exception if the subprocess
* fails with a non-zero exit code
* @param propagateEnv disable this to avoid passing in this parent process's
* environment variables to the subprocess
* @param timeoutGracePeriod if the timeout is enabled, how long in milliseconds for the
* subprocess to gracefully terminate before attempting to
* forcibly kill it
* (-1 for no kill, 0 for always kill immediately)
*/
def call(
cwd: Path = null,
Expand All @@ -66,7 +71,9 @@ case class proc(command: Shellable*) {
mergeErrIntoOut: Boolean = false,
timeout: Long = -1,
check: Boolean = true,
propagateEnv: Boolean = true
propagateEnv: Boolean = true,
// this cannot be next to `timeout` as this will introduce a bin-compat break (default arguments are numbered in the bytecode)
timeoutGracePeriod: Long = 1000
): CommandResult = {

val chunks = new java.util.concurrent.ConcurrentLinkedQueue[Either[geny.Bytes, geny.Bytes]]
Expand All @@ -87,14 +94,38 @@ case class proc(command: Shellable*) {
propagateEnv
)

sub.join(timeout)
sub.join(timeout, timeoutGracePeriod)

val chunksSeq = chunks.iterator.asScala.toIndexedSeq
val res = CommandResult(commandChunks, sub.exitCode(), chunksSeq)
if (res.exitCode == 0 || !check) res
else throw SubprocessException(res)
}

// forwarder for the new timeoutGracePeriod flag
private[os] def call(
cwd: Path,
env: Map[String, String],
stdin: ProcessInput,
stdout: ProcessOutput,
stderr: ProcessOutput,
mergeErrIntoOut: Boolean,
timeout: Long,
check: Boolean,
propagateEnv: Boolean
): CommandResult = call(
cwd,
env,
stdin,
stdout,
stderr,
mergeErrIntoOut,
timeout,
check,
propagateEnv,
timeoutGracePeriod = 1000
)

/**
* The most flexible of the [[os.proc]] calls, `os.proc.spawn` simply configures
* and starts a subprocess, and returns it as a `java.lang.Process` for you to
Expand Down Expand Up @@ -181,24 +212,28 @@ case class ProcGroup private[os] (commands: Seq[proc]) {
* `call` provides a number of parameters that let you configure how the pipeline
* is run:
*
* @param cwd the working directory of the pipeline
* @param env any additional environment variables you wish to set in the pipeline
* @param stdin any data you wish to pass to the pipelines's standard input (to the first process)
* @param stdout How the pipelines's output stream is configured (the last process stdout)
* @param stderr How the process's error stream is configured (set for all processes)
* @param mergeErrIntoOut merges the pipeline's stderr stream into it's stdout. Note that then the
* stderr will be forwarded with stdout to subsequent processes in the pipeline.
* @param timeout how long to wait in milliseconds for the pipeline to complete
* @param check disable this to avoid throwing an exception if the pipeline
* fails with a non-zero exit code
* @param propagateEnv disable this to avoid passing in this parent process's
* environment variables to the pipeline
* @param pipefail if true, the pipeline's exitCode will be the exit code of the first
* failing process. If no process fails, the exit code will be 0.
* @param handleBrokenPipe if true, every [[java.io.IOException]] when redirecting output of a process
* will be caught and handled by killing the writing process. This behaviour
* is consistent with handlers of SIGPIPE signals in most programs
* supporting interruptable piping. Disabled by default on Windows.
* @param cwd the working directory of the pipeline
* @param env any additional environment variables you wish to set in the pipeline
* @param stdin any data you wish to pass to the pipelines's standard input (to the first process)
* @param stdout How the pipelines's output stream is configured (the last process stdout)
* @param stderr How the process's error stream is configured (set for all processes)
* @param mergeErrIntoOut merges the pipeline's stderr stream into it's stdout. Note that then the
* stderr will be forwarded with stdout to subsequent processes in the pipeline.
* @param timeout how long to wait in milliseconds for the pipeline to complete
* @param check disable this to avoid throwing an exception if the pipeline
* fails with a non-zero exit code
* @param propagateEnv disable this to avoid passing in this parent process's
* environment variables to the pipeline
* @param pipefail if true, the pipeline's exitCode will be the exit code of the first
* failing process. If no process fails, the exit code will be 0.
* @param handleBrokenPipe if true, every [[java.io.IOException]] when redirecting output of a process
* will be caught and handled by killing the writing process. This behaviour
* is consistent with handlers of SIGPIPE signals in most programs
* supporting interruptable piping. Disabled by default on Windows.
* @param timeoutGracePeriod if the timeout is enabled, how long in milliseconds for the
* subprocess to gracefully terminate before attempting to
* forcibly kill it
* (-1 for no kill, 0 for always kill immediately)
*/
def call(
cwd: Path = null,
Expand All @@ -211,7 +246,9 @@ case class ProcGroup private[os] (commands: Seq[proc]) {
check: Boolean = true,
propagateEnv: Boolean = true,
pipefail: Boolean = true,
handleBrokenPipe: Boolean = !isWindows
handleBrokenPipe: Boolean = !isWindows,
// this cannot be next to `timeout` as this will introduce a bin-compat break (default arguments are numbered in the bytecode)
timeoutGracePeriod: Long = 1000
): CommandResult = {
val chunks = new java.util.concurrent.ConcurrentLinkedQueue[Either[geny.Bytes, geny.Bytes]]

Expand All @@ -232,7 +269,7 @@ case class ProcGroup private[os] (commands: Seq[proc]) {
pipefail
)

sub.join(timeout)
sub.join(timeout, timeoutGracePeriod)

val chunksSeq = chunks.iterator.asScala.toIndexedSeq
val res =
Expand All @@ -241,6 +278,33 @@ case class ProcGroup private[os] (commands: Seq[proc]) {
else throw SubprocessException(res)
}

private[os] def call(
cwd: Path,
env: Map[String, String],
stdin: ProcessInput,
stdout: ProcessOutput,
stderr: ProcessOutput,
mergeErrIntoOut: Boolean,
timeout: Long,
check: Boolean,
propagateEnv: Boolean,
pipefail: Boolean,
handleBrokenPipe: Boolean
): CommandResult = call(
cwd,
env,
stdin,
stdout,
stderr,
mergeErrIntoOut,
timeout,
check,
propagateEnv,
pipefail,
handleBrokenPipe,
timeoutGracePeriod = 1000
)

/**
* The most flexible of the [[os.ProcGroup]] calls. It sets-up a pipeline of processes,
* and returns a [[ProcessPipeline]] for you to interact with however you like.
Expand Down
69 changes: 52 additions & 17 deletions os/src/SubProcess.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,19 +49,27 @@ sealed trait ProcessLike extends java.lang.AutoCloseable {
*/
def waitFor(timeout: Long = -1): Boolean

// FIXME: docs
/**
* Wait up to `millis` for the [[ProcessLike]] to terminate and all stdout and stderr
* from the subprocess to be handled. By default waits indefinitely; if a time
* limit is given, explicitly destroys the [[ProcessLike]] if it has not completed by
* the time the timeout has occurred
*/
def join(timeout: Long = -1): Boolean
def join(timeout: Long = -1, timeoutGracePeriod: Long = 1000): Boolean

j-mie6 marked this conversation as resolved.
Show resolved Hide resolved
@deprecatedOverriding("this method is now a forwarder, and should not be overriden", "0.10.4")
private[os] def join(timeout: Long): Boolean = join(timeout, timeoutGracePeriod = 1000)
}

/**
* Represents a spawn subprocess that has started and may or may not have
* completed.
*/
@deprecatedInheritance(
"this class will be made final: if you are using it be aware that `join` has a new overloading",
"0.10.4"
)
class SubProcess(
val wrapped: java.lang.Process,
val inputPumperThread: Option[Thread],
Expand Down Expand Up @@ -114,17 +122,28 @@ class SubProcess(
}
}

// FIXME: documentation
/**
* Wait up to `millis` for the subprocess to terminate and all stdout and stderr
* from the subprocess to be handled. By default waits indefinitely; if a time
* limit is given, explicitly destroys the subprocess if it has not completed by
* the time the timeout has occurred
*/
def join(timeout: Long = -1): Boolean = {
def join(timeout: Long = -1, timeoutGracePeriod: Long = 1000): Boolean = {
val exitedCleanly = waitFor(timeout)
if (!exitedCleanly) {
destroy()
destroyForcibly()
assume(
timeout != -1,
"if the waitFor does not complete cleanly, this implies there is a timeout imposed, so the grace period is applicable"
)
if (timeoutGracePeriod == -1) destroy()
else if (timeoutGracePeriod == 0) destroyForcibly()
else {
destroy()
if (!waitFor(timeoutGracePeriod)) {
destroyForcibly()
}
}
waitFor(-1)
}
outputPumperThread.foreach(_.join())
Expand Down Expand Up @@ -222,6 +241,10 @@ object SubProcess {
}
}

@deprecatedInheritance(
"this class will be made final: if you are using it be aware that `join` has a new overloading",
"0.10.4"
)
class ProcessPipeline(
val processes: Seq[SubProcess],
pipefail: Boolean,
Expand Down Expand Up @@ -339,28 +362,40 @@ class ProcessPipeline(
}
}

// FIXME: documentation
/**
* Wait up to `millis` for the [[ProcessPipeline]] to terminate all the processes
* in pipeline. By default waits indefinitely; if a time limit is given, explicitly
* destroys each process if it has not completed by the time the timeout has occurred.
*/
override def join(timeout: Long = -1): Boolean = {
@tailrec
def joinRec(startedAt: Long, processesLeft: Seq[SubProcess], result: Boolean): Boolean =
processesLeft match {
case Nil => result
case head :: tail =>
val elapsed = System.currentTimeMillis() - startedAt
val timeoutLeft = Math.max(0, timeout - elapsed)
val exitedCleanly = head.join(timeoutLeft)
joinRec(startedAt, tail, result && exitedCleanly)
}
override def join(timeout: Long = -1, timeoutGracePeriod: Long = 1000): Boolean = {
// previously, this was implemented in a similar way to the waitFor above, but with
// join logic. This is much harder to make work with the grace period, because we
// want to evenly give all threads a chance to terminate gracefully. As such, this
// implementation interleaves the single-process join implementation more fairly

// in this case, the grace period does not apply, so fine
if (timeout == -1) {
processes.forall(_.join())
} else {
val timeNow = System.currentTimeMillis()
joinRec(timeNow, processes, true)
// timeout is active, so the grace period must be accounted for
val exitedCleanly = waitFor(timeout)
if (!exitedCleanly) {
if (timeoutGracePeriod == -1) destroy()
else if (timeoutGracePeriod == 0) destroyForcibly()
else {
destroy()
if (!waitFor(timeoutGracePeriod)) {
destroyForcibly()
}
}
waitFor(-1)
// note that this is the only part that isn't shared with the other implementation of join... they could be unified if this is made an
// abstract method: then this implementation can move to the superclass (except for the default -1 case above, but that could be done via an override)
processes.flatMap(_.outputPumperThread).foreach(_.join())
j-mie6 marked this conversation as resolved.
Show resolved Hide resolved
processes.flatMap(_.errorPumperThread).foreach(_.join())
}
exitedCleanly
}
}
}
Expand Down