diff --git a/oskar-analysis/src/main/java/org/opencb/oskar/analysis/variant/stats/VariantStatsAnalysis.java b/oskar-analysis/src/main/java/org/opencb/oskar/analysis/variant/stats/VariantStatsAnalysis.java index e33cb8f..e9582ae 100644 --- a/oskar-analysis/src/main/java/org/opencb/oskar/analysis/variant/stats/VariantStatsAnalysis.java +++ b/oskar-analysis/src/main/java/org/opencb/oskar/analysis/variant/stats/VariantStatsAnalysis.java @@ -1,18 +1,28 @@ package org.opencb.oskar.analysis.variant.stats; +import org.apache.commons.lang.StringUtils; +import org.opencb.biodata.models.variant.StudyEntry; import org.opencb.commons.datastore.core.ObjectMap; +import org.opencb.commons.datastore.core.Query; +import org.opencb.commons.utils.CollectionUtils; import org.opencb.oskar.analysis.OskarAnalysis; import org.opencb.oskar.analysis.exceptions.AnalysisException; +import org.opencb.oskar.analysis.result.FileResult; import org.opencb.oskar.core.annotations.Analysis; import java.nio.file.Path; +import java.util.List; @Analysis(id = VariantStatsAnalysis.ID, data = Analysis.AnalysisData.VARIANT) public class VariantStatsAnalysis extends OskarAnalysis { public static final String ID = "VARIANT_STATS"; + private String study; private String cohort; + private List samples; + private Path outputFile; + private Query variantsQuery; public VariantStatsAnalysis(ObjectMap executorParams, Path outDir) { this(null, executorParams, outDir); @@ -23,7 +33,80 @@ public VariantStatsAnalysis(String cohort, ObjectMap executorParams, Path outDir this.cohort = cohort; } + @Override + protected void check() throws AnalysisException { + super.check(); + + if (StringUtils.isEmpty(cohort) && CollectionUtils.isEmpty(samples)) { + cohort = StudyEntry.DEFAULT_COHORT; + } + + if (StringUtils.isEmpty(cohort)) { + outputFile = outDir.resolve("variant_stats.tsv"); + } else { + outputFile = outDir.resolve("variant_stats_" + cohort + ".tsv"); + } + + if (variantsQuery == null) { + variantsQuery = new Query(); + } + } + @Override public void exec() throws AnalysisException { + VariantStatsAnalysisExecutor executor = getAnalysisExecutor(VariantStatsAnalysisExecutor.class); + + executor.setStudy(study) + .setCohort(cohort) + .setSamples(samples) + .setOutputFile(outputFile) + .setVariantsQuery(variantsQuery); + executor.setUp(executorParams, outDir); + + arm.startStep("variant-stats"); + executor.exec(); + arm.endStep(100); + + if (outputFile.toFile().exists()) { + arm.addFile(outputFile, FileResult.FileType.TAB_SEPARATED); + } else { + arm.addWarning("Output file not generated"); + } + } + + public String getStudy() { + return study; + } + + public VariantStatsAnalysis setStudy(String study) { + this.study = study; + return this; + } + + public String getCohort() { + return cohort; + } + + public VariantStatsAnalysis setCohort(String cohort) { + this.cohort = cohort; + return this; + } + + public List getSamples() { + return samples; + } + + public VariantStatsAnalysis setSamples(List samples) { + this.samples = samples; + return this; + } + + public Query getVariantsQuery() { + return variantsQuery; + } + + public VariantStatsAnalysis setVariantsQuery(Query variantsQuery) { + this.variantsQuery = variantsQuery; + return this; } } diff --git a/oskar-analysis/src/main/java/org/opencb/oskar/analysis/variant/stats/VariantStatsAnalysisExecutor.java b/oskar-analysis/src/main/java/org/opencb/oskar/analysis/variant/stats/VariantStatsAnalysisExecutor.java new file mode 100644 index 0000000..7e940a6 --- /dev/null +++ b/oskar-analysis/src/main/java/org/opencb/oskar/analysis/variant/stats/VariantStatsAnalysisExecutor.java @@ -0,0 +1,84 @@ +package org.opencb.oskar.analysis.variant.stats; + +import org.opencb.commons.datastore.core.ObjectMap; +import org.opencb.commons.datastore.core.Query; +import org.opencb.oskar.analysis.OskarAnalysisExecutor; + +import java.nio.file.Path; +import java.util.List; + +public abstract class VariantStatsAnalysisExecutor extends OskarAnalysisExecutor { + + private Path outputFile; + private String study; + private String cohort; + private List samples; + private Query variantsQuery; + + public VariantStatsAnalysisExecutor() { + } + + public VariantStatsAnalysisExecutor(ObjectMap executorParams, Path outDir) { + this(null, executorParams, outDir); + } + + public VariantStatsAnalysisExecutor(String cohort, ObjectMap executorParams, Path outDir) { + setUp(executorParams, outDir); + this.cohort = cohort; + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder("VariantStatsAnalysisExecutor{"); + sb.append("cohort='").append(cohort).append('\''); + sb.append(", executorParams=").append(executorParams); + sb.append(", outDir=").append(outDir); + sb.append('}'); + return sb.toString(); + } + + public String getStudy() { + return study; + } + + public VariantStatsAnalysisExecutor setStudy(String study) { + this.study = study; + return this; + } + + public String getCohort() { + return cohort; + } + + public VariantStatsAnalysisExecutor setCohort(String cohort) { + this.cohort = cohort; + return this; + } + + public List getSamples() { + return samples; + } + + public VariantStatsAnalysisExecutor setSamples(List samples) { + this.samples = samples; + return this; + } + + public Path getOutputFile() { + return outputFile; + } + + public VariantStatsAnalysisExecutor setOutputFile(Path outputFile) { + this.outputFile = outputFile; + return this; + } + + public VariantStatsAnalysisExecutor setVariantsQuery(Query variantsQuery) { + this.variantsQuery = variantsQuery; + return this; + } + + public Query getVariantsQuery() { + return variantsQuery; + } +} diff --git a/oskar-spark/src/main/java/org/opencb/oskar/spark/variant/analysis/executors/VariantStatsSparkParquetAnalysisExecutor.java b/oskar-spark/src/main/java/org/opencb/oskar/spark/variant/analysis/executors/VariantStatsSparkParquetAnalysisExecutor.java index 73a9d3c..a963c77 100644 --- a/oskar-spark/src/main/java/org/opencb/oskar/spark/variant/analysis/executors/VariantStatsSparkParquetAnalysisExecutor.java +++ b/oskar-spark/src/main/java/org/opencb/oskar/spark/variant/analysis/executors/VariantStatsSparkParquetAnalysisExecutor.java @@ -6,19 +6,16 @@ import org.apache.spark.sql.types.StructField; import org.opencb.commons.datastore.core.ObjectMap; import org.opencb.oskar.analysis.exceptions.AnalysisException; -import org.opencb.oskar.analysis.result.FileResult; import org.opencb.oskar.analysis.variant.stats.VariantStatsAnalysis; -import org.opencb.oskar.analysis.variant.stats.VariantStatsExecutor; +import org.opencb.oskar.analysis.variant.stats.VariantStatsAnalysisExecutor; import org.opencb.oskar.core.annotations.AnalysisExecutor; import org.opencb.oskar.spark.commons.OskarException; import org.opencb.oskar.spark.variant.Oskar; import org.opencb.oskar.spark.variant.analysis.transformers.VariantStatsTransformer; -import java.io.File; import java.io.FileNotFoundException; import java.io.PrintWriter; import java.nio.file.Path; -import java.nio.file.Paths; import static org.apache.spark.sql.functions.col; import static org.apache.spark.sql.functions.explode; @@ -28,7 +25,7 @@ analysis = VariantStatsAnalysis.ID, source= AnalysisExecutor.Source.PARQUET_FILE, framework = AnalysisExecutor.Framework.SPARK) -public class VariantStatsSparkParquetAnalysisExecutor extends VariantStatsExecutor { +public class VariantStatsSparkParquetAnalysisExecutor extends VariantStatsAnalysisExecutor { public VariantStatsSparkParquetAnalysisExecutor() { } @@ -40,7 +37,7 @@ public VariantStatsSparkParquetAnalysisExecutor(String cohort, ObjectMap executo @Override public void exec() throws AnalysisException { String parquetFilename = getExecutorParams().getString("FILE"); - String studyId = getExecutorParams().getString("STUDY_ID"); + String studyId = getStudy(); String master = getExecutorParams().getString("MASTER"); // Prepare input dataset from the input parquet file @@ -77,10 +74,10 @@ public void exec() throws AnalysisException { line.append(field.name()); } - String outFilename = getOutDir() + "/variant_stats.txt"; + Path outFilename = getOutputFile(); PrintWriter pw; try { - pw = new PrintWriter(outFilename); + pw = new PrintWriter(outFilename.toFile()); } catch (FileNotFoundException e) { throw new AnalysisException("Error creating output file: " + outFilename, e); } @@ -89,9 +86,6 @@ public void exec() throws AnalysisException { SparkAnalysisExecutorUtils.writeRows(outDf.toLocalIterator(), pw); pw.close(); - - if (new File(outFilename).exists()) { - arm.addFile(Paths.get(outFilename), FileResult.FileType.TAB_SEPARATED); - } } + }