From 6a82ddabe2ce8a2b814321c67926696819fdb6f7 Mon Sep 17 00:00:00 2001 From: Louis Bergelson Date: Mon, 2 Mar 2020 17:07:23 -0500 Subject: [PATCH] Deleting NioBam and related classes * NioBam and ExampleNioCountReads was a useful experiment and demonstration but we have not moved it past that stage in multiple years. * We adopted Disq instead of persuing this route. --- .../tools/examples/ExampleNioCountReads.java | 62 ------ .../utils/nio/ChannelAsSeekableStream.java | 97 --------- .../hellbender/utils/nio/NioBam.java | 198 ------------------ .../hellbender/utils/nio/ReadsIterable.java | 143 ------------- 4 files changed, 500 deletions(-) delete mode 100644 src/main/java/org/broadinstitute/hellbender/tools/examples/ExampleNioCountReads.java delete mode 100644 src/main/java/org/broadinstitute/hellbender/utils/nio/ChannelAsSeekableStream.java delete mode 100644 src/main/java/org/broadinstitute/hellbender/utils/nio/NioBam.java delete mode 100644 src/main/java/org/broadinstitute/hellbender/utils/nio/ReadsIterable.java diff --git a/src/main/java/org/broadinstitute/hellbender/tools/examples/ExampleNioCountReads.java b/src/main/java/org/broadinstitute/hellbender/tools/examples/ExampleNioCountReads.java deleted file mode 100644 index 0a2a067ccfe..00000000000 --- a/src/main/java/org/broadinstitute/hellbender/tools/examples/ExampleNioCountReads.java +++ /dev/null @@ -1,62 +0,0 @@ -package org.broadinstitute.hellbender.tools.examples; - -import org.apache.spark.api.java.JavaSparkContext; -import org.broadinstitute.barclay.argparser.Argument; -import org.broadinstitute.barclay.argparser.CommandLineProgramProperties; -import org.broadinstitute.hellbender.cmdline.StandardArgumentDefinitions; -import org.broadinstitute.hellbender.cmdline.programgroups.ExampleProgramGroup; -import org.broadinstitute.hellbender.engine.spark.SparkCommandLineProgram; -import org.broadinstitute.hellbender.exceptions.UserException; -import org.broadinstitute.hellbender.utils.nio.NioBam; - -import java.io.File; -import java.io.FileNotFoundException; -import java.io.PrintStream; - -/** - * Example of how to use Spark on Google Cloud Storage directly, without using the GCS Hadoop Connector. - */ -@CommandLineProgramProperties( - summary = "Example of how to use Spark on Google Cloud Storage directly, without using the GCS Hadoop Connector", - oneLineSummary = "Example of how to use Spark on Google Cloud Storage directly, without using the GCS Hadoop Connector", - programGroup = ExampleProgramGroup.class, - omitFromCommandLine = true -) -public class ExampleNioCountReads extends SparkCommandLineProgram { - private static final long serialVersionUID = 1L; - - @Argument(fullName = StandardArgumentDefinitions.OUTPUT_LONG_NAME, shortName = StandardArgumentDefinitions.OUTPUT_SHORT_NAME, doc = "Output file (if not provided, defaults to STDOUT)", common = false, optional = true) - private File OUTPUT_FILE = null; - - @Argument(fullName = "inputPath", shortName = "P", doc = "Input path (eg. gs://foo/bar.bam)", optional = false) - private String path = null; - - // Typically set to number of executors times number of cores per executor. - @Argument(fullName = "parts", doc = "number of partitions", optional = false) - private int parts = 3; - - private void countReads(JavaSparkContext ctx) { - PrintStream outputStream; - - try { - outputStream = OUTPUT_FILE != null ? new PrintStream(OUTPUT_FILE) : System.out; - } - catch ( FileNotFoundException e ) { - throw new UserException.CouldNotReadInputFile(OUTPUT_FILE, e); - } - - NioBam input = new NioBam(path, path + ".bai"); - long readCount = input.getReads(ctx, parts).count(); - outputStream.println("Number of reads: " + readCount); - } - - /** - * Runs the pipeline. - * - * @param ctx - */ - @Override - protected void runPipeline(JavaSparkContext ctx) { - countReads(ctx); - } -} diff --git a/src/main/java/org/broadinstitute/hellbender/utils/nio/ChannelAsSeekableStream.java b/src/main/java/org/broadinstitute/hellbender/utils/nio/ChannelAsSeekableStream.java deleted file mode 100644 index 6cbfa368f8c..00000000000 --- a/src/main/java/org/broadinstitute/hellbender/utils/nio/ChannelAsSeekableStream.java +++ /dev/null @@ -1,97 +0,0 @@ -package org.broadinstitute.hellbender.utils.nio; - -import htsjdk.samtools.seekablestream.SeekableStream; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.SeekableByteChannel; - -/** - * A SeekableByteChannel in SeekableStream clothes. - * This is key to using NIO Paths with htsjdk (enabling it to work with gcloud-java-nio). - */ -public class ChannelAsSeekableStream extends SeekableStream { - - private final SeekableByteChannel chan; - private final String source; - private ByteBuffer oneByte; - - public ChannelAsSeekableStream(SeekableByteChannel chan) { - this.chan = chan; - this.source = null; - } - - public ChannelAsSeekableStream(SeekableByteChannel chan, String source) { - this.chan = chan; - this.source = source; - } - - @Override - public long length() { - try { - return chan.size(); - } catch (IOException x) { - throw new RuntimeException(x); - } - } - - @Override - public long position() throws IOException { - return chan.position(); - } - - @Override - public void seek(long position) throws IOException { - chan.position(position); - } - - /** - * Reads the next byte of data from the input stream. The value byte is - * returned as an int in the range 0 to - * 255. If no byte is available because the end of the stream - * has been reached, the value -1 is returned. This method - * blocks until input data is available, the end of the stream is detected, - * or an exception is thrown. - * - * @return the next byte of data, or -1 if the end of the - * stream is reached. - * @throws IOException if an I/O error occurs. - */ - @Override - public int read() throws IOException { - ByteBuffer buf = oneByte; - if (null == buf) { - buf = ByteBuffer.allocate(1); - oneByte = buf; - } - chan.read(buf); - if (buf.remaining() == 0) { - return -1; - } - return buf.get(0); - } - - @Override - public int read(byte[] buffer, int offset, int length) throws IOException { - ByteBuffer buf = ByteBuffer.wrap(buffer, offset, length); - return chan.read(buf); - } - - @Override - public void close() throws IOException { - chan.close(); - } - - @Override - public boolean eof() throws IOException { - return chan.position() >= chan.size(); - } - - /** - * @return String representation of source (e.g. URL, file path, etc.), or null if not available. - */ - @Override - public String getSource() { - return source; - } -} diff --git a/src/main/java/org/broadinstitute/hellbender/utils/nio/NioBam.java b/src/main/java/org/broadinstitute/hellbender/utils/nio/NioBam.java deleted file mode 100644 index c967b0f9512..00000000000 --- a/src/main/java/org/broadinstitute/hellbender/utils/nio/NioBam.java +++ /dev/null @@ -1,198 +0,0 @@ -package org.broadinstitute.hellbender.utils.nio; - -import htsjdk.samtools.BAMFileSpan; -import htsjdk.samtools.BAMIndex; -import htsjdk.samtools.Chunk; -import htsjdk.samtools.QueryInterval; -import htsjdk.samtools.SAMFileHeader; -import htsjdk.samtools.SAMRecord; -import htsjdk.samtools.SAMSequenceRecord; -import htsjdk.samtools.SamInputResource; -import htsjdk.samtools.SamReader; -import htsjdk.samtools.SamReaderFactory; -import htsjdk.samtools.ValidationStringency; -import htsjdk.samtools.seekablestream.ByteArraySeekableStream; -import htsjdk.samtools.seekablestream.SeekableStream; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.broadinstitute.hellbender.engine.ReadsDataSource; -import org.broadinstitute.hellbender.exceptions.GATKException; -import org.broadinstitute.hellbender.exceptions.UserException; -import org.broadinstitute.hellbender.utils.gcs.BucketUtils; -import org.broadinstitute.hellbender.utils.io.IOUtils; - -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.Serializable; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.ArrayList; -import java.util.List; - -/** - * NioBam holds a reference to a BAM file on Google Cloud Storage, and can give you - * an RDD with the reads inside of it. - * - * The constructor will open the file to make sure it exists and we have access to it. - * This is preferable to waiting until we're running on the cloud. - * - * Although we don't expect you to move these objects across computers, you can. - */ -public class NioBam implements Serializable { - protected static final Logger logger = LogManager.getLogger(NioBam.class); - - private static final long serialVersionUID = 1L; - private final String bam; - private final String index; - private transient byte[] indexCache; - - /** Checks the files exists, then stores them. **/ - public NioBam(String gcsFilename, String indexGcsFilename) { - try { - this.bam = gcsFilename; - this.index = indexGcsFilename; - init(); - } - catch ( FileNotFoundException e ) { - throw new UserException.CouldNotReadInputFile("Could not read file " + gcsFilename, e); - } - } - - /** Finds the index file, then calls NioBam(bam, index). **/ - public NioBam(String gcsFilename) { - try { - String indexFilename = gcsFilename + ".bai"; - if ( !Files.exists(IOUtils.getPath(indexFilename)) ) { - int i = gcsFilename.lastIndexOf('.'); - if ( i >= 0 ) { - indexFilename = gcsFilename.substring(0, i) + ".bai"; - } - } - this.bam = gcsFilename; - this.index = indexFilename; - init(); - } - catch ( FileNotFoundException e ) { - throw new UserException.CouldNotReadInputFile("Could not read file " + gcsFilename, e); - } - } - - private void init() throws FileNotFoundException { - Path bamPath = IOUtils.getPath(bam); - Path bamIndexPath = IOUtils.getPath(index); - if (!Files.exists(bamPath)) { - throw new FileNotFoundException(bamPath.toString()); - } - if (!Files.exists(bamIndexPath)) { - throw new FileNotFoundException(bamIndexPath.toString()); - } - } - - /** Parses the BAM file into SAMRecords. Will be distributed onto at least 'numPartitions' partitions. **/ - public JavaRDD getReads(JavaSparkContext ctx, int numPartitions) { - try { - Path bamPath = IOUtils.getPath(bam); - ChannelAsSeekableStream bamOverNIO = new ChannelAsSeekableStream(Files.newByteChannel(bamPath), bamPath.toString()); - final byte[] index = getIndex(); - SeekableStream indexInMemory = new ByteArraySeekableStream(index); - - SamReader bam3 = SamReaderFactory.makeDefault() - .validationStringency(ValidationStringency.LENIENT) - .enable(SamReaderFactory.Option.CACHE_FILE_BASED_INDEXES) - .open(SamInputResource.of(bamOverNIO).index(indexInMemory)); - List chunks = getAllChunksBalanced(bam3, numPartitions); - - // Ideally we'd get exactly the number of chunks the user is asking for, but until then... - logger.debug("We got: " + chunks.size() + " chunks."); - - return ctx.parallelize(chunks, chunks.size()).flatMap(qi -> new ReadsIterable(bam, index, qi).iterator()); - } - catch ( IOException e ) { - throw new GATKException("I/O error loading reads", e); - } - } - - private synchronized byte[] getIndex() throws IOException { - if (null!= indexCache) { - return indexCache; - } - indexCache = Files.readAllBytes(IOUtils.getPath(index)); - return indexCache; - } - - // this isn't very good yet, ideally we want just this number of query intervals, not per-contig. - private static List getAllChunksBalanced(SamReader bam, int countPerContig) { - List ret = new ArrayList<>(); - SAMFileHeader header = bam.getFileHeader(); - for (SAMSequenceRecord s : header.getSequenceDictionary().getSequences()) { - ret.addAll(getChunksBalanced(bam, s.getSequenceIndex(), countPerContig)); - } - return ret; - } - - private static List getChunksBalanced(SamReader bam, int sequenceIndex, int retCount) { - List ret = new ArrayList<>(); - BAMIndex index = bam.indexing().getIndex(); - SAMFileHeader header = bam.getFileHeader(); - SAMSequenceRecord s = header.getSequence(sequenceIndex); - long totalLength = chunksLength(getChunks(index, sequenceIndex, 1, s.getSequenceLength() + 1)); - if (totalLength == 0) { - return ret; - } - int sofar = 0; - long targetLength = totalLength / retCount; - int end = s.getSequenceLength(); - int step = s.getSequenceLength() / (100 * retCount); - if (step < 1) step = 1; - int start = 1; - for (int j = step; j < end; j += step) { - if (j > end) j = end; - List candidate = getChunks(index, sequenceIndex, start, j); - long size = chunksLength(candidate); - if (size < targetLength) { - // not big enough yet - continue; - } - if (size > targetLength * 2) { - // too large, search for a good separation point - // TODO - } - // good, emit. - ret.add(new QueryInterval(sequenceIndex, start, j + 1)); - start = j; - sofar += size; - if (ret.size() < retCount) { - targetLength = (totalLength - sofar) / (retCount - ret.size()); - } else { - targetLength = totalLength / retCount; - } - - } - return ret; - } - - - private static List getChunks(BAMIndex index, int sequenceIndex, int start, int endExcluded) { - if (endExcluded <= start) return new ArrayList<>(); - BAMFileSpan span = index.getSpanOverlapping(sequenceIndex, start, endExcluded - 1); - if (null == span) return new ArrayList<>(); - return span.getChunks(); - } - - private static long chunksLength(List chunks) { - long totalLength = 0; - for (Chunk c : chunks) { - totalLength += chunkSize(c); - } - return totalLength; - } - - private static long chunkSize(Chunk c) { - long start = c.getChunkStart() >> 16; - long end = (c.getChunkEnd() >> 16) + (c.getChunkEnd() & 0xffff); - return (end - start); - } - -} diff --git a/src/main/java/org/broadinstitute/hellbender/utils/nio/ReadsIterable.java b/src/main/java/org/broadinstitute/hellbender/utils/nio/ReadsIterable.java deleted file mode 100644 index 32d3a60012a..00000000000 --- a/src/main/java/org/broadinstitute/hellbender/utils/nio/ReadsIterable.java +++ /dev/null @@ -1,143 +0,0 @@ -package org.broadinstitute.hellbender.utils.nio; - -import htsjdk.samtools.QueryInterval; -import htsjdk.samtools.SAMRecord; -import htsjdk.samtools.SAMRecordIterator; -import htsjdk.samtools.SamInputResource; -import htsjdk.samtools.SamReader; -import htsjdk.samtools.SamReaderFactory; -import htsjdk.samtools.ValidationStringency; -import htsjdk.samtools.seekablestream.ByteArraySeekableStream; -import htsjdk.samtools.seekablestream.SeekableStream; -import htsjdk.samtools.util.CloseableIterator; -import org.broadinstitute.hellbender.utils.gcs.BucketUtils; -import org.broadinstitute.hellbender.utils.io.IOUtils; - -import java.io.IOException; -import java.io.Serializable; -import java.nio.channels.SeekableByteChannel; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.Iterator; -import java.util.NoSuchElementException; - -/** - * ReadsIterable gives you all the reads for a given genomic interval. - * - * QueryInterval + header --> iterable SAMRecords - */ -public class ReadsIterable implements Iterable, Serializable { - - private static final long serialVersionUID = 1L; - private final String path; - private final byte[] index; - private final QueryInterval interval; - private final boolean removeHeader = true; - - class ReadsIterator implements CloseableIterator { - private final static int BUFFER_SIZE_MB = 200; - private SamReader bam; - private SAMRecordIterator query; - private SAMRecord nextRecord = null; - private boolean done = false; - - public ReadsIterator() throws IOException { - Path fpath = IOUtils.getPath(path); - byte[] indexData = index; - SeekableStream indexInMemory = new ByteArraySeekableStream(indexData); - // set high-level retries to deal with servers that might be temporarily overloaded - // while we're reading a very long file from them. - SeekableByteChannel channel = BucketUtils.addPrefetcher(BUFFER_SIZE_MB, Files.newByteChannel(fpath)); - ChannelAsSeekableStream bamOverNIO = new ChannelAsSeekableStream(channel, path); - bam = SamReaderFactory.makeDefault() - .validationStringency(ValidationStringency.LENIENT) - .enable(SamReaderFactory.Option.CACHE_FILE_BASED_INDEXES) - .open(SamInputResource.of(bamOverNIO).index(indexInMemory)); - - QueryInterval[] array = new QueryInterval[1]; - array[0] = interval; - query = bam.query(array, false); - } - - /** - * Returns {@code true} if the iteration has more elements. - * (In other words, returns {@code true} if {@link #next} would - * return an element rather than throwing an exception.) - * - * @return {@code true} if the iteration has more elements - */ - @Override - public boolean hasNext() { - if (done) return false; - - if (nextRecord!=null) return true; - - nextRecord = fetchRecord(); - - boolean ret = (nextRecord != null); - if (!ret) { - done = true; - close(); - } - return ret; - } - - /** - * Returns the next element in the iteration. - * - * @return the next element in the iteration - * @throws NoSuchElementException if the iteration has no more elements - */ - @Override - public SAMRecord next() { - if (!hasNext()) throw new NoSuchElementException(); - SAMRecord ret = nextRecord; - nextRecord = null; - return ret; - } - - private SAMRecord fetchRecord() { - while (query.hasNext()) { - SAMRecord sr = query.next(); - int start = sr.getAlignmentStart(); - if (start >= interval.start && start <= interval.end) { - // read starts in the interval - if (removeHeader) { - sr.setHeader(null); - } - return sr; - } - } - return null; - } - - @Override - public void close() { - if (null==query) return; - try { - query.close(); - query = null; - bam.close(); - bam = null; - } catch (IOException e) { - throw new RuntimeException(e); - } - } - } - - public ReadsIterable(String path, byte[] index, QueryInterval in) { - this.path = path; - this.index = index; - this.interval = in; - } - - @Override - public Iterator iterator() { - try { - return new ReadsIterator(); - } catch (IOException x) { - throw new RuntimeException(x); - } - } - -}