Skip to content

Commit

Permalink
Merge pull request #18 from MeltanoLabs/bugfix-limit-sampling
Browse files Browse the repository at this point in the history
fix: record sampling bug and add test
  • Loading branch information
pnadolny13 authored Apr 17, 2023
2 parents 6d4cde7 + 9c525b4 commit 442a431
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 23 deletions.
31 changes: 16 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,22 @@ Built with the [Meltano Singer SDK](https://sdk.meltano.com).

## Settings

| Setting | Required | Default | Description |
|:---------------------|:--------:|:-------:|:------------|
| aws_access_key_id | False | None | The access key for your AWS account. |
| aws_secret_access_key| False | None | The secret key for your AWS account. |
| aws_session_token | False | None | The session key for your AWS account. This is only needed when you are using temporary credentials. |
| aws_profile | False | None | The AWS credentials profile name to use. The profile must be configured and accessible. |
| aws_default_region | False | None | The default AWS region name (e.g. us-east-1) |
| aws_endpoint_url | False | None | The complete URL to use for the constructed client. |
| aws_assume_role_arn | False | None | The role ARN to assume. |
| use_aws_env_vars | False | 0 | Whether to retrieve aws credentials from environment variables. |
| tables | False | None | An array of table names to extract from. |
| stream_maps | False | None | Config object for stream maps capability. For more information check out [Stream Maps](https://sdk.meltano.com/en/latest/stream_maps.html). |
| stream_map_config | False | None | User-defined config values to be used within map expressions. |
| flattening_enabled | False | None | 'True' to enable schema flattening and automatically expand nested properties. |
| flattening_max_depth | False | None | The max depth to flatten schemas. |
| Setting | Required | Default | Description |
|:------------------------|:--------:|:-------:|:------------|
| tables | False | None | An array of table names to extract from. |
| infer_schema_sample_size| False | 100 | The amount of records to sample when inferring the schema. |
| aws_access_key_id | False | None | The access key for your AWS account. |
| aws_secret_access_key | False | None | The secret key for your AWS account. |
| aws_session_token | False | None | The session key for your AWS account. This is only needed when you are using temporary credentials. |
| aws_profile | False | None | The AWS credentials profile name to use. The profile must be configured and accessible. |
| aws_default_region | False | None | The default AWS region name (e.g. us-east-1) |
| aws_endpoint_url | False | None | The complete URL to use for the constructed client. |
| aws_assume_role_arn | False | None | The role ARN to assume. |
| use_aws_env_vars | False | 0 | Whether to retrieve aws credentials from environment variables. |
| stream_maps | False | None | Config object for stream maps capability. For more information check out [Stream Maps](https://sdk.meltano.com/en/latest/stream_maps.html). |
| stream_map_config | False | None | User-defined config values to be used within map expressions. |
| flattening_enabled | False | None | 'True' to enable schema flattening and automatically expand nested properties. |
| flattening_max_depth | False | None | The max depth to flatten schemas. |

A full list of supported settings and capabilities is available by running: `tap-dynamodb --about`

Expand Down
21 changes: 15 additions & 6 deletions tap_dynamodb/dynamodb_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,21 @@ def get_items_iter(
)
raise

def get_table_json_schema(self, table_name: str, strategy: str = "infer") -> dict:
sample_records = list(
self.get_items_iter(
table_name, scan_kwargs={"Limit": 100, "ConsistentRead": True}
)
)[0]
def _get_sample_records(self, table_name: str, sample_size: int) -> list:
sample_records = []
for batch in self.get_items_iter(
table_name, scan_kwargs={"Limit": sample_size, "ConsistentRead": True}
):
sample_records.extend(batch)
if len(sample_records) >= sample_size:
break
return sample_records

def get_table_json_schema(
self, table_name: str, sample_size, strategy: str = "infer"
) -> dict:
sample_records = self._get_sample_records(table_name, sample_size)

if not sample_records:
raise EmptyTableException()
if strategy == "infer":
Expand Down
9 changes: 8 additions & 1 deletion tap_dynamodb/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ def __init__(
tap: TapBaseClass,
name: str,
dynamodb_conn: DynamoDbConnector,
infer_schema_sample_size,
):
"""
Initialize a new TableStream object.
Expand All @@ -26,10 +27,13 @@ def __init__(
tap: The parent tap object.
name: The name of the stream.
dynamodb_conn: The DynamoDbConnector object.
infer_schema_sample_size: The amount of records to sample when
inferring the schema.
"""
self._dynamodb_conn: DynamoDbConnector = dynamodb_conn
self._table_name: str = name
self._schema: dict = {}
self._infer_schema_sample_size = infer_schema_sample_size
super().__init__(
tap=tap,
schema=self.schema,
Expand All @@ -51,7 +55,10 @@ def schema(self) -> dict:
"""
# TODO: SDC columns
if not self._schema:
self._schema = self._dynamodb_conn.get_table_json_schema(self._table_name)
self._schema = self._dynamodb_conn.get_table_json_schema(
self._table_name,
self._infer_schema_sample_size,
)
self.primary_keys = self._dynamodb_conn.get_table_key_properties(
self._table_name
)
Expand Down
9 changes: 9 additions & 0 deletions tap_dynamodb/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ class TapDynamoDB(Tap):
th.ArrayType(th.StringType),
description="An array of table names to extract from.",
),
th.Property(
"infer_schema_sample_size",
th.IntegerType,
description="The amount of records to sample when inferring the schema.",
default=100,
),
).to_dict()

def discover_streams(self) -> list[streams.TableStream]:
Expand All @@ -41,6 +47,9 @@ def discover_streams(self) -> list[streams.TableStream]:
tap=self,
name=table_name,
dynamodb_conn=dynamodb_conn,
infer_schema_sample_size=self.config.get(
"infer_schema_sample_size"
),
)
discovered_streams.append(stream)
except EmptyTableException:
Expand Down
18 changes: 17 additions & 1 deletion tests/test_dynamodb_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def test_get_table_json_schema():
# END PREP

db_obj = DynamoDbConnector(SAMPLE_CONFIG)
schema = db_obj.get_table_json_schema("table")
schema = db_obj.get_table_json_schema("table", 5)
assert schema == {
"type": "object",
"properties": {
Expand Down Expand Up @@ -145,3 +145,19 @@ def test_coerce_types():
db_obj = DynamoDbConnector(SAMPLE_CONFIG)
coerced = db_obj._coerce_types({"foo": decimal.Decimal("1.23")})
assert coerced == {"foo": "1.23"}


@mock_dynamodb
def test_get_sample_records():
# PREP
moto_conn = boto3.resource("dynamodb", region_name="us-west-2")
table = create_table(moto_conn, "table")
for num in range(5):
table.put_item(
Item={"year": 2023, "title": f"foo_{num}", "info": {"plot": "bar"}}
)
# END PREP

db_obj = DynamoDbConnector(SAMPLE_CONFIG)
records = db_obj._get_sample_records("table", 2)
assert len(records) == 2

0 comments on commit 442a431

Please sign in to comment.