Skip to content

Commit

Permalink
analysis: Complete VariantStatsAnalysis implementation. #18
Browse files Browse the repository at this point in the history
  • Loading branch information
j-coll committed Oct 15, 2019
1 parent febd042 commit f6938e4
Show file tree
Hide file tree
Showing 3 changed files with 173 additions and 12 deletions.
Original file line number Diff line number Diff line change
@@ -1,18 +1,28 @@
package org.opencb.oskar.analysis.variant.stats;

import org.apache.commons.lang.StringUtils;
import org.opencb.biodata.models.variant.StudyEntry;
import org.opencb.commons.datastore.core.ObjectMap;
import org.opencb.commons.datastore.core.Query;
import org.opencb.commons.utils.CollectionUtils;
import org.opencb.oskar.analysis.OskarAnalysis;
import org.opencb.oskar.analysis.exceptions.AnalysisException;
import org.opencb.oskar.analysis.result.FileResult;
import org.opencb.oskar.core.annotations.Analysis;

import java.nio.file.Path;
import java.util.List;

@Analysis(id = VariantStatsAnalysis.ID, data = Analysis.AnalysisData.VARIANT)
public class VariantStatsAnalysis extends OskarAnalysis {

public static final String ID = "VARIANT_STATS";

private String study;
private String cohort;
private List<String> samples;
private Path outputFile;
private Query variantsQuery;

public VariantStatsAnalysis(ObjectMap executorParams, Path outDir) {
this(null, executorParams, outDir);
Expand All @@ -23,7 +33,80 @@ public VariantStatsAnalysis(String cohort, ObjectMap executorParams, Path outDir
this.cohort = cohort;
}

@Override
protected void check() throws AnalysisException {
super.check();

if (StringUtils.isEmpty(cohort) && CollectionUtils.isEmpty(samples)) {
cohort = StudyEntry.DEFAULT_COHORT;
}

if (StringUtils.isEmpty(cohort)) {
outputFile = outDir.resolve("variant_stats.tsv");
} else {
outputFile = outDir.resolve("variant_stats_" + cohort + ".tsv");
}

if (variantsQuery == null) {
variantsQuery = new Query();
}
}

@Override
public void exec() throws AnalysisException {
VariantStatsAnalysisExecutor executor = getAnalysisExecutor(VariantStatsAnalysisExecutor.class);

executor.setStudy(study)
.setCohort(cohort)
.setSamples(samples)
.setOutputFile(outputFile)
.setVariantsQuery(variantsQuery);
executor.setUp(executorParams, outDir);

arm.startStep("variant-stats");
executor.exec();
arm.endStep(100);

if (outputFile.toFile().exists()) {
arm.addFile(outputFile, FileResult.FileType.TAB_SEPARATED);
} else {
arm.addWarning("Output file not generated");
}
}

public String getStudy() {
return study;
}

public VariantStatsAnalysis setStudy(String study) {
this.study = study;
return this;
}

public String getCohort() {
return cohort;
}

public VariantStatsAnalysis setCohort(String cohort) {
this.cohort = cohort;
return this;
}

public List<String> getSamples() {
return samples;
}

public VariantStatsAnalysis setSamples(List<String> samples) {
this.samples = samples;
return this;
}

public Query getVariantsQuery() {
return variantsQuery;
}

public VariantStatsAnalysis setVariantsQuery(Query variantsQuery) {
this.variantsQuery = variantsQuery;
return this;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package org.opencb.oskar.analysis.variant.stats;

import org.opencb.commons.datastore.core.ObjectMap;
import org.opencb.commons.datastore.core.Query;
import org.opencb.oskar.analysis.OskarAnalysisExecutor;

import java.nio.file.Path;
import java.util.List;

public abstract class VariantStatsAnalysisExecutor extends OskarAnalysisExecutor {

private Path outputFile;
private String study;
private String cohort;
private List<String> samples;
private Query variantsQuery;

public VariantStatsAnalysisExecutor() {
}

public VariantStatsAnalysisExecutor(ObjectMap executorParams, Path outDir) {
this(null, executorParams, outDir);
}

public VariantStatsAnalysisExecutor(String cohort, ObjectMap executorParams, Path outDir) {
setUp(executorParams, outDir);
this.cohort = cohort;
}

@Override
public String toString() {
final StringBuilder sb = new StringBuilder("VariantStatsAnalysisExecutor{");
sb.append("cohort='").append(cohort).append('\'');
sb.append(", executorParams=").append(executorParams);
sb.append(", outDir=").append(outDir);
sb.append('}');
return sb.toString();
}

public String getStudy() {
return study;
}

public VariantStatsAnalysisExecutor setStudy(String study) {
this.study = study;
return this;
}

public String getCohort() {
return cohort;
}

public VariantStatsAnalysisExecutor setCohort(String cohort) {
this.cohort = cohort;
return this;
}

public List<String> getSamples() {
return samples;
}

public VariantStatsAnalysisExecutor setSamples(List<String> samples) {
this.samples = samples;
return this;
}

public Path getOutputFile() {
return outputFile;
}

public VariantStatsAnalysisExecutor setOutputFile(Path outputFile) {
this.outputFile = outputFile;
return this;
}

public VariantStatsAnalysisExecutor setVariantsQuery(Query variantsQuery) {
this.variantsQuery = variantsQuery;
return this;
}

public Query getVariantsQuery() {
return variantsQuery;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,16 @@
import org.apache.spark.sql.types.StructField;
import org.opencb.commons.datastore.core.ObjectMap;
import org.opencb.oskar.analysis.exceptions.AnalysisException;
import org.opencb.oskar.analysis.result.FileResult;
import org.opencb.oskar.analysis.variant.stats.VariantStatsAnalysis;
import org.opencb.oskar.analysis.variant.stats.VariantStatsExecutor;
import org.opencb.oskar.analysis.variant.stats.VariantStatsAnalysisExecutor;
import org.opencb.oskar.core.annotations.AnalysisExecutor;
import org.opencb.oskar.spark.commons.OskarException;
import org.opencb.oskar.spark.variant.Oskar;
import org.opencb.oskar.spark.variant.analysis.transformers.VariantStatsTransformer;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.PrintWriter;
import java.nio.file.Path;
import java.nio.file.Paths;

import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.explode;
Expand All @@ -28,7 +25,7 @@
analysis = VariantStatsAnalysis.ID,
source= AnalysisExecutor.Source.PARQUET_FILE,
framework = AnalysisExecutor.Framework.SPARK)
public class VariantStatsSparkParquetAnalysisExecutor extends VariantStatsExecutor {
public class VariantStatsSparkParquetAnalysisExecutor extends VariantStatsAnalysisExecutor {

public VariantStatsSparkParquetAnalysisExecutor() {
}
Expand All @@ -40,7 +37,7 @@ public VariantStatsSparkParquetAnalysisExecutor(String cohort, ObjectMap executo
@Override
public void exec() throws AnalysisException {
String parquetFilename = getExecutorParams().getString("FILE");
String studyId = getExecutorParams().getString("STUDY_ID");
String studyId = getStudy();
String master = getExecutorParams().getString("MASTER");

// Prepare input dataset from the input parquet file
Expand Down Expand Up @@ -77,10 +74,10 @@ public void exec() throws AnalysisException {
line.append(field.name());
}

String outFilename = getOutDir() + "/variant_stats.txt";
Path outFilename = getOutputFile();
PrintWriter pw;
try {
pw = new PrintWriter(outFilename);
pw = new PrintWriter(outFilename.toFile());
} catch (FileNotFoundException e) {
throw new AnalysisException("Error creating output file: " + outFilename, e);
}
Expand All @@ -89,9 +86,6 @@ public void exec() throws AnalysisException {
SparkAnalysisExecutorUtils.writeRows(outDf.toLocalIterator(), pw);

pw.close();

if (new File(outFilename).exists()) {
arm.addFile(Paths.get(outFilename), FileResult.FileType.TAB_SEPARATED);
}
}

}

0 comments on commit f6938e4

Please sign in to comment.