Skip to content

Commit

Permalink
Stop using S3 Select in indexer (#4212)
Browse files Browse the repository at this point in the history
Co-authored-by: Alexei Mochalov <nl_0@quiltdata.io>
  • Loading branch information
sir-sigurd and nl0 authored Nov 19, 2024
1 parent 6d70cc8 commit 40db3b4
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 88 deletions.
21 changes: 21 additions & 0 deletions lambdas/indexer/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<!-- markdownlint-disable line-length -->
# Changelog

Changes are listed in reverse chronological order (newer entries at the top).
The entry format is

```markdown
- [Verb] Change description ([#<PR-number>](https://github.com/quiltdata/quilt/pull/<PR-number>))
```

where verb is one of

- Removed
- Added
- Fixed
- Changed

## Changes

- [Changed] Stop using S3 select ([#4212](https://github.com/quiltdata/quilt/pull/4212))
- [Added] Bootstrap the change log ([#4212](https://github.com/quiltdata/quilt/pull/4212))
97 changes: 58 additions & 39 deletions lambdas/indexer/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@


import datetime
import functools
import json
import os
import pathlib
Expand Down Expand Up @@ -92,7 +93,6 @@
POINTER_PREFIX_V1,
get_available_memory,
get_quilt_logger,
query_manifest_content,
separated_env_to_iter,
)

Expand Down Expand Up @@ -168,12 +168,7 @@
# currently only affects .parquet, TODO: extend to other extensions
assert 'SKIP_ROWS_EXTS' in os.environ
SKIP_ROWS_EXTS = separated_env_to_iter('SKIP_ROWS_EXTS')
SELECT_PACKAGE_META = "SELECT * from S3Object o WHERE o.version IS NOT MISSING LIMIT 1"
# No WHERE clause needed for aggregations since S3 Select skips missing fields for aggs
SELECT_PACKAGE_STATS = (
"SELECT COALESCE(SUM(obj['size']), 0) as total_bytes,"
" COUNT(obj['size']) as total_files from S3Object obj"
)
DUCKDB_SELECT_LAMBDA_ARN = os.environ["DUCKDB_SELECT_LAMBDA_ARN"]
TEST_EVENT = "s3:TestEvent"
# we need to filter out GetObject and HeadObject calls generated by the present
# lambda in order to display accurate analytics in the Quilt catalog
Expand All @@ -182,6 +177,7 @@


logger = get_quilt_logger()
s3_client = boto3.client("s3", config=botocore.config.Config(user_agent_extra=USER_AGENT_EXTRA))


def now_like_boto3():
Expand Down Expand Up @@ -247,13 +243,10 @@ def select_manifest_meta(s3_client, bucket: str, key: str):
wrapper for retry and returning a string
"""
try:
raw = query_manifest_content(
s3_client,
bucket=bucket,
key=key,
sql_stmt=SELECT_PACKAGE_META
)
return json.load(raw)
body = s3_client.get_object(Bucket=bucket, Key=key)["Body"]
with body: # this *might* be needed to close the stream ASAP
for line in body.iter_lines():
return json.loads(line)
except (botocore.exceptions.ClientError, json.JSONDecodeError) as cle:
print(f"Unable to S3 select manifest: {cle}")

Expand Down Expand Up @@ -439,7 +432,7 @@ def get_pkg_data():
first = select_manifest_meta(s3_client, bucket, manifest_key)
if not first:
return
stats = select_package_stats(s3_client, bucket, manifest_key)
stats = select_package_stats(bucket, manifest_key)
if not stats:
return

Expand Down Expand Up @@ -472,33 +465,54 @@ def get_pkg_data():
return True


def select_package_stats(s3_client, bucket, manifest_key) -> str:
@functools.lru_cache(maxsize=None)
def get_bucket_region(bucket: str) -> str:
resp = s3_client.head_bucket(Bucket=bucket)
return resp["ResponseMetadata"]["HTTPHeaders"]["x-amz-bucket-region"]


@functools.lru_cache(maxsize=None)
def get_presigner_client(bucket: str):
return boto3.client(
"s3",
region_name=get_bucket_region(bucket),
config=botocore.config.Config(signature_version="s3v4"),
)


def select_package_stats(bucket, manifest_key) -> Optional[dict]:
"""use s3 select to generate file stats for package"""
logger_ = get_quilt_logger()
try:
raw_stats = query_manifest_content(
s3_client,
bucket=bucket,
key=manifest_key,
sql_stmt=SELECT_PACKAGE_STATS
).read()

if raw_stats:
stats = json.loads(raw_stats)
assert isinstance(stats['total_bytes'], int)
assert isinstance(stats['total_files'], int)

return stats

except (
AssertionError,
botocore.exceptions.ClientError,
json.JSONDecodeError,
KeyError,
) as err:
logger_.exception("Unable to compute package stats via S3 select")
presigner_client = get_presigner_client(bucket)
url = presigner_client.generate_presigned_url(
ClientMethod="get_object",
Params={
"Bucket": bucket,
"Key": manifest_key,
},
)
lambda_ = make_lambda_client()
q = f"""
SELECT
COALESCE(SUM(size), 0) AS total_bytes,
COUNT(size) AS total_files FROM read_ndjson('{url}', columns={{size: 'UBIGINT'}}) obj
"""
resp = lambda_.invoke(
FunctionName=DUCKDB_SELECT_LAMBDA_ARN,
Payload=json.dumps({"query": q, "user_agent": f"DuckDB Select {USER_AGENT_EXTRA}"}),
)

return None
payload = resp["Payload"].read()
if "FunctionError" in resp:
logger_.error("DuckDB select unhandled error: %s", payload)
return None
parsed = json.loads(payload)
if "error" in parsed:
logger_.error("DuckDB select error: %s", parsed["error"])
return None

rows = parsed["rows"]
return rows[0] if rows else None


def extract_pptx(fileobj, max_size: int) -> str:
Expand Down Expand Up @@ -732,6 +746,11 @@ def make_s3_client():
return boto3.client("s3", config=configuration)


@functools.lru_cache(maxsize=None)
def make_lambda_client():
return boto3.client("lambda")


def map_event_name(event: dict):
"""transform eventbridge names into S3-like ones"""
input_ = event["eventName"]
Expand Down
4 changes: 3 additions & 1 deletion lambdas/indexer/pytest.ini
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
[pytest]
env =
DUCKDB_SELECT_LAMBDA_ARN = "arn:aws:lambda:us-west-2:123456789012:function:select-lambda"
log_cli = True
# This is set above critical to prevent logger events from confusing output in CI
log_level = 51
log_level = 51
1 change: 1 addition & 0 deletions lambdas/indexer/test-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ pluggy==0.9
py==1.10.0
pytest==4.4.0
pytest-cov==2.6.1
pytest-env==0.6.2
responses==0.10.14
50 changes: 2 additions & 48 deletions lambdas/indexer/test/test_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import responses
from botocore import UNSIGNED
from botocore.client import Config
from botocore.exceptions import ParamValidationError
from botocore.stub import Stubber
from dateutil.tz import tzutc
from document_queue import EVENT_PREFIX, RetryError
Expand Down Expand Up @@ -979,7 +978,7 @@ def test_index_if_package_select_stats_fail(self, append_mock, select_meta_mock,
)

select_meta_mock.assert_called_once_with(self.s3_client, bucket, manifest_key)
select_stats_mock.assert_called_once_with(self.s3_client, bucket, manifest_key)
select_stats_mock.assert_called_once_with(bucket, manifest_key)
append_mock.assert_called_once_with({
"_index": bucket + PACKAGE_INDEX_SUFFIX,
"_id": key,
Expand Down Expand Up @@ -1023,7 +1022,7 @@ def test_index_if_package(self, append_mock, select_meta_mock, select_stats_mock
)

select_meta_mock.assert_called_once_with(self.s3_client, bucket, manifest_key)
select_stats_mock.assert_called_once_with(self.s3_client, bucket, manifest_key)
select_stats_mock.assert_called_once_with(bucket, manifest_key)
append_mock.assert_called_once_with({
"_index": bucket + PACKAGE_INDEX_SUFFIX,
"_id": key,
Expand Down Expand Up @@ -1182,51 +1181,6 @@ def test_extension_overrides(self):
assert self._get_contents('foo.txt', '.txt') == ""
assert self._get_contents('foo.ipynb', '.ipynb') == ""

@pytest.mark.xfail(
raises=ParamValidationError,
reason="boto bug https://github.com/boto/botocore/issues/1621",
strict=True,
)
def test_stub_select_object_content(self):
"""Demonstrate that mocking S3 select with boto3 is broken"""
sha_hash = "50f4d0fc2c22a70893a7f356a4929046ce529b53c1ef87e28378d92b884691a5"
manifest_key = f"{MANIFEST_PREFIX_V1}{sha_hash}"
# this SHOULD work, but due to botocore bugs it does not
self.s3_stubber.add_response(
method="select_object_content",
service_response={
"ResponseMetadata": ANY,
# it is sadly not possible to mock S3 select responses because
# boto incorrectly believes "Payload"'s value should be a dict
# but it's really an iterable in realworld code
# see https://github.com/boto/botocore/issues/1621
"Payload": [
{
"Stats": {}
},
{
"Records": {
"Payload": json.dumps(MANIFEST_DATA).encode(),
},
},
{
"End": {}
},
]
},
expected_params={
"Bucket": "test-bucket",
"Key": manifest_key,
"Expression": index.SELECT_PACKAGE_META,
"ExpressionType": "SQL",
"InputSerialization": {
'JSON': {'Type': 'LINES'},
'CompressionType': 'NONE'
},
"OutputSerialization": {'JSON': {'RecordDelimiter': '\n'}}
}
)

def test_synthetic_copy_event(self):
"""check synthetic ObjectCreated:Copy event vs organic obtained on 26-May-2020
(bucket versioning on)
Expand Down

0 comments on commit 40db3b4

Please sign in to comment.