-
Notifications
You must be signed in to change notification settings - Fork 41
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adds an experimental byod-point-cloud-import command #906
Merged
Merged
Changes from all commits
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
263f32e
Adds an experimental byod-point-cloud-import command
olsen232 88b8fad
Add boto3 to requirements
olsen232 b267d10
Add byod as python package
olsen232 7145291
experimental byod-import: address review comments
olsen232 7f33931
Fix test config to work for arbiter as well as boto3
olsen232 2195933
Fix S3 tests
olsen232 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,84 @@ | ||
import logging | ||
|
||
import click | ||
|
||
from kart.fast_import import ( | ||
write_blob_to_stream, | ||
) | ||
from kart.lfs_util import dict_to_pointer_file_bytes | ||
from kart.progress_util import progress_bar | ||
from kart.s3_util import expand_s3_glob | ||
|
||
L = logging.getLogger(__name__) | ||
|
||
|
||
class ByodTileImporter: | ||
""" | ||
Subclassable logic for importing the metadata from tile-based datasets, | ||
while leaving the data in-place on existing hosted storage. | ||
""" | ||
|
||
EXTRACT_TILE_METADATA_STEP = "Fetching tile metadata" | ||
|
||
def sanity_check_sources(self, sources): | ||
for source in sources: | ||
if not source.startswith("s3://"): | ||
raise click.UsageError(f"SOURCE {source} should be an S3 url") | ||
|
||
for source in list(sources): | ||
if "*" in source: | ||
sources.remove(source) | ||
sources += expand_s3_glob(source) | ||
|
||
def extract_tile_metadata(self, tile_location): | ||
raise NotImplementedError() | ||
|
||
def get_conversion_func(self, source_metadata): | ||
return None | ||
|
||
def import_tiles_to_stream(self, stream, sources): | ||
progress = progress_bar( | ||
total=len(sources), unit="tile", desc="Writing tile metadata" | ||
) | ||
with progress as p: | ||
for source in sources: | ||
tilename = self.DATASET_CLASS.tilename_from_path(source) | ||
rel_blob_path = self.DATASET_CLASS.tilename_to_blob_path( | ||
tilename, relative=True | ||
) | ||
blob_path = f"{self.dataset_inner_path}/{rel_blob_path}" | ||
|
||
# Check if tile has already been imported previously: | ||
if self.existing_dataset is not None: | ||
existing_summary = self.existing_dataset.get_tile_summary( | ||
tilename, missing_ok=True | ||
) | ||
if existing_summary: | ||
source_oid = self.source_to_hash_and_size[source][0] | ||
if self.existing_tile_matches_source( | ||
source_oid, existing_summary | ||
): | ||
# This tile has already been imported before. Reuse it rather than re-importing it. | ||
# Re-importing it could cause it to be re-converted, which is a waste of time, | ||
# and it may not convert the same the second time, which is then a waste of space | ||
# and shows up as a pointless diff. | ||
write_blob_to_stream( | ||
stream, | ||
blob_path, | ||
(self.existing_dataset.inner_tree / rel_blob_path).data, | ||
) | ||
self.include_existing_metadata = True | ||
continue | ||
|
||
# Tile hasn't been imported previously. | ||
pointer_data = dict_to_pointer_file_bytes( | ||
self.source_to_metadata[source]["tile"] | ||
) | ||
write_blob_to_stream(stream, blob_path, pointer_data) | ||
|
||
p.update(1) | ||
|
||
self.source_to_imported_metadata = self.source_to_metadata | ||
|
||
def prompt_for_convert_to_cloud_optimized(self): | ||
return False |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,132 @@ | ||
import logging | ||
|
||
import click | ||
|
||
from kart.byod.importer import ByodTileImporter | ||
from kart.cli_util import StringFromFile, MutexOption, KartCommand | ||
from kart.point_cloud.import_ import PointCloudImporter | ||
from kart.point_cloud.metadata_util import extract_pc_tile_metadata | ||
from kart.s3_util import get_hash_and_size_of_s3_object, fetch_from_s3 | ||
|
||
|
||
L = logging.getLogger(__name__) | ||
|
||
|
||
@click.command("byod-point-cloud-import", hidden=True, cls=KartCommand) | ||
@click.pass_context | ||
@click.option( | ||
"--message", | ||
"-m", | ||
type=StringFromFile(encoding="utf-8"), | ||
help="Commit message. By default this is auto-generated.", | ||
) | ||
@click.option( | ||
"--checkout/--no-checkout", | ||
"do_checkout", | ||
is_flag=True, | ||
default=True, | ||
help="Whether to create a working copy once the import is finished, if no working copy exists yet.", | ||
) | ||
@click.option( | ||
"--replace-existing", | ||
is_flag=True, | ||
cls=MutexOption, | ||
exclusive_with=["--delete", "--update-existing"], | ||
help="Replace existing dataset at the same path.", | ||
) | ||
@click.option( | ||
"--update-existing", | ||
is_flag=True, | ||
cls=MutexOption, | ||
exclusive_with=["--replace-existing"], | ||
help=( | ||
"Update existing dataset at the same path. " | ||
"Tiles will be replaced by source tiles with the same name. " | ||
"Tiles in the existing dataset which are not present in SOURCES will remain untouched." | ||
), | ||
) | ||
@click.option( | ||
"--delete", | ||
type=StringFromFile(encoding="utf-8"), | ||
cls=MutexOption, | ||
exclusive_with=["--replace-existing"], | ||
multiple=True, | ||
help=("Deletes the given tile. Can be used multiple times."), | ||
) | ||
@click.option( | ||
"--amend", | ||
default=False, | ||
is_flag=True, | ||
help="Amend the previous commit instead of adding a new commit", | ||
) | ||
@click.option( | ||
"--allow-empty", | ||
is_flag=True, | ||
default=False, | ||
help=( | ||
"Usually recording a commit that has the exact same tree as its sole " | ||
"parent commit is a mistake, and the command prevents you from making " | ||
"such a commit. This option bypasses the safety" | ||
), | ||
) | ||
@click.option( | ||
"--num-workers", | ||
"--num-processes", | ||
type=click.INT, | ||
help="How many import workers to run in parallel. Defaults to the number of available CPU cores.", | ||
default=None, | ||
hidden=True, | ||
) | ||
@click.option("--dataset-path", "--dataset", help="The dataset's path once imported") | ||
@click.argument( | ||
"sources", | ||
nargs=-1, | ||
metavar="SOURCE [SOURCES...]", | ||
) | ||
def byod_point_cloud_import( | ||
ctx, | ||
message, | ||
do_checkout, | ||
replace_existing, | ||
update_existing, | ||
delete, | ||
amend, | ||
allow_empty, | ||
num_workers, | ||
dataset_path, | ||
sources, | ||
): | ||
""" | ||
Experimental. Import a dataset of point-cloud tiles from S3. Doesn't fetch the tiles, does store the tiles original location. | ||
|
||
SOURCES should be one or more LAZ or LAS files (or wildcards that match multiple LAZ or LAS files). | ||
""" | ||
repo = ctx.obj.repo | ||
|
||
ByodPointCloudImporter(repo, ctx).import_tiles( | ||
convert_to_cloud_optimized=False, | ||
dataset_path=dataset_path, | ||
message=message, | ||
do_checkout=do_checkout, | ||
replace_existing=replace_existing, | ||
update_existing=update_existing, | ||
delete=delete, | ||
amend=amend, | ||
allow_empty=allow_empty, | ||
sources=list(sources), | ||
num_workers=num_workers, | ||
) | ||
|
||
|
||
class ByodPointCloudImporter(ByodTileImporter, PointCloudImporter): | ||
def extract_tile_metadata(self, tile_location): | ||
oid_and_size = get_hash_and_size_of_s3_object(tile_location) | ||
# TODO - download only certain ranges of the file, and extract metadata from those. | ||
tmp_downloaded_tile = fetch_from_s3(tile_location) | ||
result = extract_pc_tile_metadata( | ||
tmp_downloaded_tile, oid_and_size=oid_and_size | ||
) | ||
tmp_downloaded_tile.unlink() | ||
# TODO - format still not definite, we might not put the whole URL in here. | ||
result["tile"]["url"] = tile_location | ||
return result |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,106 @@ | ||
from base64 import standard_b64decode | ||
import functools | ||
import os | ||
from pathlib import Path | ||
import tempfile | ||
from urllib.parse import urlparse | ||
|
||
import boto3 | ||
import click | ||
|
||
from kart.exceptions import NotFound, NO_IMPORT_SOURCE | ||
|
||
# Utility functions for dealing with S3 - not yet launched. | ||
|
||
|
||
@functools.lru_cache(maxsize=1) | ||
def get_s3_config(): | ||
# TODO - add an option --s3-region to commands where it would be useful. | ||
return None | ||
|
||
|
||
@functools.lru_cache(maxsize=1) | ||
def get_s3_client(): | ||
client = boto3.client("s3", config=get_s3_config()) | ||
if "AWS_NO_SIGN_REQUEST" in os.environ: | ||
client._request_signer.sign = lambda *args, **kwargs: None | ||
return client | ||
|
||
|
||
@functools.lru_cache(maxsize=1) | ||
def get_s3_resource(): | ||
resource = boto3.resource("s3", config=get_s3_config()) | ||
if "AWS_NO_SIGN_REQUEST" in os.environ: | ||
resource.meta.client._request_signer.sign = lambda *args, **kwargs: None | ||
return resource | ||
|
||
|
||
@functools.lru_cache() | ||
def get_bucket(name): | ||
return get_s3_resource().Bucket(name) | ||
|
||
|
||
def fetch_from_s3(s3_url, output_path=None): | ||
""" | ||
Downloads the object at s3_url to output_path. | ||
If output-path is not set, creates a temporary file using tempfile.mkstemp() | ||
""" | ||
# TODO: handle failure. | ||
parsed = urlparse(s3_url) | ||
bucket = get_bucket(parsed.netloc) | ||
if output_path is None: | ||
fd, path = tempfile.mkstemp() | ||
# If we keep it open, boto3 won't be able to write to it (on Windows): | ||
os.close(fd) | ||
output_path = Path(path) | ||
bucket.download_file(parsed.path.lstrip("/"), str(output_path.resolve())) | ||
return output_path | ||
|
||
|
||
def expand_s3_glob(source_spec): | ||
""" | ||
Given an s3_path with '*' wildcard in, uses prefix and suffix matching to find all S3 objects that match. | ||
Subdirectories (or the S3 equivalent - S3 is not exactly a directory hierarchy) are not matched - | ||
that is, s3://bucket/path/*.txt matches s3://bucket/path/example.txt but not s3://bucket/path/subpath/example.txt | ||
""" | ||
# TODO: handle any kind of failure, sanity check to make sure we don't match a million objects. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. meh; a million objects is actually fine at this level, not really our job to try to guess if that's the user's intent 🤷 |
||
if "*" not in source_spec: | ||
return [source_spec] | ||
|
||
parsed = urlparse(source_spec) | ||
prefix, suffix = parsed.path.split("*", maxsplit=1) | ||
if "*" in suffix: | ||
raise click.UsageError( | ||
f"Two wildcards '*' found in {source_spec} - only one wildcard is supported" | ||
) | ||
prefix = prefix.lstrip("/") | ||
prefix_len = len(prefix) | ||
|
||
bucket = get_bucket(parsed.netloc) | ||
matches = bucket.objects.filter(Prefix=prefix) | ||
result = [] | ||
for match in matches: | ||
assert match.key.startswith(prefix) | ||
match_suffix = match.key[prefix_len:] | ||
if match_suffix.endswith(suffix) and "/" not in match_suffix: | ||
result.append(f"s3://{match.bucket_name}/{match.key}") | ||
|
||
if not result: | ||
raise NotFound( | ||
f"No S3 objects found at {source_spec}", exit_code=NO_IMPORT_SOURCE | ||
) | ||
return result | ||
|
||
|
||
def get_hash_and_size_of_s3_object(s3_url): | ||
"""Returns the (SHA256-hash-in-Base64, filesize) of an S3 object.""" | ||
parsed = urlparse(s3_url) | ||
bucket = parsed.netloc | ||
key = parsed.path.lstrip("/") | ||
response = get_s3_client().head_object( | ||
Bucket=bucket, Key=key, ChecksumMode="ENABLED" | ||
) | ||
# TODO - handle failure (eg missing SHA256 checksum), which is extremely likely. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this seems important |
||
sha256 = standard_b64decode(response["ChecksumSHA256"]).hex() | ||
size = response["ContentLength"] | ||
return sha256, size |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a more efficient way of doing this?
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3/client/list_objects_v2.html supports a
OptionalObjectAttributes
thing, which might let us grab filesizes and checksums in the list call, to avoid doing GetObject requests for each object.If we can get PDAL to do the PC tile metadata bit, and we can move the checksum/filesize into the list call, then we don't need to do any per-object calls ourselves, which should make things quite a bit faster for big layers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not done - we don't necessarily do a list call if the user didn't supply us with a wildcard operator to expand. We could do a list call regardless, using whatever is the common prefix of all the supplied tiles... or multiple list calls if there are multiple distinct prefixes... but, this sounds like a whole other thing, it doesn't need to be a part of this PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
turns out this thing doesn't actually work anyway because
RestoreStatus
is the only thing you can put inOptionalObjectAttributes
- you can't ask for checksums as part of that. Dumb but can't do much about it 👍