-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrfastq.c
170 lines (129 loc) · 5.23 KB
/
rfastq.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
#include "rfastq.h"
pthread_mutex_t console_mutex_rfastq;
KSEQ_INIT(gzFile, gzread)
void read_fastqs(struct gargs *genome_arguments, struct pargs *program_arguments) {
struct tpool *tm;
tm = tpool_create(program_arguments->thread_number);
for (int i=0; i<program_arguments->number_of_genomes; i++) {
tpool_add_work(tm, read_fastq, genome_arguments+i);
}
tpool_wait(tm);
tpool_destroy(tm);
}
void read_fastq(void *arg) {
struct gargs *genome_arguments = (struct gargs *)arg;
gzFile in = gzopen(genome_arguments->inFileName, "r");
if (in == NULL) {
log1(ERROR, "Error opening file %s", genome_arguments->inFileName);
return;
}
// create file for writing cores
FILE *out = NULL;
if (genome_arguments->write_lcpt) {
out = fopen(genome_arguments->inFileName, "wb");
if (out == NULL) {
log1(ERROR, "Error opening file for saving into file %s", genome_arguments->outFileName);
return;
}
}
// estimate core counts and make array allocation
struct stat st;
if (stat(genome_arguments->inFileName, &st) != 0) {
log1(ERROR, "Error getting file size of %s", genome_arguments->inFileName);
return;
}
uint64_t file_size = st.st_size;
uint64_t estimated_uncompressed_size = file_size; // default assumption
if (strstr(genome_arguments->inFileName, ".gz")) {
// estimate uncompressed size using typical compression ratio (~4:1 for FASTQ)
estimated_uncompressed_size = file_size * 4;
}
uint64_t estimated_bp_count = estimated_uncompressed_size / 2;
uint64_t estimated_core_size = (int)(estimated_bp_count / pow(MAGIC_LCP_FQ_CONSTANT, genome_arguments->lcp_level));
genome_arguments->cores = (simple_core*)malloc(estimated_core_size * sizeof(simple_core));
if (genome_arguments->cores == NULL) {
pthread_mutex_lock(&console_mutex_rfastq);
log1(INFO, "Thread ID: %ld couldn't allocate memory of size %ld for cores", pthread_self(), estimated_core_size);
pthread_mutex_unlock(&console_mutex_rfastq);
}
if (genome_arguments->verbose) {
pthread_mutex_lock(&console_mutex_rfastq);
log1(INFO, "Thread ID: %ld, in: %s, cc: %ld", pthread_self(), genome_arguments->inFileName, estimated_core_size);
pthread_mutex_unlock(&console_mutex_rfastq);
}
kseq_t *seq = kseq_init(in);
while (kseq_read(seq) >= 0) {
process_read(seq->seq.s, seq->seq.l, &estimated_core_size, genome_arguments, out);
}
kseq_destroy(seq);
gzclose(in);
// end writing cores to file if user specified to do so
if (genome_arguments->write_lcpt) {
done(out);
fclose(out);
}
// log ending of reading fasta
if (genome_arguments->verbose) {
pthread_mutex_lock(&console_mutex_rfastq);
log1(INFO, "Thread ID: %ld ended reading %s, size: %ld", pthread_self(), genome_arguments->inFileName, genome_arguments->cores_len);
pthread_mutex_unlock(&console_mutex_rfastq);
}
// sort and filter the cores
genSign(genome_arguments, genome_arguments->sct);
// log ending of processing fasta
if (genome_arguments->verbose) {
pthread_mutex_lock(&console_mutex_rfastq);
log1(INFO, "Thread ID: %ld ended processing %s, size: %ld", pthread_self(), genome_arguments->inFileName, genome_arguments->cores_len);
pthread_mutex_unlock(&console_mutex_rfastq);
}
}
void process_read(char *sequence, size_t seq_size, uint64_t *capacity, struct gargs *genome_arguments, FILE *out) {
uint64_t cap = *capacity;
// process forward
struct lps str_fwd;
init_lps(&str_fwd, sequence, seq_size);
lps_deepen(&str_fwd, genome_arguments->lcp_level);
if (genome_arguments->write_lcpt) {
save(out, &str_fwd);
}
uint64_t len = genome_arguments->cores_len;
if (cap <= len+str_fwd.size) {
cap = cap * 1.5;
simple_core* temp = (simple_core*)realloc(genome_arguments->cores, cap);
if (temp == NULL) {
log1(ERROR, "Couldn't increase cores array size.");
return;
}
genome_arguments->cores = temp;
}
simple_core *cores = genome_arguments->cores;
for (int i=0; i<str_fwd.size; i++) {
cores[len] = ((uint64_t)str_fwd.cores[i].label << 32) + (str_fwd.cores[i].end-str_fwd.cores[i].start);
len++;
}
free_lps(&str_fwd);
// process reverse complement
struct lps str_rev;
init_lps2(&str_rev, sequence, seq_size);
lps_deepen(&str_rev, genome_arguments->lcp_level);
if (genome_arguments->write_lcpt) {
save(out, &str_rev);
}
if (*capacity <= len+str_rev.size) {
*capacity = *capacity * 1.5;
simple_core* temp = (simple_core*)realloc(genome_arguments->cores, *capacity);
if (temp == NULL) {
log1(ERROR, "Couldn't increase cores array size.");
return;
}
genome_arguments->cores = temp;
}
cores = genome_arguments->cores;
for (int i=0; i<str_rev.size; i++) {
cores[len] = ((uint64_t)str_rev.cores[i].label << 32) + (str_rev.cores[i].end-str_rev.cores[i].start);
len++;
}
genome_arguments->cores_len = len;
*capacity = cap;
free_lps(&str_rev);
}