generated from NCBI-Codeathons/codeathon-team-template
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmain.nf
182 lines (130 loc) · 4.93 KB
/
main.nf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
#!/usr/bin/env nextflow
nextflow.enable.dsl = 2
// prints to the screen and to the log
log.info """
██████ ██ ██ ███ ██ ██████ ██ ██ ██ ███████ ████████ ███████ ██████
██ ██ ██ ████ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██
██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ███████ ██ █████ ██████
██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██
██████ ███████ ██ ██ ████ ██████ ███████ ██████ ███████ ██ ███████ ██ ██
ClinCluster (version 0.1.0)
A pipeline for clustering ClinVar condition entries into groups based on whether they
are alternate names for the same underlying condition. The pipeline uses an LLM DBSCAN
to assign cluster indices to each condition in input variants for a given gene.
===================================
gene list : ${params.genelist}
results dir : ${params.results}
user email : ${params.email}
tuning param : ${params.tuningparam}
[debug mode : ${params.debugmode}]
[cleanup : ${params.cleanup}]
"""
.stripIndent()
// WORKFLOW SPECIFICATION
// --------------------------------------------------------------- //
workflow {
// input channels
ch_genelist = Channel
.fromPath( params.genelist )
.splitCsv( header: false, sep: "\t" )
.flatten( )
// Workflow steps
RETRIEVE_DATA (
ch_genelist
)
CLUSTER_WITH_LLM (
RETRIEVE_DATA.out
)
FIND_QUALIFYING_RECORDS (
CLUSTER_WITH_LLM.out
)
MAP_TO_ORIGINAL_DATA (
CLUSTER_WITH_LLM.out
)
}
// --------------------------------------------------------------- //
// DERIVATIVE PARAMETER SPECIFICATION
// --------------------------------------------------------------- //
// Using debugmode setting to decide how to handle errors
if ( params.debugmode == true ){
params.errorMode = 'terminate'
} else {
params.errorMode = 'ignore'
}
// Additional parameters that are derived from parameters set in nextflow.config
params.retrieved = params.results + "/01_retrieved_data"
params.clusters = params.results + "/02_unverified_clusters"
params.verified = params.results + "/03_qualifying_clusters"
params.remapped = params.results + "/05_clinvar_with_clusters"
// --------------------------------------------------------------- //
// PROCESS SPECIFICATION
// --------------------------------------------------------------- //
process RETRIEVE_DATA {
tag "${gene}"
publishDir params.retrieved, mode: 'copy', overwrite: true
errorStrategy { sleep(Math.pow(2, task.attempt) * 100 as long); return 'retry' }
maxRetries 2
input:
val gene
output:
tuple val(gene), path("${gene}_formatted_unique_conditions.json"), path("${gene}_variants_extracted.json")
script:
"""
pull_and_extract_clinvar.py \
--gene ${gene} \
--email ${params.email}
"""
}
process CLUSTER_WITH_LLM {
tag "${gene}"
publishDir params.verified, mode: 'copy', overwrite: true
errorStrategy { task.attempt < 3 ? 'retry' : params.errorMode }
maxRetries 2
input:
tuple val(gene), path(unique_conditions), path(full_variants)
output:
tuple val(gene), path("${gene}_clusters.json"), path(full_variants)
script:
"""
llm embed-multi diseases \
${unique_conditions} \
--database ${gene}.db \
--model sentence-transformers/all-MiniLM-L6-v2 \
--store
llm cluster diseases \
--database ${gene}.db \
${params.tuningparam} > ${gene}_clusters.json
"""
}
process FIND_QUALIFYING_RECORDS {
tag "${gene}"
publishDir params.verified, mode: 'copy', pattern: "*qualifying*", overwrite: true
errorStrategy { task.attempt < 3 ? 'retry' : params.errorMode }
maxRetries 2
input:
tuple val(gene), path(llm_clusters), path(full_variants)
output:
tuple val(gene), path("${gene}_qualifying_records.json"), path(full_variants)
script:
"""
check_duplicate.py ${gene}
"""
}
process MAP_TO_ORIGINAL_DATA {
tag "${gene}"
publishDir params.remapped, mode: 'copy', pattern: "*qualifying*", overwrite: true
errorStrategy { task.attempt < 3 ? 'retry' : params.errorMode }
maxRetries 2
input:
tuple val(gene), path(llm_clusters), path(full_variants)
output:
tuple val(gene), path("*"), path(full_variants)
script:
"""
aggregate_by_rcv.py \
--variants_path ${full_variants} \
--cluster_path ${llm_clusters} \
--gene ${gene}
"""
}
// --------------------------------------------------------------- //