From 3274f3ce6aad01af04c05065f4906a9e0dd84e7d Mon Sep 17 00:00:00 2001 From: seb Date: Fri, 2 Jul 2021 13:16:31 +0200 Subject: [PATCH 1/5] Init commit to add meta-imputation files to zip archive --- files/minimac4.yaml | 9 +++++ .../steps/CompressionEncryption.java | 38 +++++++++++++++++++ .../steps/imputation/ImputationMapper.java | 12 ++++++ .../steps/vcf/VcfChunkOutput.java | 10 +++++ .../imputationserver/util/ExportObject.java | 20 ++++++++++ 5 files changed, 89 insertions(+) diff --git a/files/minimac4.yaml b/files/minimac4.yaml index ba92c8a2..8b994cae 100644 --- a/files/minimac4.yaml +++ b/files/minimac4.yaml @@ -118,6 +118,15 @@ workflow: true: yes false: no visible: true + + - id: meta + description: Generate Meta-imputation file + type: checkbox + value: no + values: + true: yes + false: no + visible: true - id: myseparator type: separator diff --git a/src/main/java/genepi/imputationserver/steps/CompressionEncryption.java b/src/main/java/genepi/imputationserver/steps/CompressionEncryption.java index 24d4b782..680d93e3 100644 --- a/src/main/java/genepi/imputationserver/steps/CompressionEncryption.java +++ b/src/main/java/genepi/imputationserver/steps/CompressionEncryption.java @@ -55,6 +55,7 @@ public boolean run(WorkflowContext context) { String outputScores = context.get("outputScores"); String localOutput = context.get("local"); String aesEncryption = context.get("aesEncryption"); + String meta = context.get("meta"); String mode = context.get("mode"); String password = context.get("password"); @@ -112,14 +113,18 @@ public boolean run(WorkflowContext context) { List data = new Vector(); List header = new Vector(); List info = new Vector(); + List dataMeta = new Vector(); + List headerMeta = new Vector(); header = findFiles(folder, ".header.dose.vcf.gz"); + headerMeta = findFiles(folder, ".header.empiricalDose.vcf.gz"); if (phasingOnly) { data = findFiles(folder, ".phased.vcf.gz"); } else { data = findFiles(folder, ".data.dose.vcf.gz"); info = findFiles(folder, ".info"); + dataMeta = findFiles(folder, ".data.empiricalDose.vcf.gz"); } // combine all X. to one folder @@ -145,6 +150,14 @@ public boolean run(WorkflowContext context) { currentInfoList.addAll(info); export.setInfoFiles(currentInfoList); + ArrayList currentHeaderMetaList = export.getHeaderMetaFiles(); + currentHeaderMetaList.addAll(headerMeta); + export.setHeaderMetaFiles(currentHeaderMetaList); + + ArrayList currentDataMetaList = export.getDataMetaFiles(); + currentDataMetaList.addAll(dataMeta); + export.setDataMetaFiles(currentDataMetaList); + chromosomes.put(name, export); } @@ -174,6 +187,7 @@ public boolean run(WorkflowContext context) { // resort for chrX only if (name.equals("X")) { + Collections.sort(entry.getDataMetaFiles(), new ChrXComparator()); Collections.sort(entry.getDataFiles(), new ChrXComparator()); Collections.sort(entry.getInfoFiles(), new ChrXComparator()); } @@ -184,12 +198,16 @@ public boolean run(WorkflowContext context) { // output files String dosageOutput; + String dosageMetaOutput = null; + MergedVcfFile vcfFileMeta = null; String infoOutput = ""; if (phasingOnly) { dosageOutput = FileUtil.path(temp, "chr" + name + ".phased.vcf.gz"); } else { dosageOutput = FileUtil.path(temp, "chr" + name + ".dose.vcf.gz"); + dosageMetaOutput = FileUtil.path(temp, "chr" + name + ".empiricalDose.vcf.gz"); + vcfFileMeta = new MergedVcfFile(dosageMetaOutput); infoOutput = FileUtil.path(temp, "chr" + name + ".info.gz"); FileMerger.mergeAndGzInfo(entry.getInfoFiles(), infoOutput); } @@ -249,6 +267,22 @@ public boolean run(WorkflowContext context) { HdfsUtil.delete(file); } + //combine meta files + if (meta != null && meta.equals("yes")) { + String headerMetaFile = entry.getHeaderMetaFiles().get(0); + vcfFileMeta.addFile(HdfsUtil.open(headerMetaFile)); + + // add meta data files + if (vcfFileMeta != null) { + for (String file : entry.getDataMetaFiles()) { + context.println("Read file " + file); + vcfFileMeta.addFile(HdfsUtil.open(file)); + HdfsUtil.delete(file); + } + vcfFileMeta.close(); + } + } + vcfFile.close(); if (sanityCheck.equals("yes") && lastChromosome) { @@ -273,6 +307,10 @@ public boolean run(WorkflowContext context) { // create zip file ArrayList files = new ArrayList(); files.add(new File(dosageOutput)); + + if (meta != null && meta.equals("yes")) { + files.add(new File(dosageMetaOutput)); + } if (!phasingOnly) { files.add(new File(infoOutput)); diff --git a/src/main/java/genepi/imputationserver/steps/imputation/ImputationMapper.java b/src/main/java/genepi/imputationserver/steps/imputation/ImputationMapper.java index e914b12c..cbfed7ff 100644 --- a/src/main/java/genepi/imputationserver/steps/imputation/ImputationMapper.java +++ b/src/main/java/genepi/imputationserver/steps/imputation/ImputationMapper.java @@ -303,6 +303,18 @@ public void map(LongWritable key, Text value, Context context) throws IOExceptio FileMerger.splitIntoHeaderAndData(outputChunk.getImputedVcfFilename(), outHeader, outData, imputationParameters); + + // store vcf file (remove header) + BgzipSplitOutputStream outDataMeta = new BgzipSplitOutputStream( + HdfsUtil.create(HdfsUtil.path(output, chunk + ".data.empiricalDose.vcf.gz"))); + + BgzipSplitOutputStream outHeaderMeta = new BgzipSplitOutputStream( + HdfsUtil.create(HdfsUtil.path(output, chunk + ".header.empiricalDose.vcf.gz"))); + + FileMerger.splitIntoHeaderAndData(outputChunk.getMetaVcfFilename(), outHeaderMeta, outDataMeta, + imputationParameters); + + long end = System.currentTimeMillis(); statistics.setImportTime((end - start) / 1000); diff --git a/src/main/java/genepi/imputationserver/steps/vcf/VcfChunkOutput.java b/src/main/java/genepi/imputationserver/steps/vcf/VcfChunkOutput.java index 1df3f44b..d8c32390 100644 --- a/src/main/java/genepi/imputationserver/steps/vcf/VcfChunkOutput.java +++ b/src/main/java/genepi/imputationserver/steps/vcf/VcfChunkOutput.java @@ -6,6 +6,7 @@ public class VcfChunkOutput extends VcfChunk { private String prefix; private String imputedVcfFilename; + private String metaVcfFilename; private String phasedVcfFilename; private String scoreFilename; @@ -15,6 +16,7 @@ public VcfChunkOutput(VcfChunk chunk, String outputFolder) { prefix = FileUtil.path(outputFolder, chunk.getId()); imputedVcfFilename = prefix + ".dose.vcf.gz"; + metaVcfFilename = prefix + ".empiricalDose.vcf.gz"; infoFilename = prefix + ".info"; phasedVcfFilename = prefix + ".phased.vcf.gz"; scoreFilename = prefix + ".scores.csv"; @@ -26,6 +28,14 @@ public VcfChunkOutput(VcfChunk chunk, String outputFolder) { setPhased(chunk.isPhased()); } + public String getMetaVcfFilename() { + return metaVcfFilename; + } + + public void setMetaVcfFilename(String metaVcfFilename) { + this.metaVcfFilename = metaVcfFilename; + } + public String getPrefix() { return prefix; } diff --git a/src/main/java/genepi/imputationserver/util/ExportObject.java b/src/main/java/genepi/imputationserver/util/ExportObject.java index c089465e..0bc70bed 100644 --- a/src/main/java/genepi/imputationserver/util/ExportObject.java +++ b/src/main/java/genepi/imputationserver/util/ExportObject.java @@ -5,13 +5,33 @@ public class ExportObject { private ArrayList dataFiles; + private ArrayList dataMetaFiles; private ArrayList headerFiles; + private ArrayList headerMetaFiles; private ArrayList infoFiles; public ExportObject() { dataFiles = new ArrayList(); headerFiles = new ArrayList(); infoFiles = new ArrayList(); + headerMetaFiles = new ArrayList(); + dataMetaFiles = new ArrayList(); + } + + public ArrayList getDataMetaFiles() { + return dataMetaFiles; + } + + public void setDataMetaFiles(ArrayList dataMetaFiles) { + this.dataMetaFiles = dataMetaFiles; + } + + public ArrayList getHeaderMetaFiles() { + return headerMetaFiles; + } + + public void setHeaderMetaFiles(ArrayList headerMetaFiles) { + this.headerMetaFiles = headerMetaFiles; } public ArrayList getDataFiles() { From bb5c2925dd38985d76e272471c1d26827785e314 Mon Sep 17 00:00:00 2001 From: seb Date: Fri, 2 Jul 2021 16:06:53 +0200 Subject: [PATCH 2/5] Add meta-imputation testcase --- .../steps/ImputationTest.java | 51 +++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/src/test/java/genepi/imputationserver/steps/ImputationTest.java b/src/test/java/genepi/imputationserver/steps/ImputationTest.java index 525471e6..5fd434aa 100644 --- a/src/test/java/genepi/imputationserver/steps/ImputationTest.java +++ b/src/test/java/genepi/imputationserver/steps/ImputationTest.java @@ -102,6 +102,57 @@ public void testPipelineWithPhased() throws IOException, ZipException { // FileUtil.deleteDirectory("test-data/tmp"); } + + @Test + public void testPipelineWithPhasedAndMetaOption() throws IOException, ZipException { + + String configFolder = "test-data/configs/hapmap-chr20"; + String inputFolder = "test-data/data/chr20-phased"; + + // create workflow context + + WorkflowTestContext context = buildContext(inputFolder, "hapmap2"); + context.setInput("meta", "yes"); + + // run qc to create chunkfile + QcStatisticsMock qcStats = new QcStatisticsMock(configFolder); + boolean result = run(context, qcStats); + + assertTrue(result); + assertTrue(context.hasInMemory("Remaining sites in total: 7,735")); + + // add panel to hdfs + importRefPanel(FileUtil.path(configFolder, "ref-panels")); + // importMinimacMap("test-data/B38_MAP_FILE.map"); + importBinaries("files/bin"); + + // run imputation + ImputationMinimac3Mock imputation = new ImputationMinimac3Mock(configFolder); + result = run(context, imputation); + assertTrue(result); + + // run export + CompressionEncryptionMock export = new CompressionEncryptionMock("files"); + result = run(context, export); + assertTrue(result); + + ZipFile zipFile = new ZipFile("test-data/tmp/local/chr_20.zip", PASSWORD.toCharArray()); + zipFile.extractAll("test-data/tmp"); + + VcfFile file = VcfFileUtil.load("test-data/tmp/chr20.dose.vcf.gz", 100000000, false); + + VcfFile fileMeta = VcfFileUtil.load("test-data/tmp/chr20.empiricalDose.vcf.gz", 100000000, false); + + //count GENOTYPED from info file + assertEquals(7735, fileMeta.getNoSnps()); + assertEquals("20", file.getChromosome()); + assertEquals(51, file.getNoSamples()); + assertEquals(true, file.isPhased()); + assertEquals(TOTAL_REFPANEL_CHR20_B37 + ONLY_IN_INPUT, file.getNoSnps()); + + // FileUtil.deleteDirectory("test-data/tmp"); + + } @Test public void testPipelineWithPhasedAndEmptyPhasing() throws IOException, ZipException { From b9488b61616f40d461e69294fb678e1b40fb4f74 Mon Sep 17 00:00:00 2001 From: seb Date: Fri, 2 Jul 2021 16:07:41 +0200 Subject: [PATCH 3/5] Prepare release candidate for testing --- files/minimac4.yaml | 2 +- pom.xml | 2 +- .../imputationserver/steps/imputation/ImputationPipeline.java | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/files/minimac4.yaml b/files/minimac4.yaml index 8b994cae..c0a0f570 100644 --- a/files/minimac4.yaml +++ b/files/minimac4.yaml @@ -1,7 +1,7 @@ id: imputationserver name: Genotype Imputation (Minimac4) description: This is the new Michigan Imputation Server Pipeline using Minimac4. Documentation can be found here.

If your input data is GRCh37/hg19 please ensure chromosomes are encoded without prefix (e.g. 20).
If your input data is GRCh38hg38 please ensure chromosomes are encoded with prefix 'chr' (e.g. chr20). -version: 1.5.8 +version: 1.6.0-rc1 website: https://imputationserver.readthedocs.io category: diff --git a/pom.xml b/pom.xml index 30136ffb..8a873bb5 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ genepi imputationserver - 1.5.8 + 1.6.0-rc1 jar University of Michigan Imputation Server diff --git a/src/main/java/genepi/imputationserver/steps/imputation/ImputationPipeline.java b/src/main/java/genepi/imputationserver/steps/imputation/ImputationPipeline.java index e43b6a89..1dbb6f30 100644 --- a/src/main/java/genepi/imputationserver/steps/imputation/ImputationPipeline.java +++ b/src/main/java/genepi/imputationserver/steps/imputation/ImputationPipeline.java @@ -23,7 +23,7 @@ public class ImputationPipeline { - public static final String PIPELINE_VERSION = "michigan-imputationserver-1.5.8"; + public static final String PIPELINE_VERSION = "michigan-imputationserver-1.6.0-rc1"; public static final String IMPUTATION_VERSION = "minimac4-1.0.2"; From c04987504afdfb2a8e778ab8f8a8dfe630d048cf Mon Sep 17 00:00:00 2001 From: Lukas Forer Date: Tue, 6 Jul 2021 12:33:10 +0200 Subject: [PATCH 4/5] Fix testcases with default phasing --- src/main/java/genepi/imputationserver/steps/Imputation.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/main/java/genepi/imputationserver/steps/Imputation.java b/src/main/java/genepi/imputationserver/steps/Imputation.java index ad7b2649..173cbaa6 100644 --- a/src/main/java/genepi/imputationserver/steps/Imputation.java +++ b/src/main/java/genepi/imputationserver/steps/Imputation.java @@ -218,7 +218,9 @@ protected void readConfigFile() { job.setPhasingOnly("false"); } - job.setPhasingEngine(phasing); + if (phasing != null) { + job.setPhasingEngine(phasing); + } job.setInput(result.filename); job.setOutput(HdfsUtil.path(output, chr)); From e0e17b45e3948147afe9451fcd47a4ea476cfe18 Mon Sep 17 00:00:00 2001 From: Lukas Forer Date: Tue, 6 Jul 2021 13:37:34 +0200 Subject: [PATCH 5/5] Refactore chunk merging --- .../steps/CompressionEncryption.java | 297 +++++------------- .../steps/vcf/MergedVcfFile.java | 60 +++- .../imputationserver/util/ExportObject.java | 60 ---- .../imputationserver/util/FileMerger.java | 4 +- .../util/ImputationResults.java | 119 +++++++ .../util/ImputedChromosome.java | 66 ++++ 6 files changed, 324 insertions(+), 282 deletions(-) delete mode 100644 src/main/java/genepi/imputationserver/util/ExportObject.java create mode 100644 src/main/java/genepi/imputationserver/util/ImputationResults.java create mode 100644 src/main/java/genepi/imputationserver/util/ImputedChromosome.java diff --git a/src/main/java/genepi/imputationserver/steps/CompressionEncryption.java b/src/main/java/genepi/imputationserver/steps/CompressionEncryption.java index 680d93e3..c9b12c11 100644 --- a/src/main/java/genepi/imputationserver/steps/CompressionEncryption.java +++ b/src/main/java/genepi/imputationserver/steps/CompressionEncryption.java @@ -1,26 +1,13 @@ package genepi.imputationserver.steps; import java.io.File; -import java.io.IOException; -import java.io.PrintWriter; -import java.io.StringWriter; import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import java.util.Vector; -import genepi.imputationserver.util.*; -import genepi.io.text.LineWriter; import org.apache.commons.io.FileUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import cloudgene.sdk.internal.IExternalWorkspace; import cloudgene.sdk.internal.WorkflowContext; @@ -28,9 +15,15 @@ import genepi.hadoop.HdfsUtil; import genepi.hadoop.command.Command; import genepi.imputationserver.steps.vcf.MergedVcfFile; +import genepi.imputationserver.util.DefaultPreferenceStore; +import genepi.imputationserver.util.FileChecksum; +import genepi.imputationserver.util.FileMerger; +import genepi.imputationserver.util.ImputationResults; +import genepi.imputationserver.util.ImputedChromosome; +import genepi.imputationserver.util.PasswordCreator; +import genepi.imputationserver.util.PgsPanel; import genepi.io.FileUtil; -import genepi.io.text.LineReader; -import genepi.riskscore.App; +import genepi.io.text.LineWriter; import genepi.riskscore.io.MetaFile; import genepi.riskscore.io.ReportFile; import genepi.riskscore.tasks.CreateHtmlReportTask; @@ -38,6 +31,7 @@ import genepi.riskscore.tasks.MergeScoreTask; import lukfor.progress.TaskService; import net.lingala.zip4j.ZipFile; +import net.lingala.zip4j.exception.ZipException; import net.lingala.zip4j.model.ZipParameters; import net.lingala.zip4j.model.enums.AesKeyStrength; import net.lingala.zip4j.model.enums.CompressionLevel; @@ -54,7 +48,7 @@ public boolean run(WorkflowContext context) { String output = context.get("outputimputation"); String outputScores = context.get("outputScores"); String localOutput = context.get("local"); - String aesEncryption = context.get("aesEncryption"); + String aesEncryptionValue = context.get("aesEncryption"); String meta = context.get("meta"); String mode = context.get("mode"); String password = context.get("password"); @@ -66,6 +60,10 @@ public boolean run(WorkflowContext context) { phasingOnly = true; } + boolean mergeMetaFiles = !phasingOnly && (meta != null && meta.equals("yes")); + + boolean aesEncryption = (aesEncryptionValue != null && aesEncryptionValue.equals("yes")); + // read config if mails should be sent String folderConfig = getFolder(CompressionEncryption.class); File jobConfig = new File(FileUtil.path(folderConfig, "job.config")); @@ -102,188 +100,88 @@ public boolean run(WorkflowContext context) { // get sorted directories List folders = HdfsUtil.getDirectories(output); - Map chromosomes = new HashMap(); - - for (String folder : folders) { - - String name = FileUtil.getFilename(folder); - - context.println("Prepare files for chromosome " + name); - - List data = new Vector(); - List header = new Vector(); - List info = new Vector(); - List dataMeta = new Vector(); - List headerMeta = new Vector(); - - header = findFiles(folder, ".header.dose.vcf.gz"); - headerMeta = findFiles(folder, ".header.empiricalDose.vcf.gz"); - - if (phasingOnly) { - data = findFiles(folder, ".phased.vcf.gz"); - } else { - data = findFiles(folder, ".data.dose.vcf.gz"); - info = findFiles(folder, ".info"); - dataMeta = findFiles(folder, ".data.empiricalDose.vcf.gz"); - } - - // combine all X. to one folder - if (name.startsWith("X.")) { - name = "X"; - } - - ExportObject export = chromosomes.get(name); - - if (export == null) { - export = new ExportObject(); - } - - ArrayList currentDataList = export.getDataFiles(); - currentDataList.addAll(data); - export.setDataFiles(currentDataList); - - ArrayList currentHeaderList = export.getHeaderFiles(); - currentHeaderList.addAll(header); - export.setHeaderFiles(currentHeaderList); + ImputationResults imputationResults = new ImputationResults(folders, phasingOnly); + Map imputedChromosomes = imputationResults.getChromosomes(); - ArrayList currentInfoList = export.getInfoFiles(); - currentInfoList.addAll(info); - export.setInfoFiles(currentInfoList); - - ArrayList currentHeaderMetaList = export.getHeaderMetaFiles(); - currentHeaderMetaList.addAll(headerMeta); - export.setHeaderMetaFiles(currentHeaderMetaList); - - ArrayList currentDataMetaList = export.getDataMetaFiles(); - currentDataMetaList.addAll(dataMeta); - export.setDataMetaFiles(currentDataMetaList); - - chromosomes.put(name, export); - - } - - Set chromosomesSet = chromosomes.keySet(); + Set chromosomes = imputedChromosomes.keySet(); boolean lastChromosome = false; int index = 0; - ZipParameters param = new ZipParameters(); - param.setEncryptFiles(true); - param.setEncryptionMethod(EncryptionMethod.ZIP_STANDARD); - String checksumFilename = FileUtil.path(localOutput, "results.md5"); LineWriter writer = new LineWriter(checksumFilename); - for (String name : chromosomesSet) { + for (String name : chromosomes) { index++; - if (index == chromosomesSet.size()) { + if (index == chromosomes.size()) { lastChromosome = true; } - ExportObject entry = chromosomes.get(name); + ImputedChromosome imputedChromosome = imputedChromosomes.get(name); context.println("Export and merge chromosome " + name); - // resort for chrX only - if (name.equals("X")) { - Collections.sort(entry.getDataMetaFiles(), new ChrXComparator()); - Collections.sort(entry.getDataFiles(), new ChrXComparator()); - Collections.sort(entry.getInfoFiles(), new ChrXComparator()); - } - - // create temp fir + // create temp dir String temp = FileUtil.path(localOutput, "temp"); FileUtil.createDirectory(temp); // output files - String dosageOutput; - String dosageMetaOutput = null; - MergedVcfFile vcfFileMeta = null; - String infoOutput = ""; + ArrayList files = new ArrayList(); + + // merge info files + if (!phasingOnly) { + String infoOutput = FileUtil.path(temp, "chr" + name + ".info.gz"); + FileMerger.mergeAndGzInfo(imputedChromosome.getInfoFiles(), infoOutput); + files.add(new File(infoOutput)); + } + + // merge all dosage files + + String dosageOutput; if (phasingOnly) { dosageOutput = FileUtil.path(temp, "chr" + name + ".phased.vcf.gz"); } else { dosageOutput = FileUtil.path(temp, "chr" + name + ".dose.vcf.gz"); - dosageMetaOutput = FileUtil.path(temp, "chr" + name + ".empiricalDose.vcf.gz"); - vcfFileMeta = new MergedVcfFile(dosageMetaOutput); - infoOutput = FileUtil.path(temp, "chr" + name + ".info.gz"); - FileMerger.mergeAndGzInfo(entry.getInfoFiles(), infoOutput); } + files.add(new File(dosageOutput)); MergedVcfFile vcfFile = new MergedVcfFile(dosageOutput); + vcfFile.addHeader(context, imputedChromosome.getHeaderFiles()); - // simple header check - String headerLine = null; - for (String file : entry.getHeaderFiles()) { - - context.println("Read header file " + file); - LineReader reader = null; - try { - reader = new LineReader(HdfsUtil.open(file)); - while (reader.next()) { - String line = reader.get(); - if (line.startsWith("#CHROM")) { - if (headerLine != null) { - if (headerLine.equals(line)) { - context.println(" Header is the same as header of first file."); - } else { - context.println(" ERROR: Header is different as header of first file."); - context.println(headerLine); - context.println(line); - throw new Exception("Different sample order in chunks."); - } - } else { - headerLine = line; - vcfFile.addFile(HdfsUtil.open(file)); - context.println(" Keep this header as first header."); - } - } - - } - if (reader != null) { - reader.close(); - } - HdfsUtil.delete(file); - } catch (Exception e) { - if (reader != null) { - reader.close(); - } - StringWriter errors = new StringWriter(); - e.printStackTrace(new PrintWriter(errors)); - context.println("Error reading header file: " + errors.toString()); - } - } - - if (headerLine == null || headerLine.trim().isEmpty()) { - throw new Exception("No valid header file found"); - } - - // add data files - for (String file : entry.getDataFiles()) { + for (String file : imputedChromosome.getDataFiles()) { context.println("Read file " + file); vcfFile.addFile(HdfsUtil.open(file)); HdfsUtil.delete(file); } - //combine meta files - if (meta != null && meta.equals("yes")) { - String headerMetaFile = entry.getHeaderMetaFiles().get(0); - vcfFileMeta.addFile(HdfsUtil.open(headerMetaFile)); + vcfFile.close(); + + // merge all meta files + if (mergeMetaFiles) { + + context.println("Merging meta files..."); + + String dosageMetaOutput = FileUtil.path(temp, "chr" + name + ".empiricalDose.vcf.gz"); + MergedVcfFile vcfFileMeta = new MergedVcfFile(dosageMetaOutput); + + String headerMetaFile = imputedChromosome.getHeaderMetaFiles().get(0); + context.println("Use header from file " + headerMetaFile); - // add meta data files - if (vcfFileMeta != null) { - for (String file : entry.getDataMetaFiles()) { + vcfFileMeta.addFile(HdfsUtil.open(headerMetaFile)); + + for (String file : imputedChromosome.getDataMetaFiles()) { context.println("Read file " + file); vcfFileMeta.addFile(HdfsUtil.open(file)); HdfsUtil.delete(file); } vcfFileMeta.close(); - } - } - vcfFile.close(); + context.println("Meta files merged."); + + files.add(new File(dosageMetaOutput)); + } if (sanityCheck.equals("yes") && lastChromosome) { context.log("Run tabix on chromosome " + name + "..."); @@ -297,30 +195,13 @@ public boolean run(WorkflowContext context) { context.log("Tabix done."); } - if (aesEncryption != null && aesEncryption.equals("yes")) { - param.setEncryptionMethod(EncryptionMethod.AES); - param.setAesKeyStrength(AesKeyStrength.KEY_STRENGTH_256); - param.setCompressionMethod(CompressionMethod.DEFLATE); - param.setCompressionLevel(CompressionLevel.NORMAL); - } - // create zip file - ArrayList files = new ArrayList(); - files.add(new File(dosageOutput)); - - if (meta != null && meta.equals("yes")) { - files.add(new File(dosageMetaOutput)); - } - - if (!phasingOnly) { - files.add(new File(infoOutput)); - } - String fileName = "chr_" + name + ".zip"; String filePath = FileUtil.path(localOutput, fileName); - ZipFile file = new ZipFile(new File(filePath), password.toCharArray()); - file.addFiles(files, param); + File file = new File(filePath); + createEncryptedZipFile(file, files, password, aesEncryption); + // add checksum to hash file context.log("Creating file checksum for " + filePath); long checksumStart = System.currentTimeMillis(); String checksum = FileChecksum.HashFile(new File(filePath), FileChecksum.Algorithm.MD5); @@ -341,7 +222,7 @@ public boolean run(WorkflowContext context) { context.log("Start file upload: " + filePath); - String url = externalWorkspace.upload("local", file.getFile()); + String url = externalWorkspace.upload("local", file); long end = (System.currentTimeMillis() - start) / 1000; @@ -349,7 +230,7 @@ public boolean run(WorkflowContext context) { context.log("Add " + localOutput + " to custom download"); - String size = FileUtils.byteCountToDisplaySize(file.getFile().length()); + String size = FileUtils.byteCountToDisplaySize(file.length()); context.addDownload("local", fileName, size, url); @@ -431,13 +312,10 @@ public boolean run(WorkflowContext context) { String fileName = "scores.zip"; String filePath = FileUtil.path(localOutput, fileName); - ArrayList files = new ArrayList(); - files.add(new File(outputFileScores)); - - ZipFile file = new ZipFile(new File(filePath), password.toCharArray()); - file.addFiles(files, param); + File file = new File(filePath); + createEncryptedZipFile(file, new File(outputFileScores), password, aesEncryption); - context.println("Exported PGS scores to " + outputFileScores + "."); + context.println("Exported PGS scores to " + fileName + "."); FileUtil.deleteDirectory(temp2); } @@ -497,40 +375,29 @@ public boolean run(WorkflowContext context) { } - private List findFiles(String folder, String pattern) throws IOException { - - Configuration conf = HdfsUtil.getConfiguration(); - - FileSystem fileSystem = FileSystem.get(conf); - Path pathFolder = new Path(folder); - FileStatus[] files = fileSystem.listStatus(pathFolder); - - List dataFiles = new Vector(); - for (FileStatus file : files) { - if (!file.isDir() && !file.getPath().getName().startsWith("_") - && file.getPath().getName().endsWith(pattern)) { - dataFiles.add(file.getPath().toString()); - } + public void createEncryptedZipFile(File file, List files, String password, boolean aesEncryption) + throws ZipException { + ZipParameters param = new ZipParameters(); + param.setEncryptFiles(true); + param.setEncryptionMethod(EncryptionMethod.ZIP_STANDARD); + + if (aesEncryption) { + param.setEncryptionMethod(EncryptionMethod.AES); + param.setAesKeyStrength(AesKeyStrength.KEY_STRENGTH_256); + param.setCompressionMethod(CompressionMethod.DEFLATE); + param.setCompressionLevel(CompressionLevel.NORMAL); } - Collections.sort(dataFiles); - return dataFiles; - } - - class ChrXComparator implements Comparator { - - List definedOrder = Arrays.asList("X.PAR1", "X.nonPAR", "X.PAR2"); - @Override - public int compare(String o1, String o2) { + ZipFile zipFile = new ZipFile(file, password.toCharArray()); + zipFile.addFiles(files, param); - String region = o1.substring(o1.lastIndexOf("/") + 1).split("_")[1]; - - String region2 = o2.substring(o2.lastIndexOf("/") + 1).split("_")[1]; - - return Integer.valueOf(definedOrder.indexOf(region)) - .compareTo(Integer.valueOf(definedOrder.indexOf(region2))); - } + } + public void createEncryptedZipFile(File file, File source, String password, boolean aesEncryption) + throws ZipException { + List files = new Vector(); + files.add(source); + createEncryptedZipFile(file, files, password, aesEncryption); } } diff --git a/src/main/java/genepi/imputationserver/steps/vcf/MergedVcfFile.java b/src/main/java/genepi/imputationserver/steps/vcf/MergedVcfFile.java index 8e1ff3c5..c8201ebb 100644 --- a/src/main/java/genepi/imputationserver/steps/vcf/MergedVcfFile.java +++ b/src/main/java/genepi/imputationserver/steps/vcf/MergedVcfFile.java @@ -1,18 +1,20 @@ package genepi.imputationserver.steps.vcf; -import htsjdk.samtools.util.BlockCompressedStreamConstants; - -import java.io.DataInputStream; -import java.io.File; -import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.List; import org.apache.commons.compress.utils.IOUtils; +import cloudgene.sdk.internal.WorkflowContext; +import genepi.hadoop.HdfsUtil; +import genepi.io.text.LineReader; +import htsjdk.samtools.util.BlockCompressedStreamConstants; + public class MergedVcfFile { private FileOutputStream output; @@ -31,4 +33,52 @@ public void close() throws IOException { output.close(); } + public void addHeader(WorkflowContext context, List files) throws Exception { + // simple header check + String headerLine = null; + for (String file : files) { + + context.println("Read header file " + file); + LineReader reader = null; + try { + reader = new LineReader(HdfsUtil.open(file)); + while (reader.next()) { + String line = reader.get(); + if (line.startsWith("#CHROM")) { + if (headerLine != null) { + if (headerLine.equals(line)) { + context.println(" Header is the same as header of first file."); + } else { + context.println(" ERROR: Header is different as header of first file."); + context.println(headerLine); + context.println(line); + throw new Exception("Different sample order in chunks."); + } + } else { + headerLine = line; + addFile(HdfsUtil.open(file)); + context.println(" Keep this header as first header."); + } + } + + } + if (reader != null) { + reader.close(); + } + HdfsUtil.delete(file); + } catch (Exception e) { + if (reader != null) { + reader.close(); + } + StringWriter errors = new StringWriter(); + e.printStackTrace(new PrintWriter(errors)); + context.println("Error reading header file: " + errors.toString()); + } + } + + if (headerLine == null || headerLine.trim().isEmpty()) { + throw new Exception("No valid header file found"); + } + } + } diff --git a/src/main/java/genepi/imputationserver/util/ExportObject.java b/src/main/java/genepi/imputationserver/util/ExportObject.java deleted file mode 100644 index 0bc70bed..00000000 --- a/src/main/java/genepi/imputationserver/util/ExportObject.java +++ /dev/null @@ -1,60 +0,0 @@ -package genepi.imputationserver.util; - -import java.util.ArrayList; - -public class ExportObject { - - private ArrayList dataFiles; - private ArrayList dataMetaFiles; - private ArrayList headerFiles; - private ArrayList headerMetaFiles; - private ArrayList infoFiles; - - public ExportObject() { - dataFiles = new ArrayList(); - headerFiles = new ArrayList(); - infoFiles = new ArrayList(); - headerMetaFiles = new ArrayList(); - dataMetaFiles = new ArrayList(); - } - - public ArrayList getDataMetaFiles() { - return dataMetaFiles; - } - - public void setDataMetaFiles(ArrayList dataMetaFiles) { - this.dataMetaFiles = dataMetaFiles; - } - - public ArrayList getHeaderMetaFiles() { - return headerMetaFiles; - } - - public void setHeaderMetaFiles(ArrayList headerMetaFiles) { - this.headerMetaFiles = headerMetaFiles; - } - - public ArrayList getDataFiles() { - return dataFiles; - } - - public void setDataFiles(ArrayList dataFiles) { - this.dataFiles = dataFiles; - } - - public ArrayList getHeaderFiles() { - return headerFiles; - } - - public void setHeaderFiles(ArrayList headerFiles) { - this.headerFiles = headerFiles; - } - - public ArrayList getInfoFiles() { - return infoFiles; - } - - public void setInfoFiles(ArrayList infoFiles) { - this.infoFiles = infoFiles; - } -} diff --git a/src/main/java/genepi/imputationserver/util/FileMerger.java b/src/main/java/genepi/imputationserver/util/FileMerger.java index 5b6c727c..d78413af 100644 --- a/src/main/java/genepi/imputationserver/util/FileMerger.java +++ b/src/main/java/genepi/imputationserver/util/FileMerger.java @@ -5,7 +5,7 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; -import java.util.ArrayList; +import java.util.List; import java.util.zip.GZIPOutputStream; import genepi.hadoop.HdfsUtil; @@ -117,7 +117,7 @@ public void close() throws IOException { } - public static void mergeAndGzInfo(ArrayList hdfs, String local) throws IOException { + public static void mergeAndGzInfo(List hdfs, String local) throws IOException { GZIPOutputStream out = new GZIPOutputStream(new FileOutputStream(local)); diff --git a/src/main/java/genepi/imputationserver/util/ImputationResults.java b/src/main/java/genepi/imputationserver/util/ImputationResults.java new file mode 100644 index 00000000..65a505ca --- /dev/null +++ b/src/main/java/genepi/imputationserver/util/ImputationResults.java @@ -0,0 +1,119 @@ +package genepi.imputationserver.util; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Vector; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import genepi.hadoop.HdfsUtil; +import genepi.io.FileUtil; + +public class ImputationResults { + + private Map chromosomes = new HashMap(); + + public ImputationResults(List folders, boolean phasingOnly) throws IOException { + + for (String folder : folders) { + + String chromosomeName = FileUtil.getFilename(folder); + + // combine all X. to one folder + if (chromosomeName.startsWith("X.")) { + chromosomeName = "X"; + } + + ImputedChromosome chromsome = chromosomes.get(chromosomeName); + + if (chromsome == null) { + chromsome = new ImputedChromosome(); + chromosomes.put(chromosomeName, chromsome); + } + + if (phasingOnly) { + + List headerFiles = findFiles(folder, ".header.dose.vcf.gz"); + chromsome.addHeaderFiles(headerFiles); + + List dataFiles = findFiles(folder, ".phased.vcf.gz"); + chromsome.addDataFiles(dataFiles); + + } else { + + List headerFiles = findFiles(folder, ".header.dose.vcf.gz"); + chromsome.addHeaderFiles(headerFiles); + + List dataFiles = findFiles(folder, ".data.dose.vcf.gz"); + chromsome.addDataFiles(dataFiles); + + List infoFiles = findFiles(folder, ".info"); + chromsome.addInfoFiles(infoFiles); + + List headerMetaFiles = findFiles(folder, ".header.empiricalDose.vcf.gz"); + chromsome.addHeaderMetaFiles(headerMetaFiles); + + List dataMetaFiles = findFiles(folder, ".data.empiricalDose.vcf.gz"); + chromsome.addDataMetaFiles(dataMetaFiles); + + } + + // resort for chrX only + if (chromosomeName.equals("X")) { + Collections.sort(chromsome.getDataMetaFiles(), new ChromosomeXComparator()); + Collections.sort(chromsome.getDataFiles(), new ChromosomeXComparator()); + Collections.sort(chromsome.getInfoFiles(), new ChromosomeXComparator()); + } + } + + } + + public Map getChromosomes() { + return chromosomes; + } + + protected List findFiles(String folder, String pattern) throws IOException { + + Configuration conf = HdfsUtil.getConfiguration(); + + FileSystem fileSystem = FileSystem.get(conf); + Path pathFolder = new Path(folder); + FileStatus[] files = fileSystem.listStatus(pathFolder); + + List dataFiles = new Vector(); + for (FileStatus file : files) { + if (!file.isDir() && !file.getPath().getName().startsWith("_") + && file.getPath().getName().endsWith(pattern)) { + dataFiles.add(file.getPath().toString()); + } + } + Collections.sort(dataFiles); + return dataFiles; + } + + class ChromosomeXComparator implements Comparator { + + List definedOrder = Arrays.asList("X.PAR1", "X.nonPAR", "X.PAR2"); + + @Override + public int compare(String o1, String o2) { + + String region = o1.substring(o1.lastIndexOf("/") + 1).split("_")[1]; + + String region2 = o2.substring(o2.lastIndexOf("/") + 1).split("_")[1]; + + return Integer.valueOf(definedOrder.indexOf(region)) + .compareTo(Integer.valueOf(definedOrder.indexOf(region2))); + } + + } + +} diff --git a/src/main/java/genepi/imputationserver/util/ImputedChromosome.java b/src/main/java/genepi/imputationserver/util/ImputedChromosome.java new file mode 100644 index 00000000..797852e6 --- /dev/null +++ b/src/main/java/genepi/imputationserver/util/ImputedChromosome.java @@ -0,0 +1,66 @@ +package genepi.imputationserver.util; + +import java.util.ArrayList; +import java.util.List; +import java.util.Vector; + +public class ImputedChromosome { + + private List dataFiles; + + private List dataMetaFiles; + + private List headerFiles; + + private List headerMetaFiles; + + private List infoFiles; + + public ImputedChromosome() { + dataFiles = new Vector(); + headerFiles = new Vector(); + infoFiles = new Vector(); + headerMetaFiles = new ArrayList(); + dataMetaFiles = new Vector(); + } + + public List getDataMetaFiles() { + return dataMetaFiles; + } + + public void addDataMetaFiles(List dataMetaFiles) { + this.dataMetaFiles.addAll(dataMetaFiles); + } + + public List getHeaderMetaFiles() { + return headerMetaFiles; + } + + public void addHeaderMetaFiles(List headerMetaFiles) { + this.headerMetaFiles.addAll(headerMetaFiles); + } + + public List getDataFiles() { + return dataFiles; + } + + public void addDataFiles(List dataFiles) { + this.dataFiles.addAll(dataFiles); + } + + public List getHeaderFiles() { + return headerFiles; + } + + public void addHeaderFiles(List headerFiles) { + this.headerFiles.addAll(headerFiles); + } + + public List getInfoFiles() { + return infoFiles; + } + + public void addInfoFiles(List infoFiles) { + this.infoFiles.addAll(infoFiles); + } +}