Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for kart upgrade so it handles complicated repos #449

Merged
merged 3 commits into from
Jun 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,14 @@ Please note that compatibility for 0.x releases (software or repositories) isn't

_When adding new entries to the changelog, please include issue/PR numbers wherever possible._

## 0.10.1

#### Fix for `kart upgrade`
Fixed `kart upgrade` so that it preserves more complicated (or yet-to-be-released) features of V2 repos as they are upgraded to V3. [#448](https://github.com/koordinates/kart/issues/448)

Specifically:
* `generated-pks.json` metadata, extra metadata found in datasets that have an automatically generated primary key and which are maintained by repeatedly importing from a primary-key-less datasource
* attachments (which are not yet fully supported by Kart) - arbitrary files kept alongside datasets, such as license or readme files.

## 0.10.0

Expand Down
3 changes: 3 additions & 0 deletions kart/dataset3.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,9 @@ def import_iter_meta_blobs(self, repo, source):

yield full_path, content

for rel_path, content in source.attachment_items():
yield self.full_attachment_path(rel_path), content

def iter_legend_blob_data(self):
"""
Generates (full_path, blob_data) tuples for each legend in this dataset
Expand Down
99 changes: 60 additions & 39 deletions kart/fast_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,15 @@ class _CommitMissing(Exception):
pass


def _safe_walk_repo(repo):
def _safe_walk_repo(repo, from_commit):
"""
Contextmanager. Walk the repo log, yielding each commit.
If a commit isn't present, raises _CommitMissing.
Avoids catching any other KeyErrors raised by pygit2 or the contextmanager body
"""
do_raise = False
try:
for commit in repo.walk(repo.head.target):
for commit in repo.walk(from_commit.id):
try:
yield commit
except KeyError:
Expand All @@ -65,7 +65,10 @@ def _safe_walk_repo(repo):


def should_compare_imported_features_against_old_features(
repo, source, replacing_dataset
repo,
source,
replacing_dataset,
from_commit,
):
"""
Returns True iff we should compare feature blobs to the previous feature blobs
Expand All @@ -91,7 +94,7 @@ def should_compare_imported_features_against_old_features(

# Walk the log until we encounter a relevant schema change
try:
for commit in _safe_walk_repo(repo):
for commit in _safe_walk_repo(repo, from_commit):
datasets = repo.datasets(commit.oid)
try:
old_dataset = datasets[replacing_dataset.path]
Expand Down Expand Up @@ -149,14 +152,22 @@ def fast_import_clear_trees(*, procs, replace_ids, replacing_dataset, source):
if replacing_dataset is None:
# nothing to do
return
dest_inner_path = f"{source.dest_path}/{replacing_dataset.DATASET_DIRNAME}"
dest_path = source.dest_path
dest_inner_path = f"{dest_path}/{replacing_dataset.DATASET_DIRNAME}"
for i, proc in enumerate(procs):
if replace_ids is None:
# Delete the existing dataset, before we re-import it.
proc.stdin.write(f"D {source.dest_path}\n".encode("utf8"))
else:
# delete and reimport meta/
# Delete and reimport any attachments at dest_path
attachment_names = [
obj.name for obj in replacing_dataset.tree if obj.type_str == "blob"
]
Comment on lines +163 to +165
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this look deeper? it only looks for blobs which are immediate children of the dataset tree. I feel like we're going to miss something here if we ever add more complex hierarchies of 'attachments'.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now these are the only attachments which exist in V2 repos, and they really only exist in repos which we created (not user repos) and the set of all V2 repos is not supposed to grow very much (or at least, not with new types of V2 repos which we haven't yet seen).

There is another type of hypothetical attachment which won't currently be preserved and which we can't really do anything about, at least not with the current structure - attachments which are not attached to any particular dataset, but are at some other path. But again, these don't exist either. So, not too motivated to make this more complicated for no real benefit

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay 👍

for name in attachment_names:
proc.stdin.write(f"D {dest_path}/{name}\n".encode("utf8"))
# Delete and reimport <inner_path>/meta/
proc.stdin.write(f"D {dest_inner_path}/meta\n".encode("utf8"))

# delete all features not pertaining to this process.
# we also delete the features that *do*, but we do it further down
# so that we don't have to iterate the IDs more than once.
Expand All @@ -174,20 +185,25 @@ def fast_import_clear_trees(*, procs, replace_ids, replacing_dataset, source):
pass


UNSPECIFIED = object()


def fast_import_tables(
repo,
sources,
*,
verbosity=1,
num_processes=4,
header=None,
message=None,
replace_existing=ReplaceExisting.DONT_REPLACE,
from_commit=UNSPECIFIED,
replace_ids=None,
allow_empty=False,
limit=None,
max_pack_size="2G",
max_delta_depth=0,
# Advanced use - used by kart upgrade.
header=None,
extra_cmd_args=(),
):
"""
Expand All @@ -200,13 +216,16 @@ def fast_import_tables(
1: basic status information
2: full output of `git-fast-import --stats ...`
num_processes: how many import processes to run in parallel
header - the commit-header to supply git-fast-import. Generated if not supplied - see generate_header.
message - the commit-message used when generating the header. Generated if not supplied - see generate_message.
replace_existing - See ReplaceExisting enum
from_commit - the commit to be used as a starting point before beginning the import.
replace_ids - list of PK values to replace, or None
limit - maximum number of features to import per source.
max_pack_size - maximum size of pack files. Affects performance.
max_delta_depth - maximum depth of delta-compression chains. Affects performance.

The following extra options are used by kart upgrade.
header - the commit-header to supply git-fast-import. Generated if not supplied - see generate_header.
extra_cmd_args - any extra args for the git-fast-import command.
"""

Expand All @@ -220,28 +239,26 @@ def fast_import_tables(
# too many processes it won't be very even.
raise ValueError(f"Can't import with more than {MAX_PROCESSES} processes")

# The tree this repo was at before this function was called.
# May be None (repo is empty)
orig_tree = repo.head_tree

# The tree we look at for considering what datasets already exist
# depends what we want to replace.
if replace_existing == ReplaceExisting.ALL:
starting_tree = None
# The commit that this import is using as the basis for the new commit.
# If we are replacing everything, we start from scratch, so from_commit is None.
if replace_existing is ReplaceExisting.ALL:
from_commit = None
else:
starting_tree = repo.head_tree
if from_commit is UNSPECIFIED:
raise RuntimeError(
"Caller should specify from_commit when requesting an import that doesn't start from scratch"
)

if not starting_tree:
replace_existing = ReplaceExisting.ALL
from_tree = from_commit.peel(pygit2.Tree) if from_commit else repo.empty_tree

assert repo.version in SUPPORTED_REPO_VERSIONS
extra_blobs = extra_blobs_for_version(repo.version) if not starting_tree else []
extra_blobs = extra_blobs_for_version(repo.version) if not from_commit else []

ImportSource.check_valid(sources)

if replace_existing == ReplaceExisting.DONT_REPLACE:
for source in sources:
if source.dest_path in starting_tree:
if source.dest_path in from_tree:
raise InvalidOperation(
f"Cannot import to {source.dest_path}/ - already exists in repository"
)
Expand All @@ -260,7 +277,6 @@ def fast_import_tables(
for arg in extra_cmd_args:
cmd.append(arg)

orig_commit = repo.head_commit
import_refs = []

if verbosity >= 1:
Expand Down Expand Up @@ -291,22 +307,21 @@ def fast_import_tables(
import_ref = f"refs/kart-import/{uuid.uuid4()}"
import_refs.append(import_ref)

# may be None, if head is detached
# orig_branch may be None, if head is detached
# FIXME - this code relies upon the fact that we always either a) import at HEAD (import flow)
# or b) Fix up the branch heads later (upgrade flow).
orig_branch = repo.head_branch
proc_header = generate_header(repo, sources, message, import_ref)
if replace_existing != ReplaceExisting.ALL:
proc_header += f"from {orig_commit.oid}\n"
proc_header = generate_header(
repo, sources, message, import_ref, from_commit
)

proc = stack.enter_context(_git_fast_import(repo, *cmd))
procs.append(proc)
proc.stdin.write(proc_header.encode("utf8"))

# Write the extra blob that records the repo's version:
for i, blob_path in write_blobs_to_stream(procs[0].stdin, extra_blobs):
if (
replace_existing != ReplaceExisting.ALL
and blob_path in starting_tree
):
if replace_existing != ReplaceExisting.ALL and blob_path in from_tree:
raise ValueError(f"{blob_path} already exists")

if num_processes == 1:
Expand All @@ -326,6 +341,7 @@ def proc_for_feature_path(path):
repo,
source,
replace_existing,
from_commit,
procs,
proc_for_feature_path,
replace_ids,
Expand Down Expand Up @@ -363,7 +379,7 @@ def proc_for_feature_path(path):
else:
new_tree = trees[0]
if not allow_empty:
if new_tree == orig_tree:
if new_tree == from_tree:
raise NotFound("No changes to commit", exit_code=NO_CHANGES)

# use the existing commit details we already imported, but use the new tree
Expand All @@ -380,16 +396,14 @@ def proc_for_feature_path(path):
# remove the import branches
for b in import_refs:
if b in repo.references:
try:
repo.references.delete(b)
except KeyError:
pass # Nothing to delete, probably due to some earlier failure.
repo.references.delete(b)


def _import_single_source(
repo,
source,
replace_existing,
from_commit,
procs,
proc_for_feature_path,
replace_ids,
Expand All @@ -400,6 +414,7 @@ def _import_single_source(
repo - the Kart repo to import into.
source - an individual ImportSource
replace_existing - See ReplaceExisting enum
from_commit - the commit to be used as a starting point before beginning the import.
procs - all the processes to be used (for parallel imports)
proc_for_feature_path - function, given a feature path returns the process to use to import it
replace_ids - list of PK values to replace, or None
Expand All @@ -412,7 +427,7 @@ def _import_single_source(
replacing_dataset = None
if replace_existing == ReplaceExisting.GIVEN:
try:
replacing_dataset = repo.datasets()[source.dest_path]
replacing_dataset = repo.datasets(refish=from_commit)[source.dest_path]
except KeyError:
# no such dataset; no problem
replacing_dataset = None
Expand Down Expand Up @@ -478,7 +493,10 @@ def _ids():
)

elif should_compare_imported_features_against_old_features(
repo, source, replacing_dataset
repo,
source,
replacing_dataset,
from_commit,
):
feature_blob_iter = dataset.import_iter_feature_blobs(
repo,
Expand Down Expand Up @@ -536,18 +554,21 @@ def copy_existing_blob_to_stream(stream, blob_path, blob_sha):
stream.write(f"M 644 {blob_sha} {blob_path}\n".encode("utf8"))


def generate_header(repo, sources, message, branch):
def generate_header(repo, sources, message, branch, from_commit):
if message is None:
message = generate_message(sources)

author = repo.author_signature()
committer = repo.committer_signature()
return (
result = (
f"commit {branch}\n"
f"author {author.name} <{author.email}> {author.time} {minutes_to_tz_offset(author.offset)}\n"
f"committer {committer.name} <{committer.email}> {committer.time} {minutes_to_tz_offset(committer.offset)}\n"
f"data {len(message.encode('utf8'))}\n{message}\n"
)
if from_commit:
result += f"from {from_commit.oid}\n"
return result


def generate_message(sources):
Expand Down
8 changes: 8 additions & 0 deletions kart/import_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,14 @@ def meta_items(self):
"""
raise NotImplementedError()

def attachment_items(self):
"""
Returns a dict of all the attachment items that need to be imported.
These are files that will be imported verbatim to dest_path, but not hidden inside the dataset.
This could be a license or a readme.
"""
return {}

def crs_definitions(self):
"""
Returns an {identifier: definition} dict containing every CRS definition.
Expand Down
9 changes: 6 additions & 3 deletions kart/init.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,15 +326,17 @@ def import_(
import_sources.append(import_source)

ImportSource.check_valid(import_sources, param_hint="tables")
replace_existing_enum = (
ReplaceExisting.GIVEN if replace_existing else ReplaceExisting.DONT_REPLACE
)
fast_import_tables(
repo,
import_sources,
verbosity=ctx.obj.verbosity + 1,
message=message,
max_delta_depth=max_delta_depth,
replace_existing=ReplaceExisting.GIVEN
if replace_existing
else ReplaceExisting.DONT_REPLACE,
replace_existing=replace_existing_enum,
from_commit=repo.head_commit,
replace_ids=replace_ids,
allow_empty=allow_empty,
num_processes=num_processes or get_default_num_processes(),
Expand Down Expand Up @@ -458,6 +460,7 @@ def init(
fast_import_tables(
repo,
sources,
from_commit=None,
message=message,
max_delta_depth=max_delta_depth,
num_processes=num_processes or get_default_num_processes(),
Expand Down
2 changes: 1 addition & 1 deletion kart/pk_generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from .import_source import ImportSource
from .dataset2 import Dataset2
from .dataset2 import Dataset3
from .schema import ColumnSchema, Schema
from .schema import ColumnSchema


class PkGeneratingImportSource(ImportSource):
Expand Down
4 changes: 2 additions & 2 deletions kart/rich_base_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,9 +201,9 @@ def diff_feature(self, other, feature_filter=UNFILTERED, reverse=False):
"""
feature_filter = feature_filter or UNFILTERED

params = {}
params = {"flags": pygit2.GIT_DIFF_SKIP_BINARY_CHECK}
if reverse:
params = {"swap": True}
params["swap"] = True

if other is None:
diff_index = self.inner_tree.diff_to_tree(**params)
Expand Down
Loading