Skip to content

Commit

Permalink
Merge pull request #784 from fnothaft/sort-760
Browse files Browse the repository at this point in the history
[ADAM-783] Write @sq header lines in sorted order.
  • Loading branch information
ryan-williams committed Aug 21, 2015
2 parents ec525d4 + bf96804 commit de3fa55
Show file tree
Hide file tree
Showing 6 changed files with 140 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ class Transform(protected val args: TransformArgs) extends BDGSparkCommand[Trans
} else {
sc.loadAlignments(args.inputPath)
}
}).adamSave(args)
}).adamSave(args, args.sortReads)
}

private def createKnownSnpsTable(sc: SparkContext): SnpTable = CreateKnownSnpsTable.time {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,14 +109,22 @@ class SequenceDictionary(val records: Vector[SequenceRecord]) extends Serializab
* @return Returns a SAM formatted sequence dictionary.
*/
def toSAMSequenceDictionary: SAMSequenceDictionary = {
new SAMSequenceDictionary(records.map(_ toSAMSequenceRecord).toList)
import SequenceRecord._
new SAMSequenceDictionary(records.sorted.map(_ toSAMSequenceRecord).toList)
}

override def toString: String = {
records.map(_.toString).fold("SequenceDictionary{")(_ + "\n" + _) + "}"
}
}

object SequenceOrdering extends Ordering[SequenceRecord] {
def compare(a: SequenceRecord,
b: SequenceRecord): Int = {
a.name.compareTo(b.name)
}
}

/**
* Utility class within the SequenceDictionary; represents unique reference name-to-id correspondence
*
Expand Down Expand Up @@ -183,6 +191,8 @@ object SequenceRecord {
val REFSEQ_TAG = "REFSEQ"
val GENBANK_TAG = "GENBANK"

implicit def ordering = SequenceOrdering

def apply(name: String,
length: Long,
md5: String = null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ class AlignmentRecordRDDFunctions(rdd: RDD[AlignmentRecord])
rdd.filter(overlapsQuery)
}

def maybeSaveBam(args: ADAMSaveAnyArgs): Boolean = {
def maybeSaveBam(args: ADAMSaveAnyArgs,
isSorted: Boolean = false): Boolean = {
if (args.outputPath.endsWith(".sam")) {
log.info("Saving data in SAM format")
rdd.adamSAMSave(args.outputPath, asSingleFile = args.asSingleFile)
Expand All @@ -91,8 +92,9 @@ class AlignmentRecordRDDFunctions(rdd: RDD[AlignmentRecord])
maybeSaveBam(args) || { rdd.adamParquetSave(args); true }
}

def adamSave(args: ADAMSaveAnyArgs) = {
maybeSaveBam(args) || maybeSaveFastq(args) || { rdd.adamParquetSave(args); true }
def adamSave(args: ADAMSaveAnyArgs,
isSorted: Boolean = false) = {
maybeSaveBam(args, isSorted) || maybeSaveFastq(args) || { rdd.adamParquetSave(args); true }
}

def adamSAMString: String = {
Expand Down Expand Up @@ -120,11 +122,15 @@ class AlignmentRecordRDDFunctions(rdd: RDD[AlignmentRecord])
*
* @param filePath Path to save files to.
* @param asSam Selects whether to save as SAM or BAM. The default value is true (save in SAM format).
* @param isSorted If the output is sorted, this will modify the header.
*/
def adamSAMSave(filePath: String, asSam: Boolean = true, asSingleFile: Boolean = false) = SAMSave.time {
def adamSAMSave(filePath: String,
asSam: Boolean = true,
asSingleFile: Boolean = false,
isSorted: Boolean = false) = SAMSave.time {

// convert the records
val (convertRecords: RDD[SAMRecordWritable], header: SAMFileHeader) = rdd.adamConvertToSAM()
val (convertRecords: RDD[SAMRecordWritable], header: SAMFileHeader) = rdd.adamConvertToSAM(isSorted)

// add keys to our records
val withKey =
Expand Down Expand Up @@ -227,7 +233,7 @@ class AlignmentRecordRDDFunctions(rdd: RDD[AlignmentRecord])
*
* @return Returns a SAM/BAM formatted RDD of reads, as well as the file header.
*/
def adamConvertToSAM(): (RDD[SAMRecordWritable], SAMFileHeader) = ConvertToSAM.time {
def adamConvertToSAM(isSorted: Boolean = false): (RDD[SAMRecordWritable], SAMFileHeader) = ConvertToSAM.time {
// collect global summary data
val sd = rdd.adamGetSequenceDictionary()
val rgd = rdd.adamGetReadGroupDictionary()
Expand All @@ -238,6 +244,10 @@ class AlignmentRecordRDDFunctions(rdd: RDD[AlignmentRecord])
// create header
val header = adamRecordConverter.createSAMHeader(sd, rgd)

if (isSorted) {
header.setSortOrder(SAMFileHeader.SortOrder.coordinate)
}

// broadcast for efficiency
val hdrBcast = rdd.context.broadcast(SAMFileHeaderWritable(header))

Expand Down
10 changes: 10 additions & 0 deletions adam-core/src/test/resources/sorted.sam
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
@HD VN:1.4 SO:coordinate
@SQ SN:1 LN:1000
@SQ SN:2 LN:1000
@SQ SN:3 LN:1000
@SQ SN:4 LN:2000
A 0 1 1 50 10M * 0 0 ACACACACAC **********
E 0 2 101 45 10M * 0 0 ACACACACAC **********
D 0 2 501 55 10M2S * 0 0 ACACACACACAC ************
B 0 3 11 40 4M2I4M * 0 0 ACACACACAC **********
C 0 4 1001 25 8M * 0 0 ACACACAC ********
Original file line number Diff line number Diff line change
Expand Up @@ -192,4 +192,20 @@ class SequenceDictionarySuite extends FunSuite {
toSSD.assertSameDictionary(ssd)
}

test("conversion to sam sequence dictionary has correct sort order") {
val sd = new SequenceDictionary(Vector(SequenceRecord("MT", 1000L),
SequenceRecord("4", 1000L),
SequenceRecord("1", 1000L),
SequenceRecord("3", 1000L),
SequenceRecord("2", 1000L),
SequenceRecord("X", 1000L)))
val ssd = sd.toSAMSequenceDictionary
val seq = ssd.getSequences
assert(seq.get(0).getSequenceName === "1")
assert(seq.get(1).getSequenceName === "2")
assert(seq.get(2).getSequenceName === "3")
assert(seq.get(3).getSequenceName === "4")
assert(seq.get(4).getSequenceName === "MT")
assert(seq.get(5).getSequenceName === "X")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.bdgenomics.adam.models._
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.util.ADAMFunSuite
import org.bdgenomics.formats.avro._
import scala.io.Source
import scala.util.Random

class AlignmentRecordRDDFunctionsSuite extends ADAMFunSuite {
Expand Down Expand Up @@ -213,4 +214,89 @@ class AlignmentRecordRDDFunctionsSuite extends ADAMFunSuite {
assert(readA.getReadName === readB.getReadName)
}
}

sparkTest("writing a small sorted file as SAM should produce the expected result") {
val reads = sc.parallelize(Seq(AlignmentRecord.newBuilder()
.setContig(Contig.newBuilder()
.setContigName("1")
.setContigLength(1000L)
.build())
.setStart(0L)
.setEnd(10L)
.setSequence("ACACACACAC")
.setQual("**********")
.setMapq(50)
.setCigar("10M")
.setReadName("A")
.build(),
AlignmentRecord.newBuilder()
.setContig(Contig.newBuilder()
.setContigName("3")
.setContigLength(1000L)
.build())
.setStart(10L)
.setEnd(20L)
.setSequence("ACACACACAC")
.setQual("**********")
.setMapq(40)
.setCigar("4M2I4M")
.setReadName("B")
.build(),
AlignmentRecord.newBuilder()
.setContig(Contig.newBuilder()
.setContigName("4")
.setContigLength(2000L)
.build())
.setStart(1000L)
.setEnd(1008L)
.setSequence("ACACACAC")
.setQual("********")
.setMapq(25)
.setCigar("8M")
.setReadName("C")
.build(),
AlignmentRecord.newBuilder()
.setContig(Contig.newBuilder()
.setContigName("2")
.setContigLength(1000L)
.build())
.setStart(500L)
.setEnd(510L)
.setSequence("ACACACACACAC")
.setQual("************")
.setMapq(55)
.setCigar("10M2S")
.setReadName("D")
.build(),
AlignmentRecord.newBuilder()
.setContig(Contig.newBuilder()
.setContigName("2")
.setContigLength(1000L)
.build())
.setStart(100L)
.setEnd(110L)
.setSequence("ACACACACAC")
.setQual("**********")
.setMapq(45)
.setCigar("10M")
.setReadName("E")
.build()), 1)

val tempFile = Files.createTempDirectory("sam")
val tempPath1 = tempFile.toAbsolutePath.toString + "/reads.sam"

reads.adamSortReadsByReferencePosition()
.adamSAMSave(tempPath1, isSorted = true)

val file1 = Source.fromFile(tempPath1 + "/part-r-00000")
val file2 = Source.fromFile(ClassLoader.getSystemClassLoader
.getResource("sorted.sam").getFile)

val f1lines = file1.getLines
val f2lines = file2.getLines

assert(f1lines.size === f2lines.size)
f1lines.zip(f2lines)
.foreach(p => assert(p._1 === p._2))
}
}

0 comments on commit de3fa55

Please sign in to comment.