diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicRDD.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicRDD.scala index 6f34541458..f6bd60c275 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicRDD.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicRDD.scala @@ -454,6 +454,10 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] extends Logging { * @param environment A map containing environment variable/value pairs to set * in the environment for the newly created process. Default is empty. * @param flankSize Number of bases to flank each command invocation by. + * @param optTimeout An optional parameter specifying how long to let a single + * partition run for, in seconds. If the partition times out, the partial + * results will be returned, and no exception will be logged. The partition + * will log that the command timed out. * @return Returns a new GenomicRDD of type Y. * * @tparam X The type of the record created by the piped command. @@ -461,14 +465,16 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] extends Logging { * @tparam V The InFormatter to use for formatting the data being piped to the * command. */ - def pipe[X, Y <: GenomicRDD[X, Y], V <: InFormatter[T, U, V]](cmd: String, - files: Seq[String] = Seq.empty, - environment: Map[String, String] = Map.empty, - flankSize: Int = 0)(implicit tFormatterCompanion: InFormatterCompanion[T, U, V], - xFormatter: OutFormatter[X], - convFn: (U, RDD[X]) => Y, - tManifest: ClassTag[T], - xManifest: ClassTag[X]): Y = { + def pipe[X, Y <: GenomicRDD[X, Y], V <: InFormatter[T, U, V]]( + cmd: String, + files: Seq[String] = Seq.empty, + environment: Map[String, String] = Map.empty, + flankSize: Int = 0, + optTimeout: Option[Int] = None)(implicit tFormatterCompanion: InFormatterCompanion[T, U, V], + xFormatter: OutFormatter[X], + convFn: (U, RDD[X]) => Y, + tManifest: ClassTag[T], + xManifest: ClassTag[X]): Y = { // TODO: support broadcasting files files.foreach(f => { @@ -554,7 +560,8 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] extends Logging { new OutFormatterRunner[X, OutFormatter[X]](xFormatter, is, process, - finalCmd) + finalCmd, + optTimeout) } else { Iterator[X]() } diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/OutFormatter.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/OutFormatter.scala index 37c156f0e5..06a3d8d404 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/OutFormatter.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/OutFormatter.scala @@ -19,20 +19,50 @@ package org.bdgenomics.adam.rdd import java.io.InputStream import java.lang.Process -import java.util.concurrent.Callable +import java.util.concurrent.{ Callable, TimeUnit } +import org.bdgenomics.utils.misc.Logging private[rdd] class OutFormatterRunner[T, U <: OutFormatter[T]](formatter: U, is: InputStream, process: Process, - finalCmd: List[String]) extends Iterator[T] { + finalCmd: List[String], + optTimeout: Option[Int]) extends Iterator[T] with Logging { + private val startTime = System.currentTimeMillis() private val iter = formatter.read(is) + private def hasTimedOut(): Boolean = { + optTimeout.map(timeoutSec => { + val currTime = System.currentTimeMillis() + (currTime - startTime) >= (timeoutSec * 1000L) + }).getOrElse(false) + } + + private def timeLeft(timeout: Int): Long = { + val currTime = System.currentTimeMillis() + (timeout * 1000L) - currTime + } + def hasNext: Boolean = { - if (iter.hasNext) { + if (hasTimedOut()) { + log.warn("Piped command %s timed out after %d seconds.".format( + finalCmd, optTimeout.get)) + process.destroy() + false + } else if (iter.hasNext) { true } else { - val exitCode = process.waitFor() + val exitCode = optTimeout.fold(process.waitFor())(timeout => { + val exited = process.waitFor(timeLeft(timeout), TimeUnit.MILLISECONDS) + if (exited) { + process.exitValue() + } else { + log.warn("Piped command %s timed out after %d seconds.".format( + finalCmd, timeout)) + process.destroy() + 0 + } + }) if (exitCode != 0) { throw new RuntimeException("Piped command %s exited with error code %d.".format( finalCmd, exitCode)) diff --git a/adam-core/src/test/resources/timeout.py b/adam-core/src/test/resources/timeout.py new file mode 100644 index 0000000000..b4d5035945 --- /dev/null +++ b/adam-core/src/test/resources/timeout.py @@ -0,0 +1,21 @@ +#!/usr/bin/env python + +from __future__ import print_function +import sys +import time + +# read lines from stdin +lines = sys.stdin.readlines() + +def print_lines(skip_header=False): + for line in lines: + if not (skip_header and line.startswith('@')): + print(line.strip().rstrip()) + +print_lines() +sys.stdout.flush() + +time.sleep(10) + +print_lines(skip_header=True) +sys.stdout.flush() diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDDSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDDSuite.scala index a8e71ac77a..674186b7c0 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDDSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDDSuite.scala @@ -830,6 +830,51 @@ class AlignmentRecordRDDSuite extends ADAMFunSuite { assert(records === newRecords) } + sparkTest("lose all records when a command times out") { + val reads12Path = testFile("reads12.sam") + val ardd = sc.loadBam(reads12Path) + + implicit val tFormatter = SAMInFormatter + implicit val uFormatter = new AnySAMOutFormatter + + val pipedRdd: AlignmentRecordRDD = ardd.pipe("sleep 10", optTimeout = Some(5)) + val newRecords = pipedRdd.rdd.count + assert(newRecords === 0) + } + + sparkTest("lose no records without a timeout") { + val reads12Path = testFile("reads12.sam") + val ardd = sc.loadBam(reads12Path) + + implicit val tFormatter = SAMInFormatter + implicit val uFormatter = new AnySAMOutFormatter + + // this script reads the reads into a temp file, which is then read to + // stdout, then we sleep for 10 sec, then we read to stdout again + val scriptPath = testFile("timeout.py") + val pipedRdd: AlignmentRecordRDD = ardd.pipe("python $0", + files = Seq(scriptPath)) + val newRecords = pipedRdd.rdd.count + assert(newRecords === (2 * ardd.rdd.count)) + } + + sparkTest("lose some records when a command times out") { + val reads12Path = testFile("reads12.sam") + val ardd = sc.loadBam(reads12Path) + + implicit val tFormatter = SAMInFormatter + implicit val uFormatter = new AnySAMOutFormatter + + // this script reads the reads into a temp file, which is then read to + // stdout, then we sleep for 10 sec, then we read to stdout again + val scriptPath = testFile("timeout.py") + val pipedRdd: AlignmentRecordRDD = ardd.pipe("python $0", + optTimeout = Some(5), + files = Seq(scriptPath)) + val newRecords = pipedRdd.rdd.count + assert(newRecords === ardd.rdd.count) + } + sparkTest("don't lose any reads when piping as SAM using java pipe") { val reads12Path = testFile("reads12.sam") val ardd = sc.loadBam(reads12Path)