Skip to content

Commit

Permalink
Ingester: Fixing imports, calling store_latest after default store me…
Browse files Browse the repository at this point in the history
…thod, adding --latest-table to cli.
  • Loading branch information
ABPLMC committed Jun 27, 2024
1 parent e0f34b0 commit 03de50a
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 10 deletions.
10 changes: 7 additions & 3 deletions ingester/datalake_ingester/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ def store_latest(self, record):
expression_attribute_values = {
':new_start': {'N': str(record['metadata']['start'])}
}

if record['metadata']['work_id'] is None:
work_id_value = {'BOOL': False}
else:
work_id_value = {'S': str(record['metadata']['work_id'])}

record = {
'what_where_key': {"S": record['metadata']['what']+':'+record['metadata']['where']},
'time_index_key': {"S": record['time_index_key']},
Expand Down Expand Up @@ -108,9 +114,7 @@ def store_latest(self, record):
'where': {
'S': str(record['metadata']['where'])
},
'work_id': {
'S': str(record['metadata']['work_id'])
}
'work_id': work_id_value
}
},
'url': {"S": record['url']},
Expand Down
12 changes: 5 additions & 7 deletions ingester/tests/test_ingester.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,17 @@ def test_ingest_random(storage, dynamodb_records_table, random_s3_file_maker):
def test_ingest_random_latest(storage, dynamodb_latest_table, random_s3_file_maker):
storage.latest_table_name = 'latest'
url, metadata = random_s3_file_maker()
if metadata['work_id'] == 'None':
metadata['work_id'] = None
ingester = Ingester(storage)
ingester.ingest(url)
records = [dict(r) for r in dynamodb_latest_table.scan()]
def convert_metadata(metadata):
return {k: (decimal.Decimal(str(v)) if isinstance(v, (int, float)) else v) for k, v in metadata.items()}

converted_metadata = convert_metadata(metadata)

def convert_records(records):
return {k: (decimal.Decimal(str(v)) if isinstance(v, (int, float)) else v) for k, v in records[0].items()}

converted_records = convert_records(records)
assert len(records) >= 1
for r in records:
assert r['metadata'] == converted_metadata
assert r['metadata'] == converted_records['metadata']


def test_ingest_no_end(storage, dynamodb_records_table, s3_file_from_metadata,
Expand Down

0 comments on commit 03de50a

Please sign in to comment.