Skip to content

Commit

Permalink
optionally provide sample-map-file instead of sample-map-table (#6872)
Browse files Browse the repository at this point in the history
* optionally provide sample-map-file instead of sample-map-table

* fix variantstore test

* trying to fix test

* pass in read project id

* empty string -> null

* make read-project-id optional

Co-authored-by: Megan Shand <mshand@broadinstitute.org>
  • Loading branch information
2 people authored and Marianie-Simeon committed Feb 16, 2021
1 parent 19dd991 commit 5453e6f
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import htsjdk.variant.variantcontext.writer.VariantContextWriter;
import htsjdk.variant.vcf.VCFHeader;
import java.io.File;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.broadinstitute.barclay.argparser.Argument;
Expand Down Expand Up @@ -51,11 +52,11 @@ public enum QueryMode {
private String outputVcfPathString = null;

@Argument(
fullName = "project-id",
doc = "ID of the Google Cloud project to use when executing queries",
optional = false
fullName = "read-project-id",
doc = "ID of the Google Cloud project to use (bill) when reading the microarray data tables",
optional = true
)
private String projectID = null;
private String readProjectID = null;

@Argument(
fullName = "cohort-sample-table",
Expand All @@ -64,6 +65,13 @@ public enum QueryMode {
)
private String sampleTableName = null;

@Argument(
fullName = "cohort-sample-file",
doc = "CSV of sample_id,sample_name map in the cohort",
optional = true
)
private File cohortSampleFile = null;

@Argument(
fullName = "probe-info-table",
doc = "Fully qualified name of a bigquery table containing probe information",
Expand Down Expand Up @@ -191,28 +199,34 @@ protected void onStartup() {

vcfWriter = createVCFWriter(IOUtils.getPath(outputVcfPathString));

Map<Integer, String> sampleIdMap = SampleList.getSampleIdMap(new TableReference(sampleTableName, SampleList.SAMPLE_LIST_FIELDS), printDebugInformation);
Map<Integer, String> sampleIdMap;
if (sampleTableName != null) {
sampleIdMap = SampleList.getSampleIdMap(new TableReference(sampleTableName, SampleList.SAMPLE_LIST_FIELDS), printDebugInformation);
} else if (cohortSampleFile != null) {
sampleIdMap = SampleList.getSampleIdMap(cohortSampleFile);
} else {
throw new IllegalArgumentException("--cohort-sample-names or --cohort-sample-table must be provided.");
}

Collection<String> sampleNames = sampleIdMap.values();
VCFHeader header = CommonCode.generateRawArrayVcfHeader(new HashSet<>(sampleNames), reference.getSequenceDictionary());
VCFHeader header = CommonCode.generateRawArrayVcfHeader(new HashSet<>(sampleIdMap.values()), reference.getSequenceDictionary());

Map<Long, ProbeInfo> probeIdMap;
if (probeCsvExportFile == null) {
probeIdMap = ProbeInfo.getProbeIdMapWithStorageAPI(probeTableName, printDebugInformation);
probeIdMap = ProbeInfo.getProbeIdMapWithStorageAPI(probeTableName, printDebugInformation, readProjectID);
} else {
probeIdMap = ProbeInfo.getProbeIdMapFromExport(probeCsvExportFile);
}

// if we have a qcMetrics table, augment the probeInfo map with that information
Map<Long, ProbeQcMetrics> probeQcMetricsMap = null;
if (qcMetricsTableName != null) {
probeQcMetricsMap = ProbeQcMetrics.getProbeQcMetricsWithStorageAPI(qcMetricsTableName);
probeQcMetricsMap = ProbeQcMetrics.getProbeQcMetricsWithStorageAPI(qcMetricsTableName, readProjectID);
}

//ChromosomeEnum.setRefVersion(refVersion);

engine = new ArrayExtractCohortEngine(
projectID,
readProjectID,
vcfWriter,
header,
annotationEngine,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.broadinstitute.hellbender.utils.SimpleInterval;
import org.broadinstitute.hellbender.utils.bigquery.*;
import org.broadinstitute.hellbender.utils.localsort.SortingCollection;
import org.broadinstitute.hellbender.utils.variant.GATKVCFConstants;

import java.text.DecimalFormat;
import java.util.*;
Expand All @@ -44,7 +43,7 @@ public class ArrayExtractCohortEngine {
private final ReferenceDataSource refSource;

private final ProgressMeter progressMeter;
private final String projectID;
private final String readProjectId;

/** List of sample names seen in the variant data from BigQuery. */
private final Map<Integer, String> sampleIdMap;
Expand All @@ -65,7 +64,7 @@ public class ArrayExtractCohortEngine {
final float callRateThreshold;
final boolean filterInvariants;

public ArrayExtractCohortEngine(final String projectID,
public ArrayExtractCohortEngine(final String readProjectId,
final VariantContextWriter vcfWriter,
final VCFHeader vcfHeader,
final VariantAnnotatorEngine annotationEngine,
Expand All @@ -92,7 +91,7 @@ public ArrayExtractCohortEngine(final String projectID,

this.localSortMaxRecordsInRam = localSortMaxRecordsInRam;

this.projectID = projectID;
this.readProjectId = readProjectId;
this.vcfWriter = vcfWriter;
this.refSource = refSource;
this.sampleIdMap = sampleIdMap;
Expand Down Expand Up @@ -133,7 +132,7 @@ public void traverse() {
rowRestriction = "probe_id >= " + minProbeId + " AND probe_id <= " + maxProbeId;
}

final StorageAPIAvroReader storageAPIAvroReader = new StorageAPIAvroReader(cohortTableRef, rowRestriction);
final StorageAPIAvroReader storageAPIAvroReader = new StorageAPIAvroReader(cohortTableRef, rowRestriction, readProjectId);
createVariantsFromUngroupedTableResult(storageAPIAvroReader);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public static Set<String> populateSampleNames(TableReference sampleTableRef, boo

public static Map<String, ProbeInfo> getProbeNameMap(String fqProbeTableName, boolean printDebugInformation) {
Map<String, ProbeInfo> results = new HashMap<>();
for (final ProbeInfo pi : ProbeInfo.getProbeIdMapWithStorageAPI(fqProbeTableName, printDebugInformation).values()) {
for (final ProbeInfo pi : ProbeInfo.getProbeIdMapWithStorageAPI(fqProbeTableName, printDebugInformation, null).values()) {
results.put(pi.name, pi);
}
return results;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,21 +101,22 @@ public static Map<Long, ProbeInfo> getProbeIdMapFromExport(final String probeCsv

probeIdMap.put(p.probeId, p);
}

return probeIdMap;
} catch (final Exception e) {
throw new GATKException("Error processing probe CSV file", e);
}
}
}

public static Map<Long, ProbeInfo> getProbeIdMapWithStorageAPI(String fqProbeTableName, boolean printDebugInformation) {
public static Map<Long, ProbeInfo> getProbeIdMapWithStorageAPI(String fqProbeTableName, boolean printDebugInformation, String readProjectId) {
Map<Long, ProbeInfo> results = new HashMap<>();

TableReference tableRef = new TableReference(fqProbeTableName, ProbeInfoSchema.PROBE_INFO_FIELDS);

System.out.println("Beginning probe retrieval...");
long start = System.currentTimeMillis();

try (final StorageAPIAvroReader reader = new StorageAPIAvroReader(tableRef)) {
try (final StorageAPIAvroReader reader = new StorageAPIAvroReader(tableRef, readProjectId)) {
for ( final GenericRecord row : reader ) {
ProbeInfo p = new ProbeInfo(
(Long) row.get(ProbeInfoSchema.PROBE_ID),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@ public ProbeQcMetrics(final long probeId, final Double hwe_pval, final Double ca
this.invariant = invariant;
}

public static Map<Long, ProbeQcMetrics> getProbeQcMetricsWithStorageAPI(String fqProbeTableName) {
public static Map<Long, ProbeQcMetrics> getProbeQcMetricsWithStorageAPI(String fqProbeTableName, String readProjectId) {
Map<Long, ProbeQcMetrics> results = new HashMap<>();

TableReference tableRef = new TableReference(fqProbeTableName, ProbeQcMetricsSchema.PROBE_QC_METRIC_FIELDS);

System.out.println("Beginning probe QC metrics retrieval...");
long start = System.currentTimeMillis();

try (final StorageAPIAvroReader reader = new StorageAPIAvroReader(tableRef)) {
try (final StorageAPIAvroReader reader = new StorageAPIAvroReader(tableRef, readProjectId)) {
for ( final GenericRecord row : reader ) {
ProbeQcMetrics p = new ProbeQcMetrics(
(Long) row.get(ProbeQcMetricsSchema.PROBE_ID),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

import com.google.cloud.bigquery.FieldValueList;
import com.google.cloud.bigquery.TableResult;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.stream.Collectors;
import org.apache.commons.lang.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -49,6 +54,16 @@ public static Map<Integer, String> getSampleIdMap(TableReference sampleTable, bo
return results;
}

public static Map<Integer, String> getSampleIdMap(File cohortSampleFile) {
try {
return Files.readAllLines(cohortSampleFile.toPath(), StandardCharsets.US_ASCII).stream()
.map(s -> s.split(","))
.collect(Collectors.toMap(tokens -> Integer.parseInt(tokens[0]), tokens -> tokens[1]));
} catch (IOException e) {
throw new IllegalArgumentException("Could not parse --cohort-sample-file", e);
}
}

private static TableResult querySampleTable(String fqSampleTableName, String whereClause, boolean printDebugInformation) {
// Get the query string:
final String sampleListQueryString =
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.broadinstitute.hellbender.utils.bigquery;

import com.google.api.gax.rpc.ServerStream;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.storage.v1beta1.*;
import com.google.cloud.bigquery.storage.v1beta1.ReadOptions.TableReadOptions.Builder;
import com.google.common.base.Preconditions;
Expand All @@ -15,7 +16,6 @@

import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;

public class StorageAPIAvroReader implements GATKAvroReader {
Expand All @@ -42,15 +42,19 @@ public class StorageAPIAvroReader implements GATKAvroReader {
private GenericRecord nextRow = null;

public StorageAPIAvroReader(final TableReference tableRef) {
this(tableRef, null);
this(tableRef, null, null);
}

public StorageAPIAvroReader(final TableReference tableRef, final String rowRestriction) {
public StorageAPIAvroReader(final TableReference tableRef, String parentProjectId) {
this(tableRef, null, parentProjectId);
}

public StorageAPIAvroReader(final TableReference tableRef, final String rowRestriction, String parentProjectId) {

try {
this.client = BigQueryStorageClient.create();

final String parent = String.format("projects/%s", tableRef.tableProject);
final String parent = String.format("projects/%s", parentProjectId == null || parentProjectId.isEmpty() ? tableRef.tableProject : parentProjectId);

final TableReferenceProto.TableReference tableReference = TableReferenceProto.TableReference.newBuilder()
.setProjectId(tableRef.tableProject)
Expand Down

0 comments on commit 5453e6f

Please sign in to comment.