diff --git a/CHANGES.rst b/CHANGES.rst index 7428620c..1268e8b0 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -23,6 +23,10 @@ New features and enhancements * ``mdf_reader.read``: optionally, set both external schema and code table paths and external schema file (:issue:`47`, :pull:`111`) * ``cdm_mapper``: Change both columns history and report_quality during duplicate_check (:pull:`112`) * ``cdm_mapper``: optionally, set column names to be ignored while duplicate check (:pull:`115`) +* ``cdm_mapper``: optionally, set offset values for duplicate_check (:pull:`119`) +* ``cdm_mapper``: optionally, set column entries to be ignored while duplicate_check (:pull:`119`) +* ``cdm_mapper``: add both column names ``station_speed`` and ``station_course`` to default duplicate check list (:pull:`119`) +* ``cdm_mapper``: optionally, re-index data in ascending order according to the number of nulls in each row (:pull:`119`) Breaking changes ^^^^^^^^^^^^^^^^ @@ -74,6 +78,9 @@ Internal changes * ``metmetpy``: use function ``overwrite_data`` in all platform type correction functions (:pull:`89`) * rename ``data_model`` into ``imodel`` (:pull:`103`) * implement assertion tests for module operations (:pull:`104`) +* ``cdm_mapper``: put settings for duplicate check in _duplicate_settings (:pull:`119`) +* ``cdm_mapper``: use pandas.apply function instead of for loops in duplicate_check (:pull:`119`) +* adding some more duplicate checks to testing suite (:pull:`119`) Bug fixes ^^^^^^^^^ diff --git a/cdm_reader_mapper/cdm_mapper/_duplicate_settings.py b/cdm_reader_mapper/cdm_mapper/_duplicate_settings.py new file mode 100755 index 00000000..7e65af35 --- /dev/null +++ b/cdm_reader_mapper/cdm_mapper/_duplicate_settings.py @@ -0,0 +1,57 @@ +"""Settings for duplicate check.""" + +from __future__ import annotations + +from recordlinkage import Compare +from recordlinkage.compare import Numeric + +_method_kwargs = { + "left_on": "report_timestamp", + "window": 5, + "block_on": ["primary_station_id"], +} + +_compare_kwargs = { + "primary_station_id": {"method": "exact"}, + "longitude": { + "method": "numeric", + "kwargs": {"method": "step", "offset": 0.11}, + }, + "latitude": { + "method": "numeric", + "kwargs": {"method": "step", "offset": 0.11}, + }, + "report_timestamp": { + "method": "date2", + "kwargs": {"method": "gauss", "offset": 60.0}, + }, + "station_speed": { + "method": "numeric", + "kwargs": {"method": "step", "offset": 0.09}, + }, + "station_course": { + "method": "numeric", + "kwargs": {"method": "step", "offset": 0.9}, + }, +} + +_histories = { + "duplicate_status": "Added duplicate information - flag", + "duplicates": "Added duplicate information - duplicates", +} + + +class Date2(Numeric): + """Copy of ``rl.compare.Numeric`` class.""" + + pass + + +def date2(self, *args, **kwargs): + """New method for ``rl.Compare`` object using ``Date2`` object.""" + compare = Date2(*args, **kwargs) + self.add(compare) + return self + + +Compare.date2 = date2 diff --git a/cdm_reader_mapper/cdm_mapper/duplicates.py b/cdm_reader_mapper/cdm_mapper/duplicates.py index 3af6c99a..8aa8f5e2 100755 --- a/cdm_reader_mapper/cdm_mapper/duplicates.py +++ b/cdm_reader_mapper/cdm_mapper/duplicates.py @@ -3,10 +3,13 @@ from __future__ import annotations import datetime +from copy import deepcopy import numpy as np +import pandas as pd import recordlinkage as rl -from recordlinkage.compare import Numeric + +from ._duplicate_settings import Compare, _compare_kwargs, _histories, _method_kwargs def convert_series(df, conversion): @@ -37,52 +40,10 @@ def convert_date_to_float(date): except TypeError: df[column] = locals()[method](df[column]) + df = df.infer_objects(copy=False).fillna(9999.0) return df -class Date2(Numeric): - """Copy of ``rl.compare.Numeric`` class.""" - - pass - - -def date2(self, *args, **kwargs): - """New method for ``rl.Compare`` object using ``Date2`` object.""" - compare = Date2(*args, **kwargs) - self.add(compare) - return self - - -rl.Compare.date2 = date2 - -_method_kwargs = { - "left_on": "report_timestamp", - "window": 5, - "block_on": ["primary_station_id"], -} - -_compare_kwargs = { - "primary_station_id": {"method": "exact"}, - "longitude": { - "method": "numeric", - "kwargs": {"method": "gauss", "offset": 0.05}, - }, - "latitude": { - "method": "numeric", - "kwargs": {"method": "gauss", "offset": 0.05}, - }, - "report_timestamp": { - "method": "date2", - "kwargs": {"method": "gauss", "offset": 60.0}, - }, -} - -_histories = { - "duplicate_status": "Added duplicate information - flag", - "duplicates": "Added duplicate information - duplicates", -} - - def add_history(df, indexes): """Add duplicate information to history.""" @@ -98,25 +59,26 @@ def _datetime_now(): indexes = list(indexes) history_tstmp = _datetime_now() addition = "".join([f"; {history_tstmp}. {add}" for add in _histories.items()]) - df.loc[indexes, "history"] = df.loc[indexes, "history"].apply( - lambda x: x + addition - ) + df.loc[indexes, "history"] = df.loc[indexes, "history"] + addition return df def add_duplicates(df, dups): """Add duplicates to table.""" - def _add_dups(x): - _dups = dups.get(x.name) - if _dups is None: - return x["duplicates"] - _dups = ",".join(_dups) - return "{" + _dups + "}" + def _add_dups(row): + idx = row.name + if idx not in dups.index: + return row - df["duplicates"] = df.apply(lambda x: _add_dups(x), axis=1) + dup_idx = dups.loc[idx].to_list() + v_ = report_ids.iloc[dup_idx[0]] + v_ = sorted(v_.tolist()) + row["duplicates"] = "{" + ",".join(v_) + "}" + return row - return df + report_ids = df["report_id"] + return df.apply(lambda x: _add_dups(x), axis=1) def add_report_quality(df, indexes_bad): @@ -144,15 +106,18 @@ class DupDetect: """ def __init__(self, data, compared, method, method_kwargs, compare_kwargs): - self.data = data + self.data = data.copy() self.compared = compared self.method = method self.method_kwargs = method_kwargs self.compare_kwargs = compare_kwargs def _get_limit(self, limit): + _limit = 0.991 if limit == "default": - limit = 0.991 + return _limit + if limit is None: + return _limit return limit def _get_equal_musts(self): @@ -171,7 +136,11 @@ def total_score(self): self.score = 1 - (abs(self.compared.sum(axis=1) - pcmax) / pcmax) def get_duplicates( - self, keep="first", limit="default", equal_musts=None, overwrite=True + self, + keep="first", + limit="default", + equal_musts=None, + overwrite=True, ): """Get duplicate matches. @@ -243,44 +212,61 @@ def flag_duplicates( .. _duplicate_status: https://glamod.github.io/cdm-obs-documentation/tables/code_tables/duplicate_status/duplicate_status.html .. _quality_flag: https://glamod.github.io/cdm-obs-documentation/tables/code_tables/quality_flag/quality_flag.html """ + + def _get_similars(drop, keeps): + if drop[drop_] in keeps: + return (int(drop[drop_]), int(drop[keep_])) + + def _get_duplicates(x, last): + b = list(set(x[last].values)) + return pd.Series({"dups": b}) + + def replace_keeps_and_drops(df, keep_): + while True: + keeps = df[keep_].values + replaces = df.apply(lambda x: _get_similars(x, keeps), axis=1) + replaces = dict(replaces.dropna().values) + keys = replaces.keys() + values = replaces.values() + df[keep_] = df[keep_].replace(replaces) + if not set(keys).intersection(values): + return df + self.get_duplicates(keep=keep, limit=limit, equal_musts=equal_musts) - self.result = self.data.copy() - self.result["duplicate_status"] = 0 + result = self.data.copy() + result["duplicate_status"] = 0 if not hasattr(self, "matches"): self.get_matches(limit="default", equal_musts=equal_musts) - indexes = [] - indexes_good = [] - indexes_bad = [] - duplicates = {} - - for index in self.matches.index: - if index[self.drop] in indexes_bad: - continue + indexes = self.matches.index + indexes_df = indexes.to_frame() + drop_ = indexes_df.columns[self.drop] + keep_ = indexes_df.columns[self.keep] + indexes_df = indexes_df.drop_duplicates(subset=[drop_]) - indexes += index - indexes_good.append(index[self.keep]) - indexes_bad.append(index[self.drop]) + indexes_df = replace_keeps_and_drops(indexes_df, keep_) - report_id_drop = self.result.loc[index[self.drop], "report_id"] - report_id_keep = self.result.loc[index[self.keep], "report_id"] - - if index[self.drop] not in duplicates.keys(): - duplicates[index[self.drop]] = [report_id_keep] - else: - duplicates[index[self.drop]].append(report_id_keep) - - if index[self.keep] not in duplicates.keys(): - duplicates[index[self.keep]] = [report_id_drop] - else: - duplicates[index[self.keep]].append(report_id_drop) - - self.result.loc[indexes_good, "duplicate_status"] = 1 - self.result.loc[indexes_bad, "duplicate_status"] = 3 + dup_keep = indexes_df.groupby(indexes_df[keep_]).apply( + lambda x: _get_duplicates(x, drop_), + include_groups=False, + ) + dup_drop = indexes_df.groupby(indexes_df[drop_]).apply( + lambda x: _get_duplicates(x, keep_), + include_groups=False, + ) + duplicates = pd.concat([dup_keep, dup_drop]) + + indexes_good = indexes_df[keep_].values.tolist() + indexes_bad = indexes_df[drop_].values.tolist() + indexes = indexes_good + indexes_bad + result.loc[indexes_good, "duplicate_status"] = 1 + result.loc[indexes_bad, "duplicate_status"] = 3 + result = add_report_quality(result, indexes_bad=indexes_bad) + result = add_history(result, indexes) + result = result.sort_index(ascending=True) + self.result = add_duplicates(result, duplicates) + self.data = self.data.sort_index(ascending=True) - self.result = add_report_quality(self.result, indexes_bad=indexes_bad) - self.result = add_duplicates(self.result, duplicates) - self.result = add_history(self.result, indexes) return self.result def remove_duplicates( @@ -307,9 +293,11 @@ def remove_duplicates( Input DataFrame without duplicates. """ self.get_duplicates(keep=keep, limit=limit, equal_musts=equal_musts) - self.result = self.data.copy() - drops = [index[self.drop] for index in self.matches.index] - self.result = self.result.drop(drops).reset_index(drop=True) + result = self.data.copy() + drops = self.matches.index.get_level_values(self.drop) + result = result.drop(drops) + self.result = result.sort_index(ascending=True) + self.data = self.data.sort_index(ascending=True) return self.result @@ -326,7 +314,7 @@ def set_comparer(compare_dict): recordlinkage.Compare object: recordlinkage.Compare object with added methods. """ - comparer = rl.Compare() + comparer = Compare() setattr(comparer, "conversion", {}) for column, c_dict in compare_dict.items(): try: @@ -351,6 +339,7 @@ def set_comparer(compare_dict): comparer.conversion[column] = "datetime64[ns]" if method == "date2": comparer.conversion[column] = "convert_date_to_float" + return comparer @@ -373,6 +362,40 @@ def remove_ignores(dic, columns): return new_dict +def change_offsets(dic, dic_o): + """Change offsets in compare dictionary.""" + for key in dic.keys(): + if key not in dic_o.keys(): + continue + dic[key]["kwargs"]["offset"] = dic_o[key] + return dic + + +class Comparer: + """Class to compare DataFrame with recordlinkage Comparer.""" + + def __init__( + self, + data, + method, + method_kwargs, + compare_kwargs, + pairs_df=None, + convert_data=False, + ): + indexer = getattr(rl.index, method)(**method_kwargs) + comparer = set_comparer(compare_kwargs) + if convert_data is True: + data_ = convert_series(data, comparer.conversion) + else: + data_ = data.copy() + if pairs_df is None: + pairs_df = [data_] + pairs = indexer.index(*pairs_df) + self.compared = comparer.compute(pairs, data_) + self.data = data_ + + def duplicate_check( data, method="SortedNeighbourhood", @@ -380,6 +403,9 @@ def duplicate_check( compare_kwargs=None, table_name=None, ignore_columns=None, + ignore_entries=None, + offsets=None, + reindex_by_null=True, ): """Duplicate check. @@ -399,25 +425,86 @@ def duplicate_check( table_name: str, optional Name of the CDM table to be selected from data. ignore_columns: str or list, optional - Name of data columns to ignore for duplicate check. + Name of data columns to be ignored for duplicate check. + ignore_entries: dict, optional + Key: Column name + Value: value to be ignored + E.g. offsets={"station_speed": null} + offsets: dict, optional + Change offsets for recordlinkage Compare object. + Key: Column name + Value: new offset + E.g. offsets={"latitude": 0.1} + reindex_by_null: bool, optional + If True data is re-indexed in ascending order according to the number of nulls in each row. Returns ------- DupDetect object """ + + def _count_nulls(row): + return (row == "null").sum() + + data = data.reset_index(drop=True) + + if reindex_by_null is True: + nulls = data.apply(lambda x: _count_nulls(x), axis=1) + indexes_ = list(zip(*sorted(zip(nulls.values, nulls.index)))) + data = data.reindex(indexes_[1]) + if table_name: data = data[table_name] if not method_kwargs: - method_kwargs = _method_kwargs + method_kwargs = deepcopy(_method_kwargs) if not compare_kwargs: - compare_kwargs = _compare_kwargs + compare_kwargs = deepcopy(_compare_kwargs) if ignore_columns: method_kwargs = remove_ignores(method_kwargs, ignore_columns) compare_kwargs = remove_ignores(compare_kwargs, ignore_columns) + if offsets: + compare_kwargs = change_offsets(compare_kwargs, offsets) + + Compared_ = Comparer( + data=data, + method=method, + method_kwargs=method_kwargs, + compare_kwargs=compare_kwargs, + convert_data=True, + ) + compared = Compared_.compared + data_ = Compared_.data + + if ignore_entries is None: + return DupDetect(data, compared, method, method_kwargs, compare_kwargs) + + compared = [compared] + + for column_, entry_ in ignore_entries.items(): + if isinstance(entry_, str): + entry_ = [entry_] + entries = data[column_].isin(entry_) + + d1 = data.mask(entries).dropna() + d2 = data.where(entries).dropna() + + if d1.empty: + continue + if d2.empty: + continue + + method_kwargs_ = remove_ignores(method_kwargs, column_) + compare_kwargs_ = remove_ignores(compare_kwargs, column_) + + compared_ = Comparer( + data=data_, + method=method, + method_kwargs=method_kwargs_, + compare_kwargs=compare_kwargs_, + pairs_df=[d2, d1], + ).compared + compared_[list(ignore_entries.keys())] = 1 + compared.append(compared_) - indexer = getattr(rl.index, method)(**method_kwargs) - pairs = indexer.index(data) - comparer = set_comparer(compare_kwargs) - data_ = convert_series(data, comparer.conversion) - compared = comparer.compute(pairs, data_) + compared = pd.concat(compared) return DupDetect(data, compared, method, method_kwargs, compare_kwargs) diff --git a/tests/_duplicates.py b/tests/_duplicates.py new file mode 100755 index 00000000..87b135f8 --- /dev/null +++ b/tests/_duplicates.py @@ -0,0 +1,370 @@ +from __future__ import annotations + +from cdm_reader_mapper.cdm_mapper import read_tables + +from ._results import result_data + + +def _manipulate_header(df): + # Duplicate : Different report_id's + # Failure in data set; + # each report needs a specific report_id + df.loc[5] = df.loc[4] + df.loc[5, "report_id"] = "ICOADS-302-N688EY" + df.loc[5, "report_quality"] = 2 + + # No Duplicate: Lat and Lon values differ to much + # valid is .5 degrees + df.loc[6] = df.loc[4] + df.loc[6, "report_id"] = "ICOADS-302-N688EZ" + df.loc[6, "latitude"] = -65.80 + df.loc[6, "longitude"] = 21.20 + df.loc[6, "report_quality"] = 2 + + # Duplicate: report timestamp differs no enough + # valid is 60 seconds + df.loc[7] = df.loc[1] + df.loc[7, "report_id"] = "ICOADS-302-N688DT" + df.loc[7, "report_timestamp"] = "2022-02-01 00:01:00" + df.loc[7, "report_quality"] = 2 + + # No Duplicate: report timestamp differs to much + # valid is 60 seconds + df.loc[8] = df.loc[1] + df.loc[8, "report_id"] = "ICOADS-302-N688DU" + df.loc[8, "report_timestamp"] = "2022-02-02 00:00:00" + df.loc[8, "report_quality"] = 2 + + # Duplicate : Different report_id's + # Failure in data set + df.loc[9] = df.loc[2] + df.loc[9, "report_id"] = "ICOADS-302-N688DW" + df.loc[9, "report_quality"] = 2 + + # Duplicate : Different report_id's + # Failure in data set + # each report needs a specific report_id + df.loc[10] = df.loc[3] + df.loc[10, "report_id"] = "ICOADS-302-N688EF" + df.loc[10, "latitude"] = 66.00 + df.loc[10, "longitude"] = 8.50 + df.loc[10, "report_quality"] = 2 + + # Duplicate: Lat and Lon values differ not enough + # valid is .5 degrees + df.loc[11] = df.loc[3] + df.loc[11, "report_id"] = "ICOADS-302-N688EE" + df.loc[11, "latitude"] = 66.05 + df.loc[11, "longitude"] = 8.15 + df.loc[11, "report_quality"] = 2 + + # No Duplicate: primary_station_id differs + df.loc[12] = df.loc[3] + df.loc[12, "report_id"] = "ICOADS-302-N688ED" + df.loc[12, "primary_station_id"] = "MASKSTIP" + df.loc[12, "report_quality"] = 2 + + # Duplicate: Lat and Lon values differ not enough + # valid is .5 degrees + df.loc[13] = df.loc[3] + df.loc[13, "report_id"] = "ICOADS-302-N688EC" + df.loc[13, "latitude"] = 65.95 + df.loc[13, "longitude"] = 8.05 + df.loc[13, "report_quality"] = 2 + + # Duplicate: ignore primary_station_id SHIP + df.loc[14] = df.loc[3] + df.loc[14, "report_id"] = "ICOADS-302-N688EG" + df.loc[14, "primary_station_id"] = "SHIP" + df.loc[14, "report_quality"] = 2 + + # No Duplicate: Lat and Lon values differ to much + # valid is .5 degrees + df.loc[15] = df.loc[4] + df.loc[15, "report_id"] = "ICOADS-302-N688EV" + df.loc[15, "latitude"] = 65.60 + df.loc[15, "longitude"] = -21.40 + df.loc[15, "report_quality"] = 2 + + # Duplicate: Lat and Lon values differ not enough + # valid is .5 degrees + df.loc[16] = df.loc[4] + df.loc[16, "report_id"] = "ICOADS-302-N688EW" + df.loc[16, "latitude"] = 65.90 + df.loc[16, "longitude"] = -21.10 + df.loc[16, "report_quality"] = 2 + + # No Duplicate: + df.loc[17] = df.loc[1] + df.loc[17, "report_id"] = "ICOADS-302-N688EK" + df.loc[17, "station_course"] = 316.0 + + # No Duplicate: + df.loc[18] = df.loc[1] + df.loc[18, "report_id"] = "ICOADS-302-N688EL" + df.loc[18, "station_speed"] = 4.0 + + # Duplicate: + df.loc[19] = df.loc[1] + df.loc[19, "report_id"] = "ICOADS-302-N688EM" + df.loc[19, "station_course"] = "null" + + # Duplicate: + df.loc[20] = df.loc[1] + df.loc[20, "report_id"] = "ICOADS-302-N688EN" + df.loc[20, "station_speed"] = "null" + return df + + +def _get_test_data(imodel): + exp_name = f"expected_{imodel}" + exp_data = getattr(result_data, exp_name) + data_path = exp_data.get("cdm_table") + return read_tables( + data_path, + tb_id=f"{imodel}*", + cdm_subset="header", + ) + + +exp1 = { + "duplicate_status": [0, 1, 1, 1, 1, 3, 0, 3, 0, 3, 0, 3, 0, 3, 0, 0, 3, 0, 0, 0, 0], + "report_quality": [1, 1, 0, 1, 1, 1, 2, 1, 2, 1, 2, 1, 2, 1, 2, 2, 1, 1, 1, 1, 1], + "duplicates": [ + "null", + "{ICOADS-302-N688DT}", + "{ICOADS-302-N688DW}", + "{ICOADS-302-N688EC,ICOADS-302-N688EE}", + "{ICOADS-302-N688EW,ICOADS-302-N688EY}", + "{ICOADS-302-N688EI}", + "null", + "{ICOADS-302-N688DS}", + "null", + "{ICOADS-302-N688DV}", + "null", + "{ICOADS-302-N688EH}", + "null", + "{ICOADS-302-N688EH}", + "null", + "null", + "{ICOADS-302-N688EI}", + "null", + "null", + "null", + "null", + ], +} + +exp2 = { + "duplicate_status": [0, 1, 1, 3, 1, 3, 0, 3, 0, 3, 0, 3, 1, 3, 3, 0, 3, 0, 0, 0, 0], + "report_quality": [1, 1, 0, 1, 1, 1, 2, 1, 2, 1, 2, 1, 2, 1, 1, 2, 1, 1, 1, 1, 1], + "duplicates": [ + "null", + "{ICOADS-302-N688DT}", + "{ICOADS-302-N688DW}", + "{ICOADS-302-N688ED}", + "{ICOADS-302-N688EW,ICOADS-302-N688EY}", + "{ICOADS-302-N688EI}", + "null", + "{ICOADS-302-N688DS}", + "null", + "{ICOADS-302-N688DV}", + "null", + "{ICOADS-302-N688ED}", + "{ICOADS-302-N688EC,ICOADS-302-N688EE,ICOADS-302-N688EG,ICOADS-302-N688EH}", + "{ICOADS-302-N688ED}", + "{ICOADS-302-N688ED}", + "null", + "{ICOADS-302-N688EI}", + "null", + "null", + "null", + "null", + ], +} + +exp3 = { + "duplicate_status": [1, 3, 3, 3, 3, 3, 3, 3, 0, 3, 3, 3, 0, 3, 0, 3, 3, 3, 3, 3, 3], + "report_quality": [1, 1, 1, 1, 1, 1, 1, 1, 2, 1, 1, 1, 2, 1, 2, 1, 1, 1, 1, 1, 1], + "duplicates": [ + "{ICOADS-302-N688DS,ICOADS-302-N688DT,ICOADS-302-N688DV,ICOADS-302-N688DW,ICOADS-302-N688EC,ICOADS-302-N688EE,ICOADS-302-N688EF,ICOADS-302-N688EH,ICOADS-302-N688EI,ICOADS-302-N688EK,ICOADS-302-N688EL,ICOADS-302-N688EM,ICOADS-302-N688EN,ICOADS-302-N688EV,ICOADS-302-N688EW,ICOADS-302-N688EY,ICOADS-302-N688EZ}", + "{ICOADS-302-N688DR}", + "{ICOADS-302-N688DR}", + "{ICOADS-302-N688DR}", + "{ICOADS-302-N688DR}", + "{ICOADS-302-N688DR}", + "{ICOADS-302-N688DR}", + "{ICOADS-302-N688DR}", + "null", + "{ICOADS-302-N688DR}", + "{ICOADS-302-N688DR}", + "{ICOADS-302-N688DR}", + "null", + "{ICOADS-302-N688DR}", + "null", + "{ICOADS-302-N688DR}", + "{ICOADS-302-N688DR}", + "{ICOADS-302-N688DR}", + "{ICOADS-302-N688DR}", + "{ICOADS-302-N688DR}", + "{ICOADS-302-N688DR}", + ], +} + +exp4 = { + "duplicate_status": [0, 1, 1, 1, 1, 3, 0, 3, 0, 3, 0, 3, 3, 3, 3, 0, 3, 0, 0, 0, 0], + "report_quality": [1, 1, 0, 1, 1, 1, 2, 1, 2, 1, 2, 1, 1, 1, 1, 2, 1, 1, 1, 1, 1], + "duplicates": [ + "null", + "{ICOADS-302-N688DT}", + "{ICOADS-302-N688DW}", + "{ICOADS-302-N688EC,ICOADS-302-N688ED,ICOADS-302-N688EE,ICOADS-302-N688EG}", + "{ICOADS-302-N688EW,ICOADS-302-N688EY}", + "{ICOADS-302-N688EI}", + "null", + "{ICOADS-302-N688DS}", + "null", + "{ICOADS-302-N688DV}", + "null", + "{ICOADS-302-N688EH}", + "{ICOADS-302-N688EH}", + "{ICOADS-302-N688EH}", + "{ICOADS-302-N688EH}", + "null", + "{ICOADS-302-N688EI}", + "null", + "null", + "null", + "null", + ], +} + +exp5 = { + "duplicate_status": [0, 1, 1, 1, 1, 3, 0, 3, 0, 3, 3, 3, 0, 3, 0, 3, 3, 0, 0, 0, 0], + "report_quality": [1, 1, 0, 1, 1, 1, 2, 1, 2, 1, 1, 1, 2, 1, 2, 1, 1, 1, 1, 1, 1], + "duplicates": [ + "null", + "{ICOADS-302-N688DT}", + "{ICOADS-302-N688DW}", + "{ICOADS-302-N688EC,ICOADS-302-N688EE,ICOADS-302-N688EF}", + "{ICOADS-302-N688EV,ICOADS-302-N688EW,ICOADS-302-N688EY}", + "{ICOADS-302-N688EI}", + "null", + "{ICOADS-302-N688DS}", + "null", + "{ICOADS-302-N688DV}", + "{ICOADS-302-N688EH}", + "{ICOADS-302-N688EH}", + "null", + "{ICOADS-302-N688EH}", + "null", + "{ICOADS-302-N688EI}", + "{ICOADS-302-N688EI}", + "null", + "null", + "null", + "null", + ], +} + +exp6 = { + "duplicate_status": [0, 0, 1, 1, 1, 3, 0, 0, 0, 3, 0, 3, 0, 3, 0, 0, 3, 0, 0, 0, 0], + "report_quality": [1, 1, 0, 1, 1, 1, 2, 2, 2, 1, 2, 1, 2, 1, 2, 2, 1, 1, 1, 1, 1], + "duplicates": [ + "null", + "null", + "{ICOADS-302-N688DW}", + "{ICOADS-302-N688EC,ICOADS-302-N688EE}", + "{ICOADS-302-N688EW,ICOADS-302-N688EY}", + "{ICOADS-302-N688EI}", + "null", + "null", + "null", + "{ICOADS-302-N688DV}", + "null", + "{ICOADS-302-N688EH}", + "null", + "{ICOADS-302-N688EH}", + "null", + "null", + "{ICOADS-302-N688EI}", + "null", + "null", + "null", + "null", + ], +} + +exp7 = { + "duplicate_status": [0, 1, 1, 1, 1, 3, 0, 3, 0, 3, 0, 3, 0, 3, 0, 0, 3, 0, 0, 3, 3], + "report_quality": [1, 1, 0, 1, 1, 1, 2, 1, 2, 1, 2, 1, 2, 1, 2, 2, 1, 1, 1, 1, 1], + "duplicates": [ + "null", + "{ICOADS-302-N688DT,ICOADS-302-N688EM,ICOADS-302-N688EN}", + "{ICOADS-302-N688DW}", + "{ICOADS-302-N688EC,ICOADS-302-N688EE}", + "{ICOADS-302-N688EW,ICOADS-302-N688EY}", + "{ICOADS-302-N688EI}", + "null", + "{ICOADS-302-N688DS}", + "null", + "{ICOADS-302-N688DV}", + "null", + "{ICOADS-302-N688EH}", + "null", + "{ICOADS-302-N688EH}", + "null", + "null", + "{ICOADS-302-N688EI}", + "null", + "null", + "{ICOADS-302-N688DS}", + "{ICOADS-302-N688DS}", + ], +} + +exp8 = { + "duplicate_status": [0, 1, 1, 3, 1, 3, 0, 3, 0, 3, 0, 3, 1, 3, 3, 0, 3, 0, 0, 3, 3], + "report_quality": [1, 1, 0, 1, 1, 1, 2, 1, 2, 1, 2, 1, 2, 1, 1, 2, 1, 1, 1, 1, 1], + "duplicates": [ + "null", + "{ICOADS-302-N688DT,ICOADS-302-N688EM,ICOADS-302-N688EN}", + "{ICOADS-302-N688DW}", + "{ICOADS-302-N688ED}", + "{ICOADS-302-N688EW,ICOADS-302-N688EY}", + "{ICOADS-302-N688EI}", + "null", + "{ICOADS-302-N688DS}", + "null", + "{ICOADS-302-N688DV}", + "null", + "{ICOADS-302-N688ED}", + "{ICOADS-302-N688EC,ICOADS-302-N688EE,ICOADS-302-N688EG,ICOADS-302-N688EH}", + "{ICOADS-302-N688ED}", + "{ICOADS-302-N688ED}", + "null", + "{ICOADS-302-N688EI}", + "null", + "null", + "{ICOADS-302-N688DS}", + "{ICOADS-302-N688DS}", + ], +} + +method_kwargs_ = { + "left_on": "report_timestamp", + "window": 7, + "block_on": ["primary_station_id"], +} + +compare_kwargs_ = { + "primary_station_id": {"method": "exact"}, + "report_timestamp": { + "method": "date2", + "kwargs": {"method": "gauss", "offset": 60.0}, + }, +} +df_icoads = _get_test_data("icoads_r302_d792") +df_icoads = _manipulate_header(df_icoads) + +df_craid = _get_test_data("craid") diff --git a/tests/test_duplicates.py b/tests/test_duplicates.py index d8be9c2a..1ba254fd 100755 --- a/tests/test_duplicates.py +++ b/tests/test_duplicates.py @@ -4,135 +4,117 @@ from numpy.testing import assert_array_equal from pandas.testing import assert_frame_equal -from cdm_reader_mapper.cdm_mapper import duplicate_check, read_tables - -from ._results import result_data - - -def _manipulate_header(df): - # Duplicate : Different report_id's - # Failure in data set; - # each report needs a specific report_id - df.loc[5] = df.loc[4] - df.loc[5, "report_id"] = "ICOADS-302-N688EY" - df.loc[5, "report_quality"] = 2 - - # No Duplicate: Lat and Lon values differ to much - # valid is .5 degrees - df.loc[6] = df.loc[4] - df.loc[5, "report_id"] = "ICOADS-302-N688EY" - df.loc[6, "latitude"] = -65.80 - df.loc[6, "longitude"] = 21.20 - df.loc[6, "report_quality"] = 2 - - # Duplicate: report timestamp differs no enough - # valid is 60 seconds - df.loc[7] = df.loc[1] - df.loc[7, "report_id"] = "ICOADS-302-N688DT" - df.loc[7, "report_timestamp"] = "2022-02-01 00:01:00" - df.loc[7, "report_quality"] = 2 - - # No Duplicate: report timestamp differs to much - # valid is 60 seconds - df.loc[8] = df.loc[1] - df.loc[8, "report_id"] = "ICOADS-302-N688DU" - df.loc[8, "report_timestamp"] = "2022-02-02 00:00:00" - df.loc[8, "report_quality"] = 2 - - # Duplicate : Different report_id's - # Failure in data set - df.loc[9] = df.loc[2] - df.loc[9, "report_id"] = "ICOADS-302-N688DW" - df.loc[9, "report_quality"] = 2 - - # Duplicate : Different report_id's - # Failure in data set - # each report needs a specific report_id - df.loc[10] = df.loc[3] - df.loc[10, "report_id"] = "ICOADS-302-N688EF" - df.loc[10, "latitude"] = 66.00 - df.loc[10, "longitude"] = 8.50 - df.loc[10, "report_quality"] = 2 - - # Duplicate: Lat and Lon values differ not enough - # valid is .5 degrees - df.loc[11] = df.loc[3] - df.loc[11, "report_id"] = "ICOADS-302-N688EE" - df.loc[11, "latitude"] = 66.05 - df.loc[11, "longitude"] = 8.15 - df.loc[11, "report_quality"] = 2 - - # No Duplicate: primary_station_id differs - df.loc[12] = df.loc[3] - df.loc[12, "report_id"] = "ICOADS-302-N688ED" - df.loc[12, "primary_station_id"] = "MASKSTIP" - df.loc[12, "report_quality"] = 2 - - # Duplicate: Lat and Lon values differ not enough - # valid is .5 degrees - df.loc[13] = df.loc[3] - df.loc[13, "report_id"] = "ICOADS-302-N688EC" - df.loc[13, "latitude"] = 65.95 - df.loc[13, "longitude"] = 8.05 - df.loc[13, "report_quality"] = 2 - return df - - -def _get_test_data(imodel): - exp_name = f"expected_{imodel}" - exp_data = getattr(result_data, exp_name) - data_path = exp_data.get("cdm_table") - return read_tables( - data_path, - tb_id=f"{imodel}*", - cdm_subset="header", +from cdm_reader_mapper.cdm_mapper import duplicate_check + +from ._duplicates import ( + compare_kwargs_, + df_craid, + df_icoads, + exp1, + exp2, + exp3, + exp4, + exp5, + exp6, + exp7, + exp8, + method_kwargs_, +) + + +@pytest.mark.parametrize( + "method, method_kwargs, compare_kwargs, ignore_columns, ignore_entries, offsets, expected", + [ + (None, None, None, None, None, None, exp1), + ( + None, + None, + None, + None, + {"primary_station_id": ["SHIP", "MASKSTID"]}, + None, + exp2, + ), + ( + None, + None, + None, + None, + {"station_speed": "null", "station_course": "null"}, + None, + exp7, + ), + ( + None, + None, + None, + None, + { + "primary_station_id": ["SHIP", "MASKSTID"], + "station_speed": "null", + "station_course": "null", + }, + None, + exp8, + ), + (None, method_kwargs_, None, None, None, None, exp1), + (None, None, compare_kwargs_, None, None, None, exp3), + (None, None, None, ["primary_station_id"], None, None, exp4), + ( + None, + None, + None, + None, + None, + {"latitude": 1.0, "longitude": 1.0, "report_timestamp": 360}, + exp5, + ), + ("Block", {"left_on": "report_timestamp"}, None, None, None, None, exp6), + ], +) +def test_duplicates_flag( + method, + method_kwargs, + compare_kwargs, + ignore_columns, + ignore_entries, + offsets, + expected, +): + if method is None: + method = "SortedNeighbourhood" + DupDetect = duplicate_check( + df_icoads, + method=method, + method_kwargs=method_kwargs, + compare_kwargs=compare_kwargs, + ignore_columns=ignore_columns, + ignore_entries=ignore_entries, + offsets=offsets, ) - - -df = _get_test_data("icoads_r302_d792") -df = _manipulate_header(df) -DupDetect = duplicate_check(df) - - -def test_duplicates_flag(): DupDetect.flag_duplicates() result = DupDetect.result - assert_array_equal( - result["duplicate_status"], [0, 1, 1, 1, 1, 3, 0, 3, 0, 3, 0, 3, 0, 3] - ) - assert_array_equal( - result["report_quality"], [1, 1, 0, 1, 1, 1, 2, 1, 2, 1, 2, 1, 2, 1] - ) - assert_array_equal( - result["duplicates"], - [ - "null", - "{ICOADS-302-N688DT}", - "{ICOADS-302-N688DW}", - "{ICOADS-302-N688EE,ICOADS-302-N688EC}", - "{ICOADS-302-N688EY}", - "{ICOADS-302-N688EI}", - "null", - "{ICOADS-302-N688DS}", - "null", - "{ICOADS-302-N688DV}", - "null", - "{ICOADS-302-N688EH}", - "null", - "{ICOADS-302-N688EH}", - ], - ) + assert_array_equal(result["duplicate_status"], expected["duplicate_status"]) + assert_array_equal(result["report_quality"], expected["report_quality"]) + assert_array_equal(result["duplicates"], expected["duplicates"]) def test_duplicates_remove(): + DupDetect = duplicate_check( + df_icoads, + ignore_entries={ + "primary_station_id": ["SHIP", "MASKSTID"], + "station_speed": "null", + "station_course": "null", + }, + ) DupDetect.remove_duplicates() - expected = DupDetect.data.iloc[[0, 1, 2, 3, 4, 6, 8, 10, 12]].reset_index(drop=True) + expected = DupDetect.data.iloc[[0, 1, 2, 4, 6, 8, 10, 12, 15, 17, 18]] assert_frame_equal(expected, DupDetect.result) def test_duplicates_craid(): - df = _get_test_data("craid") - DupDetect = duplicate_check(df, ignore_columns="primary_station_id") + DupDetect = duplicate_check(df_craid, ignore_columns="primary_station_id") DupDetect.flag_duplicates() assert_array_equal(DupDetect.result["duplicate_status"], [0] * 10) assert_array_equal(DupDetect.result["report_quality"], [2] * 10)