-
Notifications
You must be signed in to change notification settings - Fork 597
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allow for hdfs and gcs URI's to be passed to GenomicsDB #5017
Changes from 18 commits
fd1e170
61cd1d9
6fcacc2
3c87e98
35bea3c
46ef8aa
2e0a66f
855cff7
9d1ff22
a11d7a2
b8650de
924cfea
edd5cbe
3d35059
d430bbd
cddf276
5f4f6c6
3576665
f3cacea
ace4d51
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,6 @@ | ||
package org.broadinstitute.hellbender.engine; | ||
|
||
import com.intel.genomicsdb.GenomicsDBUtils; | ||
import com.intel.genomicsdb.model.GenomicsDBExportConfiguration; | ||
import com.intel.genomicsdb.reader.GenomicsDBFeatureReader; | ||
import htsjdk.samtools.SAMSequenceDictionary; | ||
|
@@ -27,7 +28,6 @@ | |
import java.nio.channels.SeekableByteChannel; | ||
import java.nio.file.Files; | ||
import java.nio.file.Path; | ||
import java.nio.file.Paths; | ||
import java.util.Iterator; | ||
import java.util.List; | ||
import java.util.Optional; | ||
|
@@ -62,11 +62,6 @@ | |
public final class FeatureDataSource<T extends Feature> implements GATKDataSource<T>, AutoCloseable { | ||
private static final Logger logger = LogManager.getLogger(FeatureDataSource.class); | ||
|
||
/** | ||
* identifies a path as a GenomicsDB URI | ||
*/ | ||
public static final String GENOMIC_DB_URI_SCHEME = "gendb://"; | ||
|
||
/** | ||
* Feature reader used to retrieve records from our file | ||
*/ | ||
|
@@ -260,7 +255,7 @@ public FeatureDataSource(final FeatureInput<T> featureInput, final int queryLook | |
// a query by interval is attempted. | ||
this.featureReader = getFeatureReader(featureInput, targetFeatureType, cloudWrapper, cloudIndexWrapper, reference); | ||
|
||
if (isGenomicsDBPath(featureInput.getFeaturePath())) { | ||
if (IOUtils.isGenomicsDBPath(featureInput.getFeaturePath())) { | ||
//genomics db uri's have no associated index file to read from, but they do support random access | ||
this.hasIndex = false; | ||
this.supportsRandomAccess = true; | ||
|
@@ -282,20 +277,13 @@ public FeatureDataSource(final FeatureInput<T> featureInput, final int queryLook | |
this.queryLookaheadBases = queryLookaheadBases; | ||
} | ||
|
||
/** | ||
* @param path String containing the path to test | ||
* @return true if path represent a GenomicsDB URI, otherwise false | ||
*/ | ||
public static boolean isGenomicsDBPath(final String path) { | ||
return path != null && path.startsWith(GENOMIC_DB_URI_SCHEME); | ||
} | ||
|
||
@SuppressWarnings("unchecked") | ||
private static <T extends Feature> FeatureReader<T> getFeatureReader(final FeatureInput<T> featureInput, final Class<? extends Feature> targetFeatureType, | ||
final Function<SeekableByteChannel, SeekableByteChannel> cloudWrapper, | ||
final Function<SeekableByteChannel, SeekableByteChannel> cloudIndexWrapper, | ||
final Path reference) { | ||
if (isGenomicsDBPath(featureInput.getFeaturePath())) { | ||
if (IOUtils.isGenomicsDBPath(featureInput.getFeaturePath())) { | ||
try { | ||
if (reference == null) { | ||
throw new UserException.MissingReference("You must provide a reference if you want to load from GenomicsDB"); | ||
|
@@ -364,57 +352,50 @@ private static <T extends Feature> FeatureReader<T> getFeatureReader(final Featu | |
} | ||
} | ||
|
||
private static FeatureReader<VariantContext> getGenomicsDBFeatureReader(final String path, final File reference) { | ||
if( !isGenomicsDBPath(path) ) { | ||
throw new IllegalArgumentException("Trying to create a GenomicsDBReader from a non-GenomicsDB input"); | ||
private static void verifyPathsAreReadable(final String ... paths) { | ||
for (String path : paths) { | ||
IOUtils.assertFileIsReadable(IOUtils.getPath(path)); | ||
} | ||
} | ||
|
||
final String noheader = path.replace(GENOMIC_DB_URI_SCHEME, ""); | ||
final File workspace = new File(noheader); | ||
final File callsetJson = new File(noheader, GenomicsDBConstants.DEFAULT_CALLSETMAP_FILE_NAME); | ||
final File vidmapJson = new File(noheader, GenomicsDBConstants.DEFAULT_VIDMAP_FILE_NAME); | ||
final File vcfHeader = new File(noheader, GenomicsDBConstants.DEFAULT_VCFHEADER_FILE_NAME); | ||
|
||
if ( ! workspace.exists() || ! workspace.canRead() || ! workspace.isDirectory() ) { | ||
throw new UserException("GenomicsDB workspace " + workspace.getAbsolutePath() + " does not exist, " + | ||
" is not readable, or is not a directory"); | ||
private static FeatureReader<VariantContext> getGenomicsDBFeatureReader(final String path, final File reference) { | ||
final String workspace = IOUtils.getGenomicsDBAbsolutePath(path); | ||
if (workspace == null) { | ||
throw new IllegalArgumentException("Trying to create a GenomicsDBReader from non-GenomicsDB input path " + path); | ||
} else if (Files.notExists(IOUtils.getPath(workspace))) { | ||
throw new UserException("GenomicsDB workspace " + path + " does not exist"); | ||
} | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Instead of This way, the URIs will at least be legal... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For the URI schemes, would you consider gendb://, gendb.gs:// gendb.hdfs://, etc.? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If a period in the scheme is legal, then these would be fine. I'd check to make sure that Java's URI, URL, and There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, URI and Path recognize period in their schemes. |
||
try { | ||
IOUtils.canReadFile(callsetJson); | ||
IOUtils.canReadFile(vidmapJson); | ||
IOUtils.canReadFile(vcfHeader); | ||
} catch ( UserException.CouldNotReadInputFile e ) { | ||
throw new UserException("Couldn't connect to GenomicsDB because the vidmap, callset JSON files, or gVCF Header (" + | ||
GenomicsDBConstants.DEFAULT_VIDMAP_FILE_NAME + "," + GenomicsDBConstants.DEFAULT_CALLSETMAP_FILE_NAME + "," + | ||
GenomicsDBConstants.DEFAULT_VCFHEADER_FILE_NAME + ") could not be read from GenomicsDB workspace " + workspace.getAbsolutePath(), e); | ||
} | ||
final String callsetJson = IOUtils.appendPathToDir(workspace, GenomicsDBConstants.DEFAULT_CALLSETMAP_FILE_NAME); | ||
final String vidmapJson = IOUtils.appendPathToDir(workspace, GenomicsDBConstants.DEFAULT_VIDMAP_FILE_NAME); | ||
final String vcfHeader = IOUtils.appendPathToDir(workspace, GenomicsDBConstants.DEFAULT_VCFHEADER_FILE_NAME); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are the |
||
|
||
final GenomicsDBExportConfiguration.ExportConfiguration exportConfigurationBuilder = | ||
createExportConfiguration(reference, workspace, callsetJson, vidmapJson, vcfHeader); | ||
verifyPathsAreReadable(callsetJson, vidmapJson, vcfHeader); | ||
|
||
try { | ||
final GenomicsDBExportConfiguration.ExportConfiguration exportConfigurationBuilder = | ||
createExportConfiguration(reference, workspace, callsetJson, vidmapJson, vcfHeader); | ||
return new GenomicsDBFeatureReader<>(exportConfigurationBuilder, new BCF2Codec(), Optional.empty()); | ||
} catch (final IOException e) { | ||
throw new UserException("Couldn't create GenomicsDBFeatureReader", e); | ||
} | ||
} | ||
|
||
private static GenomicsDBExportConfiguration.ExportConfiguration createExportConfiguration(final File reference, final File workspace, | ||
final File callsetJson, final File vidmapJson, | ||
final File vcfHeader) { | ||
private static GenomicsDBExportConfiguration.ExportConfiguration createExportConfiguration(final File reference, final String workspace, | ||
final String callsetJson, final String vidmapJson, | ||
final String vcfHeader) { | ||
GenomicsDBExportConfiguration.ExportConfiguration.Builder exportConfigurationBuilder = | ||
GenomicsDBExportConfiguration.ExportConfiguration.newBuilder() | ||
.setWorkspace(workspace.getAbsolutePath()) | ||
.setWorkspace(workspace) | ||
.setReferenceGenome(reference.getAbsolutePath()) | ||
.setVidMappingFile(vidmapJson.getAbsolutePath()) | ||
.setCallsetMappingFile(callsetJson.getAbsolutePath()) | ||
.setVcfHeaderFilename(vcfHeader.getAbsolutePath()) | ||
.setVidMappingFile(vidmapJson) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are these guaranteed to be absolute at this point? (the previous code enforced this) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have made sure that all these paths are absolute in the changed code where we are allowing for gendb://, gendb.hdfs:// and gendb.gs:// schemes. |
||
.setCallsetMappingFile(callsetJson) | ||
.setVcfHeaderFilename(vcfHeader) | ||
.setProduceGTField(false) | ||
.setProduceGTWithMinPLValueForSpanningDeletions(false) | ||
.setSitesOnlyQuery(false) | ||
.setMaxDiploidAltAllelesThatCanBeGenotyped(GenotypeLikelihoods.MAX_DIPLOID_ALT_ALLELES_THAT_CAN_BE_GENOTYPED); | ||
Path arrayFolder = Paths.get(workspace.getAbsolutePath(), GenomicsDBConstants.DEFAULT_ARRAY_NAME).toAbsolutePath(); | ||
|
||
// For the multi-interval support, we create multiple arrays (directories) in a single workspace - | ||
// one per interval. So, if you wish to import intervals ("chr1", [ 1, 100M ]) and ("chr2", [ 1, 100M ]), | ||
|
@@ -429,7 +410,7 @@ private static GenomicsDBExportConfiguration.ExportConfiguration createExportCon | |
// will be backward compatible with respect to reads. Hence, if a directory named genomicsdb_array is found, | ||
// the array name is passed to the GenomicsDBFeatureReader otherwise the array names are generated from the | ||
// directory entries. | ||
if (Files.exists(arrayFolder)) { | ||
if (GenomicsDBUtils.isGenomicsDBArray(workspace.toString(), GenomicsDBConstants.DEFAULT_ARRAY_NAME)) { | ||
exportConfigurationBuilder.setArrayName(GenomicsDBConstants.DEFAULT_ARRAY_NAME); | ||
} else { | ||
exportConfigurationBuilder.setGenerateArrayNameFromPartitionBounds(true); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,7 @@ | ||
package org.broadinstitute.hellbender.tools.genomicsdb; | ||
|
||
import com.google.common.util.concurrent.ThreadFactoryBuilder; | ||
import com.intel.genomicsdb.GenomicsDBUtils; | ||
import com.intel.genomicsdb.importer.GenomicsDBImporter; | ||
import com.intel.genomicsdb.importer.model.ChromosomeInterval; | ||
import com.intel.genomicsdb.model.Coordinates; | ||
|
@@ -32,10 +33,10 @@ | |
import org.broadinstitute.hellbender.exceptions.UserException; | ||
import org.broadinstitute.hellbender.utils.SimpleInterval; | ||
import org.broadinstitute.hellbender.utils.Utils; | ||
import org.broadinstitute.hellbender.utils.gcs.BucketUtils; | ||
import org.broadinstitute.hellbender.utils.io.IOUtils; | ||
import org.broadinstitute.hellbender.utils.nio.SeekableByteChannelPrefetcher; | ||
|
||
import java.io.File; | ||
import java.io.IOException; | ||
import java.nio.channels.SeekableByteChannel; | ||
import java.nio.file.Files; | ||
|
@@ -294,13 +295,13 @@ public int getDefaultCloudIndexPrefetchBufferSize() { | |
private SAMSequenceDictionary mergedHeaderSequenceDictionary; | ||
|
||
// Path to vidmap file to be written by GenomicsDBImporter | ||
private File vidMapJSONFile; | ||
private String vidMapJSONFile; | ||
|
||
// Path to callsetmap file to be written by GenomicsDBImporter | ||
private File callsetMapJSONFile; | ||
private String callsetMapJSONFile; | ||
|
||
// Path to combined VCF header file to be written by GenomicsDBImporter | ||
private File vcfHeaderFile; | ||
private String vcfHeaderFile; | ||
|
||
// GenomicsDB callset map protobuf structure containing all callset names | ||
// used to write the callset json file on traversal success | ||
|
@@ -461,17 +462,15 @@ public static SortedMap<String, Path> loadSampleNameMapFileInSortedOrder(final P | |
*/ | ||
@Override | ||
public void onTraversalStart() { | ||
|
||
final File workspaceDir = overwriteOrCreateWorkspace(); | ||
|
||
vidMapJSONFile = new File(workspaceDir + "/" + GenomicsDBConstants.DEFAULT_VIDMAP_FILE_NAME); | ||
callsetMapJSONFile = new File(workspaceDir + "/" + GenomicsDBConstants.DEFAULT_CALLSETMAP_FILE_NAME); | ||
vcfHeaderFile = new File(workspaceDir + "/" + GenomicsDBConstants.DEFAULT_VCFHEADER_FILE_NAME); | ||
String workspaceDir = BucketUtils.makeFilePathAbsolute(overwriteOrCreateWorkspace()); | ||
vidMapJSONFile = IOUtils.appendPathToDir(workspaceDir, GenomicsDBConstants.DEFAULT_VIDMAP_FILE_NAME); | ||
callsetMapJSONFile = IOUtils.appendPathToDir(workspaceDir, GenomicsDBConstants.DEFAULT_CALLSETMAP_FILE_NAME); | ||
vcfHeaderFile = IOUtils.appendPathToDir(workspaceDir, GenomicsDBConstants.DEFAULT_VCFHEADER_FILE_NAME); | ||
|
||
logger.info("Vid Map JSON file will be written to " + vidMapJSONFile); | ||
logger.info("Callset Map JSON file will be written to " + callsetMapJSONFile); | ||
logger.info("Complete VCF Header will be written to " + vcfHeaderFile); | ||
logger.info("Importing to array - " + workspace + "/" + GenomicsDBConstants.DEFAULT_ARRAY_NAME); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why remove this logger message? Isn't it still informative? |
||
logger.info("Importing to array - " + workspaceDir + "/" + GenomicsDBConstants.DEFAULT_ARRAY_NAME); | ||
|
||
initializeInputPreloadExecutorService(); | ||
} | ||
|
@@ -533,9 +532,9 @@ private ImportConfig createImportConfig(final int batchSize) { | |
importConfigurationBuilder.setConsolidateTiledbArrayAfterLoad(doConsolidation); | ||
ImportConfig importConfig = new ImportConfig(importConfigurationBuilder.build(), validateSampleToReaderMap, true, | ||
batchSize, mergedHeaderLines, sampleNameToVcfPath, this::createSampleToReaderMap); | ||
importConfig.setOutputCallsetmapJsonFile(callsetMapJSONFile.getAbsolutePath()); | ||
importConfig.setOutputVidmapJsonFile(vidMapJSONFile.getAbsolutePath()); | ||
importConfig.setOutputVcfHeaderFile(vcfHeaderFile.getAbsolutePath()); | ||
importConfig.setOutputCallsetmapJsonFile(callsetMapJSONFile); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are these guaranteed to be absolute at this point? (the previous implementation guaranteed this) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. These are generated with respect to the workspace directory that is made absolute - see line 655 : String workspaceDir = BucketUtils.makeFilePathAbsolute(workspace); |
||
importConfig.setOutputVidmapJsonFile(vidMapJSONFile); | ||
importConfig.setOutputVcfHeaderFile(vcfHeaderFile); | ||
importConfig.setUseSamplesInOrder(true); | ||
importConfig.setFunctionToCallOnBatchCompletion(this::logMessageOnBatchCompletion); | ||
return importConfig; | ||
|
@@ -653,26 +652,20 @@ private AbstractFeatureReader<VariantContext, LineIterator> getReaderFromPath(fi | |
* | ||
* @return The workspace directory | ||
*/ | ||
private File overwriteOrCreateWorkspace() { | ||
final File workspaceDir = new File(workspace); | ||
|
||
if (overwriteExistingWorkspace) { | ||
IOUtils.tryDelete(workspaceDir); | ||
} | ||
|
||
if (!workspaceDir.exists()) { | ||
final int ret = GenomicsDBImporter.createTileDBWorkspace(workspaceDir.getAbsolutePath()); | ||
if (ret > 0) { | ||
checkIfValidWorkspace(workspaceDir); | ||
logger.info("Importing data to GenomicsDB workspace: " + workspaceDir); | ||
} else if (ret < 0) { | ||
throw new UnableToCreateGenomicsDBWorkspace("Error creating GenomicsDB workspace: " + workspaceDir); | ||
} | ||
return workspaceDir; | ||
private String overwriteOrCreateWorkspace() { | ||
String workspaceDir = BucketUtils.makeFilePathAbsolute(workspace); | ||
// From JavaDoc for GenomicsDBUtils.createTileDBWorkspace | ||
// returnCode = 0 : OK. If overwriteExistingWorkspace is true and the workspace exists, it is deleted first. | ||
// returnCode = -1 : path was not a directory | ||
// returnCode = -2 : failed to create workspace | ||
// returnCode = 1 : if overwriteExistingWorkspace is false, return 1 if directory already exists | ||
int returnCode = GenomicsDBUtils.createTileDBWorkspace(workspaceDir, overwriteExistingWorkspace); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What does There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The value of |
||
if (returnCode < 0) { | ||
throw new UnableToCreateGenomicsDBWorkspace("Error creating GenomicsDB workspace: " + workspace); | ||
} else if (!overwriteExistingWorkspace && returnCode == 1) { | ||
throw new UnableToCreateGenomicsDBWorkspace("Error creating GenomicsDB workspace: " + workspace + " already exists"); | ||
} else { | ||
throw new UnableToCreateGenomicsDBWorkspace("The workspace you're trying to create already exists. ( " + workspaceDir.getAbsolutePath() + " ) " + | ||
"Writing into an existing workspace can cause data corruption. " + | ||
"Please choose an output path that doesn't already exist. "); | ||
return workspaceDir; | ||
} | ||
} | ||
|
||
|
@@ -684,13 +677,6 @@ static class UnableToCreateGenomicsDBWorkspace extends UserException { | |
} | ||
} | ||
|
||
private static void checkIfValidWorkspace(final File workspaceDir) { | ||
final File tempFile = new File(workspaceDir.getAbsolutePath() + "/__tiledb_workspace.tdb"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this check performed internally now? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, this check is performed internally in GenomicsDB now. gatk should not have to worry about internal data structures. |
||
if (!tempFile.exists()) { | ||
throw new UserException(workspaceDir.getAbsolutePath() + " is not a valid GenomicsDB workspace"); | ||
} | ||
} | ||
|
||
/** | ||
* Loads our intervals using the best available sequence | ||
* dictionary (as returned by {@link #getBestAvailableSequenceDictionary}) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move this method to
IOUtils
as well.