Skip to content

Commit

Permalink
allow no filtering to be applied (#7004)
Browse files Browse the repository at this point in the history
  • Loading branch information
kcibul committed Feb 1, 2021
1 parent 8b5fba1 commit 0cb9260
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ public class ExtractCohort extends ExtractTool {

@Argument(
fullName = "filter-set-name",
doc = "Name in filter_set_name column of filtering table to use. Which training set should be applied in extract."
doc = "Name in filter_set_name column of filtering table to use. Which training set should be applied in extract.",
optional = true
)
private String filterSetName = null;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,30 +116,37 @@ public ExtractCohortEngine(final String projectID,
int getTotalNumberOfSites() { return totalNumberOfSites; }

public void traverse() {
//First allele here is the ref, followed by the alts associated with that ref. We need this because at this point the alleles haven't been joined and remapped to one reference allele.
final HashMap<Long, HashMap<Allele, HashMap<Allele, Double>>> fullVqsLodMap = new HashMap<>();
final HashMap<Long, HashMap<Allele, HashMap<Allele, String>>> fullYngMap = new HashMap<>();

String rowRestriction = null;
if (minLocation != null && maxLocation != null) {
rowRestriction = "location >= " + minLocation + " AND location <= " + maxLocation;
}
final String rowRestrictionWithFilterSetName = rowRestriction + " AND " + SchemaUtils.FILTER_SET_NAME + " = '" + filterSetName + "'";

final StorageAPIAvroReader filteringTableAvroReader = new StorageAPIAvroReader(filteringTableRef, rowRestrictionWithFilterSetName, projectID);
//First allele here is the ref, followed by the alts associated with that ref. We need this because at this point the alleles haven't been joined and remapped to one reference allele.
final HashMap<Long, HashMap<Allele, HashMap<Allele, Double>>> fullVqsLodMap = new HashMap<>();
final HashMap<Long, HashMap<Allele, HashMap<Allele, String>>> fullYngMap = new HashMap<>();
boolean noFilteringRequested = (filteringTableRef == null);

if (!noFilteringRequested) {
final String rowRestrictionWithFilterSetName = rowRestriction + " AND " + SchemaUtils.FILTER_SET_NAME + " = '" + filterSetName + "'";

final StorageAPIAvroReader filteringTableAvroReader = new StorageAPIAvroReader(filteringTableRef, rowRestrictionWithFilterSetName, projectID);

for ( final GenericRecord queryRow : filteringTableAvroReader ) {
final long location = Long.parseLong(queryRow.get(SchemaUtils.LOCATION_FIELD_NAME).toString());
final Double vqslod = Double.parseDouble(queryRow.get("vqslod").toString());
final String yng = queryRow.get("yng_status").toString();
final Allele ref = Allele.create(queryRow.get("ref").toString(), true);
final Allele alt = Allele.create(queryRow.get("alt").toString(), false);
fullVqsLodMap.putIfAbsent(location, new HashMap<>());
fullVqsLodMap.get(location).putIfAbsent(ref, new HashMap<>());
fullVqsLodMap.get(location).get(ref).put(alt, vqslod);
fullYngMap.putIfAbsent(location, new HashMap<>());
fullYngMap.get(location).putIfAbsent(ref, new HashMap<>());
fullYngMap.get(location).get(ref).put(alt, yng);
}

for ( final GenericRecord queryRow : filteringTableAvroReader ) {
final long location = Long.parseLong(queryRow.get(SchemaUtils.LOCATION_FIELD_NAME).toString());
final Double vqslod = Double.parseDouble(queryRow.get("vqslod").toString());
final String yng = queryRow.get("yng_status").toString();
final Allele ref = Allele.create(queryRow.get("ref").toString(), true);
final Allele alt = Allele.create(queryRow.get("alt").toString(), false);
fullVqsLodMap.putIfAbsent(location, new HashMap<>());
fullVqsLodMap.get(location).putIfAbsent(ref, new HashMap<>());
fullVqsLodMap.get(location).get(ref).put(alt, vqslod);
fullYngMap.putIfAbsent(location, new HashMap<>());
fullYngMap.get(location).putIfAbsent(ref, new HashMap<>());
fullYngMap.get(location).get(ref).put(alt, yng);
filteringTableAvroReader.close();
}

switch (queryMode) {
Expand All @@ -149,7 +156,7 @@ public void traverse() {
}

final StorageAPIAvroReader storageAPIAvroReader = new StorageAPIAvroReader(cohortTableRef, rowRestriction, projectID);
createVariantsFromUngroupedTableResult(storageAPIAvroReader, fullVqsLodMap, fullYngMap);
createVariantsFromUngroupedTableResult(storageAPIAvroReader, fullVqsLodMap, fullYngMap, noFilteringRequested);
break;
case QUERY:
if (printDebugInformation) {
Expand All @@ -158,12 +165,12 @@ public void traverse() {
// create the query string
String q = "SELECT " + StringUtils.join(SchemaUtils.COHORT_FIELDS,",") + " FROM " + cohortTableRef.getFQTableName() + " ORDER BY " + SchemaUtils.LOCATION_FIELD_NAME;
TableResult tr = BigQueryUtils.executeQuery(BigQueryUtils.getBigQueryEndPoint(), cohortTableRef.tableProject, cohortTableRef.tableDataset, q);
createVariantsFromSortedTableResults(tr, fullVqsLodMap, fullYngMap);
createVariantsFromSortedTableResults(tr, fullVqsLodMap, fullYngMap, noFilteringRequested);
break;
}
}

private void createVariantsFromSortedTableResults(final TableResult tr, HashMap<Long, HashMap<Allele, HashMap<Allele, Double>>> fullVqsLodMap, HashMap<Long, HashMap<Allele, HashMap<Allele, String>>> fullYngMap) {
private void createVariantsFromSortedTableResults(final TableResult tr, HashMap<Long, HashMap<Allele, HashMap<Allele, Double>>> fullVqsLodMap, HashMap<Long, HashMap<Allele, HashMap<Allele, String>>> fullYngMap, boolean noFilteringRequested) {

// final Set<String> columnNames = new HashSet<>();
// if ( schema.getField(POSITION_FIELD_NAME) == null || schema.getField(VALUES_ARRAY_FIELD_NAME) == null ) {
Expand Down Expand Up @@ -191,15 +198,15 @@ private void createVariantsFromSortedTableResults(final TableResult tr, HashMap<
logger.info(currentLocation + ": processing records");
}
// TODO this should start a thread or something - i.e. scatter
processSampleRecordsForLocation(currentLocation, sampleRecords, new HashSet<>(SchemaUtils.ARRAY_COHORT_FIELDS), fullVqsLodMap, fullYngMap);
processSampleRecordsForLocation(currentLocation, sampleRecords, new HashSet<>(SchemaUtils.ARRAY_COHORT_FIELDS), fullVqsLodMap, fullYngMap, noFilteringRequested);
currentLocation = location;
sampleRecords = new ArrayList<>();
}

sampleRecords.add(new QueryRecord(row));

}
processSampleRecordsForLocation(currentLocation, sampleRecords, new HashSet<>(SchemaUtils.ARRAY_COHORT_FIELDS), fullVqsLodMap, fullYngMap);
processSampleRecordsForLocation(currentLocation, sampleRecords, new HashSet<>(SchemaUtils.ARRAY_COHORT_FIELDS), fullVqsLodMap, fullYngMap, noFilteringRequested);

}

Expand All @@ -219,7 +226,7 @@ public int compare( GenericRecord o1, GenericRecord o2 ) {
}


private void createVariantsFromUngroupedTableResult(final GATKAvroReader avroReader, HashMap<Long, HashMap<Allele, HashMap<Allele, Double>>> fullVqsLodMap, HashMap<Long, HashMap<Allele, HashMap<Allele, String>>> fullYngMap) {
private void createVariantsFromUngroupedTableResult(final GATKAvroReader avroReader, HashMap<Long, HashMap<Allele, HashMap<Allele, Double>>> fullVqsLodMap, HashMap<Long, HashMap<Allele, HashMap<Allele, String>>> fullYngMap, boolean noFilteringRequested) {

final org.apache.avro.Schema schema = avroReader.getSchema();

Expand All @@ -246,7 +253,7 @@ private void createVariantsFromUngroupedTableResult(final GATKAvroReader avroRea

if ( location != currentLocation && currentLocation != -1 ) {
++totalNumberOfSites;
processSampleRecordsForLocation(currentLocation, currentPositionRecords, columnNames, fullVqsLodMap, fullYngMap);
processSampleRecordsForLocation(currentLocation, currentPositionRecords, columnNames, fullVqsLodMap, fullYngMap, noFilteringRequested);

currentPositionRecords.clear();
}
Expand All @@ -257,11 +264,11 @@ private void createVariantsFromUngroupedTableResult(final GATKAvroReader avroRea

if ( ! currentPositionRecords.isEmpty() ) {
++totalNumberOfSites;
processSampleRecordsForLocation(currentLocation, currentPositionRecords, columnNames, fullVqsLodMap, fullYngMap);
processSampleRecordsForLocation(currentLocation, currentPositionRecords, columnNames, fullVqsLodMap, fullYngMap, noFilteringRequested);
}
}

private void processSampleRecordsForLocation(final long location, final Iterable<GenericRecord> sampleRecordsAtPosition, final Set<String> columnNames, HashMap<Long, HashMap<Allele, HashMap<Allele, Double>>> fullVqsLodMap, HashMap<Long, HashMap<Allele, HashMap<Allele, String>>> fullYngMap) {
private void processSampleRecordsForLocation(final long location, final Iterable<GenericRecord> sampleRecordsAtPosition, final Set<String> columnNames, HashMap<Long, HashMap<Allele, HashMap<Allele, Double>>> fullVqsLodMap, HashMap<Long, HashMap<Allele, HashMap<Allele, String>>> fullYngMap, boolean noFilteringRequested) {
final List<VariantContext> unmergedCalls = new ArrayList<>();
final Set<String> currentPositionSamplesSeen = new HashSet<>();
boolean currentPositionHasVariant = false;
Expand All @@ -272,6 +279,9 @@ private void processSampleRecordsForLocation(final long location, final Iterable

final HashMap<Allele, HashMap<Allele, Double>> vqsLodMap;
final HashMap<Allele, HashMap<Allele, String>> yngMap;

// TODO: optimize in the case where noFilteringRequested == true, no need to populate this

// If there's no yng/vqslod for this site, then we'll treat these as NAYs because VQSR dropped them (they have no alt reads).
if (fullVqsLodMap.get(SchemaUtils.encodeLocation(contig, currentPosition)) == null) {
vqsLodMap = new HashMap<>();
Expand Down Expand Up @@ -337,10 +347,10 @@ private void processSampleRecordsForLocation(final long location, final Iterable
logger.info(contig + ":" + currentPosition + ": processed " + numRecordsAtPosition + " total sample records");
}

finalizeCurrentVariant(unmergedCalls, currentPositionSamplesSeen, currentPositionHasVariant, contig, currentPosition, refAllele, vqsLodMap, yngMap);
finalizeCurrentVariant(unmergedCalls, currentPositionSamplesSeen, currentPositionHasVariant, contig, currentPosition, refAllele, vqsLodMap, yngMap, noFilteringRequested);
}

private void finalizeCurrentVariant(final List<VariantContext> unmergedCalls, final Set<String> currentVariantSamplesSeen, final boolean currentPositionHasVariant, final String contig, final long start, final Allele refAllele, HashMap<Allele, HashMap<Allele, Double>> vqsLodMap, HashMap<Allele, HashMap<Allele, String>> yngMap) {
private void finalizeCurrentVariant(final List<VariantContext> unmergedCalls, final Set<String> currentVariantSamplesSeen, final boolean currentPositionHasVariant, final String contig, final long start, final Allele refAllele, HashMap<Allele, HashMap<Allele, Double>> vqsLodMap, HashMap<Allele, HashMap<Allele, String>> yngMap, boolean noFilteringRequested) {
// If there were no variants at this site, we don't emit a record and there's nothing to do here
if ( ! currentPositionHasVariant ) {
return;
Expand All @@ -366,7 +376,9 @@ private void finalizeCurrentVariant(final List<VariantContext> unmergedCalls, fi
true);


final VariantContext finalVC = mode.equals(CommonCode.ModeEnum.ARRAYS) ? mergedVC : filterVariants(mergedVC, vqsLodMap, yngMap);
final VariantContext finalVC = noFilteringRequested || mode.equals(CommonCode.ModeEnum.ARRAYS) ? mergedVC : filterVariants(mergedVC, vqsLodMap, yngMap);


// final VariantContext annotatedVC = enableVariantAnnotator ?
// variantAnnotator.annotateContext(finalizedVC, new FeatureContext(), null, null, a -> true): finalVC;

Expand Down

0 comments on commit 0cb9260

Please sign in to comment.