Skip to content

Commit

Permalink
Add Final DynamoDB Table (#353)
Browse files Browse the repository at this point in the history
* Add Final DynamoDB Table
  • Loading branch information
laspsandoval authored Oct 8, 2024
1 parent 3de23fa commit 027ad8c
Show file tree
Hide file tree
Showing 5 changed files with 213 additions and 32 deletions.
59 changes: 54 additions & 5 deletions sds_data_manager/constructs/ialirt_ingest_lambda_construct.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,18 @@ def __init__(
super().__init__(scope, construct_id, **kwargs)

# Create DynamoDB Table
self.packet_data_table = self.create_dynamodb_table()
self.packet_data_table = self.create_ingest_dynamodb_table()
self.algorithm_data_table = self.create_algorithm_dynamodb_table()

# Create Lambda Function
self.ialirt_ingest_lambda = self.create_lambda_function(
ialirt_bucket, self.packet_data_table
ialirt_bucket, self.packet_data_table, self.algorithm_data_table
)

# Create Event Rule
self.create_event_rule(ialirt_bucket, self.ialirt_ingest_lambda)

def create_dynamodb_table(self) -> aws_dynamodb.Table:
def create_ingest_dynamodb_table(self) -> aws_dynamodb.Table:
"""Create and return the DynamoDB table."""
table = ddb.Table(
self,
Expand Down Expand Up @@ -91,8 +92,54 @@ def create_dynamodb_table(self) -> aws_dynamodb.Table:
)
return table

def create_algorithm_dynamodb_table(self) -> aws_dynamodb.Table:
"""Create and return the algorithm data product table."""
table = ddb.Table(
self,
"IalirtAlgorithmDataTable",
table_name="ialirt-algorithm-table",
# Change to RemovalPolicy.RETAIN to keep the table after stack deletion.
# TODO: change to RETAIN in production.
removal_policy=RemovalPolicy.DESTROY,
# Restore data to any point in time within the last 35 days.
# TODO: change to True in production.
point_in_time_recovery=False,
# Partition key (PK) = instrument product_name.
partition_key=ddb.Attribute(
name="product_name",
type=ddb.AttributeType.STRING,
),
# Sort key (SK) = Mission Elapsed Time (MET).
sort_key=ddb.Attribute(
name="met",
type=ddb.AttributeType.NUMBER,
),
# Define the read and write capacity units.
# TODO: change to provisioned capacity mode in production.
billing_mode=ddb.BillingMode.PAY_PER_REQUEST, # On-Demand capacity mode.
)

# Add a GSI for ingest time.
table.add_global_secondary_index(
index_name="insert_time",
# Partition key (PK) = instrument product_name.
partition_key=ddb.Attribute(
name="product_name", type=ddb.AttributeType.STRING
),
# Sort key (SK) = Insert Time (ISO).
sort_key=ddb.Attribute(
name="insert_time",
type=ddb.AttributeType.STRING,
),
projection_type=ddb.ProjectionType.ALL,
)
return table

def create_lambda_function(
self, ialirt_bucket: aws_s3.Bucket, packet_data_table: aws_dynamodb.Table
self,
ialirt_bucket: aws_s3.Bucket,
packet_data_table: aws_dynamodb.Table,
algorithm_data_table: aws_dynamodb.Table,
) -> lambda_alpha_.PythonFunction:
"""Create and return the Lambda function."""
lambda_role = iam.Role(
Expand Down Expand Up @@ -135,12 +182,14 @@ def create_lambda_function(
memory_size=1000,
role=lambda_role,
environment={
"TABLE_NAME": packet_data_table.table_name,
"INGEST_TABLE": packet_data_table.table_name,
"ALGORITHM_TABLE": algorithm_data_table.table_name,
"S3_BUCKET": ialirt_bucket.bucket_name,
},
)

packet_data_table.grant_read_write_data(ialirt_ingest_lambda)
algorithm_data_table.grant_read_write_data(ialirt_ingest_lambda)

# The resource is deleted when the stack is deleted.
ialirt_ingest_lambda.apply_removal_policy(cdk.RemovalPolicy.DESTROY)
Expand Down
29 changes: 25 additions & 4 deletions sds_data_manager/lambda_code/IAlirtCode/ialirt_ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import os

import boto3
from boto3.dynamodb.conditions import Key

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
Expand All @@ -27,23 +28,43 @@ def lambda_handler(event, context):
and runtime environment.
"""
# TODO: these steps will be put into different functions.
logger.info("Received event: %s", json.dumps(event))

table_name = os.environ.get("TABLE_NAME")
ingest_table_name = os.environ.get("INGEST_TABLE")
algorithm_table_name = os.environ.get("ALGORITHM_TABLE")
dynamodb = boto3.resource("dynamodb")
table = dynamodb.Table(table_name)
ingest_table = dynamodb.Table(ingest_table_name)
algorithm_table = dynamodb.Table(algorithm_table_name)

s3_filepath = event["detail"]["object"]["key"]
filename = os.path.basename(s3_filepath)
logger.info("Retrieved filename: %s", filename)

# TODO: item is temporary and will be replaced with actual packet data.
# TODO: Each of these steps in temporary, but provides an idea
# of how the lambda will be used.
# 1. Ingest Data to Ingest Table.
item = {
"apid": 478,
"met": 123,
"ingest_time": "2021-01-01T00:00:00Z",
"packet_blob": b"binary_data_string",
}

table.put_item(Item=item)
ingest_table.put_item(Item=item)
logger.info("Successfully wrote item to DynamoDB: %s", item)

# 2. Query Ingest Table for previous times as required by instrument.
response = ingest_table.query(KeyConditionExpression=Key("apid").eq(478))
items = response["Items"]
logger.info("Scan successful. Retrieved items: %s", items)

# 3. After processing insert data into Algorithm Table.
item = {
"product_name": "hit_product_1",
"met": 123,
"insert_time": "2021-01-01T00:00:00Z",
"data_product_1": str(1234.56),
}
algorithm_table.put_item(Item=item)
logger.info("Successfully wrote item to DynamoDB: %s", item)
49 changes: 39 additions & 10 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,18 @@


@pytest.fixture()
def table():
def setup_dynamodb():
"""Initialize DynamoDB resource and create table."""
os.environ["AWS_DEFAULT_REGION"] = "us-west-2"
os.environ["TABLE_NAME"] = "imap-data-table"
os.environ["INGEST_TABLE"] = "imap-ingest-table"
os.environ["ALGORITHM_TABLE"] = "imap-algorithm-table"

with mock_dynamodb():
# Initialize DynamoDB resource
dynamodb = boto3.resource("dynamodb", region_name="us-west-2")
table = dynamodb.create_table(
TableName="imap-data-table",

ingest_table = dynamodb.create_table(
TableName=os.environ["INGEST_TABLE"],
KeySchema=[
# Partition key
{"AttributeName": "apid", "KeyType": "HASH"},
Expand All @@ -33,15 +36,41 @@ def table():
"IndexName": "ingest_time",
"KeySchema": [
{"AttributeName": "apid", "KeyType": "HASH"},
{
"AttributeName": "ingest_time",
"KeyType": "RANGE",
},
{"AttributeName": "ingest_time", "KeyType": "RANGE"},
],
"Projection": {"ProjectionType": "ALL"},
},
],
BillingMode="PAY_PER_REQUEST",
)

algorithm_table = dynamodb.create_table(
TableName=os.environ["ALGORITHM_TABLE"],
KeySchema=[
# Partition key
{"AttributeName": "product_name", "KeyType": "HASH"},
# Sort key
{"AttributeName": "met", "KeyType": "RANGE"},
],
AttributeDefinitions=[
{"AttributeName": "product_name", "AttributeType": "S"},
{"AttributeName": "met", "AttributeType": "N"},
{"AttributeName": "insert_time", "AttributeType": "S"},
],
GlobalSecondaryIndexes=[
{
"IndexName": "insert_time", # Unique index name
"KeySchema": [
{"AttributeName": "product_name", "KeyType": "HASH"},
{"AttributeName": "insert_time", "KeyType": "RANGE"},
],
"Projection": {"ProjectionType": "ALL"},
},
],
BillingMode="PAY_PER_REQUEST",
)
yield table
table.delete()

yield {
"ingest_table": ingest_table,
"algorithm_table": algorithm_table,
}
83 changes: 74 additions & 9 deletions tests/infrastructure/test_ialirt_ingest_lambda_construct.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@


@pytest.fixture()
def populate_table(table):
def populate_ingest_table(setup_dynamodb):
"""Populate DynamoDB table."""
ingest_table = setup_dynamodb["ingest_table"]
items = [
{
"apid": 478,
Expand All @@ -22,39 +23,103 @@ def populate_table(table):
},
]
for item in items:
table.put_item(Item=item)
ingest_table.put_item(Item=item)

return items


def test_query_by_met(table, populate_table):
@pytest.fixture()
def populate_algorithm_table(setup_dynamodb):
"""Populate DynamoDB table."""
algorithm_table = setup_dynamodb["algorithm_table"]
items = [
{
"product_name": "hit_product_1",
"met": 123,
"insert_time": "2021-01-01T00:00:00Z",
"data_product_1": str(1234.56),
},
{
"product_name": "hit_product_1",
"met": 124,
"insert_time": "2021-02-01T00:00:00Z",
"data_product_2": str(101.3),
},
]
for item in items:
algorithm_table.put_item(Item=item)

return items


def test_ingest_query_by_met(setup_dynamodb, populate_ingest_table):
"""Test to query by met."""
expected_items = populate_table
ingest_table = setup_dynamodb["ingest_table"]
expected_items = populate_ingest_table

response = table.query(KeyConditionExpression=Key("apid").eq(478))
response = ingest_table.query(KeyConditionExpression=Key("apid").eq(478))

items = response["Items"]

for item in range(len(items)):
assert items[item] == expected_items[item]

response = table.query(
response = ingest_table.query(
KeyConditionExpression=Key("apid").eq(478) & Key("met").between(100, 123)
)
items = response["Items"]
assert len(items) == 1
assert items[0]["met"] == expected_items[0]["met"]


def test_query_by_date(table, populate_table):
def test_ingest_query_by_date(setup_dynamodb, populate_ingest_table):
"""Test to query by date."""
expected_items = populate_table
ingest_table = setup_dynamodb["ingest_table"]
expected_items = populate_ingest_table

response = table.query(
response = ingest_table.query(
IndexName="ingest_time",
KeyConditionExpression=Key("apid").eq(478)
& Key("ingest_time").begins_with("2021-01"),
)
items = response["Items"]
assert len(items) == 1
assert items[0] == expected_items[0]


def test_algorithm_query_by_met(setup_dynamodb, populate_algorithm_table):
"""Test to query by met."""
algorithm_table = setup_dynamodb["algorithm_table"]
expected_items = populate_algorithm_table

response = algorithm_table.query(
KeyConditionExpression=Key("product_name").eq("hit_product_1")
)

items = response["Items"]

for item in range(len(items)):
assert items[item] == expected_items[item]

response = algorithm_table.query(
KeyConditionExpression=Key("product_name").eq("hit_product_1")
& Key("met").between(100, 123)
)
items = response["Items"]
assert len(items) == 1
assert items[0]["met"] == expected_items[0]["met"]


def test_algorithm_query_by_date(setup_dynamodb, populate_algorithm_table):
"""Test to query by date."""
algorithm_table = setup_dynamodb["algorithm_table"]
expected_items = populate_algorithm_table

response = algorithm_table.query(
IndexName="insert_time",
KeyConditionExpression=Key("product_name").eq("hit_product_1")
& Key("insert_time").begins_with("2021-01"),
)
items = response["Items"]
assert len(items) == 1
assert items[0] == expected_items[0]
25 changes: 21 additions & 4 deletions tests/lambda_endpoints/test_ialirt_ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@


@pytest.fixture()
def populate_table(table):
def populate_table(setup_dynamodb):
"""Populate DynamoDB table."""
ingest_table = setup_dynamodb["ingest_table"]

items = [
{
"apid": 478,
Expand All @@ -23,19 +25,22 @@ def populate_table(table):
},
]
for item in items:
table.put_item(Item=item)
ingest_table.put_item(Item=item)

return items


def test_lambda_handler(table):
def test_lambda_handler(setup_dynamodb):
"""Test the lambda_handler function."""
# Mock event data
ingest_table = setup_dynamodb["ingest_table"]
algorithm_table = setup_dynamodb["algorithm_table"]

event = {"detail": {"object": {"key": "packets/file.txt"}}}

lambda_handler(event, {})

response = table.get_item(
response = ingest_table.get_item(
Key={
"apid": 478,
"met": 123,
Expand All @@ -46,3 +51,15 @@ def test_lambda_handler(table):
assert item is not None
assert item["met"] == 123
assert item["packet_blob"] == b"binary_data_string"

response = algorithm_table.get_item(
Key={
"product_name": "hit_product_1",
"met": 123,
}
)
item = response.get("Item")

assert item is not None
assert item["met"] == 123
assert item["data_product_1"] == str(1234.56)

0 comments on commit 027ad8c

Please sign in to comment.