diff --git a/README.md b/README.md index 0d9a90c..148d316 100644 --- a/README.md +++ b/README.md @@ -15,21 +15,22 @@ Built with the [Meltano Singer SDK](https://sdk.meltano.com). ## Settings -| Setting | Required | Default | Description | -|:---------------------|:--------:|:-------:|:------------| -| aws_access_key_id | False | None | The access key for your AWS account. | -| aws_secret_access_key| False | None | The secret key for your AWS account. | -| aws_session_token | False | None | The session key for your AWS account. This is only needed when you are using temporary credentials. | -| aws_profile | False | None | The AWS credentials profile name to use. The profile must be configured and accessible. | -| aws_default_region | False | None | The default AWS region name (e.g. us-east-1) | -| aws_endpoint_url | False | None | The complete URL to use for the constructed client. | -| aws_assume_role_arn | False | None | The role ARN to assume. | -| use_aws_env_vars | False | 0 | Whether to retrieve aws credentials from environment variables. | -| tables | False | None | An array of table names to extract from. | -| stream_maps | False | None | Config object for stream maps capability. For more information check out [Stream Maps](https://sdk.meltano.com/en/latest/stream_maps.html). | -| stream_map_config | False | None | User-defined config values to be used within map expressions. | -| flattening_enabled | False | None | 'True' to enable schema flattening and automatically expand nested properties. | -| flattening_max_depth | False | None | The max depth to flatten schemas. | +| Setting | Required | Default | Description | +|:------------------------|:--------:|:-------:|:------------| +| tables | False | None | An array of table names to extract from. | +| infer_schema_sample_size| False | 100 | The amount of records to sample when inferring the schema. | +| aws_access_key_id | False | None | The access key for your AWS account. | +| aws_secret_access_key | False | None | The secret key for your AWS account. | +| aws_session_token | False | None | The session key for your AWS account. This is only needed when you are using temporary credentials. | +| aws_profile | False | None | The AWS credentials profile name to use. The profile must be configured and accessible. | +| aws_default_region | False | None | The default AWS region name (e.g. us-east-1) | +| aws_endpoint_url | False | None | The complete URL to use for the constructed client. | +| aws_assume_role_arn | False | None | The role ARN to assume. | +| use_aws_env_vars | False | 0 | Whether to retrieve aws credentials from environment variables. | +| stream_maps | False | None | Config object for stream maps capability. For more information check out [Stream Maps](https://sdk.meltano.com/en/latest/stream_maps.html). | +| stream_map_config | False | None | User-defined config values to be used within map expressions. | +| flattening_enabled | False | None | 'True' to enable schema flattening and automatically expand nested properties. | +| flattening_max_depth | False | None | The max depth to flatten schemas. | A full list of supported settings and capabilities is available by running: `tap-dynamodb --about` diff --git a/tap_dynamodb/dynamodb_connector.py b/tap_dynamodb/dynamodb_connector.py index d39ac30..db08a78 100644 --- a/tap_dynamodb/dynamodb_connector.py +++ b/tap_dynamodb/dynamodb_connector.py @@ -85,12 +85,21 @@ def get_items_iter( ) raise - def get_table_json_schema(self, table_name: str, strategy: str = "infer") -> dict: - sample_records = list( - self.get_items_iter( - table_name, scan_kwargs={"Limit": 100, "ConsistentRead": True} - ) - )[0] + def _get_sample_records(self, table_name: str, sample_size: int) -> list: + sample_records = [] + for batch in self.get_items_iter( + table_name, scan_kwargs={"Limit": sample_size, "ConsistentRead": True} + ): + sample_records.extend(batch) + if len(sample_records) >= sample_size: + break + return sample_records + + def get_table_json_schema( + self, table_name: str, sample_size, strategy: str = "infer" + ) -> dict: + sample_records = self._get_sample_records(table_name, sample_size) + if not sample_records: raise EmptyTableException() if strategy == "infer": diff --git a/tap_dynamodb/streams.py b/tap_dynamodb/streams.py index cf1a8a7..c414f65 100644 --- a/tap_dynamodb/streams.py +++ b/tap_dynamodb/streams.py @@ -18,6 +18,7 @@ def __init__( tap: TapBaseClass, name: str, dynamodb_conn: DynamoDbConnector, + infer_schema_sample_size, ): """ Initialize a new TableStream object. @@ -26,10 +27,13 @@ def __init__( tap: The parent tap object. name: The name of the stream. dynamodb_conn: The DynamoDbConnector object. + infer_schema_sample_size: The amount of records to sample when + inferring the schema. """ self._dynamodb_conn: DynamoDbConnector = dynamodb_conn self._table_name: str = name self._schema: dict = {} + self._infer_schema_sample_size = infer_schema_sample_size super().__init__( tap=tap, schema=self.schema, @@ -51,7 +55,10 @@ def schema(self) -> dict: """ # TODO: SDC columns if not self._schema: - self._schema = self._dynamodb_conn.get_table_json_schema(self._table_name) + self._schema = self._dynamodb_conn.get_table_json_schema( + self._table_name, + self._infer_schema_sample_size, + ) self.primary_keys = self._dynamodb_conn.get_table_key_properties( self._table_name ) diff --git a/tap_dynamodb/tap.py b/tap_dynamodb/tap.py index db3c3d9..12b43c0 100644 --- a/tap_dynamodb/tap.py +++ b/tap_dynamodb/tap.py @@ -23,6 +23,12 @@ class TapDynamoDB(Tap): th.ArrayType(th.StringType), description="An array of table names to extract from.", ), + th.Property( + "infer_schema_sample_size", + th.IntegerType, + description="The amount of records to sample when inferring the schema.", + default=100, + ), ).to_dict() def discover_streams(self) -> list[streams.TableStream]: @@ -41,6 +47,9 @@ def discover_streams(self) -> list[streams.TableStream]: tap=self, name=table_name, dynamodb_conn=dynamodb_conn, + infer_schema_sample_size=self.config.get( + "infer_schema_sample_size" + ), ) discovered_streams.append(stream) except EmptyTableException: diff --git a/tests/test_dynamodb_connector.py b/tests/test_dynamodb_connector.py index c4dce6c..232fa38 100644 --- a/tests/test_dynamodb_connector.py +++ b/tests/test_dynamodb_connector.py @@ -113,7 +113,7 @@ def test_get_table_json_schema(): # END PREP db_obj = DynamoDbConnector(SAMPLE_CONFIG) - schema = db_obj.get_table_json_schema("table") + schema = db_obj.get_table_json_schema("table", 5) assert schema == { "type": "object", "properties": { @@ -145,3 +145,19 @@ def test_coerce_types(): db_obj = DynamoDbConnector(SAMPLE_CONFIG) coerced = db_obj._coerce_types({"foo": decimal.Decimal("1.23")}) assert coerced == {"foo": "1.23"} + + +@mock_dynamodb +def test_get_sample_records(): + # PREP + moto_conn = boto3.resource("dynamodb", region_name="us-west-2") + table = create_table(moto_conn, "table") + for num in range(5): + table.put_item( + Item={"year": 2023, "title": f"foo_{num}", "info": {"plot": "bar"}} + ) + # END PREP + + db_obj = DynamoDbConnector(SAMPLE_CONFIG) + records = db_obj._get_sample_records("table", 2) + assert len(records) == 2