diff --git a/rendseq/file_funcs.py b/rendseq/file_funcs.py index 394c09c..e4d7a3f 100644 --- a/rendseq/file_funcs.py +++ b/rendseq/file_funcs.py @@ -8,7 +8,7 @@ from pandas import read_csv -def validate_reads(reads): +def _validate_reads(reads): """Make sure the given reads meet our format requirements. Parameters @@ -48,7 +48,7 @@ def write_wig(wig_track, wig_file_name, chrom_name): - wig_track (required) - the wig data you wish to write (in 2xn array) - wig_file_name (string) - the new file you will write to """ - validate_reads(wig_track) + _validate_reads(wig_track) d_inds = where(wig_track[:, 0] < 1) wig_track = delete(wig_track, d_inds, axis=0) with open(wig_file_name, "w+", encoding="utf-8") as wig_file: @@ -83,7 +83,7 @@ def open_wig(filename): chrom = line[line.rfind("=") + 1 :].rstrip() # next we read all the wig file data and return that if it's valid: reads = asarray(read_csv(filename, sep="\t", header=1, names=["bp", "count"])) - validate_reads(reads) + _validate_reads(reads) return reads, chrom diff --git a/rendseq/zscores.py b/rendseq/zscores.py index 226ad6e..c23f2c3 100644 --- a/rendseq/zscores.py +++ b/rendseq/zscores.py @@ -5,48 +5,62 @@ import warnings from os.path import abspath -from numpy import mean, std, zeros +from numpy import mean, nan, std, zeros -from rendseq.file_funcs import make_new_dir, open_wig, validate_reads, write_wig +from rendseq.file_funcs import _validate_reads, make_new_dir, open_wig, write_wig -def _adjust_down(cur_ind, target_val, reads): +def _get_lower_reads(cur_ind, target_start, target_stop, reads): """Calculate the lower reads index in range for the z-score calculation.""" - validate_reads(reads) - cur_ind = min(cur_ind, len(reads) - 1) - while reads[cur_ind, 0] > target_val: + cur_ind = max(min(cur_ind, len(reads) - 1), 0) + start_ind = cur_ind + vals = [0 for _ in range(int(reads[cur_ind, 0] - target_start - 1))] + while cur_ind >= 0 and reads[cur_ind, 0] >= target_stop: + if cur_ind != start_ind: + vals.extend( + 0 for _ in range(int(reads[cur_ind, 0] - reads[cur_ind - 1, 0] - 1)) + ) + vals.append(reads[cur_ind, 1]) cur_ind -= 1 - if cur_ind == 0: - break - return cur_ind + return vals -def _adjust_up(cur_ind, target_val, reads): - """Calculate the higher reads index in range for the z-score calculation.""" - if len(reads) < 1: - raise ValueError("requires non-empty reads") +def _get_upper_reads(cur_ind, target_start, target_stop, reads): + """Fetch the upper reads needed for z score calculation with zero padding.""" + cur_ind = min(max(cur_ind, 0), len(reads) - 1) + start_ind = cur_ind + vals = [0 for _ in range(int(reads[cur_ind, 0] - target_start - 1))] - cur_ind = min(max(cur_ind, 0), len(reads)) - - while reads[cur_ind, 0] < target_val: - if cur_ind >= len(reads) - 1: - break + while reads[cur_ind, 0] < target_stop and not cur_ind >= len(reads) - 1: + if cur_ind != start_ind: + vals.extend( + 0 for _ in range(int(reads[cur_ind, 0] - reads[cur_ind - 1, 0] - 1)) + ) + vals.append(reads[cur_ind, 1]) cur_ind += 1 - return cur_ind + return vals -def _z_score(val, v_mean, v_std): +def _calc_z_score(vals, calc_val): """Calculate a z-score given a value, mean, and standard deviation. + vals = values to calculate the z score with respect to. + calc_val = value to calculate z score for. + NOTE: The z_score() of a constant vector is 0 """ - score = 0 if v_std == 0 else (val - v_mean) / v_std - return score + v_std = std(vals) + v_mean = mean(vals) + if nan in [v_mean, v_std]: + return calc_val + if v_std == 0: + return 0 if v_mean == calc_val else (calc_val - v_mean) / 0.2 + return (calc_val - v_mean) / v_std -def _remove_outliers(vals): +def _remove_outliers(vals, method="remove_by_std"): """Normalize window of reads by removing outliers (values 2.5 std > mean). Parameters @@ -59,74 +73,28 @@ def _remove_outliers(vals): removed. """ normalized_vals = vals - if len(vals) > 1: - v_mean = mean(vals) + + if method == "remove_by_std" and len(vals) > 1: v_std = std(vals) if v_std != 0: - normalized_vals = [v for v in vals if abs(_z_score(v, v_mean, v_std)) < 2.5] + normalized_vals = [v for v in vals if abs(_calc_z_score(vals, v)) < 2.5] return normalized_vals -def _calc_score(vals, min_r, cur_val): - """Compute the z score. - - Parameters - ---------- - -vals raw read count values array - -min_r: the minumum number of reads needed to calculate score - -cur_val: the value for which the z score is being calculated - - Returns - ------- - -score: the zscore for the current value, or None if insufficent reads - """ - score = None - if sum(vals + cur_val) > min_r: - v_mean = mean(vals) - v_std = std(vals) - - score = _z_score(cur_val, v_mean, v_std) - - return score - - -def score_helper(start, stop, min_r, reads, i): - """Find the z-score of reads[i] relative to the subsection of reads. - - Goes from start to stop, with a read cutoff of min_r - """ - reads_outlierless = _remove_outliers(list(reads[start:stop, 1])) - return _calc_score(reads_outlierless, min_r, reads[i, 1]) - - -def validate_gap_window(gap, w_sz): +def _validate_gap_window(gap, w_sz): """Check that gap and window size are reasonable in r/l_score_helper.""" if w_sz < 1: raise ValueError("Window size must be larger than 1 to find a z-score") if gap < 0: raise ValueError("Gap size must be at least zero to find a z-score") if gap == 0: - warnings.warn("Warning...a gap size of 0 includes the current position.") - - -def _l_score_helper(gap, w_sz, min_r, reads, i): - """Find the z_score based on reads to the left of the current pos.""" - validate_gap_window(gap, w_sz) - l_start = _adjust_up(i - (gap + w_sz), reads[i, 0] - (gap + w_sz), reads) - l_stop = _adjust_up(i - gap, reads[i, 0] - gap, reads) - return score_helper(l_start, l_stop, min_r, reads, i) - - -def _r_score_helper(gap, w_sz, min_r, reads, i): - """Find the z_score based on reads to the right of the current pos.""" - validate_gap_window(gap, w_sz) - r_start = _adjust_down(i + gap, reads[i, 0] + gap, reads) - r_stop = _adjust_down(i + gap + w_sz, reads[i, 0] + gap + w_sz, reads) - return score_helper(r_start, r_stop, min_r, reads, i) + warnings.warn( + "Warning...a gap size of 0 includes the current position.", stacklevel=2 + ) -def z_scores(reads, gap=5, w_sz=50, min_r=20): +def z_scores(reads, gap=5, w_sz=50): """Perform modified z-score transformation of reads. Parameters @@ -136,48 +104,41 @@ def z_scores(reads, gap=5, w_sz=50, min_r=20): interest that should be excluded in the z_score calculation. -w_sz (integer): the max distance (in nt) away from the current position one should include in zscore calulcation. - -min_r (integer): density threshold. If there are less than this number - of reads going into the z_score calculation for a point that point - is excluded. note this is sum of reads in the window - -file_name (string): the base file_name, can be passed in to customize - the message printed Returns ------- - -z_score (2xn array): a 2xn array with the first column being position + -z_scores (2xn array): a 2xn array with the first column being position and the second column being the z_score. """ + _validate_gap_window(gap, w_sz) + _validate_reads(reads) # make array of zscores - same length as raw reads, trimming based on window size: - z_score = zeros([len(reads) - 2 * (gap + w_sz), 2]) + z_scores = zeros([len(reads) - 2 * (gap + w_sz), 2]) # first column of return array is the location of the raw reads - z_score[:, 0] = reads[gap + w_sz : len(reads) - (gap + w_sz), 0] + z_scores[:, 0] = reads[gap + w_sz : len(reads) - (gap + w_sz), 0] # Iterate through each valid read, recording z-score for i in range((gap + w_sz + 1), (len(reads) - (gap + w_sz))): # calculate the z score with values from the left: - l_score = _l_score_helper(gap, w_sz, min_r, reads, i) + l_vals = _get_upper_reads( + i + gap, reads[i, 0] + gap, reads[i, 0] + gap + w_sz, reads + ) + l_score = _calc_z_score(_remove_outliers(l_vals), reads[i, 1]) + # calculate z score with reads from the right: - r_score = _r_score_helper(gap, w_sz, min_r, reads, i) + r_vals = _get_lower_reads( + i - gap, reads[i, 0] - gap, reads[i, 0] - gap - w_sz, reads + ) + r_score = _calc_z_score(_remove_outliers(r_vals), reads[i, 1]) # The location in which this z-score should go into the final array i_score_pos = i - (gap + w_sz) - # set the zscore to be the smaller valid score of the left/right scores - # If neither score is valid, Z-score is 0 - z_score[i_score_pos, 1] = 0 - if l_score is not None: - if r_score is not None: - z_score[i_score_pos, 1] = ( - r_score if abs(r_score) < abs(l_score) else l_score - ) - else: - z_score[i_score_pos, 1] = l_score - - elif r_score is not None: - z_score[i_score_pos, 1] = r_score + # set the zscore to be the smaller valid score of the left/right scores. + z_scores[i_score_pos, 1] = r_score if abs(r_score) < abs(l_score) else l_score - return z_score + return z_scores def parse_args_zscores(args): @@ -211,16 +172,6 @@ def parse_args_zscores(args): Default to 50.", default=50, ) - parser.add_argument( - "--min_r", - help="min_r (integer): density threshold.\ - If there are less than this number of\ - reads going into the z_score\ - calculation for a point that point is\ - excluded. note this is sum of reads in\ - the window. Default is 20", - default=20, - ) parser.add_argument( "--save_file", help="Save the z_scores file as a new\ @@ -243,30 +194,31 @@ def main_zscores(): filename = args.filename print(f"Calculating zscores for file {filename}.") reads, chrom = open_wig(filename) - z_score = z_scores( - reads, gap=int(args.gap), w_sz=int(args.w_sz), min_r=int(args.min_r) - ) + z_score = z_scores(reads, gap=int(args.gap), w_sz=int(args.w_sz)) # Save file, if applicable if args.save_file: - filename = abspath(filename).replace("\\", "/") - file_loc = filename[: filename.rfind("/")] - z_score_dir = make_new_dir([file_loc, "/Z_scores"]) - file_start = filename[filename.rfind("/") : filename.rfind(".")] - z_score_file = "".join([z_score_dir, file_start, "_zscores.wig"]) - write_wig(z_score, z_score_file, chrom) - print(f"Wrote z_scores to {z_score_file}") - + _save_zscore(filename, z_score, chrom) print( "\n".join( [ - f"Ran zscores.py with the following settings:", + "Ran zscores.py with the following settings:", f"gap: {args.gap}, w_sz: {args.w_sz},", - f"min_r: {args.min_r}, file_name: {args.filename}", + f"file_name: {args.filename}", ] ) ) +def _save_zscore(filename, z_score, chrom): + filename = abspath(filename).replace("\\", "/") + file_loc = filename[: filename.rfind("/")] + z_score_dir = make_new_dir([file_loc, "/Z_scores"]) + file_start = filename[filename.rfind("/") : filename.rfind(".")] + z_score_file = "".join([z_score_dir, file_start, "_zscores.wig"]) + write_wig(z_score, z_score_file, chrom) + print(f"Wrote z_scores to {z_score_file}") + + if __name__ == "__main__": main_zscores() diff --git a/tests/test_file_funcs.py b/tests/test_file_funcs.py index 52e29e8..b974ee6 100644 --- a/tests/test_file_funcs.py +++ b/tests/test_file_funcs.py @@ -4,28 +4,28 @@ from numpy import array from numpy.testing import assert_array_equal -from rendseq.file_funcs import make_new_dir, open_wig, validate_reads, write_wig +from rendseq.file_funcs import _validate_reads, make_new_dir, open_wig, write_wig class TestValidateReads: def test_correct(self): """validate a correct read array""" try: - validate_reads(array([[1, 2], [3, 4]])) + _validate_reads(array([[1, 2], [3, 4]])) except Exception as e: assert False, f"validate_reads invalid exception: {e}" def test_incorrect_dim(self): """read array has too many columns""" with pytest.raises(ValueError) as e_info: - validate_reads(array([[1, 2, 3], [4, 5, 6]])) + _validate_reads(array([[1, 2, 3], [4, 5, 6]])) assert e_info.value.args[0] == "reads must be (n,2), not (2, 3)" def test_incorrect_type(self): """read array isn't actually an array""" with pytest.raises(ValueError) as e_info: - validate_reads([1, 2, 3]) + _validate_reads([1, 2, 3]) assert e_info.value.args[0] == "reads must be numpy array, not " diff --git a/tests/test_scenarios.py b/tests/test_scenarios.py new file mode 100644 index 0000000..904aabf --- /dev/null +++ b/tests/test_scenarios.py @@ -0,0 +1,92 @@ +# -*- coding: utf-8 -*- +import sys + +import pytest +from mock import patch +from numpy import array, concatenate, mean, std +from numpy.random import normal, seed +from numpy.testing import assert_array_almost_equal, assert_array_equal + +from rendseq.make_peaks import thresh_peaks +from rendseq.zscores import z_scores + +step_noise_len = 100 +step_internal_len = 100 + + +@pytest.fixture +def step_up_peak(): + """Make a data array that steps up from the noise floor with peak.""" + seed(43) + noise = array( + [ + [loc, int(z)] + for loc, z in zip( + range(1, step_noise_len + 1), + normal(0, 0.2, size=(1, step_noise_len))[0], + ) + ] + ) + peak = [[step_noise_len + 1, 1000]] + internal = array( + [ + [loc, int(z)] + for loc, z in zip( + range(step_noise_len + 2, step_noise_len + step_internal_len), + normal(5, 2, size=(1, step_internal_len))[0], + ) + ] + ) + return concatenate((noise, peak, internal), axis=0) + + +@pytest.fixture +def step_down_peak(): + """Make a data array that steps up from the noise floor with peak.""" + seed(43) + internal = array( + [ + [loc, int(z)] + for loc, z in zip( + range(1, step_internal_len + 1), + normal(5, 2, size=(1, step_internal_len))[0], + ) + ] + ) + peak = [[step_internal_len + 1, 1000]] + noise = array( + [ + [loc, int(z)] + for loc, z in zip( + range(step_internal_len + 2, step_noise_len + step_internal_len), + normal(5, 2, size=(1, step_noise_len))[0], + ) + ] + ) + return concatenate((internal, peak, noise), axis=0) + + +def test_thresh_step_up(step_up_peak): + """Test we can call peaks that step up from the noise floor.""" + z_scr = z_scores(step_up_peak, gap=1, w_sz=5) + peaks = thresh_peaks(z_scr, thresh=5) + called_peaks = [p[1] for p in peaks] + peak_arr = ( + [0 for _ in range(step_noise_len - 6)] + + [1] + + [0 for _ in range(step_internal_len - 8)] + ) + assert_array_equal(called_peaks, peak_arr) + + +def test_thresh_step_down(step_down_peak): + """Test we can call peaks that step down to the noise floor.""" + z_scr = z_scores(step_down_peak, gap=1, w_sz=5) + peaks = thresh_peaks(z_scr, thresh=5) + called_peaks = [p[1] for p in peaks] + peak_arr = ( + [0 for _ in range(step_internal_len - 6)] + + [1] + + [0 for _ in range(step_noise_len - 8)] + ) + assert_array_equal(called_peaks, peak_arr) diff --git a/tests/test_zscores.py b/tests/test_zscores.py index dfb208b..1b46405 100644 --- a/tests/test_zscores.py +++ b/tests/test_zscores.py @@ -3,22 +3,16 @@ import pytest from mock import patch -from numpy import append, array, mean, std +from numpy import append, array from numpy.testing import assert_array_almost_equal, assert_array_equal from rendseq.file_funcs import write_wig from rendseq.zscores import ( - _adjust_down, - _adjust_up, - _calc_score, - _l_score_helper, - _r_score_helper, + _calc_z_score, _remove_outliers, - _z_score, + _validate_gap_window, main_zscores, parse_args_zscores, - score_helper, - validate_gap_window, z_scores, ) @@ -56,8 +50,6 @@ def regular_argslist(self): "1", "--w_sz", "3", - "--min_r", - "0", "--save_file", False, ] @@ -76,13 +68,13 @@ def test_main(self, tmpdir, capfd, reads, regular_argslist): # Run main with regular arguments with patch.object(sys, "argv", regular_argslist): main_zscores() - out, err = capfd.readouterr() + out, _ = capfd.readouterr() # Expect output assert out == "\n".join( [ f"Calculating zscores for file {file.strpath}.", - f"Ran zscores.py with the following settings:\ngap: 1, w_sz: 3,\nmin_r: 0, file_name: {file.strpath}\n", + f"Ran zscores.py with the following settings:\ngap: 1, w_sz: 3,\nfile_name: {file.strpath}\n", ] ) @@ -105,12 +97,12 @@ def test_main_defaults(self, tmpdir, capfd): out, err = capfd.readouterr() # Expect output - file_head = file.strpath[0:-8].replace("\\", "/") + file_head = file.strpath[:-8].replace("\\", "/") assert out == "\n".join( [ f"Calculating zscores for file {file.strpath}.", - f'Wrote z_scores to {file_head + "Z_scores/file_zscores.wig"}', - f"Ran zscores.py with the following settings:\ngap: 5, w_sz: 50,\nmin_r: 20, file_name: {file.strpath}\n", + f"Wrote z_scores to {file_head}Z_scores/file_zscores.wig", + f"Ran zscores.py with the following settings:\ngap: 5, w_sz: 50,\nfile_name: {file.strpath}\n", ] ) @@ -120,7 +112,6 @@ def test_parse_args(self, regular_argslist): assert args.filename == "test_file" assert args.gap == "1" assert args.w_sz == "3" - assert args.min_r == "0" assert not args.save_file def test_parse_args_defaults(self): @@ -131,23 +122,23 @@ def test_parse_args_defaults(self): assert args.filename == "test_file" assert args.gap == 5 assert args.w_sz == 50 - assert args.min_r == 20 assert args.save_file class TestZScores: def test_z_scores_regular(self, reads): """Z-scores of the reads fixture""" + print(z_scores(reads, gap=1, w_sz=3)) assert_array_almost_equal( - z_scores(reads, gap=1, w_sz=3, min_r=0), + z_scores(reads, gap=1, w_sz=3), array( [ [5, 0], [6, -0.70262826], - [7, 202.20038777234487], - [8, 4.949747468305832], - [9, -0.7213550215235531], - [10, 0], + [7, 202.20038777], + [8, -0.56829815], + [9, -0.59611538], + [10, -0.58959947], ] ), ) @@ -155,102 +146,27 @@ def test_z_scores_regular(self, reads): def test_z_scores_outlier(self, reads): """An outlier (near the edge where peaks aren't found) doesn't affect score""" reads[11] = [12, 1e8] + print(z_scores(reads, gap=1, w_sz=3)) assert_array_almost_equal( - z_scores(reads, gap=1, w_sz=3, min_r=0), + z_scores(reads, gap=1, w_sz=3), array( [ [5, 0], [6, -0.70262826], - [7, 202.20038777234487], - [8, 4.949747468305832], - [9, -0.7213550215235531], - [10, 0], + [7, 202.20038777], + [8, -0.56829815], + [9, -0.59611538], + [10, -0.58959947], ] ), ) - def test_z_scores_highMin(self, reads): - """Minimum r is too high for all reads""" - assert_array_almost_equal( - z_scores(reads, gap=1, w_sz=3, min_r=1e8), - array([[5, 0], [6, 0], [7, 0], [8, 0], [9, 0], [10, 0]]), - ) - - def test_z_scores_highMin_peak(self, reads): - """Minimum r too high for all reads but the peak""" - reads[6] = [7, 1e6] - assert_array_almost_equal( - z_scores(reads, gap=1, w_sz=3, min_r=1e6), - array( - [ - [5, 0], - [6, -7.07101478e-01], - [7, 1.69298835e05], - [8, 0], - [9, -7.07123752e-01], - [10, -7.07127995e-01], - ] - ), - decimal=4, - ) - - -class TestLWScoreHelper: - def test_l_score_helper_nogap(self, reads): - """No gap""" - min_r = 0 - i = 2 - with pytest.warns(UserWarning): - assert _l_score_helper(0, 1, min_r, reads, i) == score_helper( - 1, 2, min_r, reads, i - ) - - def test_l_score_helper_gap(self, reads): - """A small gap""" - min_r = 0 - i = 2 - assert _l_score_helper(1, 2, min_r, reads, i) == score_helper( - 0, 1, min_r, reads, i - ) - - def test_l_score_helper_largewindow(self, reads): - """Window size is larger than array""" - min_r = 0 - i = 2 - assert _l_score_helper(1, 100, min_r, reads, i) == score_helper( - 0, 1, min_r, reads, i - ) - - def test_r_score_helper_nogap(self, reads): - """No gap""" - min_r = 0 - i = 2 - with pytest.warns(UserWarning): - assert _r_score_helper(0, 1, min_r, reads, i) == score_helper( - 2, 3, min_r, reads, i - ) - - def test_r_score_helper_gap(self, reads): - """A small gap""" - min_r = 0 - i = 2 - assert _r_score_helper(1, 109, min_r, reads, i) == score_helper( - 3, 12, min_r, reads, i - ) - - def test_r_score_helper_largewindow(self, reads): - min_r = 0 - i = 2 - assert _r_score_helper(1, 1000, min_r, reads, i) == score_helper( - 3, 13, min_r, reads, i - ) - class TestValidateGapWindow: def test_window_zero(self): """Windows can't be zero""" with pytest.raises(ValueError) as e_info: - validate_gap_window(100, 0) + _validate_gap_window(100, 0) assert ( e_info.value.args[0] @@ -259,15 +175,12 @@ def test_window_zero(self): def test_window_gap_positive(self): """Windows and gaps can be positive""" - try: - validate_gap_window(100, 1) - except Exception as e: - pytest.fail(f"Unexpected exception: {e}") + _validate_gap_window(100, 1) def test_gap_negative(self): """Gaps can't be negative""" with pytest.raises(ValueError) as e_info: - validate_gap_window(-1, 100) + _validate_gap_window(-1, 100) assert ( e_info.value.args[0] == "Gap size must be at least zero to find a z-score" @@ -276,54 +189,18 @@ def test_gap_negative(self): def test_gap_zero(self): """Gaps can be zero, but should warn""" with pytest.warns(UserWarning): - validate_gap_window(0, 100) + _validate_gap_window(0, 100) -class TestScoreHelper: - def test_score_helper_normal(self, reads): - """score_helper is just a wrapper for calc_score if no outliers""" - min_r = 1 - i = 1 - assert score_helper(0, 4, 0, reads, 1) == pytest.approx( - _calc_score(list(reads[0:4, 1]), min_r, reads[i, 1]) - ) - - def test_score_helper_outlier(self, reads): - """score_helper with a clear outlier""" - min_r = 1 - i = 1 - assert score_helper(0, 4, 0, reads + [800], 1) == pytest.approx( - _calc_score(list(reads[0:4, 1]), min_r, reads[i, 1]) - ) - - -class TestCalcScore: - def test_calc_score_normal(self, reads): - """Run-of-the-mill zscore""" - assert _calc_score(reads[:, 1:], 2, reads[1, 1:]) == pytest.approx(-0.2780832) - - def test_calc_score_highmin(self, reads): - """Reads don't hit minimimum background""" - assert _calc_score(reads[:, 1:], 10000, 1) is None - - def test_calc_score_constant(self): - """Reads are constant""" - assert _calc_score(array([1] * 6), 0, 1) == 0 - - def test_calc_score_empty(self): - """Array is empty""" - assert _calc_score(array([]), 0, 1) is None - - -class TestZScore: +class TestCalcZScore: def test_z_score_normal(self, reads): """Run-of-the-mill zscore""" vals = reads[:, 1:] - assert _z_score(vals[1][0], mean(vals), std(vals)) == pytest.approx(-0.2780832) + assert _calc_z_score(vals, vals[1][0]) == pytest.approx(-0.2780832) def test_z_score_constant(self): - """Zero stdev""" - assert _z_score(1, 1, 0) == 0 + """Zero stdev, same as all vals""" + assert _calc_z_score([5, 5, 5, 5, 5, 5, 5], 5) == 0 class TestRemoveOutliers: @@ -338,65 +215,11 @@ def test_remove_outliers_homogen(self): def test_remove_outliers_clearOutlier(self): """A clear outlier""" - test_array = array([i for i in range(20)] + [80]) - assert_array_equal(_remove_outliers(test_array), test_array[0:-1]) + test_array = array(list(range(20)) + [80]) + assert_array_equal(_remove_outliers(test_array), test_array[:-1]) def test_remove_outliers_borderline(self): """Two values, near 2.5 stds away, but only one above""" test_array = append(array([1] * 10), [4.2, 5]) - assert_array_equal(_remove_outliers(test_array), test_array[0:-1]) - - -class TestAdjustDown: - def test_adjust_down_empty(self): - """Can't adjust down empty reads""" - with pytest.raises(ValueError) as e_info: - _adjust_down(3, 0, array([])) - - assert e_info.value.args[0] == "requires non-empty reads" - - def test_adjust_down_inf(self, reads): - """Target higher than all indicies""" - assert _adjust_down(3, 1000, reads) == 3 - - def test_adjust_down_onestep(self, reads): - """Target one step away from current""" - assert _adjust_down(2, 2, reads) == 1 - - def test_adjust_down_multistep(self, reads): - """Target multiple steps away from current""" - assert _adjust_down(3, 2, reads) == 1 - - def test_adjust_down_zero(self, reads): - """Target lower than any index""" - assert _adjust_down(1, -1, reads) == 0 - - def test_adjust_down_oob(self, reads): - """Current is higher than any index""" - assert _adjust_down(5, 2, reads) == 1 - - -class TestAdjustUp: - def test_adjust_up_empty(self): - """Can't adjust up empty reads""" - with pytest.raises(ValueError) as e_info: - _adjust_up(3, 0, array([])) - - assert e_info.value.args[0] == "requires non-empty reads" - - def test_adjust_up_onestep(self, reads): - """Target one step away from current""" - assert _adjust_up(1, 3, reads) == 2 - - def test_adjust_up_multistep(self, reads): - """Target multiple steps away from current""" - assert _adjust_up(0, 3, reads) == 2 - - def test_adjust_up_max(self, reads): - """Target higher than any index""" - assert _adjust_up(1, 1000, reads) == 13 - - def test_adjust_up_oob(self, reads): - """Current is lower than any index""" - assert _adjust_up(2, 0, reads) == 2 + assert_array_equal(_remove_outliers(test_array), test_array[:-1])