Skip to content

Commit

Permalink
WIP: adding query_latest_table method to ArchiveQuerier.
Browse files Browse the repository at this point in the history
  • Loading branch information
ABPLMC committed May 9, 2024
1 parent 8eb2e08 commit 5368ebf
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 13 deletions.
11 changes: 11 additions & 0 deletions api/datalake_api/querier.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from memoized_property import memoized_property
from datalake.common import DatalakeRecord
from boto3.dynamodb.conditions import Key
import base64
import json
import time
Expand Down Expand Up @@ -365,3 +366,13 @@ def _get_all_records_in_bucket(self, bucket, **kwargs):
break
kwargs['ExclusiveStartKey'] = response['LastEvaluatedKey']
return records

def query_latest_table(self, what, where):
response = self._table.query(
KeyConditionExpression=Key('what_where_key').eq(f'{what}:{where}')
)
items = response.get('Items', [])
if not items:
return None
latest_item = items[0]
return dict(url=latest_item['url'], metadata=latest_item['metadata'])
61 changes: 50 additions & 11 deletions api/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,20 @@ def tear_down():
}
]

latest_attribute_definitions = [
{
'AttributeName': 'what_where_key',
'AttributeType': 'S'
}
]

latest_key_schema = [
{
'AttributeName': 'what_where_key',
'KeyType': 'HASH'
}
]

global_secondary = [{
'IndexName': 'work-id-index',
'KeySchema': [
Expand Down Expand Up @@ -140,55 +154,80 @@ def _delete_table(table):
raise e


def _create_table(dynamodb, table_name):
def _create_table(dynamodb,
table_name,
attribute_definitions,
key_schema,
global_secondary=None):
table = dynamodb.Table(table_name)
_delete_table(table)
kwargs = dict(
TableName=table_name,
AttributeDefinitions=attribute_definitions,
KeySchema=key_schema,
GlobalSecondaryIndexes=global_secondary,
ProvisionedThroughput={
'ReadCapacityUnits': 5,
'WriteCapacityUnits': 5
}
)
if global_secondary:
kwargs['GlobalSecondaryIndexes'] = global_secondary
dynamodb.create_table(**kwargs)
return dynamodb.Table(table_name)


def _populate_table(table, records):
print(f'attempting to populate {table}')
with table.batch_writer() as batch:
for r in records:
batch.put_item(Item=r)


# Adding latest table logic so latest table will be created and records will populate it
# Once that's possible, we will simply query the latest_table for what:where, no bucket logic
@pytest.fixture
def table_maker(request, dynamodb):

def maker(records):
table_name = 'test'
table = _create_table(dynamodb, table_name)
_populate_table(table, records)
def maker(records, include_latest_key=False):
old_table_name = 'test'
latest_table_name = 'test_latest'
latest_table = None

old_table = _create_table(dynamodb, old_table_name, attribute_definitions, key_schema, global_secondary)
_populate_table(old_table, records)

if include_latest_key:
latest_table = _create_table(dynamodb, latest_table_name, latest_attribute_definitions, latest_key_schema)
_populate_table(latest_table, records)

def tear_down():
_delete_table(table)
_delete_table(old_table)
if include_latest_key:
_delete_table(latest_table)

request.addfinalizer(tear_down)

return table
return old_table, latest_table

return maker


@pytest.fixture
def record_maker(s3_file_from_metadata):

def maker(**kwargs):
def maker(include_latest_key=False, **kwargs):
m = generate_random_metadata()
m.update(**kwargs)
key = '/'.join([str(v) for v in kwargs.values()])
url = 's3://datalake-test/' + key
s3_file_from_metadata(url, m)
return DatalakeRecord.list_from_metadata(url, m)
records = DatalakeRecord.list_from_metadata(url, m)

if include_latest_key:
what = kwargs.get('what')
where = kwargs.get('where')
for record in records:
record['what_where_key'] = f"{what}:{where}"

return records

return maker
21 changes: 19 additions & 2 deletions api/tests/test_archive_querier.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,10 @@ def query_latest(self, what, where):

@pytest.fixture(params=[ArchiveQuerier, HttpQuerier],
ids=['archive_querier', 'http'])
def querier(request, dynamodb):
return request.param('test', dynamodb=dynamodb)
def querier(request):
def create_querier(dynamodb, table_name):
return request.param(table_name, dynamodb=dynamodb)
return create_querier


def in_url(result, part):
Expand Down Expand Up @@ -528,3 +530,18 @@ def test_2x_max_results_in_one_bucket(table_maker, querier, record_maker):
pages = get_all_pages(querier.query_by_time, [start, end, 'boo'])
results = consolidate_pages(pages)
assert len(results) == MAX_RESULTS * 2

"""
Will have to go through all of the tests associated with
latest and correctly query from
the latest table that was created.
"""

def test_latest_table_query(table_maker, querier, record_maker):
now = int(time.time() * 1000)
records = record_maker(include_latest_key=True, what='foo', where='boo')
_, latest_table = table_maker(records)

querier_instance = querier(dynamodb=latest_table.dynamodb, table_name=latest_table.table_name)
result = querier_instance.query_latest_table('foo', 'boo')
_validate_latest_result(result, what='foo', where='boo')

0 comments on commit 5368ebf

Please sign in to comment.