-
Notifications
You must be signed in to change notification settings - Fork 2
/
regens_testers.py
296 lines (258 loc) · 12.4 KB
/
regens_testers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
import numpy as np
import pandas as pd
from bed_reader import open_bed
import pdb
from matplotlib import pyplot as plt
pd.options.mode.chained_assignment = None
def unit_tester(current_output, known_output_file_name, header_type):
"""
Purpose
-------
Given input with known output, theis function compare's each function's current output against the correct output.
Parameters
----------
current_output: a functions actual output during unit testing.
known_output_file_name: name of the file containing the output that the function being tested is expected to have.
header_type: either 0 (if there is a header) or None (if there is no header). Passed directly to Pandas' read_csv function.
Returns
-------
It returns nothing. It prints whether or not the function passed it's unit test.
"""
if known_output_file_name[-4:] == ".bed":
bed_reader = open_bed(
"unit_testing_files/correct_write_bed_file_output.bed",
count_A1=True,
num_threads=1,
)
correct_output = bed_reader.read(dtype="int8")
elif known_output_file_name[-4:] == ".bim" or known_output_file_name[-4:] == ".fam":
correct_output = (
pd.read_csv(
"unit_testing_files/" + known_output_file_name,
delim_whitespace=True,
header=None,
dtype=str,
)
.to_numpy()
.astype("str")
)
else:
correct_output = pd.read_csv(
"unit_testing_files/" + known_output_file_name,
delimiter="\t",
header=header_type,
).to_numpy()
function_instance = 0
if len(correct_output.shape) == 2:
if (correct_output.shape)[1] == 1:
correct_output = correct_output.reshape(-1)
if "1" in known_output_file_name or "2" in known_output_file_name:
function_name = known_output_file_name[8:-12]
function_instance = known_output_file_name[-5]
else:
function_name = known_output_file_name[8:-11]
if isinstance(current_output, pd.DataFrame):
current_output = current_output.to_numpy()
if isinstance(current_output, list):
current_output = np.array(current_output)
# Importing the values sometimes creates a small ammount of machine error.
if known_output_file_name[-4:] == ".bim" or known_output_file_name[-4:] == ".fam":
if np.any(correct_output != current_output):
print(
"The "
+ function_name
+ " function's "
+ known_output_file_name[-4:]
+ " file output failed its unit test.\n"
)
else:
print(
"The "
+ function_name
+ " function's "
+ known_output_file_name[-4:]
+ " file output passed its unit test.\n"
)
else:
if np.any(np.round(correct_output, 12) != np.round(current_output, 12)):
if function_instance == "1" or function_instance == "2":
print(
"Instance "
+ function_instance
+ " of the "
+ function_name
+ " function failed its unit test.\n"
)
else:
print("The " + function_name + " function failed its unit test.\n")
else:
if function_instance == "1" or function_instance == "2":
print(
"Instance "
+ function_instance
+ " of the "
+ function_name
+ " function passed its unit test.\n"
)
else:
print("The " + function_name + " function passed its unit test.\n")
def test_drawn_breakpoints(
breakpoints, probabilities, chromosome_number, output_plink_filename_prefix
):
"""
Purpose
-------
To show that each breakpoint is, on average, drawn the expected number of times given the "probabilities" 1D array.
To confirms that each row in the "breakpoints" 2D array contains 1 or 0 instances of each breakpoint.
Parameters
----------
breakpoints: an NxB numpy array, where N is the number of simulated individuals, and B is the number of breakpoints per chromosome in each individual.
each breakpoint is an index in probabilities, which directly corresponds to a specific recombination interval.
probabilities: a 1D numpy array. The ith element is the probability of drawing a breakpoint at the ith recombination
interval in a single draw, noting that each row in "breakpoints" cannot contain any breakpoint more than once.
chromosome_number: the cromosome being analyzed
Returns
-------
It returns nothing. It plots the expected number of times each breakpoint index is drawn against the actual number of times.
for each chromosome. It also prints the row numbers of any breakpoint vectors that contain more than one instance of the same breakpoint.
"""
num_breakpoints = len(breakpoints[0])
num_samples = len(breakpoints)
breakpoint_indices = list(range(len(probabilities)))
breakpoint_counts = dict(zip(breakpoint_indices, len(probabilities) * [0]))
breakpoint_vecs_with_unwanted_replacement = []
for i, breakpoint_vec in enumerate(breakpoints):
if len(breakpoint_vec) != len(np.unique(breakpoint_vec)):
breakpoint_vecs_with_unwanted_replacement.append(i)
for b in breakpoint_vec:
breakpoint_counts[b] += 1
print(
"The samples in the following list have drawn the same breakpoint more than once:"
)
print(breakpoint_vecs_with_unwanted_replacement)
if len(breakpoint_vecs_with_unwanted_replacement) == 0:
print("Every sample has successfully drawn breakpoints without replacement")
if len(breakpoint_vecs_with_unwanted_replacement) > 0:
print(
"Some samples have drawn breakpoints with replacement. This is an error, and it should be attended to."
)
expected_num_breakpoint_counts = []
num_breakpoint_counts = []
for index in breakpoint_indices:
p_not_drawing_breakpoint = (1 - probabilities[index]) ** num_breakpoints
p_drawing_breakpoint = 1 - p_not_drawing_breakpoint
expected_num_breakpoint_counts.append(num_samples * p_drawing_breakpoint)
num_breakpoint_counts.append(breakpoint_counts[index])
expected_num_breakpoint_counts = np.array(expected_num_breakpoint_counts)
num_breakpoint_counts = np.array(num_breakpoint_counts)
plt.plot(
expected_num_breakpoint_counts,
num_breakpoint_counts,
"*",
label="expected vs actual breakpoint index counts",
)
plt.plot(
expected_num_breakpoint_counts,
expected_num_breakpoint_counts,
"-",
label="y = x",
)
plt.legend(loc="upper left")
plt.xlabel("expected number of breakpoints", fontsize=14)
plt.ylabel("number of breakpoints", fontsize=14)
if "/" in output_plink_filename_prefix:
prefix = "/".join((output_plink_filename_prefix.split("/"))[:-1]) + "/"
print(prefix)
else:
prefix = ""
plt.savefig(
prefix
+ "expected_vs_actual_breakpoint_counts_full_range_chr"
+ str(chromosome_number)
+ ".png"
)
plt.clf()
def test_breakpoint_SNP_mapping(
old_breakpoints, rcmb_interval_positions, new_breakpoints, SNP_positions
):
"""
Purpose
-------
To confirm for every drawn breakpoint that, when each breakpoint's recombination interval is replaced with the genomic position of a SNP,
that the SNP's genomic position is in-between the genomic positions of the recombination interval's left and right boundaries.
Parameters
----------
old_breakpoints: an NxB numpy array, where N is the number of simulated individuals, and B is the number of breakpoints per chromosome in each individual.
each breakpoint is an index in probabilities, which directly corresponds to a specific recombination interval.
rcmb_interval_positions: a list where ith index contains the genomic position of the left bound of the recombination interbal, which corresponds
to the ith element in "probabilities." Note that the left bound of the ith interval is the right bound of the (i-1)th interval.
new_breakpoints: an NxB numpy array, where N is the number of simulated individuals, and B is the number of breakpoints per chromosome in each individual.
each breakpoint is the row number of a SNP in the input bim file, which directly corresponds to a specific genomic location.
SNP_positions: a list where ith index contains the genomic position of the SNP in the input bim file's ith row.
Returns
-------
It returns nothing. It prints a list of the (row, column) coordinates of any new breakpoint with a genomic position that
does not fall in between the genomic positions of the original recombination interval's left and right boundaries.
"""
num_breakpoints = len(old_breakpoints[0])
erronious_mappings = []
for i in range(len(old_breakpoints)):
for k in range(num_breakpoints):
rcmb_interval_position = rcmb_interval_positions[old_breakpoints[i][k]]
SNP_position = SNP_positions[new_breakpoints[i][k]]
next_rcmb_interval = rcmb_interval_positions[old_breakpoints[i][k] + 1]
if (
rcmb_interval_position <= SNP_position
and SNP_position <= next_rcmb_interval
):
pass
else:
erronious_mappings.append((i, k))
print(
"The following rcmb interval indices have been incorrectly mapped to SNP indices"
)
print(erronious_mappings)
if len(erronious_mappings) == 0:
print("Every index has been mapped correctly")
if len(erronious_mappings) > 0:
print(
"Some SNP indices are not contained in the corresponding rcmb interval. This is an error, and it should be attended to."
)
def test_simulated_individuals(breakpoints, sampled_individuals, simulated_individuals):
"""
Purpose
-------
To verify that every simulated individual is comprised of the correct segments from the correct real individuals
Parameters
----------
breakpoints: an Nx(B+1) numpy array, where N is the number of simulated individuals, and B is the number of breakpoints per chromosome in each individual.
each breakpoint is the row number of a SNP in the input bim file, which directly corresponds to a specific genomic location. Note that the
last column in breakpoints has all elements equal to the current chromosomes last SNP index. The other columns are simple breakpoint values.
sampled_individuals: an Nx(B+1)xS numpy array, where N is the number of simulated individuals, B is the number of breakpoints per chromosome
(B breakpoints seperate B+1 segments from B+1 real individuals), and S is the number of SNPs per individual in the current chromosome.
simulated_individuals: an NxS numpy arrray. Each array of S SNPs contains concatenated segments from B+1 individuals.
Returns
-------
It returns nothing. It prints a list of the (row, segment index) of any segments within
simulated individuals that do not map back to the correct region of the correct real individual
"""
incorrect_segment_coordinates = []
for j, breakpoint_vec in enumerate(breakpoints):
startpoint = 0
for k, breakpoint in enumerate(np.sort(breakpoint_vec)):
if np.any(
sampled_individuals[j][k][startpoint:breakpoint]
!= simulated_individuals[j][startpoint:breakpoint]
):
incorrect_segment_coordinates.append([j, k])
startpoint = breakpoint
print(
"\nThe following segments (simulated_individual row number, segment index) are not from the correct regions of the correct real individuals"
)
print(incorrect_segment_coordinates)
if len(incorrect_segment_coordinates) == 0:
print(
"Every segment in every simulated individual is from the correct region of the correct real individual"
)
else:
print("\nThis is a technical error that needs to be addressed\n")