Skip to content

Commit

Permalink
app: implement analysis command line for PLINK #128
Browse files Browse the repository at this point in the history
  • Loading branch information
jtarraga committed Sep 19, 2017
1 parent b9d862a commit b1d7412
Show file tree
Hide file tree
Showing 7 changed files with 216 additions and 47 deletions.
Original file line number Diff line number Diff line change
@@ -1,97 +1,194 @@
package org.opencb.hpg.bigdata.analysis.variant.wrappers;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import org.apache.commons.lang.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.SparkSession;
import org.opencb.biodata.models.metadata.Individual;
import org.opencb.biodata.models.metadata.Sample;
import org.opencb.biodata.models.variant.Variant;
import org.opencb.biodata.models.variant.metadata.VariantStudyMetadata;
import org.opencb.biodata.tools.variant.converters.VCFExporter;
import org.opencb.biodata.tools.variant.metadata.VariantMetadataManager;
import org.opencb.commons.datastore.core.Query;
import org.opencb.commons.datastore.core.QueryOptions;
import org.opencb.biodata.tools.variant.metadata.VariantMetadataUtils;
import org.opencb.hpg.bigdata.analysis.AnalysisExecutorException;
import org.opencb.hpg.bigdata.analysis.exceptions.AnalysisToolException;
import org.opencb.hpg.bigdata.analysis.tools.Executor;
import org.opencb.hpg.bigdata.analysis.tools.Status;
import org.opencb.hpg.bigdata.analysis.tools.ToolManager;
import org.opencb.hpg.bigdata.analysis.variant.FilterParameters;
import org.opencb.hpg.bigdata.core.lib.SparkConfCreator;
import org.opencb.hpg.bigdata.core.lib.VariantDataset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.FileNotFoundException;
import java.io.PrintWriter;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

public class PlinkWrapper extends VariantAnalysisWrapper {

private Query query;
private QueryOptions queryOptions;
private String inFilename;
private String metaFilename;
private FilterParameters filterParams;
private Map<String, String> plinkParams;

private Logger logger;

public PlinkWrapper(String studyId) {
public PlinkWrapper(String studyId, String inFilename, String metaFilename,
FilterParameters filterParams, Map<String, String> plinkParams) {
super(studyId);
this.inFilename = inFilename;
this.metaFilename = metaFilename;
this.filterParams = filterParams;
this.plinkParams = plinkParams;

this.logger = LoggerFactory.getLogger(PlinkWrapper.class);
}


public void execute() {
public void execute() throws AnalysisExecutorException {
// Sanity check
if (binPath == null || !binPath.toFile().exists()) {
String msg = "PLINK binary path is missing or does not exist: '" + binPath + "'.";
logger.error(msg);
throw new AnalysisExecutorException(msg);
}

String inputAvroFilename = query.getString("input");
String tmpVcfFilename = query.get("outdir") + "/tmp.vcf";
String metaFilename = inputAvroFilename + ".meta.json";
// Get output dir
Path outDir = Paths.get("/tmp");
if (plinkParams.get("out") != null) {
outDir = Paths.get(plinkParams.get("out")).getParent();
}

// Generate VCF file by calling VCF exporter from query and query options
VariantMetadataManager manager = new VariantMetadataManager();
try {
manager.load(Paths.get(metaFilename));

SparkConf sparkConf = SparkConfCreator.getConf("tool plink", "local", 1, true);
SparkConf sparkConf = SparkConfCreator.getConf("PLINK", "local", 1, true);
SparkSession sparkSession = new SparkSession(new SparkContext(sparkConf));

VariantDataset vd = new VariantDataset(sparkSession);
vd.load(inputAvroFilename);
vd.load(inFilename);
vd.createOrReplaceTempView("vcf");

//vd.regionFilter(new Region("22:16050114-16050214"));
//vd.sampleFilter("GT", "5:0|1");

// out filename
VariantStudyMetadata studyMetadata = manager.getVariantMetadata().getStudies().get(0);
VCFExporter vcfExporter = new VCFExporter(studyMetadata);
vcfExporter.open(Paths.get(tmpVcfFilename));
exportPedMapFile(vd, studyMetadata, outDir + "/plink");

vcfExporter.export(vd.iterator());

// close everything
vcfExporter.close();
// close
sparkSession.stop();
} catch (Exception e) {
logger.error("Error executing PLINK tool when retrieving variants to VCF file: {}", e.getMessage());
e.printStackTrace();
logger.error("Error executing PLINK tool when retrieving variants to PED and MAP files: {}", e.getMessage());
return;
}

// Execute PLINK
StringBuilder sb = new StringBuilder();
sb.append(binPath);
sb.append(" --file ").append(outDir).append("/plink");
for (String key : plinkParams.keySet()) {
sb.append(" --").append(key);
String value = plinkParams.get(key);
if (!StringUtils.isEmpty(value)) {
sb.append(" ").append(value);
}
}
try {
Path tmp = Paths.get("/tmp");
Path plinkPath = Paths.get("/tmp/plink");
ToolManager toolManager = new ToolManager(plinkPath);
Executor.execute(sb.toString(), outDir, true);
} catch (AnalysisToolException e) {
logger.error(e.getMessage());
throw new AnalysisExecutorException(e);
}
}

Map<String, String> params = new HashMap<>();
params.put("input", tmpVcfFilename);
//params.put("output", "/tmp/test.bam.bai");
public void exportPedMapFile(VariantDataset variantDataset, VariantStudyMetadata studyMetadata,
String prefix) throws FileNotFoundException {
Path pedPath = Paths.get(prefix + ".ped");
Path mapPath = Paths.get(prefix + ".map");

StringBuilder sb = new StringBuilder();
PrintWriter pedWriter = new PrintWriter(pedPath.toFile());
PrintWriter mapWriter = new PrintWriter(mapPath.toFile());
Iterator<Variant> iterator = variantDataset.iterator();

List<String> sampleNames = VariantMetadataUtils.getSampleNames(studyMetadata);
//List<String> ms = new ArrayList<>(sampleNames.size());
StringBuilder[] markers = new StringBuilder[sampleNames.size()];
while (iterator.hasNext()) {
Variant variant = iterator.next();
// genotypes
List<List<String>> sampleData = variant.getStudiesMap().get(studyMetadata.getId()).getSamplesData();
assert(sampleData.size() == sampleNames.size());
for (int i = 0; i < sampleData.size(); i++) {
String[] gt = sampleData.get(i).get(0).split("[|/]");
if (markers[i] == null) {
markers[i] = new StringBuilder();
}
//if (markers.get(i) == null) {
// markers.set(i, "");
//}
//markers[i].set(i, markers.get(i) + "\t" + gt[0] + "\t" + gt[1]);
markers[i].append("\t"
+ (gt[0].equals("1") ? variant.getAlternate() : variant.getReference())
+ "\t"
+ (gt[1].equals("1") ? variant.getAlternate() : variant.getReference()));
}

// map file line
mapWriter.println(variant.getChromosome() + "\t" + variant.getId() + "\t0\t" + variant.getStart());
}

String commandLine = toolManager.createCommandLine("plink", "index", params);
System.out.println(commandLine);
// ped file line
for (int i = 0; i < sampleNames.size(); i++) {
sb.setLength(0);
String sampleName = sampleNames.get(i);
Individual individual = getIndividualBySampleName(sampleName, studyMetadata);
if (individual == null) {
// sample not found, what to do??
sb.append(0).append("\t");
sb.append(sampleName).append("\t");
sb.append(0).append("\t");
sb.append(0).append("\t");
sb.append(0).append("\t");
sb.append(0);
} else {
int sex = org.opencb.biodata.models.core.pedigree.Individual.Sex
.getEnum(individual.getSex()).getValue();
int phenotype = org.opencb.biodata.models.core.pedigree.Individual.AffectionStatus
.getEnum(individual.getPhenotype()).getValue();
sb.append(individual.getFamily() == null ? 0 : individual.getFamily()).append("\t");
sb.append(sampleName).append("\t");
sb.append(individual.getFather() == null ? 0 : individual.getFather()).append("\t");
sb.append(individual.getMother() == null ? 0 : individual.getMother()).append("\t");
sb.append(sex).append("\t");
sb.append(phenotype);
}
sb.append(markers[i]);
pedWriter.println(sb.toString());
}

Executor.execute(commandLine, tmp, true);
// close
pedWriter.close();
mapWriter.close();
}

ObjectReader reader = new ObjectMapper().reader(Status.class);
Status status = reader.readValue(tmp.resolve("status.json").toFile());
} catch (Exception e) {
logger.error("Error executing PLINK command line: {}", e.getMessage());
return;

private Individual getIndividualBySampleName(String sampleName, VariantStudyMetadata studyMetadata) {
for (Individual individual: studyMetadata.getIndividuals()) {
for (Sample sample: individual.getSamples()) {
if (sampleName.equals(sample.getId())) {
return individual;
}
}
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public LocalCliOptionsParser() {
variantSubCommands.addCommand("query", variantCommandOptions.queryVariantCommandOptions);
variantSubCommands.addCommand("metadata", variantCommandOptions.metadataVariantCommandOptions);
variantSubCommands.addCommand("rvtests", variantCommandOptions.rvtestsVariantCommandOptions);
variantSubCommands.addCommand("plink", variantCommandOptions.plinkVariantCommandOptions);

toolCommandOptions = new ToolCommandOptions(commonCommandOptions, jcommander);
jcommander.addCommand("tool", toolCommandOptions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.opencb.biodata.models.variant.avro.VariantAvro;
import org.opencb.biodata.tools.variant.metadata.VariantMetadataManager;
import org.opencb.commons.utils.FileUtils;
import org.opencb.hpg.bigdata.analysis.variant.wrappers.PlinkWrapper;
import org.opencb.hpg.bigdata.analysis.variant.wrappers.RvTestsWrapper;
import org.opencb.hpg.bigdata.app.cli.CommandExecutor;
import org.opencb.hpg.bigdata.app.cli.local.CliUtils;
Expand Down Expand Up @@ -96,6 +97,9 @@ public void execute() throws Exception {
case "rvtests":
rvtests();
break;
case "plink":
plink();
break;
default:
logger.error("Variant subcommand '" + subCommandString + "' not valid");
break;
Expand Down Expand Up @@ -404,7 +408,6 @@ public void metadata() throws Exception {
}
}


public void rvtests() throws Exception {
RvTestsWrapper rvtests = new RvTestsWrapper(variantCommandOptions.rvtestsVariantCommandOptions.datasetId,
variantCommandOptions.rvtestsVariantCommandOptions.inFilename,
Expand All @@ -420,4 +423,20 @@ public void rvtests() throws Exception {
rvtests.setBinPath(Paths.get(binPath));
rvtests.execute();
}

public void plink() throws Exception {
PlinkWrapper plink = new PlinkWrapper(variantCommandOptions.plinkVariantCommandOptions.datasetId,
variantCommandOptions.plinkVariantCommandOptions.inFilename,
variantCommandOptions.plinkVariantCommandOptions.inFilename + ".meta.json",
variantCommandOptions.plinkVariantCommandOptions.filterParameters,
variantCommandOptions.plinkVariantCommandOptions.plinkParams);

// Get the binary path from input parameter
String binPath = variantCommandOptions.plinkVariantCommandOptions.binPath;
if (StringUtils.isEmpty(binPath)) {
binPath = "plink";
}
plink.setBinPath(Paths.get(binPath));
plink.execute();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public class VariantCommandOptions {
public QueryVariantCommandOptions queryVariantCommandOptions;
public MetadataVariantCommandOptions metadataVariantCommandOptions;
public RvTestsVariantCommandOptions rvtestsVariantCommandOptions;
public PlinkVariantCommandOptions plinkVariantCommandOptions;

public LocalCliOptionsParser.CommonCommandOptions commonCommandOptions;
public FilterParameters commonFilterOptions;
Expand All @@ -38,6 +39,7 @@ public VariantCommandOptions(LocalCliOptionsParser.CommonCommandOptions commonCo
this.queryVariantCommandOptions = new QueryVariantCommandOptions();
this.metadataVariantCommandOptions = new MetadataVariantCommandOptions();
this.rvtestsVariantCommandOptions = new RvTestsVariantCommandOptions();
this.plinkVariantCommandOptions = new PlinkVariantCommandOptions();
}

public VariantCommandOptions(LocalCliOptionsParser.CommonCommandOptions commonCommandOptions,
Expand All @@ -53,6 +55,7 @@ public VariantCommandOptions(LocalCliOptionsParser.CommonCommandOptions commonCo
this.queryVariantCommandOptions = new QueryVariantCommandOptions();
this.metadataVariantCommandOptions = new MetadataVariantCommandOptions();
this.rvtestsVariantCommandOptions = new RvTestsVariantCommandOptions();
this.plinkVariantCommandOptions = new PlinkVariantCommandOptions();
}

@Parameters(commandNames = {"convert"}, commandDescription = "Convert gVCF/VCF files to different big data"
Expand Down Expand Up @@ -361,7 +364,30 @@ public class RvTestsVariantCommandOptions {
@DynamicParameter(names = "-D", description = "RvTests parameters")
public Map<String, String> rvtestsParams = new HashMap<>();

@Parameter(names = {"--rvtest-path"}, description = "Path to the RvTest executable.", arity = 1)
@Parameter(names = {"--rvtests-path"}, description = "Path to the RvTests executable.", arity = 1)
public String binPath = null;
}

@Parameters(commandNames = {"plink"}, commandDescription = "Execute the 'plink' program.")
public class PlinkVariantCommandOptions {

@ParametersDelegate
public LocalCliOptionsParser.CommonCommandOptions commonOptions = commonCommandOptions;

@ParametersDelegate
public FilterParameters filterParameters = commonFilterOptions;

@Parameter(names = {"--dataset"}, description = "Target dataset.", arity = 1)
public String datasetId = null;

@Parameter(names = {"-i", "--input"}, description = "Input file name (in Avro/Parquet file format).",
required = true, arity = 1)
public String inFilename;

@DynamicParameter(names = "-D", description = "PLINK parameters")
public Map<String, String> plinkParams = new HashMap<>();

@Parameter(names = {"--plink-path"}, description = "Path to the PLINK executable.", arity = 1)
public String binPath = null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package org.opencb.hpg.bigdata.app.cli.local;

import org.junit.Test;

public class VariantPlinkCLITest {
@Test
public void assoc() {
try {
VariantRvTestsCLITest cli = new VariantRvTestsCLITest();
cli.init();

StringBuilder commandLine = new StringBuilder();
commandLine.append(" variant plink");
commandLine.append(" --log-level ERROR");
commandLine.append(" --dataset ").append(cli.datasetName);
commandLine.append(" -i ").append(cli.avroPath);
commandLine.append(" --plink-path ../../../soft/plink/plink");
commandLine.append(" -Dassoc=");
commandLine.append(" -Dout=/tmp/out.plink");
VariantQueryCLITest.execute(commandLine.toString());
} catch (Exception e) {
e.printStackTrace();
}
}

}
6 changes: 3 additions & 3 deletions hpg-bigdata-app/src/test/resources/example.vcf
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@
##source=1000GenomesPhase3Pipeline
##FORMAT=<ID=GT,Number=1,Type=String,Description="Genotype">
#CHROM POS ID REF ALT QUAL FILTER INFO FORMAT P1 P2 P3 P4 P5 P6 P7 P8 P9
1 1 . A G 100 . . GT 0/1 0/0 0/0 1/1 0/1 0/0 0/1 0/1 0/0
1 2 . A G 100 . . GT 0/0 0/0 0/0 0/0 0/0 0/0 0/1 0/0 0/1
1 3 . A G 100 . . GT 0/0 0/0 0/0 0/0 0/1 0/0 0/0 0/0 0/0
1 1 rs1 A G 100 . . GT 0/1 0/0 0/0 1/1 0/1 0/0 0/1 0/1 0/0
1 2 rs2 A G 100 . . GT 0/0 0/0 0/0 0/0 0/0 0/0 0/1 0/0 0/1
1 3 rs3 A G 100 . . GT 0/0 0/0 0/0 0/0 0/1 0/0 0/0 0/0 0/0
8 changes: 4 additions & 4 deletions hpg-bigdata-app/src/test/resources/test.vcf
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@
##INFO=<ID=DP,Number=1,Type=Integer,Description="Total read depth">
##INFO=<ID=AA,Number=1,Type=String,Description="Ancestral Allele. Format: AA|REF|ALT|IndelType. AA: Ancestral allele, REF:Reference Allele, ALT:Alternate Allele, IndelType:Type of Indel (REF, ALT and IndelType are only defined for indels)">
#CHROM POS ID REF ALT QUAL FILTER INFO FORMAT 1 2 3 4 5 6
22 16050075 . A G 90 PASS AC=1;AF=0.000199681;AN=5008;NS=2504;DP=8012;EAS_AF=0;AMR_AF=0;AFR_AF=0;EUR_AF=0;SAS_AF=0.001;AA=.||| GT 0|0 1|0 0|1 0|0 0|0 1|0
22 16050115 . G A 100 PASS AC=32;AF=0.00638978;AN=5008;NS=2504;DP=11468;EAS_AF=0;AMR_AF=0.0014;AFR_AF=0.0234;EUR_AF=0;SAS_AF=0;AA=.||| GT 0|0 1|0 0|0 0|0 1|1 0|1
22 16050213 . C T 80 PASS AC=38;AF=0.00758786;AN=5008;NS=2504;DP=15092;EAS_AF=0;AMR_AF=0.0014;AFR_AF=0.0272;EUR_AF=0.001;SAS_AF=0;AA=.||| GT 0|0 0|0 1|1 0|0 0|0 0|1
22 16050568 . C A 100 PASS AC=2;AF=0.000399361;AN=5008;NS=2504;DP=21258;EAS_AF=0.002;AMR_AF=0;AFR_AF=0;EUR_AF=0;SAS_AF=0;AA=.||| GT 0|1 0|1 0|0 0|0 0|1 1|1
22 16050075 rs1 A G 90 PASS AC=1;AF=0.000199681;AN=5008;NS=2504;DP=8012;EAS_AF=0;AMR_AF=0;AFR_AF=0;EUR_AF=0;SAS_AF=0.001;AA=.||| GT 0|0 1|0 0|1 0|0 0|0 1|0
22 16050115 rs2 G A 100 PASS AC=32;AF=0.00638978;AN=5008;NS=2504;DP=11468;EAS_AF=0;AMR_AF=0.0014;AFR_AF=0.0234;EUR_AF=0;SAS_AF=0;AA=.||| GT 0|0 1|0 0|0 0|0 1|1 0|1
22 16050213 rs3 C T 80 PASS AC=38;AF=0.00758786;AN=5008;NS=2504;DP=15092;EAS_AF=0;AMR_AF=0.0014;AFR_AF=0.0272;EUR_AF=0.001;SAS_AF=0;AA=.||| GT 0|0 0|0 1|1 0|0 0|0 0|1
22 16050568 rs4 C A 100 PASS AC=2;AF=0.000399361;AN=5008;NS=2504;DP=21258;EAS_AF=0.002;AMR_AF=0;AFR_AF=0;EUR_AF=0;SAS_AF=0;AA=.||| GT 0|1 0|1 0|0 0|0 0|1 1|1

0 comments on commit b1d7412

Please sign in to comment.