From 707567cb264f3f05cd3fded6e9e7e730b19b14c8 Mon Sep 17 00:00:00 2001 From: Michael L Heuer Date: Wed, 25 Jan 2017 20:15:21 -0600 Subject: [PATCH] Add pipe API in and out formatters for Features --- .../adam/rdd/feature/BEDInFormatter.scala | 65 +++++++++++++++++++ .../adam/rdd/feature/BEDOutFormatter.scala | 63 ++++++++++++++++++ .../adam/rdd/feature/GFF3InFormatter.scala | 65 +++++++++++++++++++ .../adam/rdd/feature/GFF3OutFormatter.scala | 63 ++++++++++++++++++ .../adam/rdd/feature/GTFInFormatter.scala | 65 +++++++++++++++++++ .../adam/rdd/feature/GTFOutFormatter.scala | 63 ++++++++++++++++++ .../rdd/feature/NarrowPeakInFormatter.scala | 65 +++++++++++++++++++ .../rdd/feature/NarrowPeakOutFormatter.scala | 63 ++++++++++++++++++ adam-core/src/test/resources/dvl1.200.gtf | 1 - .../adam/rdd/feature/FeatureRDDSuite.scala | 48 ++++++++++++++ docs/source/60_building_apps.md | 9 ++- 11 files changed, 568 insertions(+), 2 deletions(-) create mode 100644 adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/BEDInFormatter.scala create mode 100644 adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/BEDOutFormatter.scala create mode 100644 adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/GFF3InFormatter.scala create mode 100644 adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/GFF3OutFormatter.scala create mode 100644 adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/GTFInFormatter.scala create mode 100644 adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/GTFOutFormatter.scala create mode 100644 adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/NarrowPeakInFormatter.scala create mode 100644 adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/NarrowPeakOutFormatter.scala diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/BEDInFormatter.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/BEDInFormatter.scala new file mode 100644 index 0000000000..a3ce74740f --- /dev/null +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/BEDInFormatter.scala @@ -0,0 +1,65 @@ +/** + * Licensed to Big Data Genomics (BDG) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The BDG licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.bdgenomics.adam.rdd.feature + +import java.io.{ + BufferedWriter, + OutputStream, + OutputStreamWriter +} +import org.bdgenomics.adam.rdd.{ InFormatter, InFormatterCompanion } +import org.bdgenomics.formats.avro.Feature +import org.bdgenomics.utils.misc.Logging + +/** + * InFormatter companion that builds a BEDInFormatter to write features in BED format to a pipe. + */ +object BEDInFormatter extends InFormatterCompanion[Feature, FeatureRDD, BEDInFormatter] { + + /** + * Apply method for building the BEDInFormatter from a FeatureRDD. + * + * @param fRdd FeatureRDD to build from. + */ + def apply(fRdd: FeatureRDD): BEDInFormatter = { + BEDInFormatter() + } +} + +case class BEDInFormatter private () extends InFormatter[Feature, FeatureRDD, BEDInFormatter] { + protected val companion = BEDInFormatter + + /** + * Writes features to an output stream in BED format. + * + * @param os An OutputStream connected to a process we are piping to. + * @param iter An iterator of features to write. + */ + def write(os: OutputStream, iter: Iterator[Feature]) { + val writer = new BufferedWriter(new OutputStreamWriter(os)) + + // write the features + iter.foreach(f => { + writer.write(FeatureRDD.toBed(f)) + writer.newLine() + }) + + // close the writer, else stream may be defective + writer.close() // os is flushed and closed in InFormatterRunner + } +} diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/BEDOutFormatter.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/BEDOutFormatter.scala new file mode 100644 index 0000000000..763e224c77 --- /dev/null +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/BEDOutFormatter.scala @@ -0,0 +1,63 @@ +/** + * Licensed to Big Data Genomics (BDG) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The BDG licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.bdgenomics.adam.rdd.feature + +import htsjdk.samtools.ValidationStringency +import htsjdk.tribble.readers.{ + AsciiLineReader, + AsciiLineReaderIterator +} +import java.io.InputStream +import org.bdgenomics.adam.rdd.OutFormatter +import org.bdgenomics.formats.avro.Feature +import scala.annotation.tailrec +import scala.collection.mutable.ListBuffer + +/** + * OutFormatter that reads streaming BED format. + */ +case class BEDOutFormatter() extends OutFormatter[Feature] { + val bedParser = new BEDParser + val stringency = ValidationStringency.STRICT + + /** + * Reads features from an input stream in BED format. + * + * @param is An InputStream connected to the process we are piping from. + * @return Returns an iterator of Features read from the stream. + */ + def read(is: InputStream): Iterator[Feature] = { + + // make line reader iterator + val lri = new AsciiLineReaderIterator(new AsciiLineReader(is)) + + @tailrec def convertIterator(iter: AsciiLineReaderIterator, + features: ListBuffer[Feature] = ListBuffer.empty): Iterator[Feature] = { + if (!iter.hasNext) { + iter.close() + features.toIterator + } else { + val nextFeatures = bedParser.parse(iter.next, stringency).fold(features)(features += _) + convertIterator(iter, nextFeatures) + } + } + + // convert the iterator + convertIterator(lri) + } +} diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/GFF3InFormatter.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/GFF3InFormatter.scala new file mode 100644 index 0000000000..2202fb28b4 --- /dev/null +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/GFF3InFormatter.scala @@ -0,0 +1,65 @@ +/** + * Licensed to Big Data Genomics (BDG) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The BDG licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.bdgenomics.adam.rdd.feature + +import java.io.{ + BufferedWriter, + OutputStream, + OutputStreamWriter +} +import org.bdgenomics.adam.rdd.{ InFormatter, InFormatterCompanion } +import org.bdgenomics.formats.avro.Feature +import org.bdgenomics.utils.misc.Logging + +/** + * InFormatter companion that builds a GFF3InFormatter to write features in GFF3 format to a pipe. + */ +object GFF3InFormatter extends InFormatterCompanion[Feature, FeatureRDD, GFF3InFormatter] { + + /** + * Apply method for building the GFF3InFormatter from a FeatureRDD. + * + * @param fRdd FeatureRDD to build from. + */ + def apply(fRdd: FeatureRDD): GFF3InFormatter = { + GFF3InFormatter() + } +} + +case class GFF3InFormatter private () extends InFormatter[Feature, FeatureRDD, GFF3InFormatter] { + protected val companion = GFF3InFormatter + + /** + * Writes features to an output stream in GFF3 format. + * + * @param os An OutputStream connected to a process we are piping to. + * @param iter An iterator of features to write. + */ + def write(os: OutputStream, iter: Iterator[Feature]) { + val writer = new BufferedWriter(new OutputStreamWriter(os)) + + // write the features + iter.foreach(f => { + writer.write(FeatureRDD.toGff3(f)) + writer.newLine() + }) + + // close the writer, else stream may be defective + writer.close() // os is flushed and closed in InFormatterRunner + } +} diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/GFF3OutFormatter.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/GFF3OutFormatter.scala new file mode 100644 index 0000000000..654a5e825f --- /dev/null +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/GFF3OutFormatter.scala @@ -0,0 +1,63 @@ +/** + * Licensed to Big Data Genomics (BDG) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The BDG licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.bdgenomics.adam.rdd.feature + +import htsjdk.samtools.ValidationStringency +import htsjdk.tribble.readers.{ + AsciiLineReader, + AsciiLineReaderIterator +} +import java.io.InputStream +import org.bdgenomics.adam.rdd.OutFormatter +import org.bdgenomics.formats.avro.Feature +import scala.annotation.tailrec +import scala.collection.mutable.ListBuffer + +/** + * OutFormatter that reads streaming GFF3 format. + */ +case class GFF3OutFormatter() extends OutFormatter[Feature] { + val gff3Parser = new GFF3Parser + val stringency = ValidationStringency.STRICT + + /** + * Reads features from an input stream in GFF3 format. + * + * @param is An InputStream connected to the process we are piping from. + * @return Returns an iterator of Features read from the stream. + */ + def read(is: InputStream): Iterator[Feature] = { + + // make line reader iterator + val lri = new AsciiLineReaderIterator(new AsciiLineReader(is)) + + @tailrec def convertIterator(iter: AsciiLineReaderIterator, + features: ListBuffer[Feature] = ListBuffer.empty): Iterator[Feature] = { + if (!iter.hasNext) { + iter.close() + features.toIterator + } else { + val nextFeatures = gff3Parser.parse(iter.next, stringency).fold(features)(features += _) + convertIterator(iter, nextFeatures) + } + } + + // convert the iterator + convertIterator(lri) + } +} diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/GTFInFormatter.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/GTFInFormatter.scala new file mode 100644 index 0000000000..30d4f163c5 --- /dev/null +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/GTFInFormatter.scala @@ -0,0 +1,65 @@ +/** + * Licensed to Big Data Genomics (BDG) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The BDG licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.bdgenomics.adam.rdd.feature + +import java.io.{ + BufferedWriter, + OutputStream, + OutputStreamWriter +} +import org.bdgenomics.adam.rdd.{ InFormatter, InFormatterCompanion } +import org.bdgenomics.formats.avro.Feature +import org.bdgenomics.utils.misc.Logging + +/** + * InFormatter companion that builds a GTFInFormatter to write features in GTF format to a pipe. + */ +object GTFInFormatter extends InFormatterCompanion[Feature, FeatureRDD, GTFInFormatter] { + + /** + * Apply method for building the GTFInFormatter from a FeatureRDD. + * + * @param fRdd FeatureRDD to build from. + */ + def apply(fRdd: FeatureRDD): GTFInFormatter = { + GTFInFormatter() + } +} + +case class GTFInFormatter private () extends InFormatter[Feature, FeatureRDD, GTFInFormatter] { + protected val companion = GTFInFormatter + + /** + * Writes features to an output stream in GTF format. + * + * @param os An OutputStream connected to a process we are piping to. + * @param iter An iterator of features to write. + */ + def write(os: OutputStream, iter: Iterator[Feature]) { + val writer = new BufferedWriter(new OutputStreamWriter(os)) + + // write the features + iter.foreach(f => { + writer.write(FeatureRDD.toGtf(f)) + writer.newLine() + }) + + // close the writer, else stream may be defective + writer.close() // os is flushed and closed in InFormatterRunner + } +} diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/GTFOutFormatter.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/GTFOutFormatter.scala new file mode 100644 index 0000000000..fd504879d6 --- /dev/null +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/GTFOutFormatter.scala @@ -0,0 +1,63 @@ +/** + * Licensed to Big Data Genomics (BDG) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The BDG licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.bdgenomics.adam.rdd.feature + +import htsjdk.samtools.ValidationStringency +import htsjdk.tribble.readers.{ + AsciiLineReader, + AsciiLineReaderIterator +} +import java.io.InputStream +import org.bdgenomics.adam.rdd.OutFormatter +import org.bdgenomics.formats.avro.Feature +import scala.annotation.tailrec +import scala.collection.mutable.ListBuffer + +/** + * OutFormatter that reads streaming GTF format. + */ +case class GTFOutFormatter() extends OutFormatter[Feature] { + val gtfParser = new GTFParser + val stringency = ValidationStringency.STRICT + + /** + * Reads features from an input stream in GTF format. + * + * @param is An InputStream connected to the process we are piping from. + * @return Returns an iterator of Features read from the stream. + */ + def read(is: InputStream): Iterator[Feature] = { + + // make line reader iterator + val lri = new AsciiLineReaderIterator(new AsciiLineReader(is)) + + @tailrec def convertIterator(iter: AsciiLineReaderIterator, + features: ListBuffer[Feature] = ListBuffer.empty): Iterator[Feature] = { + if (!iter.hasNext) { + iter.close() + features.toIterator + } else { + val nextFeatures = gtfParser.parse(iter.next, stringency).fold(features)(features += _) + convertIterator(iter, nextFeatures) + } + } + + // convert the iterator + convertIterator(lri) + } +} diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/NarrowPeakInFormatter.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/NarrowPeakInFormatter.scala new file mode 100644 index 0000000000..46ebb0153c --- /dev/null +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/NarrowPeakInFormatter.scala @@ -0,0 +1,65 @@ +/** + * Licensed to Big Data Genomics (BDG) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The BDG licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.bdgenomics.adam.rdd.feature + +import java.io.{ + BufferedWriter, + OutputStream, + OutputStreamWriter +} +import org.bdgenomics.adam.rdd.{ InFormatter, InFormatterCompanion } +import org.bdgenomics.formats.avro.Feature +import org.bdgenomics.utils.misc.Logging + +/** + * InFormatter companion that builds a NarrowPeakInFormatter to write features in NarrowPeak format to a pipe. + */ +object NarrowPeakInFormatter extends InFormatterCompanion[Feature, FeatureRDD, NarrowPeakInFormatter] { + + /** + * Apply method for building the NarrowPeakInFormatter from a FeatureRDD. + * + * @param fRdd FeatureRDD to build from. + */ + def apply(fRdd: FeatureRDD): NarrowPeakInFormatter = { + NarrowPeakInFormatter() + } +} + +case class NarrowPeakInFormatter private () extends InFormatter[Feature, FeatureRDD, NarrowPeakInFormatter] { + protected val companion = NarrowPeakInFormatter + + /** + * Writes features to an output stream in NarrowPeak format. + * + * @param os An OutputStream connected to a process we are piping to. + * @param iter An iterator of features to write. + */ + def write(os: OutputStream, iter: Iterator[Feature]) { + val writer = new BufferedWriter(new OutputStreamWriter(os)) + + // write the features + iter.foreach(f => { + writer.write(FeatureRDD.toNarrowPeak(f)) + writer.newLine() + }) + + // close the writer, else stream may be defective + writer.close() // os is flushed and closed in InFormatterRunner + } +} diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/NarrowPeakOutFormatter.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/NarrowPeakOutFormatter.scala new file mode 100644 index 0000000000..37328a436d --- /dev/null +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/NarrowPeakOutFormatter.scala @@ -0,0 +1,63 @@ +/** + * Licensed to Big Data Genomics (BDG) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The BDG licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.bdgenomics.adam.rdd.feature + +import htsjdk.samtools.ValidationStringency +import htsjdk.tribble.readers.{ + AsciiLineReader, + AsciiLineReaderIterator +} +import java.io.InputStream +import org.bdgenomics.adam.rdd.OutFormatter +import org.bdgenomics.formats.avro.Feature +import scala.annotation.tailrec +import scala.collection.mutable.ListBuffer + +/** + * OutFormatter that reads streaming NarrowPeak format. + */ +case class NarrowPeakOutFormatter() extends OutFormatter[Feature] { + val narrowPeakParser = new NarrowPeakParser + val stringency = ValidationStringency.STRICT + + /** + * Reads features from an input stream in NarrowPeak format. + * + * @param is An InputStream connected to the process we are piping from. + * @return Returns an iterator of Features read from the stream. + */ + def read(is: InputStream): Iterator[Feature] = { + + // make line reader iterator + val lri = new AsciiLineReaderIterator(new AsciiLineReader(is)) + + @tailrec def convertIterator(iter: AsciiLineReaderIterator, + features: ListBuffer[Feature] = ListBuffer.empty): Iterator[Feature] = { + if (!iter.hasNext) { + iter.close() + features.toIterator + } else { + val nextFeatures = narrowPeakParser.parse(iter.next, stringency).fold(features)(features += _) + convertIterator(iter, nextFeatures) + } + } + + // convert the iterator + convertIterator(lri) + } +} diff --git a/adam-core/src/test/resources/dvl1.200.gtf b/adam-core/src/test/resources/dvl1.200.gtf index bfd419b3a1..44450831c6 100644 --- a/adam-core/src/test/resources/dvl1.200.gtf +++ b/adam-core/src/test/resources/dvl1.200.gtf @@ -1,4 +1,3 @@ -seqname source feature start end score strand frame gene_id transcript_id 1 Ensembl similarity 1331346 1331534 100 + . 1 Ensembl similarity 1331346 1331540 199 + . 1 Ensembl similarity 1331346 1331558 344 + . diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/feature/FeatureRDDSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/feature/FeatureRDDSuite.scala index 4c25996d5e..5ebd3dd28e 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/feature/FeatureRDDSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/feature/FeatureRDDSuite.scala @@ -784,4 +784,52 @@ class FeatureRDDSuite extends ADAMFunSuite with TypeCheckedTripleEquals { assert(features.sequences.apply("chr1").isDefined) assert(features.sequences.apply("chr1").get.length >= 794336L) } + + sparkTest("don't lose any features when piping as BED format") { + val inputPath = testFile("dvl1.200.bed") + val frdd = sc.loadBed(inputPath) + + implicit val tFormatter = BEDInFormatter + implicit val uFormatter = new BEDOutFormatter + + val pipedRdd: FeatureRDD = frdd.pipe("tee /dev/null") + assert(pipedRdd.rdd.count >= frdd.rdd.count) + assert(pipedRdd.rdd.distinct.count === frdd.rdd.distinct.count) + } + + sparkTest("don't lose any features when piping as GTF format") { + val inputPath = testFile("Homo_sapiens.GRCh37.75.trun100.gtf") + val frdd = sc.loadGtf(inputPath) + + implicit val tFormatter = GTFInFormatter + implicit val uFormatter = new GTFOutFormatter + + val pipedRdd: FeatureRDD = frdd.pipe("tee /dev/null") + assert(pipedRdd.rdd.count >= frdd.rdd.count) + assert(pipedRdd.rdd.distinct.count === frdd.rdd.distinct.count) + } + + sparkTest("don't lose any features when piping as GFF3 format") { + val inputPath = testFile("dvl1.200.gff3") + val frdd = sc.loadGff3(inputPath) + + implicit val tFormatter = GFF3InFormatter + implicit val uFormatter = new GFF3OutFormatter + + val pipedRdd: FeatureRDD = frdd.pipe("tee /dev/null") + assert(pipedRdd.rdd.count >= frdd.rdd.count) + assert(pipedRdd.rdd.distinct.count === frdd.rdd.distinct.count) + } + + sparkTest("don't lose any features when piping as NarrowPeak format") { + val inputPath = testFile("wgEncodeOpenChromDnaseGm19238Pk.trunc10.narrowPeak") + val frdd = sc.loadNarrowPeak(inputPath) + + implicit val tFormatter = NarrowPeakInFormatter + implicit val uFormatter = new NarrowPeakOutFormatter + + val pipedRdd: FeatureRDD = frdd.pipe("tee /dev/null") + assert(pipedRdd.rdd.count >= frdd.rdd.count) + assert(pipedRdd.rdd.distinct.count === frdd.rdd.distinct.count) + } } diff --git a/docs/source/60_building_apps.md b/docs/source/60_building_apps.md index 187807de0f..febb690cef 100644 --- a/docs/source/60_building_apps.md +++ b/docs/source/60_building_apps.md @@ -602,6 +602,13 @@ support the following: pipe, with the exact format autodetected from the stream. * We do not support piping CRAM due to complexities around the reference-based compression. +* `FeatureRDD`: + * `InForamtter`s: `BEDInFormatter`, `GFF3InFormatter`, `GTFInFormatter`, and + `NarrowPeakInFormatter` for writing features out to a pipe in BED, GFF3, GTF/GFF2, + or NarrowPeak format, respectively. + * `OutFormatter`s: `BEDOutFormatter`, `GFF3OutFormatter`, `GTFOutFormatter`, and + `NarrowPeakInFormatter` for reading features in BED, GFF3, GTF/GFF2, or NarrowPeak + format in from a pipe, respectively. * `FragmentRDD`: * `InFormatter`: `InterleavedFASTQInFormatter` writes FASTQ with the reads from a paired sequencing protocol interleaved in the FASTQ stream to a pipe. @@ -672,4 +679,4 @@ machine in our cluster. We suggest several different approaches: (e.g., python, dynamically linked libraries) are installed across each node on your cluster. * Run the command using a container system such as [Docker](https://docker.io) - or [Singularity](http://singularity.lbl.gov/). \ No newline at end of file + or [Singularity](http://singularity.lbl.gov/).