Skip to content

Commit

Permalink
[ADAM-1875] Add ability to timeout a piped command.
Browse files Browse the repository at this point in the history
Resolves #1875. Adds an optional timeout parameter to the `GenomicRDD.pipe`
function. If this is provided, a command will be killed (but without failing the
Spark task) if it runs for longer than the timeout parameter.
  • Loading branch information
Frank Austin Nothaft authored and heuermh committed Jan 25, 2018
1 parent 2ec9f1c commit adff336
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 13 deletions.
25 changes: 16 additions & 9 deletions adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -454,21 +454,27 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] extends Logging {
* @param environment A map containing environment variable/value pairs to set
* in the environment for the newly created process. Default is empty.
* @param flankSize Number of bases to flank each command invocation by.
* @param optTimeout An optional parameter specifying how long to let a single
* partition run for, in seconds. If the partition times out, the partial
* results will be returned, and no exception will be logged. The partition
* will log that the command timed out.
* @return Returns a new GenomicRDD of type Y.
*
* @tparam X The type of the record created by the piped command.
* @tparam Y A GenomicRDD containing X's.
* @tparam V The InFormatter to use for formatting the data being piped to the
* command.
*/
def pipe[X, Y <: GenomicRDD[X, Y], V <: InFormatter[T, U, V]](cmd: String,
files: Seq[String] = Seq.empty,
environment: Map[String, String] = Map.empty,
flankSize: Int = 0)(implicit tFormatterCompanion: InFormatterCompanion[T, U, V],
xFormatter: OutFormatter[X],
convFn: (U, RDD[X]) => Y,
tManifest: ClassTag[T],
xManifest: ClassTag[X]): Y = {
def pipe[X, Y <: GenomicRDD[X, Y], V <: InFormatter[T, U, V]](
cmd: String,
files: Seq[String] = Seq.empty,
environment: Map[String, String] = Map.empty,
flankSize: Int = 0,
optTimeout: Option[Int] = None)(implicit tFormatterCompanion: InFormatterCompanion[T, U, V],
xFormatter: OutFormatter[X],
convFn: (U, RDD[X]) => Y,
tManifest: ClassTag[T],
xManifest: ClassTag[X]): Y = {

// TODO: support broadcasting files
files.foreach(f => {
Expand Down Expand Up @@ -554,7 +560,8 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] extends Logging {
new OutFormatterRunner[X, OutFormatter[X]](xFormatter,
is,
process,
finalCmd)
finalCmd,
optTimeout)
} else {
Iterator[X]()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,50 @@ package org.bdgenomics.adam.rdd

import java.io.InputStream
import java.lang.Process
import java.util.concurrent.Callable
import java.util.concurrent.{ Callable, TimeUnit }
import org.bdgenomics.utils.misc.Logging

private[rdd] class OutFormatterRunner[T, U <: OutFormatter[T]](formatter: U,
is: InputStream,
process: Process,
finalCmd: List[String]) extends Iterator[T] {
finalCmd: List[String],
optTimeout: Option[Int]) extends Iterator[T] with Logging {

private val startTime = System.currentTimeMillis()
private val iter = formatter.read(is)

private def hasTimedOut(): Boolean = {
optTimeout.map(timeoutSec => {
val currTime = System.currentTimeMillis()
(currTime - startTime) >= (timeoutSec * 1000L)
}).getOrElse(false)
}

private def timeLeft(timeout: Int): Long = {
val currTime = System.currentTimeMillis()
(timeout * 1000L) - currTime
}

def hasNext: Boolean = {
if (iter.hasNext) {
if (hasTimedOut()) {
log.warn("Piped command %s timed out after %d seconds.".format(
finalCmd, optTimeout.get))
process.destroy()
false
} else if (iter.hasNext) {
true
} else {
val exitCode = process.waitFor()
val exitCode = optTimeout.fold(process.waitFor())(timeout => {
val exited = process.waitFor(timeLeft(timeout), TimeUnit.MILLISECONDS)
if (exited) {
process.exitValue()
} else {
log.warn("Piped command %s timed out after %d seconds.".format(
finalCmd, timeout))
process.destroy()
0
}
})
if (exitCode != 0) {
throw new RuntimeException("Piped command %s exited with error code %d.".format(
finalCmd, exitCode))
Expand Down
21 changes: 21 additions & 0 deletions adam-core/src/test/resources/timeout.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#!/usr/bin/env python

from __future__ import print_function
import sys
import time

# read lines from stdin
lines = sys.stdin.readlines()

def print_lines(skip_header=False):
for line in lines:
if not (skip_header and line.startswith('@')):
print(line.strip().rstrip())

print_lines()
sys.stdout.flush()

time.sleep(10)

print_lines(skip_header=True)
sys.stdout.flush()
Original file line number Diff line number Diff line change
Expand Up @@ -830,6 +830,51 @@ class AlignmentRecordRDDSuite extends ADAMFunSuite {
assert(records === newRecords)
}

sparkTest("lose all records when a command times out") {
val reads12Path = testFile("reads12.sam")
val ardd = sc.loadBam(reads12Path)

implicit val tFormatter = SAMInFormatter
implicit val uFormatter = new AnySAMOutFormatter

val pipedRdd: AlignmentRecordRDD = ardd.pipe("sleep 10", optTimeout = Some(5))
val newRecords = pipedRdd.rdd.count
assert(newRecords === 0)
}

sparkTest("lose no records without a timeout") {
val reads12Path = testFile("reads12.sam")
val ardd = sc.loadBam(reads12Path)

implicit val tFormatter = SAMInFormatter
implicit val uFormatter = new AnySAMOutFormatter

// this script reads the reads into a temp file, which is then read to
// stdout, then we sleep for 10 sec, then we read to stdout again
val scriptPath = testFile("timeout.py")
val pipedRdd: AlignmentRecordRDD = ardd.pipe("python $0",
files = Seq(scriptPath))
val newRecords = pipedRdd.rdd.count
assert(newRecords === (2 * ardd.rdd.count))
}

sparkTest("lose some records when a command times out") {
val reads12Path = testFile("reads12.sam")
val ardd = sc.loadBam(reads12Path)

implicit val tFormatter = SAMInFormatter
implicit val uFormatter = new AnySAMOutFormatter

// this script reads the reads into a temp file, which is then read to
// stdout, then we sleep for 10 sec, then we read to stdout again
val scriptPath = testFile("timeout.py")
val pipedRdd: AlignmentRecordRDD = ardd.pipe("python $0",
optTimeout = Some(5),
files = Seq(scriptPath))
val newRecords = pipedRdd.rdd.count
assert(newRecords === ardd.rdd.count)
}

sparkTest("don't lose any reads when piping as SAM using java pipe") {
val reads12Path = testFile("reads12.sam")
val ardd = sc.loadBam(reads12Path)
Expand Down

0 comments on commit adff336

Please sign in to comment.