diff --git a/src/vak/core/predict.py b/src/vak/core/predict.py index 999b58f94..a96e4fc64 100644 --- a/src/vak/core/predict.py +++ b/src/vak/core/predict.py @@ -14,7 +14,6 @@ constants, files, io, - labeled_timebins, validators ) from .. import models @@ -223,12 +222,19 @@ def predict( spect_dict = files.spect.load(spect_path) t = spect_dict[timebins_key] - labels, onsets_s, offsets_s = labeled_timebins.lbl_tb2segments( + + if majority_vote or min_segment_dur: + y_pred = transforms.labeled_timebins.postprocess( + y_pred, + timebin_dur=timebin_dur, + min_segment_dur=min_segment_dur, + majority_vote=majority_vote, + ) + + labels, onsets_s, offsets_s = transforms.labeled_timebins.to_segments( y_pred, labelmap=labelmap, t=t, - min_segment_dur=min_segment_dur, - majority_vote=majority_vote, ) if labels is None and onsets_s is None and offsets_s is None: # handle the case when all time bins are predicted to be unlabeled diff --git a/src/vak/datasets/vocal_dataset.py b/src/vak/datasets/vocal_dataset.py index 3c4b02314..3e748cc50 100644 --- a/src/vak/datasets/vocal_dataset.py +++ b/src/vak/datasets/vocal_dataset.py @@ -2,7 +2,7 @@ from .. import annotation from .. import files -from .. import labeled_timebins +from .. import transforms class VocalDataset: @@ -80,7 +80,7 @@ def __getitem__(self, idx): annot = self.annots[idx] lbls_int = [self.labelmap[lbl] for lbl in annot.seq.labels] # "lbl_tb": labeled timebins. Target for output of network - lbl_tb = labeled_timebins.label_timebins( + lbl_tb = transforms.labeled_timebins.from_segments( lbls_int, annot.seq.onsets_s, annot.seq.offsets_s, diff --git a/src/vak/datasets/window_dataset.py b/src/vak/datasets/window_dataset.py index 7bcf4bc7c..2429ca04e 100644 --- a/src/vak/datasets/window_dataset.py +++ b/src/vak/datasets/window_dataset.py @@ -4,11 +4,13 @@ import torch from torchvision.datasets.vision import VisionDataset -from .. import annotation -from .. import files -from .. import io -from .. import labeled_timebins -from .. import validators +from .. import ( + annotation, + files, + io, + transforms, + validators +) class WindowDataset(VisionDataset): @@ -206,7 +208,7 @@ def __get_window_labelvec(self, idx): spect_id ] # "annot id" == spect_id if both were taken from rows of DataFrame lbls_int = [self.labelmap[lbl] for lbl in annot.seq.labels] - lbl_tb = labeled_timebins.label_timebins( + lbl_tb = transforms.labeled_timebins.from_segments( lbls_int, annot.seq.onsets_s, annot.seq.offsets_s, @@ -694,7 +696,7 @@ def spect_vectors_from_df( lbls_int = [labelmap[lbl] for lbl in annot.seq.labels] timebins = spect_dict[timebins_key] lbl_tb.append( - labeled_timebins.label_timebins( + transforms.labeled_timebins.from_segments( lbls_int, annot.seq.onsets_s, annot.seq.offsets_s, diff --git a/src/vak/engine/model.py b/src/vak/engine/model.py index 1a3350025..163089625 100644 --- a/src/vak/engine/model.py +++ b/src/vak/engine/model.py @@ -7,7 +7,7 @@ from tqdm import tqdm from ..device import get_default as get_default_device -from ..labeled_timebins import lbl_tb2labels +from .. import transforms logger = logging.getLogger(__name__) @@ -267,10 +267,10 @@ def _eval(self, eval_data): for metric_name in self.metrics.keys() ] ): - y_labels = lbl_tb2labels( + y_labels = transforms.labeled_timebins.lbl_tb2labels( y.cpu().numpy(), eval_data.dataset.labelmap ) - y_pred_labels = lbl_tb2labels( + y_pred_labels = transforms.labeled_timebins.lbl_tb2labels( y_pred.cpu().numpy(), eval_data.dataset.labelmap ) else: diff --git a/src/vak/labeled_timebins.py b/src/vak/labeled_timebins.py index 72cbab088..aec59dd73 100644 --- a/src/vak/labeled_timebins.py +++ b/src/vak/labeled_timebins.py @@ -1,15 +1,16 @@ """functions for dealing with labeled timebin vectors""" -import numpy as np -import scipy.stats +from __future__ import annotations -from .timebins import timebin_dur_from_vec -from .validators import row_or_1d, column_or_1d +import numpy as np -def has_unlabeled(labels_int, onsets_s, offsets_s, time_bins): - """determine whether there are unlabeled segments in a spectrogram, - given labels, onsets, and offsets of vocalizations, and vector of - time bins from spectrogram +def has_unlabeled(labels_int: list | np.nddary, + onsets_s: np.ndarray, + offsets_s: np.ndarray, + time_bins: np.ndarray) -> bool: + """Determine whether there are unlabeled segments in a spectrogram, + given labels, onsets, and offsets of segments, and vector of + time bins from spectrogram. Parameters ---------- @@ -50,434 +51,3 @@ def has_unlabeled(labels_int, onsets_s, offsets_s, time_bins): return True else: return False - - -def label_timebins(labels_int, onsets_s, offsets_s, time_bins, unlabeled_label=0): - """makes a vector of labels for each time bin from a spectrogram, - given labels, onsets, and offsets of vocalizations - - Parameters - ---------- - labels_int : list, numpy.ndarray - a list or array of labels from the annotation for a vocalization, - mapped to integers - onsets_s : numpy.ndarray - 1d vector of floats, segment onsets in seconds - offsets_s : numpy.ndarray - 1-d vector of floats, segment offsets in seconds - time_bins : mumpy.ndarray - 1-d vector of floats, time in seconds for center of each time bin of a spectrogram - unlabeled_label : int - label assigned to time bins that do not have labels associated with them. - Default is 0 - - Returns - ------- - lbl_tb : numpy.ndarray - same length as time_bins, with each element a label for each time bin - """ - if ( - type(labels_int) == list - and not all([type(lbl) == int for lbl in labels_int]) - or ( - type(labels_int) == np.ndarray - and labels_int.dtype not in [np.int8, np.int16, np.int32, np.int64] - ) - ): - raise TypeError("labels_int must be a list or numpy.ndarray of integers") - - label_vec = np.ones((time_bins.shape[-1],), dtype="int8") * unlabeled_label - onset_inds = [np.argmin(np.abs(time_bins - onset)) for onset in onsets_s] - offset_inds = [np.argmin(np.abs(time_bins - offset)) for offset in offsets_s] - for label, onset, offset in zip(labels_int, onset_inds, offset_inds): - # offset_inds[ind]+1 because offset time bin is still "part of" syllable - label_vec[onset : offset + 1] = label - - return label_vec - - -ALPHANUMERIC = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789' -DUMMY_SINGLE_CHAR_LABELS = [ - # some large range of characters not typically used as labels - chr(x) for x in range(162, 400) -] -# start with alphanumeric since more human readable; -# mapping can be arbitrary as long as it's consistent -DUMMY_SINGLE_CHAR_LABELS = ( - *ALPHANUMERIC, - *DUMMY_SINGLE_CHAR_LABELS -) - - -# added to fix https://github.com/NickleDave/vak/issues/373 -def _multi_char_labels_to_single_char(labels_mapping): - """returns a copy of a ``labels_mapping`` where any - labels with multiple characters are converted to - single characters - - this makes it possible to correctly compute metrics - like Levenshtein edit distance - """ - current_str_labels = sorted( - # sort to be extra sure we get same order every time - # (even though OrderedDict is now default in Python). - # Same order forces mapping to single characters to be deterministic across function calls. - labels_mapping.keys() - ) - new_labels_mapping = {} - for dummy_label_ind, label_str in enumerate(current_str_labels): - label_int = labels_mapping[label_str] - if len(label_str) > 1: - # replace with dummy label - new_label_str = DUMMY_SINGLE_CHAR_LABELS[dummy_label_ind] - new_labels_mapping[new_label_str] = label_int - else: - new_labels_mapping[label_str] = label_int - return new_labels_mapping - - -def lbl_tb2labels(labeled_timebins, labels_mapping, spect_ID_vector=None): - """converts output of network from label for each frame - to one label for each continuous segment. - - This function is used when evaluating a model, - to convert outputs to dummy strings that make it possible - to compute string-based metrics, such as edit distance. - - It should **not** be used to convert predictions - to annotations, because it can modify the - ``labels_mapping`` so that metrics are correctly - computed. - - For mapping outputs to string label predictions, - use ``vak.labeled_timebins.lbl_tb2segments``. - - Parameters - ---------- - labeled_timebins : ndarray - where each element is a label for a time bin. - Such an array is the output of the network. - labels_mapping : dict - that maps str labels to consecutive integers. - The mapping is inverted to convert back to str labels. - spect_ID_vector : ndarray - of same length as labeled_timebins, where each element - is an ID # for the spectrogram from which labeled_timebins - was taken. - If provided, used to split the converted labels back to - a list of label str, with one for each spectrogram. - Default is None, in which case the return value is one long str. - - Returns - ------- - labels : str or list - labeled_timebins mapped back to label str. - If spect_ID_vector was provided, then labels is split into a list of str, - where each str corresponds to predicted labels for each predicted - segment in each spectrogram as identified by spect_ID_vector. - """ - labeled_timebins = row_or_1d(labeled_timebins) - idx = np.diff(labeled_timebins, axis=0).astype(bool) - idx = np.insert(idx, 0, True) - - labels = labeled_timebins[idx] - - # remove 'unlabeled' label - if "unlabeled" in labels_mapping: - labels = labels[labels != labels_mapping["unlabeled"]] - - # replace any multiple character labels in mapping - # with dummy single-character labels - # so that we do not affect Levenshtein distance computation - # see https://github.com/NickleDave/vak/issues/373 - if any([len(label) > 1 for label in labels_mapping.keys()]): # only re-map if necessary - # (to minimize chance of knock-on bugs) - labels_mapping = _multi_char_labels_to_single_char(labels_mapping) - - # only invert mapping and then map integer labels to characters - # *after* ensuring all string labels are single-character - inverse_labels_mapping = dict((v, k) for k, v in labels_mapping.items()) - labels = labels.tolist() - labels = [inverse_labels_mapping[label] for label in labels] - - if spect_ID_vector: - labels_list = [] - spect_ID_vector = spect_ID_vector[idx] - labels_arr = np.asarray(labels) - # need to split up labels by spect_ID_vector - # this is probably not the most efficient way: - spect_IDs = np.unique(spect_ID_vector) - - for spect_ID in spect_IDs: - these = np.where(spect_ID_vector == spect_ID) - curr_labels = labels_arr[these].tolist() - if all([type(el) is str for el in curr_labels]): - labels_list.append("".join(curr_labels)) - elif all([type(el) is int for el in curr_labels]): - labels_list.append(curr_labels) - return labels_list, spect_ID_vector - else: - if all([type(el) is str or type(el) is np.str_ for el in labels]): - return "".join(labels) - elif all([type(el) is int for el in labels]): - return labels - - -def _segment_lbl_tb(lbl_tb): - """helper function that segments vector of labeled timebins. - - Parameters - ---------- - lbl_tb : numpy.ndarray - vector where each element represents a label for a timebin - - Returns - ------- - labels : numpy.ndarray - vector where each element is a label for a segment with its onset - and offset indices given by the corresponding element in onset_inds - and offset_inds. - onset_inds : numpy.ndarray - vector where each element is the onset index for a segment. - Each onset corresponds to the value at the same index in labels. - offset_inds : numpy.ndarray - vector where each element is the offset index for a segment - Each offset corresponds to the value at the same index in labels. - """ - # factored out as a separate function to be able to test - # and in case user wants to do just this with output of neural net - offset_inds = np.where(np.diff(lbl_tb, axis=0))[0] - onset_inds = offset_inds + 1 - offset_inds = np.concatenate((offset_inds, np.asarray([lbl_tb.shape[0] - 1]))) - onset_inds = np.concatenate((np.asarray([0]), onset_inds)) - labels = lbl_tb[onset_inds] - return labels, onset_inds, offset_inds - - -def lbl_tb_segment_inds_list(lbl_tb, unlabeled_label=0): - """given a vector of labeled timebins, - returns a list of indexing vectors, - one for each labeled segment in the vector. - - Parameters - ---------- - lbl_tb : numpy.ndarray - vector of labeled timebins from spectrogram - unlabeled_label : int - label that was given to segments that were not labeled in annotation, - e.g. silent periods between annotated segments. Default is 0. - return_inds : bool - if True, return list of indices for segments in lbl_tb, in addition to the segments themselves. - if False, just return list of numpy.ndarrays that are the segments from lbl_tb. - - Returns - ------- - segment_inds_list : list - of numpy.ndarray, indices that will recover segments list from lbl_tb. - """ - segment_inds = np.nonzero(lbl_tb != unlabeled_label)[0] - return np.split(segment_inds, np.where(np.diff(segment_inds) != 1)[0] + 1) - - -def remove_short_segments( - lbl_tb, segment_inds_list, timebin_dur, min_segment_dur, unlabeled_label=0 -): - """remove segments from vector of labeled timebins - that are shorter than specified duration - - Parameters - ---------- - lbl_tb : numpy.ndarray - vector of labeled spectrogram time bins, i.e., - where each element is a label for a time bin. - Output of a neural network. - segment_inds_list : list - of numpy.ndarray, indices that will recover segments list from ``lbl_tb``. - Returned by function ``vak.labels.lbl_tb_segment_inds_list``. - timebin_dur : float - Duration of a single timebin in the spectrogram, in seconds. - Used to convert onset and offset indices in ``lbl_tb`` to seconds. - min_segment_dur : float - minimum duration of segment, in seconds. If specified, then - any segment with a duration less than min_segment_dur is - removed from lbl_tb. Default is None, in which case no - segments are removed. - unlabeled_label : int - label that was given to segments that were not labeled in annotation, - e.g. silent periods between annotated segments. Default is 0. - - Returns - ------- - lbl_tb : numpy.ndarray - with segments whose duration is shorter than ``min_segment_dur`` set to ``unlabeled_label`` - segment_inds_list : list - of numpy.ndarray, with arrays removed that represented - segments in ``lbl_tb`` that were shorter than ``min_segment_dur`` - """ - new_segment_inds_list = [] - - for segment_inds in segment_inds_list: - if segment_inds.shape[-1] * timebin_dur < min_segment_dur: - lbl_tb[segment_inds] = unlabeled_label - # DO NOT keep segment_inds array - else: - # do keep segment_inds array, don't change lbl_tb - new_segment_inds_list.append(segment_inds) - - return lbl_tb, new_segment_inds_list - - -def majority_vote_transform(lbl_tb, segment_inds_list): - """transform segments containing multiple labels - into segments with a single label by taking a "majority vote", - i.e. assign all time bins in the segment the most frequently - occurring label in the segment. - - Parameters - ---------- - lbl_tb : numpy.ndarray - vector of labeled spectrogram time bins, i.e., - where each element is a label for a time bin. - Output of a neural network. - segment_inds_list : list - of numpy.ndarray, indices that will recover segments list from lbl_tb. - Returned by funciton ``vak.labels.lbl_tb_segment_inds_list``. - - Returns - ------- - lbl_tb : numpy.ndarray - after the majority vote transform has been applied - """ - for segment_inds in segment_inds_list: - segment = lbl_tb[segment_inds] - majority = scipy.stats.mode(segment, keepdims=False)[0].item() - lbl_tb[segment_inds] = majority - - return lbl_tb - - -def lbl_tb2segments( - lbl_tb, labelmap, t, min_segment_dur=None, majority_vote=False, n_decimals_trunc=5 -): - """convert vector of labeled timebins into segments, - by finding where continuous runs of a single label start - and stop. Returns vectors of labels and onsets and offsets - in units of seconds. - - Parameters - ---------- - lbl_tb : numpy.ndarray - vector of labeled spectrogram time bins, i.e., - where each element is a label for a time bin. - Output of a neural network. - labelmap : dict - that maps labels to consecutive integers. - The mapping is inverted to convert back to labels. - t : numpy.ndarray - Vector of times; the times are bin centers of columns in a spectrogram. - Returned by function that generated spectrogram. - Used to convert onset and offset indices in lbl_tb to seconds. - min_segment_dur : float - minimum duration of segment, in seconds. If specified, then - any segment with a duration less than min_segment_dur is - removed from lbl_tb. Default is None, in which case no - segments are removed. - majority_vote : bool - if True, transform segments containing multiple labels - into segments with a single label by taking a "majority vote", - i.e. assign all time bins in the segment the most frequently - occurring label in the segment. This transform can only be - applied if the labelmap contains an 'unlabeled' label, - because unlabeled segments makes it possible to identify - the labeled segments. Default is False. - n_decimals_trunc : int - number of decimal places to keep when truncating the timebin duration - calculated from the vector of times t. Default is 5. - - Returns - ------- - labels : numpy.ndarray - vector where each element is a label for a segment with its onset - and offset indices given by the corresponding element in onset_inds - and offset_inds. - onsets_s : numpy.ndarray - vector where each element is the onset in seconds a segment. - Each onset corresponds to the value at the same index in labels. - offsets_s : numpy.ndarray - vector where each element is the offset in seconds of a segment. - Each offset corresponds to the value at the same index in labels. - """ - lbl_tb = column_or_1d(lbl_tb) - - if "unlabeled" in labelmap: - # handle the case when all time bins are predicted to be unlabeled - # see https://github.com/NickleDave/vak/issues/383 - uniq_lbl_tb = np.unique(lbl_tb) - if len(uniq_lbl_tb) == 1 and uniq_lbl_tb[0] == labelmap["unlabeled"]: - return None, None, None - - timebin_dur = timebin_dur_from_vec(t, n_decimals_trunc) - - if min_segment_dur is not None or majority_vote: - if "unlabeled" not in labelmap: - raise ValueError( - "min_segment_dur or majority_vote specified," - " but 'unlabeled' not in labelmap.\n" - "Without 'unlabeled' segments these transforms cannot be applied." - ) - segment_inds_list = lbl_tb_segment_inds_list( - lbl_tb, unlabeled_label=labelmap["unlabeled"] - ) - - if min_segment_dur is not None: - lbl_tb, segment_inds_list = remove_short_segments( - lbl_tb, - segment_inds_list, - timebin_dur, - min_segment_dur, - labelmap["unlabeled"], - ) - if len(segment_inds_list) == 0: # no segments left after removing - return None, None, None - - if majority_vote: - lbl_tb = majority_vote_transform(lbl_tb, segment_inds_list) - - labels, onset_inds, offset_inds = _segment_lbl_tb(lbl_tb) - - # remove 'unlabeled' label - if "unlabeled" in labelmap: - keep = np.where(labels != labelmap["unlabeled"])[0] - labels = labels[keep] - onset_inds = onset_inds[keep] - offset_inds = offset_inds[keep] - - # handle case where removing 'unlabeled' **after** clean-up leaves no segments - if all([len(vec) == 0 for vec in (labels, onset_inds, offset_inds)]): - return None, None, None - - inverse_labelmap = dict((v, k) for k, v in labelmap.items()) - labels = labels.tolist() - labels = np.asarray([inverse_labelmap[label] for label in labels]) - # the 'best' estimate we can get of onset and offset times, - # given binned times, and labels applied to each time bin, - # is "some time" between the last labeled bin for one segment, - # i.e. its offset, and the first labeled bin for the next - # segment, i.e. its onset. In other words if the whole bin is labeled - # as belonging to that segment, and the bin preceding it is labeled as - # belonging to the previous section, then the onset of the current - # segment must be the time between the two bins. To find those times - # we use the bin centers and either subtract (for onsets) or add - # (for offsets) half a timebin duration. This half a timebin - # duration puts our onsets and offsets at the time "between" bins. - onsets_s = t[onset_inds] - (timebin_dur / 2) - offsets_s = t[offset_inds] + (timebin_dur / 2) - - # but this estimate will be "wrong" if we set the onset or offset time - # outside the possible times in our timebin vector. Need to clean up. - if onsets_s[0] < 0.0: - onsets_s[0] = 0.0 - if offsets_s[-1] > t[-1]: - offsets_s[-1] = t[-1] - - return labels, onsets_s, offsets_s diff --git a/src/vak/labels.py b/src/vak/labels.py index 186ef5a06..2f1e99774 100644 --- a/src/vak/labels.py +++ b/src/vak/labels.py @@ -1,29 +1,42 @@ +from __future__ import annotations + +import numpy as np +import pandas as pd + from . import annotation -def to_map(labelset, map_unlabeled=True): - """map set of labels to series of consecutive integers from 0 to n inclusive, +def to_map(labelset: set, + map_unlabeled: bool = True) -> dict: + """Convert set of labels to `dict` + mapping those labels to a series of consecutive integers + from 0 to n inclusive, where n is the number of labels in the set. - This 'labelmap' is used when mapping labels from annotations of a vocalization into + This 'labelmap' is used when mapping labels + from annotations of a vocalization into a label for every time bin in a spectrogram of that vocalization. - If map_unlabeled is True, 'unlabeled' will be added to labelset, and will map to 0, + If ``map_unlabeled`` is True, then the label 'unlabeled' + will be added to labelset, and will map to 0, so the total number of classes is n + 1. Parameters ---------- labelset : set - of labels used to annotate a Dataset. + Set of labels used to annotate a dataset. map_unlabeled : bool - if True, include key 'unlabeled' in mapping. Any time bins in a spectrogram - that do not have a label associated with them, e.g. a silent gap between vocalizations, - will be assigned the integer that the 'unlabeled' key maps to. + If True, include key 'unlabeled' in mapping. + Any time bins in a spectrogram + that do not have a label associated with them, + e.g. a silent gap between vocalizations, + will be assigned the integer + that the 'unlabeled' key maps to. Returns ------- labelmap : dict - maps labels to integers + Maps labels to integers. """ if type(labelset) != set: raise TypeError(f"type of labelset must be set, got type {type(labelset)}") @@ -38,29 +51,35 @@ def to_map(labelset, map_unlabeled=True): return labelmap -def to_set(labels_list): - """given a list of labels from annotations, return the set of (unique) labels +def to_set(labels_list: list[np.ndarray | list]) -> set: + """Given a list of labels from annotations, + return the set of (unique) labels. Parameters ---------- labels_list : list - of lists, i.e. labels from annotations + Of labels from annotations, + either a list of numpy.ndarrays + or a list of lists. Returns ------- - labelset + labelset : set + Unique set of labels found in ``labels_list``. Examples -------- >>> labels_list = [voc.annot.labels for voc in vds.voc_list] >>> labelset = to_set(labels_list) + >>> print(labelset) + {'a', 'b', 'c', 'd', 'e'} """ all_labels = [lbl for labels in labels_list for lbl in labels] labelset = set(all_labels) return labelset -def from_df(vak_df): +def from_df(vak_df: pd.DataFrame) -> list[np.ndarray]: """returns labels for each vocalization in a dataset. Takes Pandas DataFrame representing the dataset, loads annotation for each row in the DataFrame, and then returns @@ -78,3 +97,66 @@ def from_df(vak_df): """ annots = annotation.from_df(vak_df) return [annot.seq.labels for annot in annots] + + +ALPHANUMERIC = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789' +DUMMY_SINGLE_CHAR_LABELS = [ + # some large range of characters not typically used as labels + chr(x) for x in range(162, 400) +] +# start with alphanumeric since more human readable; +# mapping can be arbitrary as long as it's consistent +DUMMY_SINGLE_CHAR_LABELS = ( + *ALPHANUMERIC, + *DUMMY_SINGLE_CHAR_LABELS +) + + +# added to fix https://github.com/NickleDave/vak/issues/373 +def multi_char_labels_to_single_char(labelmap: dict, skip: tuple[str] = ('unlabeled',)) -> dict: + """Return a copy of a ``labelmap`` where any + labels that are strings with multiple characters + are converted to single characters. + + This makes it possible to correctly compute metrics + like Levenshtein edit distance. + + Labels that are strings with multiple characters + are replaced by a single-label character from + the constant ``vak.labels.DUMMY_SINGLE_CHAR_LABELS``. + The replacement is grabbed with the index of the + multi-character label from the sorted ``dict``. + + Parameters + ---------- + labelmap : dict + That maps human-readable string labels + to integers. As returned by + ``vak.labels.to_map``. + skip : tuple + Of strings, labels to leave + as multiple characters. + Default is ('unlabeled',). + + Returns + ------- + labelmap : dict + Where any keys with multiple characters + in string are converted to dummy single characters. + """ + current_str_labels = sorted( + # sort to be extra sure we get same order every time + # (even though OrderedDict is now default in Python). + # Same order forces mapping to single characters to be deterministic across function calls. + labelmap.keys() + ) + new_labelmap = {} + for dummy_label_ind, label_str in enumerate(current_str_labels): + label_int = labelmap[label_str] + if len(label_str) > 1 and label_str not in skip: + # replace with dummy label + new_label_str = DUMMY_SINGLE_CHAR_LABELS[dummy_label_ind] + new_labelmap[new_label_str] = label_int + else: + new_labelmap[label_str] = label_int + return new_labelmap diff --git a/src/vak/transforms/__init__.py b/src/vak/transforms/__init__.py index c4cdee215..d2b334c42 100644 --- a/src/vak/transforms/__init__.py +++ b/src/vak/transforms/__init__.py @@ -1,2 +1,3 @@ +from . import labeled_timebins +from .defaults import get_defaults from .transforms import * -from vak.transforms.defaults import get_defaults diff --git a/src/vak/transforms/labeled_timebins/__init__.py b/src/vak/transforms/labeled_timebins/__init__.py new file mode 100644 index 000000000..69911c255 --- /dev/null +++ b/src/vak/transforms/labeled_timebins/__init__.py @@ -0,0 +1,7 @@ +from .functional import * +from .transforms import ( + FromSegments, + PostProcess, + ToLabels, + ToSegments, +) diff --git a/src/vak/transforms/labeled_timebins/functional.py b/src/vak/transforms/labeled_timebins/functional.py new file mode 100644 index 000000000..a45daace8 --- /dev/null +++ b/src/vak/transforms/labeled_timebins/functional.py @@ -0,0 +1,443 @@ +"""functional forms of transformations +related to labeled timebins, +i.e., vectors where each element represents +a label for a time bin from a spectrogram. + +This module is structured as followed: +- from_segments: transform to get labeled timebins from annotations +- to_labels: transform to get back just string labels from labeled timebins, + used to evaluate a model +- to_segments: transform to get back segment onsets, offsets, and labels from labeled timebins. + Inverse of ``from_segments``. +- post-processing transforms that can be used to "clean up" a vector of labeled timebins + - to_inds_list: helper function used to find segments in a vector of labeled timebins + - remove_short_segments: remove any segment less than a minimum duration + - take_majority_vote: take a "majority vote" within each segment bounded by the "unlabeled" label, + and apply the most "popular" label within each segment to all timebins in that segment + - postprocess: combines remove_short_segments and take_majority_vote in one transform +""" +from __future__ import annotations + +import numpy as np +import scipy.stats + +from ...timebins import timebin_dur_from_vec +from ...validators import column_or_1d, row_or_1d + + +__all__ = [ + # keep alphabetized + 'from_segments', + 'postprocess', + 'remove_short_segments', + 'take_majority_vote', + 'to_inds_list', + 'to_labels', + 'to_segments', +] + + +def from_segments(labels_int: np.ndarray, + onsets_s: np.ndarray, + offsets_s: np.ndarray, + time_bins: np.ndarray, + unlabeled_label: int = 0) -> np.ndarray: + """Make a vector of labels for a vector of time bins, + given labeled segments in the form of onset times, + offset times, and segment labels. + + Parameters + ---------- + labels_int : list, numpy.ndarray + A list or array of labels from the annotation for a vocalization, + mapped to integers + onsets_s : numpy.ndarray + 1-d vector of floats, segment onsets in seconds. + offsets_s : numpy.ndarray + 1-d vector of floats, segment offsets in seconds. + time_bins : numpy.ndarray + 1-d vector of floats, time in seconds for center of each time bin of a spectrogram. + unlabeled_label : int + Label assigned to time bins that do not have labels associated with them. + Default is 0. + + Returns + ------- + lbl_tb : numpy.ndarray + same length as time_bins, with each element a label for each time bin + """ + if ( + ( + type(labels_int) == list + and not all([type(lbl) == int for lbl in labels_int]) + ) or + ( + type(labels_int) == np.ndarray + and labels_int.dtype not in [np.int8, np.int16, np.int32, np.int64] + ) + ): + raise TypeError("labels_int must be a list or numpy.ndarray of integers") + + label_vec = np.ones((time_bins.shape[-1],), dtype="int8") * unlabeled_label + onset_inds = [np.argmin(np.abs(time_bins - onset)) for onset in onsets_s] + offset_inds = [np.argmin(np.abs(time_bins - offset)) for offset in offsets_s] + for label, onset, offset in zip(labels_int, onset_inds, offset_inds): + # offset_inds[ind]+1 because offset time bin is still "part of" syllable + label_vec[onset:offset + 1] = label + + return label_vec + + +def to_labels(lbl_tb: np.ndarray, labelmap: dict) -> str: + """Convert vector of labeled timebins to a string, + one character for each continuous segment. + + Allows for converting output of network + from a label for each frame + to one label for each continuous segment, + in order to compute string-based metrics like edit distance. + + Parameters + ---------- + lbl_tb : numpy.ndarray + Where each element is a label for a frame / time bin. + Typically, the output of a neural network. + labelmap : dict + That maps string labels to integers. + The mapping is inverted to convert back to string labels. + + Returns + ------- + labels : str + The label at the onset of each continuous segment + in ``lbl_tb``, mapped back to string labels in ``labelmap``. + """ + lbl_tb = row_or_1d(lbl_tb) + + onset_inds = np.diff(lbl_tb, axis=0).astype(bool) + onset_inds = np.insert(onset_inds, 0, True) + + labels = lbl_tb[onset_inds] + + # remove 'unlabeled' label + if "unlabeled" in labelmap: + labels = labels[labels != labelmap["unlabeled"]] + + if len(labels) < 1: # if removing all the 'unlabeled' leaves nothing + return "" + + # only invert mapping and then map integer labels to characters + inverse_labelmap = dict((v, k) for k, v in labelmap.items()) + labels = labels.tolist() + labels = [inverse_labelmap[label] for label in labels] + + return "".join(labels) + + +def to_segments( + lbl_tb: np.ndarray, + labelmap: dict, + t: np.ndarray, + n_decimals_trunc: int = 5 +) -> tuple[np.ndarray, np.ndarray, np.ndarray]: + """Convert a vector of labeled time bins + into segments in the form of onset indices, + offset indices, and labels. + + Finds where continuous runs of a single label start + and stop in timebins, and considers each of these runs + a segment. + + The function returns vectors of labels and onsets and offsets + in units of seconds. + + Parameters + ---------- + lbl_tb : numpy.ndarray + Vector of labeled spectrogram time bins, i.e., + where each element is a label for a time bin. + Output of a neural network. + labelmap : dict + That maps labels to consecutive integers. + The mapping is inverted to convert back to labels. + t : numpy.ndarray + Vector of times; the times are bin centers of columns in a spectrogram. + Returned by function that generated spectrogram. + Used to convert onset and offset indices in lbl_tb to seconds. + n_decimals_trunc : int + Number of decimal places to keep when truncating the timebin duration + calculated from the vector of times t. Default is 5. + + Returns + ------- + labels : numpy.ndarray + Vector where each element is a label for a segment with its onset + and offset indices given by the corresponding element in onset_inds + and offset_inds. + onsets_s : numpy.ndarray + Vector where each element is the onset in seconds a segment. + Each onset corresponds to the value at the same index in labels. + offsets_s : numpy.ndarray + Vector where each element is the offset in seconds of a segment. + Each offset corresponds to the value at the same index in labels. + """ + lbl_tb = column_or_1d(lbl_tb) + + if "unlabeled" in labelmap: + # handle the case when all time bins are predicted to be unlabeled + # see https://github.com/NickleDave/vak/issues/383 + uniq_lbl_tb = np.unique(lbl_tb) + if len(uniq_lbl_tb) == 1 and uniq_lbl_tb[0] == labelmap["unlabeled"]: + return '', None, None + + # used to find onsets/offsets below; compute here so if we fail we do so early + timebin_dur = timebin_dur_from_vec(t, n_decimals_trunc) + + offset_inds = np.nonzero(np.diff(lbl_tb, axis=0))[0] # [0] because nonzero return tuple + onset_inds = offset_inds + 1 + offset_inds = np.concatenate((offset_inds, np.asarray([lbl_tb.shape[0] - 1]))) + onset_inds = np.concatenate((np.asarray([0]), onset_inds)) + labels = lbl_tb[onset_inds] + + # remove 'unlabeled' label + if "unlabeled" in labelmap: + keep = np.where(labels != labelmap["unlabeled"])[0] + labels = labels[keep] + onset_inds = onset_inds[keep] + offset_inds = offset_inds[keep] + + # handle case where removing 'unlabeled' leaves no segments + if all([len(vec) == 0 for vec in (labels, onset_inds, offset_inds)]): + return "", None, None + + inverse_labelmap = dict((v, k) for k, v in labelmap.items()) + labels = labels.tolist() + labels = np.asarray([inverse_labelmap[label] for label in labels]) + # the 'best' estimate we can get of onset and offset times, + # given binned times, and labels applied to each time bin, + # is "some time" between the last labeled bin for one segment, + # i.e. its offset, and the first labeled bin for the next + # segment, i.e. its onset. In other words if the whole bin is labeled + # as belonging to that segment, and the bin preceding it is labeled as + # belonging to the previous section, then the onset of the current + # segment must be the time between the two bins. To find those times + # we use the bin centers and either subtract (for onsets) or add + # (for offsets) half a timebin duration. This half a timebin + # duration puts our onsets and offsets at the time "between" bins. + onsets_s = t[onset_inds] - (timebin_dur / 2) + offsets_s = t[offset_inds] + (timebin_dur / 2) + + # but this estimate will be "wrong" if we set the onset or offset time + # outside the possible times in our timebin vector. Need to clean up. + if onsets_s[0] < 0.0: + onsets_s[0] = 0.0 + if offsets_s[-1] > t[-1]: + offsets_s[-1] = t[-1] + + return labels, onsets_s, offsets_s + + +def to_inds_list(lbl_tb: np.ndarray, unlabeled_label: int = 0) -> list[np.ndarray]: + """Given a vector of labeled timebins, + returns a list of indexing vectors, + one for each labeled segment in the vector. + + Parameters + ---------- + lbl_tb : numpy.ndarray + Vector of labeled timebins from spectrogram + unlabeled_label : int + Label that was given to segments that were not labeled in annotation, + e.g. silent periods between annotated segments. Default is 0. + return_inds : bool + If True, return list of indices for segments in lbl_tb, in addition to the segments themselves. + If False, just return list of numpy.ndarrays that are the segments from lbl_tb. + + Returns + ------- + segment_inds_list : list + of numpy.ndarray, indices that will recover segments list from lbl_tb. + """ + segment_inds = np.nonzero(lbl_tb != unlabeled_label)[0] + return np.split(segment_inds, np.where(np.diff(segment_inds) != 1)[0] + 1) + + +def remove_short_segments( + lbl_tb: np.ndarray, + segment_inds_list: list[np.ndarray], + timebin_dur: float, + min_segment_dur: float | int, + unlabeled_label: int = 0 +) -> tuple[np.ndarray, list[np.ndarray]]: + """Remove segments from vector of labeled timebins + that are shorter than a specified duration. + + Parameters + ---------- + lbl_tb : numpy.ndarray + Vector of labeled spectrogram time bins, i.e., + where each element is a label for a time bin. + Output of a neural network. + segment_inds_list : list + Of numpy.ndarray, indices that will recover segments list from ``lbl_tb``. + Returned by function ``vak.labels.lbl_tb_segment_inds_list``. + timebin_dur : float + Duration of a single timebin in the spectrogram, in seconds. + Used to convert onset and offset indices in ``lbl_tb`` to seconds. + min_segment_dur : float + Minimum duration of segment, in seconds. If specified, then + any segment with a duration less than min_segment_dur is + removed from lbl_tb. Default is None, in which case no + segments are removed. + unlabeled_label : int + Label that was given to segments that were not labeled in annotation, + e.g. silent periods between annotated segments. Default is 0. + + Returns + ------- + lbl_tb : numpy.ndarray + With segments whose duration is shorter than ``min_segment_dur`` + set to ``unlabeled_label`` + segment_inds_list : list + Of numpy.ndarray, with arrays removed that represented + segments in ``lbl_tb`` that were shorter than ``min_segment_dur``. + """ + new_segment_inds_list = [] + + for segment_inds in segment_inds_list: + if segment_inds.shape[-1] * timebin_dur < min_segment_dur: + lbl_tb[segment_inds] = unlabeled_label + # DO NOT keep segment_inds array + else: + # do keep segment_inds array, don't change lbl_tb + new_segment_inds_list.append(segment_inds) + + return lbl_tb, new_segment_inds_list + + +def take_majority_vote(lbl_tb: np.ndarray, + segment_inds_list: list[np.ndarray]) -> np.ndarray: + """Transform segments containing multiple labels + into segments with a single label by taking a "majority vote", + i.e. assign all time bins in the segment the most frequently + occurring label in the segment. + + Parameters + ---------- + lbl_tb : numpy.ndarray + Vector of labeled spectrogram time bins, i.e., + where each element is a label for a time bin. + Output of a neural network. + segment_inds_list : list + Of numpy.ndarray, indices that will recover segments list from lbl_tb. + Returned by function ``vak.labels.lbl_tb_segment_inds_list``. + + Returns + ------- + lbl_tb : numpy.ndarray + After the majority vote transform has been applied. + """ + for segment_inds in segment_inds_list: + segment = lbl_tb[segment_inds] + majority = scipy.stats.mode(segment, keepdims=False)[0].item() + lbl_tb[segment_inds] = majority + + return lbl_tb + + +def postprocess( + lbl_tb: np.ndarray, + timebin_dur: float, + unlabeled_label: int = 0, + min_segment_dur: float | None = None, + majority_vote: bool = False, +) -> np.ndarray: + """Apply post-processing transformations + to a vector of labeled timebins. + + Optional post-processing + consist of two transforms, + that both rely on there being a label + that corresponds to the "unlabeled" + (or "background") class. + The first removes any segments that are + shorter than a specified duration, + by converting labels in those segments to the + "background" / "unlabeled" class label. + The second performs a "majority vote" + transform within run of labels that is + bordered on both sides by the "background" label. + I.e., it counts the number of times any + label occurs in that segment, + and then assigns all bins the most common label. + + The function performs those steps in this order + (pseudo-code): + + .. code-block:: + + if min_segment_dur: + lbl_tb = remove_short_segments(lbl_tb, labelmap, min_segment_dur) + if majority_vote: + lbl_tb = majority_vote(lbl_tb, labelmap) + return lbl_tb + + Parameters + ---------- + lbl_tb : numpy.ndarray + Vector of labeled spectrogram time bins, i.e., + where each element is a label for a time bin. + Output of a neural network. + timebin_dur : float + Duration of a time bin in a spectrogram, + e.g., as estimated from vector of times + using ``vak.timebins.timebin_dur_from_vec``. + unlabeled_label : int + Label that was given to segments that were not labeled in annotation, + e.g. silent periods between annotated segments. Default is 0. + min_segment_dur : float + Minimum duration of segment, in seconds. If specified, then + any segment with a duration less than min_segment_dur is + removed from lbl_tb. Default is None, in which case no + segments are removed. + majority_vote : bool + If True, transform segments containing multiple labels + into segments with a single label by taking a "majority vote", + i.e. assign all time bins in the segment the most frequently + occurring label in the segment. This transform can only be + applied if the labelmap contains an 'unlabeled' label, + because unlabeled segments makes it possible to identify + the labeled segments. Default is False. + + Returns + ------- + lbl_tb : numpy.ndarray + Vector of labeled timebins after post-processing is applied. + """ + lbl_tb = row_or_1d(lbl_tb) + + # handle the case when all time bins are predicted to be unlabeled + # see https://github.com/NickleDave/vak/issues/383 + uniq_lbl_tb = np.unique(lbl_tb) + if len(uniq_lbl_tb) == 1 and uniq_lbl_tb[0] == unlabeled_label: + return lbl_tb # -> no need to do any of the post-processing + + segment_inds_list = to_inds_list( + lbl_tb, unlabeled_label=unlabeled_label + ) + + if min_segment_dur is not None: + lbl_tb, segment_inds_list = remove_short_segments( + lbl_tb, + segment_inds_list, + timebin_dur, + min_segment_dur, + unlabeled_label, + ) + if len(segment_inds_list) == 0: # no segments left after removing + return lbl_tb # -> no need to do any of the post-processing + + if majority_vote: + lbl_tb = take_majority_vote(lbl_tb, segment_inds_list) + + return lbl_tb diff --git a/src/vak/transforms/labeled_timebins/transforms.py b/src/vak/transforms/labeled_timebins/transforms.py new file mode 100644 index 000000000..7f4742e91 --- /dev/null +++ b/src/vak/transforms/labeled_timebins/transforms.py @@ -0,0 +1,263 @@ +"""class forms of transformations +related to labeled timebins, +i.e., vectors where each element represents +a label for a time bin from a spectrogram. + +These classes call functions from +``vak.transforms.labeled_timebins.functional``. +Not all functions in that module +have a corresponding class, +just key functions needed by +dataloaders and models. + +- FromSegments: transform to get labeled timebins from annotations +- ToLabels: transform to get back just string labels from labeled timebins, + used to evaluate a model. +- ToSegments: transform to get segment onsets, offsets, and labels from labeled timebins. + Used to convert model output to predictions. + Inverse of ``from_segments``. +- PostProcess: combines two post-processing transforms applied to labeled timebins, + ``remove_short_segments`` and ``take_majority_vote``, in one class. +""" +from __future__ import annotations + +import numpy as np + +from . import functional as F + + +class FromSegments: + """Transform that makes a vector of labels for a vector of time bins, + given labeled segments in the form of onset times, + offset times, and segment labels. + + Attributes + ---------- + unlabeled_label : int + Label assigned to time bins that do not have labels associated with them. + Default is 0. + """ + def __init__(self, unlabeled_label: int = 0): + self.unlabeled_label = unlabeled_label + + def __call__(self, + labels_int: np.ndarray, + onsets_s: np.ndarray, + offsets_s: np.ndarray, + time_bins: np.ndarray) -> np.ndarray: + """Make a vector of labels for a vector of time bins, + given labeled segments in the form of onset times, + offset times, and segment labels. + + Parameters + ---------- + labels_int : list, numpy.ndarray + A list or array of labels from the annotation for a vocalization, + mapped to integers + onsets_s : numpy.ndarray + 1-d vector of floats, segment onsets in seconds. + offsets_s : numpy.ndarray + 1-d vector of floats, segment offsets in seconds. + time_bins : numpy.ndarray + 1-d vector of floats, time in seconds for center of each time bin of a spectrogram. + + Returns + ------- + lbl_tb : numpy.ndarray + same length as time_bins, with each element a label for each time bin + """ + return F.from_segments(labels_int, onsets_s, offsets_s, time_bins, + unlabeled_label=self.unlabeled_label) + + +class ToLabels: + """Transforms that converts + vector of labeled timebins to a string, + one character for each continuous segment. + + Allows for converting output of network + from a label for each frame + to one label for each continuous segment, + in order to compute string-based metrics like edit distance. + + Attributes + ---------- + labelmap : dict + That maps string labels to integers. + The mapping is inverted to convert back to string labels. + """ + def __init__(self, labelmap: dict): + self.labelmap = labelmap + + def __call__(self, lbl_tb: np.ndarray) -> str: + """Convert vector of labeled timebins to a string, + one character for each continuous segment. + + Parameters + ---------- + lbl_tb : numpy.ndarray + Where each element is a label for a frame / time bin. + Typically, the output of a neural network. + + Returns + ------- + labels : str + The label at the onset of each continuous segment + in ``lbl_tb``, mapped back to string labels in ``labelmap``. + """ + return F.to_labels(lbl_tb, self.labelmap) + + +class ToSegments: + """Transform that converts a vector of labeled time bins + into segments in the form of onset indices, + offset indices, and labels. + + Finds where continuous runs of a single label start + and stop in timebins, and considers each of these runs + a segment. + + The function returns vectors of labels and onsets and offsets + in units of seconds. + + Attributes + ---------- + labelmap : dict + That maps string labels to integers. + The mapping is inverted to convert back to string labels. + n_decimals_trunc : int + Number of decimal places to keep when truncating the timebin duration + calculated from the vector of times t. Default is 5. + """ + + def __init__(self, + labelmap: dict, + n_decimals_trunc: int = 5 + ): + self.labelmap = labelmap + self.n_decimals_trunc = n_decimals_trunc + + def __call__(self, + lbl_tb: np.ndarray, + t: np.ndarray) -> tuple[np.ndarray, np.ndarray, np.ndarray]: + """Convert a vector of labeled time bins + into segments in the form of onset indices, + offset indices, and labels. + + Finds where continuous runs of a single label start + and stop in timebins, and considers each of these runs + a segment. + + The function returns vectors of labels and onsets and offsets + in units of seconds. + + Parameters + ---------- + lbl_tb : numpy.ndarray + Vector of labeled spectrogram time bins, i.e., + where each element is a label for a time bin. + Output of a neural network. + t : numpy.ndarray + Vector of times; the times are bin centers of columns in a spectrogram. + Returned by function that generated spectrogram. + Used to convert onset and offset indices in lbl_tb to seconds. + + Returns + ------- + labels : numpy.ndarray + Vector where each element is a label for a segment with its onset + and offset indices given by the corresponding element in onset_inds + and offset_inds. + onsets_s : numpy.ndarray + Vector where each element is the onset in seconds a segment. + Each onset corresponds to the value at the same index in labels. + offsets_s : numpy.ndarray + Vector where each element is the offset in seconds of a segment. + Each offset corresponds to the value at the same index in labels. + """ + return F.to_segments(lbl_tb, self.labelmap, t, self.n_decimals_trunc) + + +class PostProcess: + """Apply post-processing transformations + to a vector of labeled timebins. + + Optional post-processing + consist of two transforms, + that both rely on there being a label + that corresponds to the "unlabeled" + (or "background") class. + The first removes any segments that are + shorter than a specified duration, + by converting labels in those segments to the + "background" / "unlabeled" class label. + The second performs a "majority vote" + transform within run of labels that is + bordered on both sides by the "background" label. + I.e., it counts the number of times any + label occurs in that segment, + and then assigns all bins the most common label. + + The function performs those steps in this order + (pseudo-code): + + .. code-block:: + + if min_segment_dur: + lbl_tb = remove_short_segments(lbl_tb, labelmap, min_segment_dur) + if majority_vote: + lbl_tb = majority_vote(lbl_tb, labelmap) + return lbl_tb + + Attributes + ---------- + timebin_dur : float + Duration of a time bin in a spectrogram, + e.g., as estimated from vector of times + using ``vak.timebins.timebin_dur_from_vec``. + unlabeled_label : int + Label that was given to segments that were not labeled in annotation, + e.g. silent periods between annotated segments. Default is 0. + min_segment_dur : float + Minimum duration of segment, in seconds. If specified, then + any segment with a duration less than min_segment_dur is + removed from lbl_tb. Default is None, in which case no + segments are removed. + majority_vote : bool + If True, transform segments containing multiple labels + into segments with a single label by taking a "majority vote", + i.e. assign all time bins in the segment the most frequently + occurring label in the segment. This transform can only be + applied if the labelmap contains an 'unlabeled' label, + because unlabeled segments makes it possible to identify + the labeled segments. Default is False. + """ + def __init__(self, + timebin_dur: float, + unlabeled_label: int = 0, + min_segment_dur: float | None = None, + majority_vote: bool = False, + ): + self.timebin_dur = timebin_dur + self.unlabeled_label = unlabeled_label + self.min_segment_dur = min_segment_dur + self.majority_vote = majority_vote + + def __call__(self, + lbl_tb: np.ndarray) -> np.ndarray: + """Convert vector of labeled timebins into labels. + + Parameters + ---------- + lbl_tb : numpy.ndarray + Vector of labeled spectrogram time bins, i.e., + where each element is a label for a time bin. + Output of a neural network. + + Returns + ------- + lbl_tb : numpy.ndarray + Vector of labeled timebins after post-processing is applied. + """ + return F.postprocess(lbl_tb, self.timebin_dur, self.unlabeled_label, + self.min_segment_dur, self.majority_vote) diff --git a/tests/fixtures/annot.py b/tests/fixtures/annot.py index 6f635402b..31b2c6c9a 100644 --- a/tests/fixtures/annot.py +++ b/tests/fixtures/annot.py @@ -4,50 +4,79 @@ import toml +from .config import GENERATED_TEST_CONFIGS_ROOT from .test_data import SOURCE_TEST_DATA_ROOT -@pytest.fixture -def annot_file_yarden(source_test_data_root): - return source_test_data_root.joinpath( + +ANNOT_FILE_YARDEN = SOURCE_TEST_DATA_ROOT.joinpath( "spect_mat_annot_yarden", "llb3", "llb3_annot_subset.mat" ) @pytest.fixture -def annot_list_yarden(annot_file_yarden): - scribe = crowsetta.Transcriber(format="yarden") - annot_list = scribe.from_file(annot_file_yarden) - return annot_list +def annot_file_yarden(): + return ANNOT_FILE_YARDEN + + +scribe_yarden = crowsetta.Transcriber(format="yarden") +ANNOT_LIST_YARDEN = scribe_yarden.from_file(ANNOT_FILE_YARDEN) + + +@pytest.fixture +def annot_list_yarden(): + return ANNOT_LIST_YARDEN + + +LABELSET_YARDEN = [ + str(an_int) + for an_int in [1, 2, 3, 6, 7, 8, 9, 10, 11, 12, 13, 14, 16, 17, 18, 19] +] @pytest.fixture def labelset_yarden(): """labelset as it would be loaded from a toml file - don't return a set because we need to use this to test functions that convert it to a set + don't return a set because we need to use this to test functions that convert it to a set. + We also don't use a config for this since it's entered there as a "label string" """ - return [ - str(an_int) - for an_int in [1, 2, 3, 6, 7, 8, 9, 10, 11, 12, 13, 14, 16, 17, 18, 19] - ] + return LABELSET_YARDEN + + +ANNOT_DIR_NOTMAT = SOURCE_TEST_DATA_ROOT.joinpath("audio_cbin_annot_notmat", "gy6or6", "032312") @pytest.fixture -def annot_dir_notmat(source_test_data_root): - return source_test_data_root.joinpath("audio_cbin_annot_notmat", "gy6or6", "032312") +def annot_dir_notmat(): + return ANNOT_DIR_NOTMAT + + +ANNOT_FILES_NOTMAT = sorted(ANNOT_DIR_NOTMAT.glob("*.not.mat")) @pytest.fixture -def annot_files_notmat(annot_dir_notmat): - return sorted(annot_dir_notmat.glob("*.not.mat")) +def annot_files_notmat(): + return ANNOT_FILES_NOTMAT + + +scribe_notmat = crowsetta.Transcriber(format="notmat") +ANNOT_LIST_NOTMAT = scribe_notmat.from_file(ANNOT_FILES_NOTMAT) @pytest.fixture -def annot_list_notmat(annot_files_notmat): - scribe = crowsetta.Transcriber(format="notmat") - annot_list = scribe.from_file(annot_files_notmat) - return annot_list +def annot_list_notmat(): + return ANNOT_LIST_NOTMAT + + + +a_train_notmat_config = sorted( + GENERATED_TEST_CONFIGS_ROOT.glob("*train*notmat*toml") +)[0] # get first config.toml from glob list +# doesn't really matter which config, they all have labelset +with a_train_notmat_config.open("r") as fp: + a_train_notmat_toml = toml.load(fp) +LABELSET_NOTMAT = a_train_notmat_toml["PREP"]["labelset"] @pytest.fixture @@ -55,64 +84,78 @@ def labelset_notmat(generated_test_configs_root): """labelset as it would be loaded from a toml file don't return a set because we need to use this to test functions that convert it to a set""" - a_train_notmat_config = sorted( - generated_test_configs_root.glob("*train*notmat*toml") - )[ - 0 - ] # get first config.toml from glob list - # doesn't really matter which config, they all have labelset - with a_train_notmat_config.open("r") as fp: - a_train_notmat_toml = toml.load(fp) - labelset = a_train_notmat_toml["PREP"]["labelset"] - return labelset - - -@pytest.fixture -def annot_file_birdsongrec(source_test_data_root): - return source_test_data_root.joinpath( - "audio_wav_annot_birdsongrec", "Bird0", "Annotation.xml" - ) + return LABELSET_NOTMAT + + +ANNOT_FILE_BIRDSONGREC = SOURCE_TEST_DATA_ROOT.joinpath( + "audio_wav_annot_birdsongrec", "Bird0", "Annotation.xml" +) + + +@pytest.fixture +def annot_file_birdsongrec(): + return ANNOT_FILE_BIRDSONGREC + + +scribe_birdsongrec = crowsetta.Transcriber(format="birdsong-recognition-dataset") +ANNOT_LIST_BIRDSONGREC = scribe_birdsongrec.from_file(ANNOT_FILE_BIRDSONGREC) @pytest.fixture -def annot_list_birdsongrec(annot_file_birdsongrec): - scribe = crowsetta.Transcriber(format="birdsong-recognition-dataset") - annot_list = scribe.from_file(annot_file_birdsongrec) - return annot_list +def annot_list_birdsongrec(): + return ANNOT_LIST_BIRDSONGREC + + +ANNOT_DIR_TEXTGRID = SOURCE_TEST_DATA_ROOT.joinpath("audio_wav_annot_textgrid", "AGBk") @pytest.fixture -def annot_dir_textgrid(source_test_data_root): - return source_test_data_root.joinpath("audio_wav_annot_textgrid", "AGBk") +def annot_dir_textgrid(): + return ANNOT_DIR_TEXTGRID + + +ANNOT_FILES_TEXTGRID = sorted(ANNOT_DIR_TEXTGRID.glob("*.TextGrid")) @pytest.fixture -def annot_files_textgrid(annot_dir_textgrid): - return sorted(annot_dir_textgrid.glob("*.TextGrid")) +def annot_files_textgrid(): + return ANNOT_FILES_TEXTGRID + + +scribe_textgrid = crowsetta.Transcriber(format="textgrid") +ANNOT_LIST_TEXTGRID = scribe_textgrid.from_file(ANNOT_FILES_TEXTGRID) @pytest.fixture -def annot_list_textgrid(annot_files_textgrid): - scribe = crowsetta.Transcriber(format="textgrid") - annot_list = scribe.from_file(annot_files_textgrid) - return annot_list +def annot_list_textgrid(): + return ANNOT_LIST_TEXTGRID + + +ANNOT_DIR_SIMPLE_SEQ = SOURCE_TEST_DATA_ROOT.joinpath( + "audio_cbin_annot_simple_seq", "gy6or6", "032312" +) @pytest.fixture -def annot_dir_simple_seq(source_test_data_root): - return source_test_data_root.joinpath("audio_cbin_annot_simple_seq", "gy6or6", "032312") +def annot_dir_simple_seq(): + return ANNOT_DIR_SIMPLE_SEQ + + +ANNOT_FILES_SIMPLE_SEQ = sorted(ANNOT_DIR_SIMPLE_SEQ.glob("*.cbin.csv")) @pytest.fixture -def annot_files_simple_seq(annot_dir_simple_seq): - return sorted(annot_dir_simple_seq.glob("*.cbin.csv")) +def annot_files_simple_seq(): + return ANNOT_FILES_SIMPLE_SEQ + + +scribe_simple_seq = crowsetta.Transcriber(format="simple-seq") +ANNOT_LIST_SIMPLE_SEQ = scribe_simple_seq.from_file(ANNOT_FILES_SIMPLE_SEQ) @pytest.fixture -def annot_list_simple_seq(annot_files_simple_seq): - scribe = crowsetta.Transcriber(format="simple-seq") - annot_list = scribe.from_file(annot_files_simple_seq) - return annot_list +def annot_list_simple_seq(): + return ANNOT_LIST_SIMPLE_SEQ @pytest.fixture diff --git a/tests/fixtures/config.py b/tests/fixtures/config.py index cf586e0e6..0eb764ab4 100644 --- a/tests/fixtures/config.py +++ b/tests/fixtures/config.py @@ -5,6 +5,8 @@ import pytest import toml +from .test_data import GENERATED_TEST_DATA_ROOT + @pytest.fixture def test_configs_root(test_data_root): @@ -65,9 +67,12 @@ def invalid_option_config_path(test_configs_root): return test_configs_root.joinpath("invalid_option_config.toml") +GENERATED_TEST_CONFIGS_ROOT = GENERATED_TEST_DATA_ROOT.joinpath("configs") + + @pytest.fixture -def generated_test_configs_root(generated_test_data_root): - return generated_test_data_root.joinpath("configs") +def generated_test_configs_root(): + return GENERATED_TEST_CONFIGS_ROOT # ---- path to config files ---- diff --git a/tests/test_labeled_timebins.py b/tests/test_labeled_timebins.py index fa32b37ab..5cbd8a033 100644 --- a/tests/test_labeled_timebins.py +++ b/tests/test_labeled_timebins.py @@ -5,276 +5,26 @@ import vak.labeled_timebins -def test_has_unlabeled(): - labels_1 = [1, 1, 1, 1, 2, 2, 3, 3, 3] - onsets_s1 = np.asarray([0, 2, 4, 6, 8, 10, 12, 14, 16]) - offsets_s1 = np.asarray([1, 3, 5, 7, 9, 11, 13, 15, 17]) - time_bins = np.arange(0, 18, 0.001) - has_ = vak.labeled_timebins.has_unlabeled( - labels_1, onsets_s1, offsets_s1, time_bins - ) - assert has_ - - labels_1 = [1, 1, 1, 1, 2, 2, 3, 3, 3] - onsets_s1 = np.asarray([0, 2, 4, 6, 8, 10, 12, 14, 16]) - offsets_s1 = np.asarray( - [1.999, 3.999, 5.999, 7.999, 9.999, 11.999, 13.999, 15.999, 17.999] - ) - time_bins = np.arange(0, 18, 0.001) - has_ = vak.labeled_timebins.has_unlabeled( - labels_1, onsets_s1, offsets_s1, time_bins - ) - assert has_ is False - - -@pytest.mark.parametrize( - "labeled_timebins, labels_mapping, spect_ID_vector, expected_labels", - [ - (np.array([0, 0, 1, 1, 0, 0, 2, 2, 0, 0]), {'unlabeled': 0, 'a': 1, 'b': 2}, None, 'ab'), - (np.array([0, 0, 1, 1, 0, 0, 2, 2, 0, 0]), {'unlabeled': 0, '1': 1, '2': 2}, None, '12'), - (np.array([0, 0, 21, 21, 0, 0, 22, 22, 0, 0]), {'unlabeled': 0, '21': 21, '22': 22}, None, 'AB'), - (np.array([0, 0, 11, 11, 0, 0, 12, 12, 0, 0]), {'unlabeled': 0, '11': 11, '12': 12}, None, 'AB'), - ] -) -def test_lbl_tb2labels(labeled_timebins, labels_mapping, spect_ID_vector, expected_labels): - labels = vak.labeled_timebins.lbl_tb2labels(labeled_timebins, labels_mapping, spect_ID_vector) - assert labels == expected_labels - - -def test_segment_lbl_tb(): - lbl_tb = np.asarray([0, 0, 0, 0, 1, 1, 1, 1, 0, 0, 0, 0]) - labels, onset_inds, offset_inds = vak.labeled_timebins._segment_lbl_tb(lbl_tb) - assert np.array_equal(labels, np.asarray([0, 1, 0])) - assert np.array_equal(onset_inds, np.asarray([0, 4, 8])) - assert np.array_equal(offset_inds, np.asarray([3, 7, 11])) - - -@pytest.mark.parametrize( - "lbl_tb, seg_inds_list_expected", - [ - (np.asarray([0, 0, 0, 0, 1, 1, 1, 1, 0, 0, 0, 0]), [np.array([4, 5, 6, 7])]), - # assert works when segment is at start of lbl_tb - (np.asarray([1, 1, 1, 1, 0, 0, 0, 0]), [np.array([0, 1, 2, 3])]), - # assert works with multiple segments - ( - np.array([0, 0, 0, 1, 1, 1, 0, 0, 0, 1, 2, 1, 0, 0]), - [np.array([3, 4, 5]), np.array([9, 10, 11])], - ), - # assert works when a segment is at end of lbl_tb - ( - np.array([0, 0, 0, 1, 1, 1, 0, 0, 0, 1, 2, 1]), - [np.array([3, 4, 5]), np.array([9, 10, 11])], - ), - ], -) -def test_lbl_tb_segment_inds_list(lbl_tb, seg_inds_list_expected): - UNLABELED = 0 - - seg_inds_list = vak.labeled_timebins.lbl_tb_segment_inds_list( - lbl_tb=lbl_tb, unlabeled_label=UNLABELED - ) - assert np.array_equal(seg_inds_list, seg_inds_list_expected) - - -def test_remove_short_segments(): - UNLABELED = 0 - - # should do nothing when a labeled segment has all the same labels - lbl_tb = np.asarray([0, 0, 0, 0, 1, 1, 1, 1, 0, 0, 1, 0]) - segment_inds_list = vak.labeled_timebins.lbl_tb_segment_inds_list( - lbl_tb, unlabeled_label=UNLABELED - ) - TIMEBIN_DUR = 0.001 - MIN_SEGMENT_DUR = 0.002 - lbl_tb_tfm, segment_inds_list_out = vak.labeled_timebins.remove_short_segments( - lbl_tb, - segment_inds_list, - timebin_dur=TIMEBIN_DUR, - min_segment_dur=MIN_SEGMENT_DUR, - unlabeled_label=UNLABELED, - ) - - lbl_tb_expected = np.asarray([0, 0, 0, 0, 1, 1, 1, 1, 0, 0, 0, 0]) - assert np.array_equal(lbl_tb_tfm, lbl_tb_expected) - - @pytest.mark.parametrize( - "lbl_tb_in, lbl_tb_expected", + 'labels, onsets, offsets, time_bins, expected_output', [ - # should do nothing when a labeled segment has all the same labels - ( - np.asarray([0, 0, 0, 0, 1, 1, 1, 1, 0, 0, 0, 0]), - np.asarray([0, 0, 0, 0, 1, 1, 1, 1, 0, 0, 0, 0]), - ), ( - np.array([0, 0, 0, 1, 1, 1, 0, 0, 0, 1, 2, 1, 0, 0]), - np.array([0, 0, 0, 1, 1, 1, 0, 0, 0, 1, 1, 1, 0, 0]), + [1, 1, 1, 1, 2, 2, 3, 3, 3], + np.asarray([0, 2, 4, 6, 8, 10, 12, 14, 16]), + np.asarray([1, 3, 5, 7, 9, 11, 13, 15, 17]), + np.arange(0, 18, 0.001), + True ), - # test MajorityVote works when there is no 'unlabeled' segment at start of vector - (np.asarray([1, 1, 2, 1, 0, 0, 0, 0]), np.asarray([1, 1, 1, 1, 0, 0, 0, 0])), - # test MajorityVote works when there is no 'unlabeled' segment at end of vector ( - np.array([0, 0, 0, 1, 1, 1, 0, 0, 0, 1, 2, 1]), - np.array([0, 0, 0, 1, 1, 1, 0, 0, 0, 1, 1, 1]), - ), - # test that a tie results in lowest value class winning, default behavior of scipy.stats.mode - ( - np.array([0, 0, 0, 1, 1, 1, 0, 0, 0, 1, 1, 2, 2]), - np.array([0, 0, 0, 1, 1, 1, 0, 0, 0, 1, 1, 1, 1]), - ), - ], -) -def test_majority_vote(lbl_tb_in, lbl_tb_expected): - UNLABELED = 0 - - segment_inds_list = vak.labeled_timebins.lbl_tb_segment_inds_list( - lbl_tb_in, unlabeled_label=UNLABELED - ) - lbl_tb_maj_vote = vak.labeled_timebins.majority_vote_transform( - lbl_tb_in, segment_inds_list - ) - assert np.array_equal(lbl_tb_maj_vote, lbl_tb_expected) - - -MAX_ABS_DIFF = 0.003 # milliseconds - - -def test_lbl_tb2segments_recovers_onsets_offsets_labels(): - onsets_s = np.asarray([1.0, 3.0, 5.0, 7.0]) - offsets_s = np.asarray([2.0, 4.0, 6.0, 8.0]) - labelset = set(list("abcd")) - labelmap = vak.labels.to_map(labelset) - - labels = np.asarray(["a", "b", "c", "d"]) - timebin_dur = 0.001 - total_dur_s = 10 - timebins = ( - np.asarray(range(1, int(total_dur_s / timebin_dur) + 1)) * timebin_dur - ) # [0.001, 0.002, ..., 10.0] - lbl_tb = np.zeros(timebins.shape, dtype="int8") - for onset, offset, lbl in zip(onsets_s, offsets_s, labels): - on_ind = np.nonzero(timebins == onset)[0].item() - off_ind = np.nonzero(timebins == offset)[0].item() - lbl_tb[on_ind : off_ind + 1] = labelmap[lbl] - - labels_out, onsets_s_out, offsets_s_out = vak.labeled_timebins.lbl_tb2segments( - lbl_tb, labelmap, timebins - ) - - assert np.array_equal(labels, labels_out) - assert np.all(np.abs(onsets_s - onsets_s_out) < MAX_ABS_DIFF) - assert np.all(np.abs(offsets_s - offsets_s_out) < MAX_ABS_DIFF) - - -# skip these for now because they cause tests to fail for reasons unrelated -# to what the test is testing -SPECT_FILES_TO_SKIP = [ - "llb3_0071_2018_04_23_17_38_30.wav.mat", # has zero duration between syllable segments, onsets 54 and 55 - # I assume the same issue is coming up with these other two - "llb3_0074_2018_04_23_17_41_08.wav.mat", - "llb3_0016_2018_04_23_15_18_14.wav.mat", -] - - -def test_lbl_tb2segments_recovers_onsets_offsets_labels_from_real_data( - specific_dataframe, - labelset_yarden, - model, -): - """test that ``lbl_tb2segments`` recovers onsets and offsets from real data""" - vak_df = specific_dataframe( - config_type="train", model=model, spect_format="mat", annot_format="yarden" - ) - labelmap = vak.labels.to_map(set(labelset_yarden)) - - spect_paths = vak_df["spect_path"].values - annot_list = vak.annotation.from_df(vak_df) - spect_annot_map = vak.annotation.map_annotated_to_annot(spect_paths, annot_list) - - TIMEBINS_KEY = "t" - - for spect_path, annot in spect_annot_map.items(): - # in general not good to have conditionals in tests - # but neglecting these weird edge case files for now - if any( - spect_path.endswith(spect_file_to_skip) - for spect_file_to_skip in SPECT_FILES_TO_SKIP - ): - continue - - lbls_int = [labelmap[lbl] for lbl in annot.seq.labels] - timebins = vak.files.spect.load(spect_path)[TIMEBINS_KEY] - - lbl_tb = vak.labeled_timebins.label_timebins( - lbls_int, - annot.seq.onsets_s, - annot.seq.offsets_s, - timebins, - unlabeled_label=labelmap["unlabeled"], + [1, 1, 1, 1, 2, 2, 3, 3, 3], + np.asarray([0, 2, 4, 6, 8, 10, 12, 14, 16]), + np.asarray([1.999, 3.999, 5.999, 7.999, 9.999, 11.999, 13.999, 15.999, 17.999]), + np.arange(0, 18, 0.001), + False ) - - labels, onsets_s_out, offsets_s_out = vak.labeled_timebins.lbl_tb2segments( - lbl_tb, labelmap, timebins - ) - assert np.all(np.char.equal(labels, annot.seq.labels)) - assert np.all(np.abs(annot.seq.onsets_s - onsets_s_out) < MAX_ABS_DIFF) - assert np.all(np.abs(annot.seq.offsets_s - offsets_s_out) < MAX_ABS_DIFF) - - -def test_lbl_tb2segments_majority_vote(): - labelmap = { - "unlabeled": 0, - "a": 1, - "b": 2, - } - lbl_tb = np.array([0, 0, 0, 1, 1, 1, 1, 0, 0, 0, 2, 2, 1, 0, 0]) - timebins = np.arange(1, lbl_tb.shape[0] + 1) * 0.001 - labels_out, onsets_s_out, offsets_s_out = vak.labeled_timebins.lbl_tb2segments( - lbl_tb, labelmap, timebins, majority_vote=True - ) - assert np.all(np.char.equal(labels_out, np.array(["a", "b"]))) - - -def test_lbl_tb2segments_all_unlabeled(): - """test that ``lbl_tb2segments`` returns all ``None``s when - all elements in the input vector ``lbl_tb`` are the ``unlabeled`` class""" - labelmap = { - "unlabeled": 0, - "a": 1, - "b": 2, - } - N_TIMEBINS = 4000 # just want some number that's on the order of size of a typical Bengalese finch song - lbl_tb = np.zeros(N_TIMEBINS).astype(int) - timebins = np.arange(1, lbl_tb.shape[0] + 1) * 0.001 - labels_out, onsets_s_out, offsets_s_out = vak.labeled_timebins.lbl_tb2segments( - lbl_tb, labelmap, timebins, majority_vote=True - ) - assert all([out is None for out in [labels_out, onsets_s_out, offsets_s_out]]) - - -@pytest.mark.parametrize( - 'y_pred, timebin_dur, min_segment_dur, labelmap', - [ - (np.array([0, 0, 0, 0, 0, 0, 7, 7, 3, - 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, - 7, 7, 7, 7, 7, 7, 7, 0, 0, 0, 0, 0, 0, ]), - 0.002, - 0.025, - {"unlabeled": 0, "a": 3, "b": 7}), - ] + ], ) -def test_lbl_tb2segments_min_seg_dur_makes_all_unlabeled(y_pred, - timebin_dur, - min_segment_dur, - labelmap): - """test that ``lbl_tb2segments`` returns all ``None``s when - removing all segments less than the minimum segment duration - causes all elements in the input vector ``lbl_tb`` - to become the ``unlabeled`` class""" - # TODO: assert that applying 'minimum segment duration' post-processing does what we expect - # i.e. converts all elements to 'unlabeled' - timebins = np.arange(1, y_pred.shape[0] + 1) * timebin_dur - labels_out, onsets_s_out, offsets_s_out = vak.labeled_timebins.lbl_tb2segments( - y_pred, labelmap, timebins, min_segment_dur=min_segment_dur, majority_vote=True - ) - assert all([out is None for out in [labels_out, onsets_s_out, offsets_s_out]]) +def test_has_unlabeled(labels, onsets, offsets, time_bins, expected_output): + assert vak.labeled_timebins.has_unlabeled( + labels, onsets, offsets, time_bins + ) == expected_output diff --git a/tests/test_labels.py b/tests/test_labels.py index a49440de4..f76268408 100644 --- a/tests/test_labels.py +++ b/tests/test_labels.py @@ -1,33 +1,111 @@ +import copy + +import numpy as np +import pytest + import vak.files.spect import vak.labels -def test_to_map(): - labelset = set(list("abcde")) - labelmap = vak.labels.to_map(labelset, map_unlabeled=False) - assert type(labelmap) == dict - assert len(labelmap) == len(labelset) # because map_unlabeled=False +@pytest.mark.parametrize( + 'labelset, map_unlabeled', + [ + ( + set(list("abcde")), + True + ), + ( + set(list("abcde")), + False + ), + ( + {1, 2, 3, 4, 5, 6}, + True, + ), + ( + {1, 2, 3, 4, 5, 6}, + False, + ) + ] +) +def test_to_map(labelset, map_unlabeled): + labelmap = vak.labels.to_map(labelset, map_unlabeled=map_unlabeled) + assert isinstance(labelmap, dict) + if map_unlabeled: + # because map_unlabeled=True + assert len(labelmap) == len(labelset) + 1 + else: + # because map_unlabeled=False + assert len(labelmap) == len(labelset) - labelset = set(list("abcde")) - labelmap = vak.labels.to_map(labelset, map_unlabeled=True) - assert type(labelmap) == dict - assert len(labelmap) == len(labelset) + 1 # because map_unlabeled=True - labelset = {1, 2, 3, 4, 5, 6} - labelmap = vak.labels.to_map(labelset, map_unlabeled=False) - assert type(labelmap) == dict - assert len(labelmap) == len(labelset) # because map_unlabeled=False +@pytest.mark.parametrize( + 'labels_list, expected_labelset', + [ + ( + [ + [1, 1, 1, 1, 2, 2, 3, 3, 3], + [1, 1, 1, 2, 2, 3, 3, 3, 3, 3] + ], + {1, 2, 3} + ) + ] +) +def test_to_set(labels_list, expected_labelset): + labelset = vak.labels.to_set(labels_list) + assert isinstance(labelset, set) + assert labelset == expected_labelset - labelset = {1, 2, 3, 4, 5, 6} - labelmap = vak.labels.to_map(labelset, map_unlabeled=True) - assert type(labelmap) == dict - assert len(labelmap) == len(labelset) + 1 # because map_unlabeled=True +@pytest.mark.parametrize( + 'config_type, model_name, audio_format, spect_format, annot_format', + [ + ('train', 'tweetynet', 'cbin', None, 'notmat'), + ('train', 'tweetynet', None, 'mat', 'yarden'), + ] +) +def test_from_df(config_type, model_name, audio_format, spect_format, annot_format, specific_dataframe): + df = specific_dataframe(config_type, model_name, annot_format, audio_format, spect_format) + out = vak.labels.from_df(df) + assert isinstance(out, list) + assert all([isinstance(labels, np.ndarray) for labels in out]) -def test_to_set(): - labels1 = [1, 1, 1, 1, 2, 2, 3, 3, 3] - labels2 = [1, 1, 1, 2, 2, 3, 3, 3, 3, 3] - labels_list = [labels1, labels2] - labelset = vak.labels.to_set(labels_list) - assert type(labelset) == set - assert labelset == {1, 2, 3} + +INTS_LABELMAP = {str(val): val for val in range(1, 20)} +INTS_LABELMAP_WITH_UNLABELED = copy.deepcopy(INTS_LABELMAP) +INTS_LABELMAP_WITH_UNLABELED['unlabeled'] = 0 + +DEFAULT_SKIP = ('unlabeled',) + + +@pytest.mark.parametrize( + 'labelmap, skip', + [ + ({'a': 1, 'b': 2, 'c': 3, 'd': 4, 'e': 5}, None), + ({'unlabeled': 0, 'a': 1, 'b': 2, 'c': 3, 'd': 4, 'e': 5}, None), + ({'unlabeled': 0, 'a': 1, 'b': 2, 'c': 3, 'd': 4, 'e': 5}, ('unlabeled',)), + (INTS_LABELMAP, None), + (INTS_LABELMAP_WITH_UNLABELED, ('unlabeled',)) + ] +) +def test_multi_char_labels_to_single_char(labelmap, skip): + if skip: + out = vak.labels.multi_char_labels_to_single_char(labelmap, skip) + else: + # test default skip + out = vak.labels.multi_char_labels_to_single_char(labelmap) + + if skip: + for skiplabel in skip: + assert skiplabel in out + assert all( + [len(label) == 1 + for label in out.keys() + if label not in skip] + ) + else: + assert all([ + len(label) == 1 + for label in out.keys() + if label not in DEFAULT_SKIP + ]) diff --git a/tests/test_transforms/__init__.py b/tests/test_transforms/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/test_transforms/test_labeled_timebins/__init__.py b/tests/test_transforms/test_labeled_timebins/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/test_transforms/test_labeled_timebins/test_functional.py b/tests/test_transforms/test_labeled_timebins/test_functional.py new file mode 100644 index 000000000..0653e7470 --- /dev/null +++ b/tests/test_transforms/test_labeled_timebins/test_functional.py @@ -0,0 +1,469 @@ +"""tests for functional forms of transforms +for labeled timebins. + +Tests are in the same order as the module ``vak.transforms.labeled_timebins.functional``.: +- from_segments: transform to get labeled timebins from annotations +- to_labels: transform to get back just string labels from labeled timebins, + used to evaluate a model +- to_segments: transform to get back segment onsets, offsets, and labels from labeled timebins. + Inverse of ``from_segments``. +- post-processing transforms that can be used to "clean up" a vector of labeled timebins + - to_inds_list: helper function used to find segments in a vector of labeled timebins + - remove_short_segments: remove any segment less than a minimum duration + - take_majority_vote: take a "majority vote" within each segment bounded by the "unlabeled" label, + and apply the most "popular" label within each segment to all timebins in that segment + +Additionally some of the functions have more than one unit test, +where the first tests with simple examples +and the second then tests with real data. +Namely, ``to_labels``, ``to_segments`` and the related functions +``to_labels_with_postprocessing`` +and ``to_segments_with_postprocessing``. +Simple examples are used to test expected behavior and edge cases. +Testing with real data complements this. +""" +import copy +import itertools + +import numpy as np +import pytest + +import vak.files.spect +import vak.labels +import vak.transforms.labeled_timebins + + +from ...fixtures.annot import ANNOT_LIST_YARDEN, ANNOT_LIST_NOTMAT, LABELSET_YARDEN, LABELSET_NOTMAT +from ...fixtures.spect import SPECT_LIST_NPZ, SPECT_LIST_MAT + + +assert len(ANNOT_LIST_YARDEN) == len(SPECT_LIST_MAT), "ANNOT_LIST_YARDEN and SPECT_LIST_MAT are not the same length" + +SPECT_LIST_NPZ = copy.deepcopy(SPECT_LIST_NPZ) # to not mutate the one used by fixtures +ANNOT_LIST_NOTMAT = copy.deepcopy(ANNOT_LIST_NOTMAT) # to not mutate the one used by fixtures +# make sure ANNOT_LIST_NOTMAT can pair with SPECT_LIST_NPZ +audio_paths_from_spect_list = [ + spect_path.name.replace('.spect.npz', '') for spect_path in SPECT_LIST_NPZ +] +ANNOT_LIST_NOTMAT = [ + annot for annot in ANNOT_LIST_NOTMAT + if annot.audio_path.name in audio_paths_from_spect_list +] + + +# define here because we re-use to parametrize multiple tests +# and because we import in .test_transforms +FROM_SEGMENTS_PARAMETRIZE_ARGVALS = list(zip( + sorted(ANNOT_LIST_YARDEN, key=lambda annot: annot.audio_path.name), + sorted(SPECT_LIST_MAT, key=lambda spect_path: spect_path.name), + itertools.repeat(LABELSET_YARDEN) +)) + list(zip( + sorted(ANNOT_LIST_NOTMAT, key=lambda annot: annot.audio_path.name), + sorted(SPECT_LIST_NPZ, key=lambda spect_path: spect_path.name), + itertools.repeat(LABELSET_NOTMAT) +)) + + +@pytest.mark.parametrize( + 'annot, spect_path, labelset', + FROM_SEGMENTS_PARAMETRIZE_ARGVALS, +) +def test_from_segments(annot, spect_path, labelset): + labelset = vak.converters.labelset_to_set(labelset) + labelmap = vak.labels.to_map(labelset, True) + + spect_dict = vak.files.spect.load(spect_path) + timebins = spect_dict['t'] + + try: + lbls_int = [labelmap[lbl] for lbl in annot.seq.labels] + except KeyError: + pytest.skip( + 'Annotation with label not in labelset, would not include in dataset' + ) + + lbl_tb = vak.transforms.labeled_timebins.from_segments( + lbls_int, + annot.seq.onsets_s, + annot.seq.offsets_s, + timebins, + unlabeled_label=labelmap['unlabeled'], + ) + assert lbl_tb.shape == timebins.shape + assert all( + [lbl in lbls_int for lbl in np.unique(lbls_int)] + ) + + +@pytest.mark.parametrize( + "lbl_tb, labelmap, labels_expected_int", + [ + (np.array([0, 0, 1, 1, 0, 0, 2, 2, 0, 0]), {'unlabeled': 0, 'a': 1, 'b': 2}, [1, 2]), + (np.array([0, 0, 1, 1, 0, 0, 2, 2, 0, 0]), {'unlabeled': 0, '1': 1, '2': 2}, [1, 2]), + (np.array([0, 0, 21, 21, 0, 0, 22, 22, 0, 0]), {'unlabeled': 0, '21': 21, '22': 22}, [21, 22]), + (np.array([0, 0, 11, 11, 0, 0, 12, 12, 0, 0]), {'unlabeled': 0, '11': 11, '12': 12}, [11, 12]), + ] +) +def test_to_labels(lbl_tb, labelmap, labels_expected_int): + # next line, convert all labels to single characters + # we can easily compare strings we get back with expected; + # this is what core.eval does + labelmap = vak.labels.multi_char_labels_to_single_char( + labelmap, skip=('unlabeled',) + ) + labelmap_inv = {v: k for k, v in labelmap.items()} + labels_expected = ''.join([labelmap_inv[lbl_int] for lbl_int in labels_expected_int]) + + labels = vak.transforms.labeled_timebins.to_labels(lbl_tb, labelmap) + assert labels == labels_expected + + +# skip these for now because they cause tests to fail for reasons unrelated +# to what the test is testing +SPECT_FILES_TO_SKIP = [ + "llb3_0071_2018_04_23_17_38_30.wav.mat", # has zero duration between syllable segments, onsets 54 and 55 + # these have similar issues, where we can't successfully round trip from labeled timebins to segments + # because the timebin duration is pretty big (2.7 ms) and there are silent gap durations very close to that + # (e.g. 3 ms), so segments get combined or lost due to rounding error when we do np.min/max below + "llb3_0074_2018_04_23_17_41_08.wav.mat", + "llb3_0016_2018_04_23_15_18_14.wav.mat", + "llb3_0053_2018_04_23_17_20_04.wav.mat", + "llb3_0054_2018_04_23_17_21_23.wav.mat" +] + + +@pytest.mark.parametrize( + 'annot, spect_path, labelset', + FROM_SEGMENTS_PARAMETRIZE_ARGVALS, +) +def test_to_labels_real_data( + annot, spect_path, labelset +): + """test that ``to_labels_with_postprocessing`` recovers labels from real data""" + labelset = vak.converters.labelset_to_set(labelset) + labelmap = vak.labels.to_map(labelset) + # next line, convert all labels to single characters + # we can easily compare strings we get back with expected; + # this is what core.eval does + labelmap = vak.labels.multi_char_labels_to_single_char( + labelmap, skip=('unlabeled',) + ) + TIMEBINS_KEY = "t" + + if any( + str(spect_path).endswith(spect_file_to_skip) + for spect_file_to_skip in SPECT_FILES_TO_SKIP + ): + pytest.skip( + "Can't round trip segments -> lbl_tb -> segments " + "because of small silent gap durations + large time bin durations" + ) + + try: + lbls_int = [labelmap[lbl] for lbl in annot.seq.labels] + except KeyError: + pytest.skip( + 'Annotation with label not in labelset, would not include in dataset' + ) + + timebins = vak.files.spect.load(spect_path)[TIMEBINS_KEY] + + lbl_tb = vak.transforms.labeled_timebins.from_segments( + lbls_int, + annot.seq.onsets_s, + annot.seq.offsets_s, + timebins, + unlabeled_label=labelmap["unlabeled"], + ) + + labels = vak.transforms.labeled_timebins.to_labels( + lbl_tb, + labelmap, + ) + + labelmap_multi_inv = {v: k for k, v in + labelmap.items()} + labels_expected = "".join( + [labelmap_multi_inv[lbl_int] for lbl_int in lbls_int] + ) + assert labels == labels_expected + + +MAX_ABS_DIFF = 0.003 # milliseconds + + +@pytest.mark.parametrize( + 'annot, spect_path, labelset', + FROM_SEGMENTS_PARAMETRIZE_ARGVALS, +) +def test_to_segments_real_data( + annot, spect_path, labelset +): + """test that ``to_segments`` recovers onsets, offsets, and labels from real data""" + labelset = vak.converters.labelset_to_set(labelset) + labelmap = vak.labels.to_map(labelset) + + TIMEBINS_KEY = "t" + + if any( + str(spect_path).endswith(spect_file_to_skip) + for spect_file_to_skip in SPECT_FILES_TO_SKIP + ): + pytest.skip( + "Can't round trip segments -> lbl_tb -> segments " + "because of small silent gap durations + large time bin durations" + ) + + try: + lbls_int = [labelmap[lbl] for lbl in annot.seq.labels] + except KeyError: + pytest.skip( + 'Annotation with label not in labelset, would not include in dataset' + ) + + timebins = vak.files.spect.load(spect_path)[TIMEBINS_KEY] + + lbl_tb = vak.transforms.labeled_timebins.from_segments( + lbls_int, + annot.seq.onsets_s, + annot.seq.offsets_s, + timebins, + unlabeled_label=labelmap["unlabeled"], + ) + + expected_labels = lbl_tb[np.insert(np.diff(lbl_tb).astype(bool), 0, True)] + + labels, onsets_s, offsets_s = vak.transforms.labeled_timebins.to_segments( + lbl_tb, labelmap, timebins + ) + + assert np.all(np.char.equal(labels, annot.seq.labels)) + # writing the logic of the function here to test wouldn't make sense + # but to still test on real data, we can test whether onset_inds + # is the same length as expected_labels. This should be True + assert np.all(np.abs(annot.seq.onsets_s - onsets_s) < MAX_ABS_DIFF) + assert np.all(np.abs(annot.seq.offsets_s - offsets_s) < MAX_ABS_DIFF) + + +@pytest.mark.parametrize( + "lbl_tb, seg_inds_list_expected", + [ + (np.asarray([0, 0, 0, 0, 1, 1, 1, 1, 0, 0, 0, 0]), [np.array([4, 5, 6, 7])]), + # assert works when segment is at start of lbl_tb + (np.asarray([1, 1, 1, 1, 0, 0, 0, 0]), [np.array([0, 1, 2, 3])]), + # assert works with multiple segments + ( + np.array([0, 0, 0, 1, 1, 1, 0, 0, 0, 1, 2, 1, 0, 0]), + [np.array([3, 4, 5]), np.array([9, 10, 11])], + ), + # assert works when a segment is at end of lbl_tb + ( + np.array([0, 0, 0, 1, 1, 1, 0, 0, 0, 1, 2, 1]), + [np.array([3, 4, 5]), np.array([9, 10, 11])], + ), + ], +) +def test_to_inds(lbl_tb, seg_inds_list_expected): + """Test ``to_inds`` works as expected""" + UNLABELED = 0 + + seg_inds_list = vak.transforms.labeled_timebins.to_inds_list( + lbl_tb=lbl_tb, unlabeled_label=UNLABELED + ) + assert np.array_equal(seg_inds_list, seg_inds_list_expected) + + +@pytest.mark.parametrize( + 'lbl_tb, unlabeled, timebin_dur, min_segment_dur, lbl_tb_expected', + [ + # should remove the 1 at the end if lbl_tb since it's a segment with dur < 0.002 + ( + np.asarray([0, 0, 0, 0, 1, 1, 1, 1, 0, 0, 1, 0]), + 0, + 0.001, + 0.002, + np.asarray([0, 0, 0, 0, 1, 1, 1, 1, 0, 0, 0, 0]) + ), + # should **not** remove a segment with dur == 0.002 + ( + np.asarray([0, 0, 0, 0, 1, 1, 1, 1, 0, 1, 1, 0]), + 0, + 0.001, + 0.002, + np.asarray([0, 0, 0, 0, 1, 1, 1, 1, 0, 1, 1, 0]) + ) + ] +) +def test_remove_short_segments(lbl_tb, unlabeled, timebin_dur, min_segment_dur, lbl_tb_expected): + """Test ``remove_short_segments`` works as expected""" + segment_inds_list = vak.transforms.labeled_timebins.to_inds_list( + lbl_tb, unlabeled_label=unlabeled + ) + lbl_tb_tfm, segment_inds_list_out = vak.transforms.labeled_timebins.remove_short_segments( + lbl_tb, + segment_inds_list, + timebin_dur=timebin_dur, + min_segment_dur=min_segment_dur, + unlabeled_label=unlabeled, + ) + assert np.array_equal(lbl_tb_tfm, lbl_tb_expected) + + +@pytest.mark.parametrize( + "lbl_tb_in, unlabeled, lbl_tb_expected", + [ + # should do nothing when a labeled segment has all the same labels + ( + np.asarray([0, 0, 0, 0, 1, 1, 1, 1, 0, 0, 0, 0]), + 0, + np.asarray([0, 0, 0, 0, 1, 1, 1, 1, 0, 0, 0, 0]), + ), + ( + np.array([0, 0, 0, 1, 1, 1, 0, 0, 0, 1, 2, 1, 0, 0]), + 0, + np.array([0, 0, 0, 1, 1, 1, 0, 0, 0, 1, 1, 1, 0, 0]), + ), + # test MajorityVote works when there is no 'unlabeled' segment at start of vector + ( + np.asarray([1, 1, 2, 1, 0, 0, 0, 0]), + 0, + np.asarray([1, 1, 1, 1, 0, 0, 0, 0]) + ), + # test MajorityVote works when there is no 'unlabeled' segment at end of vector + ( + np.array([0, 0, 0, 1, 1, 1, 0, 0, 0, 1, 2, 1]), + 0, + np.array([0, 0, 0, 1, 1, 1, 0, 0, 0, 1, 1, 1]), + ), + # test that a tie results in lowest value class winning, default behavior of scipy.stats.mode + ( + np.array([0, 0, 0, 1, 1, 1, 0, 0, 0, 1, 1, 2, 2]), + 0, + np.array([0, 0, 0, 1, 1, 1, 0, 0, 0, 1, 1, 1, 1]), + ), + ], +) +def test_majority_vote(lbl_tb_in, unlabeled, lbl_tb_expected): + """Test ``majority_vote`` works as expected""" + segment_inds_list = vak.transforms.labeled_timebins.to_inds_list( + lbl_tb_in, unlabeled_label=unlabeled + ) + lbl_tb_maj_vote = vak.transforms.labeled_timebins.take_majority_vote( + lbl_tb_in, segment_inds_list + ) + assert np.array_equal(lbl_tb_maj_vote, lbl_tb_expected) + + +# ---- define these constants here we use with pytest.mark.parametrize +# so that we can import them in .test_transforms as well +TIMEBIN_DUR_FOR_PARAMETRIZE = 0.001 +UNLABELED_LABEL = 0 +POSTPROCESS_PARAMS_ARGVALS = [ + # test case where we apply *neither* of the transforms + ( + np.asarray([0, 1, 1, 0, 2, 2, 0, 3, 3, 0, 0, 4, 4, 0, 0]), + None, + False, + np.asarray([0, 1, 1, 0, 2, 2, 0, 3, 3, 0, 0, 4, 4, 0, 0]), + ), + # test case where we apply *neither* of the transforms, and one segment is at end of lbl_tb + ( + np.asarray([0, 1, 1, 0, 2, 2, 0, 3, 3, 0, 0, 4, 4, 4, 4]), + None, + False, + np.asarray([0, 1, 1, 0, 2, 2, 0, 3, 3, 0, 0, 4, 4, 4, 4]), + ), + # ---- start of test cases for majority vote + # test MajorityVote does nothing when a labeled segment has all the same labels + ( + np.asarray([0, 1, 1, 0, 2, 2, 2, 2, 0, 0, 0, 0]), + None, + True, + np.asarray([0, 1, 1, 0, 2, 2, 2, 2, 0, 0, 0, 0]), + ), + # test majority vote + ( + np.array([0, 0, 0, 1, 1, 1, 0, 0, 0, 1, 2, 1, 0, 0]), + None, + True, + # majority vote converts second segment to label "a" + np.array([0, 0, 0, 1, 1, 1, 0, 0, 0, 1, 1, 1, 0, 0]), + ), + # test MajorityVote works when there is no 'unlabeled' segment at start of vector + ( + np.array([1, 1, 2, 1, 0, 0, 0, 0]), + None, + True, + np.array([1, 1, 1, 1, 0, 0, 0, 0]), + ), + # test MajorityVote works when there is no 'unlabeled' segment at end of vector + ( + np.array([0, 0, 0, 1, 1, 1, 0, 0, 0, 1, 2, 1]), + None, + True, + np.array([0, 0, 0, 1, 1, 1, 0, 0, 0, 1, 1, 1]), + ), + # test that a tie results in lowest value class winning, default behavior of scipy.stats.mode + ( + np.array([0, 0, 0, 1, 1, 1, 0, 0, 0, 1, 1, 2, 2]), + None, + True, + np.array([0, 0, 0, 1, 1, 1, 0, 0, 0, 1, 1, 1, 1]), + ), + # test that majority vote just returns lbl_tb untouched when everything is unlabeled + ( + np.ones(4000).astype(int) * UNLABELED_LABEL, # i.e. all zeros, but being explicit here + None, + True, + np.ones(4000).astype(int) * UNLABELED_LABEL, + ), + # ---- start of test cases for min segment dur + # should remove a segment with dur < min_segment_dur + ( + np.asarray([0, 0, 0, 0, 1, 1, 1, 1, 0, 0, 1, 0]), + 0.002, + False, + np.asarray([0, 0, 0, 0, 1, 1, 1, 1, 0, 0, 0, 0]), + ), + # should **not** remove a segment with dur == 0.002 + ( + np.asarray([0, 0, 0, 0, 1, 1, 1, 1, 0, 1, 1, 0]), + 0.002, + False, + np.asarray([0, 0, 0, 0, 1, 1, 1, 1, 0, 1, 1, 0]), + ), + # test min_segment_dur returns all Nones when all segments are less than min segment dur + ( + np.array([0, 0, 0, 0, 0, 0, 1, 1, 2, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, ]), + 0.025, # notice segment dur, 25ms. Realistic value but will remove all segments in lbl_tb + False, + np.ones(36).astype(int) * UNLABELED_LABEL, # i.e. all zeros, but being explicit here + ), +] + +# now rewrite but with args in order for function call: +POSTPROCESS_PARAMS_ARGVALS = [ + argvals[:1] + (TIMEBIN_DUR_FOR_PARAMETRIZE, UNLABELED_LABEL) + argvals[1:] + for argvals in POSTPROCESS_PARAMS_ARGVALS +] + + +@pytest.mark.parametrize( + 'lbl_tb, timebin_dur, unlabeled_label, min_segment_dur, majority_vote, lbl_tb_expected', + POSTPROCESS_PARAMS_ARGVALS +) +def test_postprocess(lbl_tb, timebin_dur, unlabeled_label, min_segment_dur, majority_vote, lbl_tb_expected): + """Test that ``trasnforms.labeled_timebins.postprocess`` works as expected. + Specifically test that we recover an expected string of labels, + as would be used to compute edit distance.""" + lbl_tb = vak.transforms.labeled_timebins.postprocess( + lbl_tb, + timebin_dur=timebin_dur, + unlabeled_label=UNLABELED_LABEL, + majority_vote=majority_vote, + min_segment_dur=min_segment_dur, + ) + + assert np.all(np.equal(lbl_tb, lbl_tb_expected)) diff --git a/tests/test_transforms/test_labeled_timebins/test_transforms.py b/tests/test_transforms/test_labeled_timebins/test_transforms.py new file mode 100644 index 000000000..23b4fb682 --- /dev/null +++ b/tests/test_transforms/test_labeled_timebins/test_transforms.py @@ -0,0 +1,249 @@ +import numpy as np +import pytest + +import vak + + +from .test_functional import ( + FROM_SEGMENTS_PARAMETRIZE_ARGVALS, + MAX_ABS_DIFF, + SPECT_FILES_TO_SKIP, + TIMEBIN_DUR_FOR_PARAMETRIZE, + POSTPROCESS_PARAMS_ARGVALS, +) + + +class TestFromSegments: + def test_init(self): + from_segments_tfm = vak.transforms.labeled_timebins.FromSegments() + assert isinstance(from_segments_tfm, vak.transforms.labeled_timebins.FromSegments) + + @pytest.mark.parametrize( + 'annot, spect_path, labelset', + FROM_SEGMENTS_PARAMETRIZE_ARGVALS, + ) + def test_call(self, annot, spect_path, labelset): + labelset = vak.converters.labelset_to_set(labelset) + labelmap = vak.labels.to_map(labelset, True) + + spect_dict = vak.files.spect.load(spect_path) + timebins = spect_dict['t'] + + try: + lbls_int = [labelmap[lbl] for lbl in annot.seq.labels] + except KeyError: + pytest.skip( + 'Annotation with label not in labelset, would not include in dataset' + ) + + from_segments_tfm = vak.transforms.labeled_timebins.FromSegments(unlabeled_label=labelmap['unlabeled']) + lbl_tb = from_segments_tfm( + lbls_int, + annot.seq.onsets_s, + annot.seq.offsets_s, + timebins, + ) + assert lbl_tb.shape == timebins.shape + assert all( + [lbl in lbls_int for lbl in np.unique(lbls_int)] + ) + + +class TestToLabels: + @pytest.mark.parametrize( + 'labelset', + [tup[2] for tup in FROM_SEGMENTS_PARAMETRIZE_ARGVALS], + ) + def test_init(self, labelset): + # Note that we add an 'unlabeled' class because post-processing transforms *require* it + # This is default, just making it explicit + labelset = vak.converters.labelset_to_set(labelset) + labelmap = vak.labels.to_map(labelset, map_unlabeled=True) + + to_labels_tfm = vak.transforms.labeled_timebins.ToLabels( + labelmap=labelmap, + ) + assert isinstance(to_labels_tfm, vak.transforms.labeled_timebins.ToLabels) + + @pytest.mark.parametrize( + "lbl_tb, labelmap, labels_expected_int", + [ + (np.array([0, 0, 1, 1, 0, 0, 2, 2, 0, 0]), {'unlabeled': 0, 'a': 1, 'b': 2}, [1, 2]), + (np.array([0, 0, 1, 1, 0, 0, 2, 2, 0, 0]), {'unlabeled': 0, '1': 1, '2': 2}, [1, 2]), + (np.array([0, 0, 21, 21, 0, 0, 22, 22, 0, 0]), {'unlabeled': 0, '21': 21, '22': 22}, [21, 22]), + (np.array([0, 0, 11, 11, 0, 0, 12, 12, 0, 0]), {'unlabeled': 0, '11': 11, '12': 12}, [11, 12]), + ] + ) + def test_call(self, lbl_tb, labelmap, labels_expected_int): + # Note that we add an 'unlabeled' class because post-processing transforms *require* it + # This is default, just making it explicit + labelmap = vak.labels.multi_char_labels_to_single_char( + labelmap, skip=('unlabeled',) + ) + labelmap_inv = {v: k for k, v in labelmap.items()} + labels_expected = ''.join([labelmap_inv[lbl_int] for lbl_int in labels_expected_int]) + + to_labels_tfm = vak.transforms.labeled_timebins.ToLabels( + labelmap=labelmap, + ) + labels = to_labels_tfm(lbl_tb) + assert labels == labels_expected + + @pytest.mark.parametrize( + 'annot, spect_path, labelset', + FROM_SEGMENTS_PARAMETRIZE_ARGVALS, + ) + def test_call_real_data( + self, annot, spect_path, labelset + ): + """test that ``to_labels_with_postprocessing`` recovers labels from real data""" + labelset = vak.converters.labelset_to_set(labelset) + labelmap = vak.labels.to_map(labelset) + # next line, convert all labels to single characters + # we can easily compare strings we get back with expected; + # this is what core.eval does + labelmap = vak.labels.multi_char_labels_to_single_char( + labelmap, skip=('unlabeled',) + ) + TIMEBINS_KEY = "t" + + if any( + str(spect_path).endswith(spect_file_to_skip) + for spect_file_to_skip in SPECT_FILES_TO_SKIP + ): + pytest.skip( + "Can't round trip segments -> lbl_tb -> segments " + "because of small silent gap durations + large time bin durations" + ) + + try: + lbls_int = [labelmap[lbl] for lbl in annot.seq.labels] + except KeyError: + pytest.skip( + 'Annotation with label not in labelset, would not include in dataset' + ) + + timebins = vak.files.spect.load(spect_path)[TIMEBINS_KEY] + + lbl_tb = vak.transforms.labeled_timebins.from_segments( + lbls_int, + annot.seq.onsets_s, + annot.seq.offsets_s, + timebins, + unlabeled_label=labelmap["unlabeled"], + ) + + to_labels_tfm = vak.transforms.labeled_timebins.ToLabels( + labelmap=labelmap, + ) + labels = to_labels_tfm(lbl_tb) + + labelmap_multi_inv = {v: k for k, v in + labelmap.items()} + labels_expected = "".join( + [labelmap_multi_inv[lbl_int] for lbl_int in lbls_int] + ) + assert labels == labels_expected + + +class TestToSegments: + @pytest.mark.parametrize( + 'labelset', + [tup[2] for tup in FROM_SEGMENTS_PARAMETRIZE_ARGVALS], + ) + def test_init(self, labelset): + # Note that we add an 'unlabeled' class because post-processing transforms *require* it + # This is default, just making it explicit + labelset = vak.converters.labelset_to_set(labelset) + labelmap = vak.labels.to_map(labelset, map_unlabeled=True) + + to_segments_tfm = vak.transforms.labeled_timebins.ToSegments( + labelmap=labelmap, + ) + assert isinstance(to_segments_tfm, vak.transforms.labeled_timebins.ToSegments) + + @pytest.mark.parametrize( + 'annot, spect_path, labelset', + FROM_SEGMENTS_PARAMETRIZE_ARGVALS, + ) + def test_call_real_data(self, annot, spect_path, labelset): + labelset = vak.converters.labelset_to_set(labelset) + labelmap = vak.labels.to_map(labelset) + + TIMEBINS_KEY = "t" + + if any( + str(spect_path).endswith(spect_file_to_skip) + for spect_file_to_skip in SPECT_FILES_TO_SKIP + ): + pytest.skip( + "Can't round trip segments -> lbl_tb -> segments " + "because of small silent gap durations + large time bin durations" + ) + + try: + lbls_int = [labelmap[lbl] for lbl in annot.seq.labels] + except KeyError: + pytest.skip( + 'Annotation with label not in labelset, would not include in dataset' + ) + + timebins = vak.files.spect.load(spect_path)[TIMEBINS_KEY] + + lbl_tb = vak.transforms.labeled_timebins.from_segments( + lbls_int, + annot.seq.onsets_s, + annot.seq.offsets_s, + timebins, + unlabeled_label=labelmap["unlabeled"], + ) + + to_segments_tfm = vak.transforms.labeled_timebins.ToSegments( + labelmap=labelmap, + ) + + labels, onsets_s, offsets_s = to_segments_tfm( + lbl_tb, timebins + ) + + assert np.all(np.char.equal(labels, annot.seq.labels)) + assert np.all(np.abs(annot.seq.onsets_s - onsets_s) < MAX_ABS_DIFF) + assert np.all(np.abs(annot.seq.offsets_s - offsets_s) < MAX_ABS_DIFF) + + +class TestPostprocess: + @pytest.mark.parametrize( + 'min_segment_dur, majority_vote, timebin_dur', + # keep just the argvals we need to instantiate + [argvals[3:5] + (TIMEBIN_DUR_FOR_PARAMETRIZE,) for argvals in POSTPROCESS_PARAMS_ARGVALS] + ) + def test_init(self, min_segment_dur, majority_vote, timebin_dur): + # Note that we add an 'unlabeled' class + # because post-processing transforms *require* it + # This is default, just making it explicit + to_labels_tfm = vak.transforms.labeled_timebins.PostProcess( + min_segment_dur=min_segment_dur, + majority_vote=majority_vote, + timebin_dur=timebin_dur, + ) + assert isinstance(to_labels_tfm, vak.transforms.labeled_timebins.PostProcess) + + @pytest.mark.parametrize( + 'lbl_tb, timebin_dur, unlabeled_label, min_segment_dur, majority_vote, lbl_tb_expected', + POSTPROCESS_PARAMS_ARGVALS + ) + def test_call(self, lbl_tb, timebin_dur, unlabeled_label, min_segment_dur, majority_vote, lbl_tb_expected): + # Note that we add an 'unlabeled' class because post-processing transforms *require* it + # This is default, just making it explicit + postprocess_tfm = vak.transforms.labeled_timebins.PostProcess( + min_segment_dur=min_segment_dur, + majority_vote=majority_vote, + timebin_dur=timebin_dur, + ) + + lbl_tb = postprocess_tfm( + lbl_tb + ) + + assert np.all(np.equal(lbl_tb, lbl_tb_expected)) +