diff --git a/CHANGELOG.md b/CHANGELOG.md index 3956db610..b5d844a32 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,10 @@ Please note that compatibility for 0.x releases (software or repositories) isn't _When adding new entries to the changelog, please include issue/PR numbers wherever possible._ +## 0.8.0 (UNRELEASED) + + * Support for detecting features which have changed slightly during a re-import from a data source without a primary key, and reimporting them with the same primary key as last time so they show as edits as opposed to inserts. [#212](https://github.com/koordinates/sno/issues/212) + ## 0.7.1 #### JSON syntax-highlighting fix diff --git a/sno/base_dataset.py b/sno/base_dataset.py index 06ecd10f0..a6603f9d9 100644 --- a/sno/base_dataset.py +++ b/sno/base_dataset.py @@ -178,13 +178,17 @@ def geom_column_name(self): def features(self): """ Yields a dict for every feature. Dicts contain key-value pairs for each feature property, - and geometries use sno.geometry.Geometry objects, as in the following example: + and geometries use sno.geometry.Geometry objects, as in the following example:: + { "fid": 123, "geom": Geometry(b"..."), "name": "..." "last-modified": "..." } + + Each dict is guaranteed to iterate in the same order as the columns are ordered in the schema, + so that zip(schema.columns, feature.values()) matches each field with its column. """ for blob in self.feature_blobs(): yield self.get_feature(path=blob.name, data=memoryview(blob)) diff --git a/sno/fast_import.py b/sno/fast_import.py index ef74b041f..3aa52d8de 100644 --- a/sno/fast_import.py +++ b/sno/fast_import.py @@ -82,7 +82,7 @@ def fast_import_tables( ) # Add primary keys if needed. - sources = PkGeneratingImportSource.wrap_if_needed(sources, repo) + sources = PkGeneratingImportSource.wrap_sources_if_needed(sources, repo) cmd = [ "git", diff --git a/sno/import_source.py b/sno/import_source.py index 5475a55a2..c8f439a2f 100644 --- a/sno/import_source.py +++ b/sno/import_source.py @@ -130,13 +130,17 @@ def has_geometry(self): def features(self): """ Yields a dict for every feature. Dicts contain key-value pairs for each feature property, - and geometries use sno.geometry.Geometry objects, as in the following example: + and geometries use sno.geometry.Geometry objects, as in the following example:: + { "fid": 123, "geom": Geometry(b"..."), "name": "..." "last-modified": "..." } + + Each dict is guaranteed to iterate in the same order as the columns are ordered in the schema, + so that zip(schema.columns, feature.values()) matches each field with its column. """ raise NotImplementedError() diff --git a/sno/init.py b/sno/init.py index 16987fb17..230e04b5c 100644 --- a/sno/init.py +++ b/sno/init.py @@ -132,6 +132,17 @@ def _add_datasets_to_working_copy(repo, *datasets, replace_existing=False): is_flag=True, help="Replace existing dataset(s) of the same name.", ) +@click.option( + "--similarity-detection-limit", + hidden=True, + type=click.INT, + default=50, + help=( + "When replacing an existing dataset where primary keys are auto-generated: the maximum number of unmatched " + "features to search through for similar features, so that primary keys can be reassigned for features that " + "are similar but have had minor edits. Zero means that no similarity detection is performed. (Advanced users only)" + ), +) @click.option( "--allow-empty", is_flag=True, @@ -167,6 +178,7 @@ def import_table( tables, table_info, replace_existing, + similarity_detection_limit, allow_empty, max_delta_depth, do_checkout, @@ -247,8 +259,11 @@ def import_table( # will result in a new schema object, and thus a new blob for every feature. # Note that alignment works better if we add the generated-pk-column first (when needed), # if one schema has this and the other lacks it they will be harder to align. - import_source = PkGeneratingImportSource.wrap_if_needed( - import_source, repo + import_source = PkGeneratingImportSource.wrap_source_if_needed( + import_source, + repo, + dest_path=dest_path, + similarity_detection_limit=similarity_detection_limit, ) import_source.schema = existing_ds.schema.align_to_self( import_source.schema diff --git a/sno/pk_generation.py b/sno/pk_generation.py index fcb4ca31e..1b38790e5 100644 --- a/sno/pk_generation.py +++ b/sno/pk_generation.py @@ -1,9 +1,12 @@ -from collections.abc import Iterable +from itertools import zip_longest +import pygit2 +import subprocess +from .dataset2 import Dataset2 from .import_source import ImportSource +from .exceptions import NotYetImplemented, SubprocessError from .serialise_util import ( json_pack, - json_unpack, ) from .schema import ColumnSchema, Schema @@ -47,9 +50,22 @@ class PkGeneratingImportSource(ImportSource): } } + During a reimport, if similarity_detection_limit is set to some X > 0, and the import results in a number + of inserts + deletes[1] that is less than X, then these inserts and deletes will be searched to see if we can + find some inserts that are similar (not identical) to the deletes. These matching new features will be + reassigned a primary key from the delete, connecting the two features together so that that insert + delete + will instead show up in the log as an edit. + Currently, two features are considered "similar" if they differ by only one field, and they have only one such + counterpart. + Searching these inserts and deletes is computationally costly - every insert must be checked against every delete + so each doubling of the limit can result in four times the processing time. + + [1] An insert is a feature which is not present in the previous import and so could not be reassigned an existing + primary key, and a delete is a feature that was present in the previous import but not in the current one. """ - GENERATED_PKS_PATH = ".sno-dataset/meta/generated-pks.json" + GENERATED_PKS_ITEM = "generated-pks.json" + GENERATED_PKS_PATH = ".sno-dataset/meta/" + GENERATED_PKS_ITEM DEFAULT_PK_COL = { "id": ColumnSchema.new_id(), @@ -60,45 +76,49 @@ class PkGeneratingImportSource(ImportSource): } @classmethod - def wrap_if_needed(cls, source_or_sources, repo): - """ - Wraps an ImportSource in a PkGeneratingImportSource if the original data lacks a primary key. - If multiple ImportSources are supplied, wraps those that lack a primary key and returns a new list. - """ - if isinstance(source_or_sources, Iterable): - sources = source_or_sources - return [cls.wrap_if_needed(s, repo) for s in sources] - - source = source_or_sources - if not source.schema.pk_columns: - return PkGeneratingImportSource(source, repo) - else: - return source + def wrap_source_if_needed(cls, source, repo, **kwargs): + """Wraps an ImportSource in a PkGeneratingImportSource if the original data lacks a primary key.""" + return ( + source + if source.schema.pk_columns + else PkGeneratingImportSource(source, repo, **kwargs) + ) + + @classmethod + def wrap_sources_if_needed(cls, sources, repo, **kwargs): + """Wraps any of the given ImportSources that lack a primary key, returns the result as a new list.""" + return [cls.wrap_source_if_needed(source, repo, **kwargs) for source in sources] - def __init__(self, delegate, repo): + def __init__(self, delegate, repo, *, dest_path=None, similarity_detection_limit=0): self.delegate = delegate + if dest_path is not None: + self.dest_path = dest_path + + # Similarity detection limit - the maximum number of (inserts + deletes) we will look through + # to see if some of them can be paired up to make edits. + self.similarity_detection_limit = similarity_detection_limit + self.load_data_from_repo(repo) def load_data_from_repo(self, repo): - tree = repo.head_tree - generated_pks_blob = None + if repo.version != 2: + raise NotYetImplemented("PK generation only supported for dataset 2") - if tree is not None: - try: - generated_pks_blob = tree / self.dest_path / self.GENERATED_PKS_PATH - except KeyError: - pass + self.prev_dest_tree = self._prev_import_dest_tree(repo) - if not generated_pks_blob: + if not self.prev_dest_tree: self.pk_col = self.DEFAULT_PK_COL self.primary_key = self.pk_col["name"] self.pk_to_hash = {} - self.hash_to_pks = {} - self.hash_to_unassigned_pks = {} - self.next_new_pk = 1 + self.first_new_pk = 1 + + self.similarity_detection_limit = 0 + self.similarity_detection_insert_limit = 0 return - data = json_unpack(generated_pks_blob.data) + self.prev_dest_dataset = Dataset2(self.prev_dest_tree, self.dest_path) + + data = self.prev_dest_dataset.get_meta_item(self.GENERATED_PKS_ITEM) self.pk_col = data["primaryKeySchema"] self.primary_key = self.pk_col["name"] @@ -109,16 +129,47 @@ def load_data_from_repo(self, repo): for pk, feature_hash in data["generatedPrimaryKeys"].items() } - # Hash of feature contents -> primary key(s) of imported feature(s), for every feature ever imported. - self.hash_to_pks = self._invert_pk_map(self.pk_to_hash) - - # Subset of hash_to_pks - only contains primary keys that have not yet been assigned during the current import. - # Meaning, if we need a primary key for a feature, we should first check this dict to find a historical one that - # hasn't yet been assigned to a feature during this import. - self.hash_to_unassigned_pks = self._invert_pk_map(self.pk_to_hash) - - # Next primary key to use if we can't find a historical but unassigned one in hash_to_unassigned_pks. - self.next_new_pk = max(self.pk_to_hash) + 1 if self.pk_to_hash else 1 + # First primary key to use if we can't find a historical but unassigned one. + self.first_new_pk = max(self.pk_to_hash) + 1 if self.pk_to_hash else 1 + + # The number of inserts, deletes, previous- and current-feature-count, are related by the given formula: + # prev-FC + inserts - deletes = curr-FC + # Since we know prev-FC and curr-FC already, we can already calculate the number of inserts we can encounter + # before we know that (inserts + deletes) definitely exceeds the similarity_detection_limit, and once that many + # inserts are encountered, we give up on similarity detection. + feature_count_delta = self.feature_count - self.prev_dest_dataset.feature_count + if abs(feature_count_delta) > self.similarity_detection_limit: + self.similarity_detection_insert_limit = 0 + else: + self.similarity_detection_insert_limit = ( + max(self.similarity_detection_limit + feature_count_delta, 0) // 2 + ) + + def _prev_import_dest_tree(self, repo): + """Returns the dataset tree that was created the last time this datasource was imported.""" + if repo.is_empty: + return None + + current_pks_tree = self._get_generated_pks_tree(repo.head_tree) + if current_pks_tree is None: + return None + + prev_import_commit = None + for commit in repo.walk(repo.head_commit.id): + if self._get_generated_pks_tree(commit) == current_pks_tree: + prev_import_commit = commit + else: + # We've reached the commit before the previous import + break + + return prev_import_commit.peel(pygit2.Tree) / self.dest_path + + def _get_generated_pks_tree(self, commit_or_tree): + root_tree = commit_or_tree.peel(pygit2.Tree) + try: + return root_tree / self.dest_path / self.GENERATED_PKS_PATH + except KeyError: + return None def encode_generated_pk_data(self, relative=False): path = self.GENERATED_PKS_PATH @@ -144,25 +195,138 @@ def _init_schema(self): return Schema.from_column_dicts([self.pk_col] + cols) def features(self): - schema = self.schema - for feature in self.delegate.features(): - feature[self.primary_key] = None - feature_hash = schema.hash_feature(feature, without_pk=True) - feature[self.primary_key] = self.generate_pk(feature_hash) - yield feature - - def generate_pk(self, feature_hash): - unused_pks = self.hash_to_unassigned_pks.get(feature_hash) - if unused_pks: - return unused_pks.pop(0) - - pk = self.next_new_pk - self.next_new_pk += 1 - - self.hash_to_pks.setdefault(feature_hash, []) - self.hash_to_pks[feature_hash].append(pk) + # Next primary key to use if we can't find a historical but unassigned one in hash_to_unassigned_pks. + next_new_pk = self.first_new_pk + + # Subset of hash_to_pks - only contains primary keys that have not yet been assigned during the current import. + # Meaning, if we need a primary key for a feature, we should first check this dict to find a historical one that + # hasn't yet been assigned to a feature during this import, and reassign it to the current feature. + hash_to_unassigned_pks = self._invert_pk_map(self.pk_to_hash) + + # Features that we couldn't reassign PKs to - so far they are inserts, but if we can find some similar deletes + # once we know the full list of inserts and deletes, then we can reassign PKs from the deletes, so that they + # become edits. + buffered_inserts = [] + buffered_insert_limit = self.similarity_detection_insert_limit + + for orig_feature in self.delegate.features(): + feature = {self.primary_key: None, **orig_feature} + feature_hash = self.schema.hash_feature(feature, without_pk=True) + + pks = hash_to_unassigned_pks.get(feature_hash) + reassigned_pk = pks.pop(0) if pks else None + + if reassigned_pk is not None: + # This feature is exactly the same as a historical one that had a PK, + # and that PK has not yet been assigned this import. We re-assign it now. + feature[self.primary_key] = reassigned_pk + yield feature + + elif buffered_insert_limit > 0: + # New feature, but we don't assign it a PK just yet. + # We buffer this feature for now - maybe we'll find a similar one from among the deleted + # features later, which we can reuse the PK for, making this an edit. + # We can do this once we have the full list of new and deleted features). + buffered_inserts.append(feature) + + if len(buffered_inserts) > buffered_insert_limit: + # Too many inserts - give up on finding similar ones from among the deletes. + yield from self._assign_pk_range(buffered_inserts, next_new_pk) + next_new_pk += len(buffered_inserts) + buffered_inserts = [] + buffered_insert_limit = 0 + + else: + # New feature. Assign it a new PK and yield it. + yield self._assign_pk(feature, next_new_pk, feature_hash=feature_hash) + next_new_pk += 1 + + if buffered_inserts: + + # Look for matching inserts-deletes - reassign the PK from the delete, treat is as an edit: + yield from self._match_similar_features_and_remove( + self._find_deleted_features(hash_to_unassigned_pks), buffered_inserts + ) + # Just assign new PKs to those we couldn't find a match for. + yield from self._assign_pk_range(buffered_inserts, next_new_pk) + + def _assign_pk_range(self, features, pk): + for feature in features: + yield self._assign_pk(feature, pk) + pk += 1 + + def _assign_pk(self, feature, pk, feature_hash=None): + if feature_hash is None: + feature_hash = self.schema.hash_feature(feature, without_pk=True) + + feature[self.primary_key] = pk self.pk_to_hash[pk] = feature_hash - return pk + return feature + + def _match_similar_features_and_remove(self, old_features, new_features): + orig_old_features_len = len(old_features) + orig_new_features_len = len(new_features) + similar_count = 0 + + for old_feature, new_feature in self._pop_similar_pairs( + old_features, new_features + ): + pk = old_feature[self.primary_key] + yield self._assign_pk(new_feature, pk) + similar_count += 1 + + assert len(old_features) == orig_old_features_len - similar_count + assert len(new_features) == orig_new_features_len - similar_count + + def _pop_similar_pairs(self, old_features, new_features): + # Copy old_features so we can remove from it while iterating over it: + for old_feature in old_features.copy(): + new_feature = self._find_sole_similar(old_feature, new_features) + if ( + new_feature is not None + and self._find_sole_similar(new_feature, old_features) is not None + ): + old_features.remove(old_feature) + new_features.remove(new_feature) + yield old_feature, new_feature + + def _find_sole_similar(self, target, source_list): + match_count = 0 + for s in source_list: + if self._is_similar(s, target): + match = s + match_count += 1 + if match_count > 1: + break + + return match if match_count == 1 else None + + def _is_similar(self, lhs, rhs): + # NOTE: This is one of several possible similarity metrics. + # TODO: Add more and make them configurable, if this proves useful. + + dissimilar_count = 0 + + for l, r in zip_longest(lhs.values(), rhs.values()): + if l != r: + dissimilar_count += 1 + if dissimilar_count > 2: + # Different primary key + two other different fields -> dissimilar. + return False + + # Different primary key + one other different field (or fewer) -> similar. + return True + + def _find_deleted_features(self, hash_to_unassigned_pks): + unassigned_pks = set() + for pks in hash_to_unassigned_pks.values(): + unassigned_pks.update(pks) + + def pk_filter(pk): + return pk in unassigned_pks + + filtered_ds = FilteredDataset(self.prev_dest_tree, self.dest_path, pk_filter) + return list(filtered_ds.features()) def check_fully_specified(self): self.delegate.check_fully_specified() @@ -209,3 +373,17 @@ def import_source_desc(self): def aggregate_import_source_desc(self, import_sources): return self.delegate.aggregate_import_source_desc(import_sources) + + +class FilteredDataset(Dataset2): + """A dataset that only yields features with pk where `pk_filter(pk)` returns True.""" + + def __init__(self, tree, path, pk_filter): + super().__init__(tree, path) + self.pk_filter = pk_filter + + def feature_blobs(self): + for blob in super().feature_blobs(): + pk = self.decode_path_to_1pk(blob.name) + if self.pk_filter(pk): + yield blob diff --git a/tests/test_structure.py b/tests/test_structure.py index 785d259a3..c3b83887c 100644 --- a/tests/test_structure.py +++ b/tests/test_structure.py @@ -672,7 +672,6 @@ def test_postgis_import_from_view_no_pk( "gpkg-polygons", "nz-waca-adjustments.gpkg", "nz_waca_adjustments" ): c = postgis_db.cursor() - c.execute( """ CREATE VIEW nz_waca_adjustments_view AS ( @@ -716,6 +715,7 @@ def test_postgis_import_from_view_no_pk( os.environ["SNO_POSTGRES_URL"], "nz_waca_adjustments_view", "--replace-existing", + "--similarity-detection-limit=10", ] ) assert r.exit_code == 0, r.stderr @@ -730,6 +730,57 @@ def test_postgis_import_from_view_no_pk( # Means 92 features are in both, and should be imported with the same PK both times # 159 + 161 is 320, which is 92 more features than the actual total of 228 + # This is similar enough to be detected as an edit - only one field is different. + c.execute( + "UPDATE nz_waca_adjustments SET survey_reference='foo' WHERE id=1424927;" + ) + # This is similar enough to be detected as an edit - only one field is different. + c.execute( + "UPDATE nz_waca_adjustments SET adjusted_nodes=12345678 WHERE id=1443053;" + ) + # This will not be detected as an edit - two fields are different, + # so it looks like one feature is deleted and a different one is inserted. + c.execute( + "UPDATE nz_waca_adjustments SET survey_reference='bar', adjusted_nodes=87654321 WHERE id=1452332;" + ) + + r = cli_runner.invoke( + [ + "--repo", + str(repo_path.resolve()), + "import", + os.environ["SNO_POSTGRES_URL"], + "nz_waca_adjustments_view", + "--replace-existing", + "--similarity-detection-limit=10", + ] + ) + assert r.exit_code == 0, r.stderr + r = cli_runner.invoke(["--repo", str(repo_path.resolve()), "show"]) + assert r.exit_code == 0, r.stderr + # Two edits and one insert + delete: + assert r.stdout.splitlines()[-19:] == [ + "", + "--- nz_waca_adjustments_view:feature:1", + "+++ nz_waca_adjustments_view:feature:1", + "- survey_reference = ␀", + "+ survey_reference = foo", + "--- nz_waca_adjustments_view:feature:2", + "+++ nz_waca_adjustments_view:feature:2", + "- adjusted_nodes = 1238", + "+ adjusted_nodes = 12345678", + "--- nz_waca_adjustments_view:feature:3", + "- geom = MULTIPOLYGON(...)", + "- date_adjusted = 2011-06-07T15:22:58Z", + "- survey_reference = ␀", + "- adjusted_nodes = 558", + "+++ nz_waca_adjustments_view:feature:229", + "+ geom = MULTIPOLYGON(...)", + "+ date_adjusted = 2011-06-07T15:22:58Z", + "+ survey_reference = bar", + "+ adjusted_nodes = 87654321", + ] + def test_pk_encoding(): ds = Dataset1(None, "mytable")