Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: record sampling bug and add test #18

Merged
merged 2 commits into from
Apr 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 16 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,22 @@ Built with the [Meltano Singer SDK](https://sdk.meltano.com).

## Settings

| Setting | Required | Default | Description |
|:---------------------|:--------:|:-------:|:------------|
| aws_access_key_id | False | None | The access key for your AWS account. |
| aws_secret_access_key| False | None | The secret key for your AWS account. |
| aws_session_token | False | None | The session key for your AWS account. This is only needed when you are using temporary credentials. |
| aws_profile | False | None | The AWS credentials profile name to use. The profile must be configured and accessible. |
| aws_default_region | False | None | The default AWS region name (e.g. us-east-1) |
| aws_endpoint_url | False | None | The complete URL to use for the constructed client. |
| aws_assume_role_arn | False | None | The role ARN to assume. |
| use_aws_env_vars | False | 0 | Whether to retrieve aws credentials from environment variables. |
| tables | False | None | An array of table names to extract from. |
| stream_maps | False | None | Config object for stream maps capability. For more information check out [Stream Maps](https://sdk.meltano.com/en/latest/stream_maps.html). |
| stream_map_config | False | None | User-defined config values to be used within map expressions. |
| flattening_enabled | False | None | 'True' to enable schema flattening and automatically expand nested properties. |
| flattening_max_depth | False | None | The max depth to flatten schemas. |
| Setting | Required | Default | Description |
|:------------------------|:--------:|:-------:|:------------|
| tables | False | None | An array of table names to extract from. |
| infer_schema_sample_size| False | 100 | The amount of records to sample when inferring the schema. |
| aws_access_key_id | False | None | The access key for your AWS account. |
| aws_secret_access_key | False | None | The secret key for your AWS account. |
| aws_session_token | False | None | The session key for your AWS account. This is only needed when you are using temporary credentials. |
| aws_profile | False | None | The AWS credentials profile name to use. The profile must be configured and accessible. |
| aws_default_region | False | None | The default AWS region name (e.g. us-east-1) |
| aws_endpoint_url | False | None | The complete URL to use for the constructed client. |
| aws_assume_role_arn | False | None | The role ARN to assume. |
| use_aws_env_vars | False | 0 | Whether to retrieve aws credentials from environment variables. |
| stream_maps | False | None | Config object for stream maps capability. For more information check out [Stream Maps](https://sdk.meltano.com/en/latest/stream_maps.html). |
| stream_map_config | False | None | User-defined config values to be used within map expressions. |
| flattening_enabled | False | None | 'True' to enable schema flattening and automatically expand nested properties. |
| flattening_max_depth | False | None | The max depth to flatten schemas. |

A full list of supported settings and capabilities is available by running: `tap-dynamodb --about`

Expand Down
21 changes: 15 additions & 6 deletions tap_dynamodb/dynamodb_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,21 @@ def get_items_iter(
)
raise

def get_table_json_schema(self, table_name: str, strategy: str = "infer") -> dict:
sample_records = list(
self.get_items_iter(
table_name, scan_kwargs={"Limit": 100, "ConsistentRead": True}
)
)[0]
def _get_sample_records(self, table_name: str, sample_size: int) -> list:
sample_records = []
for batch in self.get_items_iter(
table_name, scan_kwargs={"Limit": sample_size, "ConsistentRead": True}
):
sample_records.extend(batch)
if len(sample_records) >= sample_size:
break
return sample_records

def get_table_json_schema(
self, table_name: str, sample_size, strategy: str = "infer"
) -> dict:
sample_records = self._get_sample_records(table_name, sample_size)

if not sample_records:
raise EmptyTableException()
if strategy == "infer":
Expand Down
9 changes: 8 additions & 1 deletion tap_dynamodb/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ def __init__(
tap: TapBaseClass,
name: str,
dynamodb_conn: DynamoDbConnector,
infer_schema_sample_size,
):
"""
Initialize a new TableStream object.
Expand All @@ -26,10 +27,13 @@ def __init__(
tap: The parent tap object.
name: The name of the stream.
dynamodb_conn: The DynamoDbConnector object.
infer_schema_sample_size: The amount of records to sample when
inferring the schema.
"""
self._dynamodb_conn: DynamoDbConnector = dynamodb_conn
self._table_name: str = name
self._schema: dict = {}
self._infer_schema_sample_size = infer_schema_sample_size
super().__init__(
tap=tap,
schema=self.schema,
Expand All @@ -51,7 +55,10 @@ def schema(self) -> dict:
"""
# TODO: SDC columns
if not self._schema:
self._schema = self._dynamodb_conn.get_table_json_schema(self._table_name)
self._schema = self._dynamodb_conn.get_table_json_schema(
self._table_name,
self._infer_schema_sample_size,
)
self.primary_keys = self._dynamodb_conn.get_table_key_properties(
self._table_name
)
Expand Down
9 changes: 9 additions & 0 deletions tap_dynamodb/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ class TapDynamoDB(Tap):
th.ArrayType(th.StringType),
description="An array of table names to extract from.",
),
th.Property(
"infer_schema_sample_size",
th.IntegerType,
description="The amount of records to sample when inferring the schema.",
default=100,
),
).to_dict()

def discover_streams(self) -> list[streams.TableStream]:
Expand All @@ -41,6 +47,9 @@ def discover_streams(self) -> list[streams.TableStream]:
tap=self,
name=table_name,
dynamodb_conn=dynamodb_conn,
infer_schema_sample_size=self.config.get(
"infer_schema_sample_size"
),
)
discovered_streams.append(stream)
except EmptyTableException:
Expand Down
18 changes: 17 additions & 1 deletion tests/test_dynamodb_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def test_get_table_json_schema():
# END PREP

db_obj = DynamoDbConnector(SAMPLE_CONFIG)
schema = db_obj.get_table_json_schema("table")
schema = db_obj.get_table_json_schema("table", 5)
assert schema == {
"type": "object",
"properties": {
Expand Down Expand Up @@ -145,3 +145,19 @@ def test_coerce_types():
db_obj = DynamoDbConnector(SAMPLE_CONFIG)
coerced = db_obj._coerce_types({"foo": decimal.Decimal("1.23")})
assert coerced == {"foo": "1.23"}


@mock_dynamodb
def test_get_sample_records():
# PREP
moto_conn = boto3.resource("dynamodb", region_name="us-west-2")
table = create_table(moto_conn, "table")
for num in range(5):
table.put_item(
Item={"year": 2023, "title": f"foo_{num}", "info": {"plot": "bar"}}
)
# END PREP

db_obj = DynamoDbConnector(SAMPLE_CONFIG)
records = db_obj._get_sample_records("table", 2)
assert len(records) == 2