Skip to content

Commit

Permalink
Merge pull request bigdatagenomics#7 from ryan-williams/sbt
Browse files Browse the repository at this point in the history
Merge in upstream, upgrade parent plugin
  • Loading branch information
ryan-williams committed Dec 12, 2016
2 parents 9853909 + 115a03c commit 8c53066
Show file tree
Hide file tree
Showing 77 changed files with 2,033 additions and 583 deletions.
60 changes: 22 additions & 38 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,57 +1,41 @@
organization := "org.hammerlab.adam"

name := ParentPlugin.sparkName("adam-core")
name := sparkName("adam-core")

version := "0.20.4-SNAPSHOT"

sonatypeProfileName := "org.hammerlab"

val utilsVersion = "0.2.9"

hadoopVersion := "2.7.3"

scalatestVersion := "2.2.1"

testDeps ++= Seq(
"org.bdgenomics.utils" %% "utils-misc" % utilsVersion classifier("tests") exclude("org.apache.spark", "*"),
addSparkDeps
publishTestJar
enableScalariform

// Using ":=" here to clobber the usual default hammerlab-test-libs that are added by parent-plugin, which use
// Scalatest 3.0.0.
testDeps := Seq(
"org.mockito" % "mockito-core" % "1.10.19"
)

providedDeps ++= Seq(
libraries.value('hadoop),
libraries.value('spark)
)
testJarTestDeps += (libraries.value('bdg_utils_misc) exclude("org.apache.spark", "*"))

libraryDependencies ++= Seq(
"org.bdgenomics.utils" %% "utils-metrics" % utilsVersion,
"org.bdgenomics.utils" %% "utils-misc" % utilsVersion,
"org.bdgenomics.utils" %% "utils-io" % utilsVersion exclude("com.fasterxml.jackson.core", "*"),
"org.bdgenomics.utils" %% "utils-cli" % utilsVersion,
"org.bdgenomics.utils" %% "utils-intervalrdd" % utilsVersion,
"com.esotericsoftware.kryo" % "kryo" % "2.24.0",
"org.bdgenomics.bdg-formats" % "bdg-formats" % "0.10.0",
"commons-io" % "commons-io" % "2.4",
libraries.value('bdg_formats),
libraries.value('bdg_utils_cli),
libraries.value('bdg_utils_intervalrdd),
libraries.value('bdg_utils_io),
libraries.value('bdg_utils_metrics),
libraries.value('bdg_utils_misc),
libraries.value('commons_io),
libraries.value('hadoop_bam) exclude("com.github.samtools", "htsjdk"),
libraries.value('log4j),
"com.github.samtools" % "htsjdk" % "2.5.0",
"com.netflix.servo" % "servo-core" % "0.10.0",
"it.unimi.dsi" % "fastutil" % "6.6.5",
"org.apache.avro" % "avro" % "1.8.0",
"org.slf4j" % "slf4j-log4j12" % "1.7.21",
"org.apache.httpcomponents" % "httpclient" % "4.5.2",
"org.apache.parquet" % "parquet-avro" % "1.8.1",
"org.apache.parquet" % "parquet-scala_2.10" % "1.8.1" exclude("org.scala-lang", "scala-library"),
libraries.value('hadoop_bam),
"com.github.samtools" % "htsjdk" % "2.5.0",
"org.apache.httpcomponents" % "httpclient" % "4.5.2",
"com.netflix.servo" % "servo-core" % "0.10.0",
"org.hammerlab" %% "genomic-loci" % "1.4.2" exclude("com.github.samtools", "htsjdk")
"org.hammerlab" %% "genomic-loci" % "1.4.4" exclude("com.github.samtools", "htsjdk")
)

import scalariform.formatter.preferences._
import com.typesafe.sbt.SbtScalariform
import com.typesafe.sbt.SbtScalariform.ScalariformKeys

SbtScalariform.defaultScalariformSettings

ScalariformKeys.preferences := ScalariformKeys.preferences.value
.setPreference(AlignParameters, true)
.setPreference(CompactStringConcatenation, false)
.setPreference(AlignSingleLineCaseStatements, true)
.setPreference(DoubleIndentClassDeclaration, true)
.setPreference(PreserveDanglingCloseParenthesis, true)
3 changes: 1 addition & 2 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
addSbtPlugin("org.hammerlab" % "sbt-parent" % "1.2.3")
addSbtPlugin("com.typesafe.sbt" % "sbt-scalariform" % "1.3.0")
addSbtPlugin("org.hammerlab" % "sbt-parent" % "1.4.0")
109 changes: 54 additions & 55 deletions src/main/java/org/bdgenomics/adam/io/FastqRecordReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
* a single Text output. This is then fed into the FastqConverter, which
* converts the single Text instance into two AlignmentRecords.
*/
abstract class FastqRecordReader extends RecordReader<Void,Text> {
abstract class FastqRecordReader extends RecordReader<Void, Text> {
/*
* fastq format:
* <fastq> := <block>+
Expand All @@ -57,53 +57,53 @@ abstract class FastqRecordReader extends RecordReader<Void,Text> {
* For now I'm going to assume single-line sequences. This works for our sequencing
* application. We'll see if someone complains in other applications.
*/

/**
* First valid data index in the stream.
*/
private long start;

/**
* First index value beyond the slice, i.e. slice is in range [start,end).
*/
protected long end;

/**
* Current position in file.
*/
protected long pos;

/**
* Path of the file being parsed.
*/
private Path file;

/**
* The line reader we are using to read the file.
*/
private LineReader lineReader;

/**
* The input stream we are using to read the file.
*/
private InputStream inputStream;

/**
* The text for a single record pair we have parsed out.
* Hadoop's RecordReader contract requires us to save this as state.
*/
private Text currentValue;

/**
* Newline string for matching on.
*/
private static final byte[] newline = "\n".getBytes();

/**
* Maximum length for a read string.
*/
private static final int MAX_LINE_LENGTH = 10000;

/**
* Builds a new record reader given a config file and an input split.
*
Expand All @@ -115,13 +115,13 @@ protected FastqRecordReader(final Configuration conf, final FileSplit split) thr
file = split.getPath();
start = split.getStart();
end = start + split.getLength();

FileSystem fs = file.getFileSystem(conf);
FSDataInputStream fileIn = fs.open(file);

CompressionCodecFactory codecFactory = new CompressionCodecFactory(conf);
CompressionCodec codec = codecFactory.getCodec(file);

if (codec == null) { // no codec. Uncompressed file.
positionAtFirstRecord(fileIn);
inputStream = fileIn;
Expand All @@ -130,14 +130,12 @@ protected FastqRecordReader(final Configuration conf, final FileSplit split) thr
if (start != 0) {
throw new RuntimeException("Start position for compressed file is not 0! (found " + start + ")");
}

inputStream = codec.createInputStream(fileIn);
end = Long.MAX_VALUE; // read until the end of the file
}

lineReader = new LineReader(inputStream);
}

/**
* Checks to see whether the buffer is positioned at a valid record.
*
Expand All @@ -154,19 +152,19 @@ protected FastqRecordReader(final Configuration conf, final FileSplit split) thr
*
* @param stream The stream to reposition.
*/
protected void positionAtFirstRecord(FSDataInputStream stream) throws IOException {
protected final void positionAtFirstRecord(final FSDataInputStream stream) throws IOException {
Text buffer = new Text();

if (true) { // (start > 0) // use start>0 to assume that files start with valid data
// Advance to the start of the first record that ends with /1
// We use a temporary LineReader to read lines until we find the
// position of the right one. We then seek the file to that position.
stream.seek(start);
LineReader reader = new LineReader(stream);

int bytesRead = 0;
do {
bytesRead = reader.readLine(buffer, (int)Math.min(MAX_LINE_LENGTH, end - start));
bytesRead = reader.readLine(buffer, (int) Math.min(MAX_LINE_LENGTH, end - start));
int bufferLength = buffer.getLength();
if (bytesRead > 0 && !checkBuffer(bufferLength, buffer)) {
start += bytesRead;
Expand All @@ -175,9 +173,9 @@ protected void positionAtFirstRecord(FSDataInputStream stream) throws IOExceptio
//
// If this isn't the start of a record, we want to backtrack to its end
long backtrackPosition = start + bytesRead;
bytesRead = reader.readLine(buffer, (int)Math.min(MAX_LINE_LENGTH, end - start));
bytesRead = reader.readLine(buffer, (int)Math.min(MAX_LINE_LENGTH, end - start));

bytesRead = reader.readLine(buffer, (int) Math.min(MAX_LINE_LENGTH, end - start));
bytesRead = reader.readLine(buffer, (int) Math.min(MAX_LINE_LENGTH, end - start));
if (bytesRead > 0 && buffer.getLength() > 0 && buffer.getBytes()[0] == '+') {
break; // all good!
} else {
Expand All @@ -188,39 +186,39 @@ protected void positionAtFirstRecord(FSDataInputStream stream) throws IOExceptio
}
}
} while (bytesRead > 0);

stream.seek(start);
}

pos = start;
}

/**
* Method is a no-op.
*
* @param split The input split that we will parse.
* @param context The Hadoop task context.
*/
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {}

public final void initialize(final InputSplit split, final TaskAttemptContext context)
throws IOException, InterruptedException {}

/**
* FASTQ has no keys, so we return null.
*
* @return Always returns null.
*/
public Void getCurrentKey() {
public final Void getCurrentKey() {
return null;
}

/**
* Returns the last interleaved FASTQ record.
*
* @return The text corresponding to the last read pair.
*/
public Text getCurrentValue() {
public final Text getCurrentValue() {
return currentValue;
}

/**
* Seeks ahead in our split to the next key-value pair.
*
Expand All @@ -229,42 +227,41 @@ public Text getCurrentValue() {
*
* @return True if reading the next read pair succeeded.
*/
public boolean nextKeyValue() throws IOException, InterruptedException {
public final boolean nextKeyValue() throws IOException, InterruptedException {
currentValue = new Text();

return next(currentValue);
}

/**
* Close this RecordReader to future operations.
*/
public void close() throws IOException {
public final void close() throws IOException {
inputStream.close();
}

/**
* How much of the input has the RecordReader consumed?
*
* @return Returns a value on [0.0, 1.0] that notes how many bytes we
* have read so far out of the total bytes to read.
*/
public float getProgress() {
public final float getProgress() {
if (start == end) {
return 1.0f;
} else {
return Math.min(1.0f, (pos - start) / (float)(end - start));
}
}

/**
* Produces a debugging message with the file position.
*
* @return Returns a string containing {filename}:{index}.
*/
protected String makePositionMessage() {
protected final String makePositionMessage() {
return file.toString() + ":" + pos;
}

/**
* Parses a read from an interleaved FASTQ file.
*
Expand All @@ -277,7 +274,9 @@ protected String makePositionMessage() {
* @throws RuntimeException Throws exception if FASTQ record doesn't
* have proper formatting (e.g., record doesn't start with @).
*/
protected boolean lowLevelFastqRead(Text readName, Text value) throws IOException {
protected final boolean lowLevelFastqRead(final Text readName, final Text value)
throws IOException {

// ID line
readName.clear();
long skipped = appendLineInto(readName, true);
Expand All @@ -292,31 +291,30 @@ protected boolean lowLevelFastqRead(Text readName, Text value) throws IOExceptio
". Line: " +
readName + ". \n");
}

value.append(readName.getBytes(), 0, readName.getLength());

// sequence
appendLineInto(value, false);

// separator line
appendLineInto(value, false);

// quality
appendLineInto(value, false);

return true;
}

/**
* Reads from the input split.
*
* @param value Text record to write input value into.
* @return Returns whether this read was successful or not.
*
* @see lowLevelFastqRead
* @see #lowLevelFastqRead(Text, Text)
*/
abstract protected boolean next(Text value) throws IOException;

/**
* Reads a newline into a text record from the underlying line reader.
*
Expand All @@ -330,14 +328,15 @@ protected boolean lowLevelFastqRead(Text readName, Text value) throws IOExceptio
private int appendLineInto(final Text dest, final boolean eofOk) throws EOFException, IOException {
Text buf = new Text();
int bytesRead = lineReader.readLine(buf, MAX_LINE_LENGTH);
if (bytesRead < 0 || (bytesRead == 0 && !eofOk))

if (bytesRead < 0 || (bytesRead == 0 && !eofOk)) {
throw new EOFException();

}

dest.append(buf.getBytes(), 0, buf.getLength());
dest.append(newline, 0, 1);
pos += bytesRead;

return bytesRead;
}
}
Loading

0 comments on commit 8c53066

Please sign in to comment.