From 4f3b2e6d98a03a642d7cdaae5197cb1d621223d7 Mon Sep 17 00:00:00 2001 From: sh4sh <6833405+sh4sh@users.noreply.github.com> Date: Tue, 28 Feb 2023 17:45:00 -0500 Subject: [PATCH] =?UTF-8?q?Revert=20"Revert=20"=F0=9F=90=9B=20Source=20Dyn?= =?UTF-8?q?amodb:=20Fix=20reserved=20words=20in=20expression=20(#20172)"?= =?UTF-8?q?=20(#23515)"?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This reverts commit a5cd384a40227e929996097ed3352ed30b780841. --- .../resources/seed/source_definitions.yaml | 2 +- .../src/main/resources/seed/source_specs.yaml | 9 +++++- .../connectors/source-dynamodb/Dockerfile | 2 +- .../source/dynamodb/DynamodbConfig.java | 10 +++++-- .../source/dynamodb/DynamodbOperations.java | 30 +++++++++++++++++-- .../source/dynamodb/DynamodbUtils.java | 4 +-- .../src/main/resources/spec.json | 7 +++++ .../source/dynamodb/DynamodbDataFactory.java | 1 + .../dynamodb/DynamodbOperationsTest.java | 10 +++---- connectors.md | 2 +- docs/integrations/sources/dynamodb.md | 21 +++++++------ 11 files changed, 74 insertions(+), 24 deletions(-) diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index aa0c7030ac49..defba39e8277 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -474,7 +474,7 @@ - name: DynamoDB sourceDefinitionId: 50401137-8871-4c5a-abb7-1f5fda35545a dockerRepository: airbyte/source-dynamodb - dockerImageTag: 0.1.1 + dockerImageTag: 0.1.2 documentationUrl: https://docs.airbyte.com/integrations/sources/dynamodb icon: dynamodb.svg sourceType: api diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 3b2b0b0f707f..0c0c303f04c1 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -3399,7 +3399,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-dynamodb:0.1.1" +- dockerImage: "airbyte/source-dynamodb:0.1.2" spec: documentationUrl: "https://docs.airbyte.com/integrations/sources/dynamodb" connectionSpecification: @@ -3465,6 +3465,13 @@ airbyte_secret: true examples: - "a012345678910ABCDEFGH/AbCdEfGhEXAMPLEKEY" + reserved_attribute_names: + title: "Reserved attribute names" + type: "string" + description: "Comma separated reserved attribute names present in your tables" + airbyte_secret: true + examples: + - "name, field_name, field-name" supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] diff --git a/airbyte-integrations/connectors/source-dynamodb/Dockerfile b/airbyte-integrations/connectors/source-dynamodb/Dockerfile index 18c6ff785fd9..17a0fecfa26c 100644 --- a/airbyte-integrations/connectors/source-dynamodb/Dockerfile +++ b/airbyte-integrations/connectors/source-dynamodb/Dockerfile @@ -17,5 +17,5 @@ ENV APPLICATION source-dynamodb COPY --from=build /airbyte /airbyte # Airbyte's build system uses these labels to know what to name and tag the docker images produced by this Dockerfile. -LABEL io.airbyte.version=0.1.1 +LABEL io.airbyte.version=0.1.2 LABEL io.airbyte.name=airbyte/source-dynamodb diff --git a/airbyte-integrations/connectors/source-dynamodb/src/main/java/io/airbyte/integrations/source/dynamodb/DynamodbConfig.java b/airbyte-integrations/connectors/source-dynamodb/src/main/java/io/airbyte/integrations/source/dynamodb/DynamodbConfig.java index 912ab3ce9961..463d8f42085f 100644 --- a/airbyte-integrations/connectors/source-dynamodb/src/main/java/io/airbyte/integrations/source/dynamodb/DynamodbConfig.java +++ b/airbyte-integrations/connectors/source-dynamodb/src/main/java/io/airbyte/integrations/source/dynamodb/DynamodbConfig.java @@ -6,6 +6,8 @@ import com.fasterxml.jackson.databind.JsonNode; import java.net.URI; +import java.util.Arrays; +import java.util.List; import software.amazon.awssdk.regions.Region; public record DynamodbConfig( @@ -16,18 +18,22 @@ public record DynamodbConfig( String accessKey, - String secretKey + String secretKey, + + List reservedAttributeNames ) { public static DynamodbConfig createDynamodbConfig(JsonNode jsonNode) { JsonNode endpoint = jsonNode.get("endpoint"); JsonNode region = jsonNode.get("region"); + JsonNode attributeNames = jsonNode.get("reserved_attribute_names"); return new DynamodbConfig( endpoint != null && !endpoint.asText().isBlank() ? URI.create(endpoint.asText()) : null, region != null && !region.asText().isBlank() ? Region.of(region.asText()) : null, jsonNode.get("access_key_id").asText(), - jsonNode.get("secret_access_key").asText()); + jsonNode.get("secret_access_key").asText(), + attributeNames != null ? Arrays.asList(attributeNames.asText().split("\\s*,\\s*")) : List.of()); } } diff --git a/airbyte-integrations/connectors/source-dynamodb/src/main/java/io/airbyte/integrations/source/dynamodb/DynamodbOperations.java b/airbyte-integrations/connectors/source-dynamodb/src/main/java/io/airbyte/integrations/source/dynamodb/DynamodbOperations.java index b9cbd0caa680..77f0a8b44155 100644 --- a/airbyte-integrations/connectors/source-dynamodb/src/main/java/io/airbyte/integrations/source/dynamodb/DynamodbOperations.java +++ b/airbyte-integrations/connectors/source-dynamodb/src/main/java/io/airbyte/integrations/source/dynamodb/DynamodbOperations.java @@ -13,9 +13,11 @@ import java.time.format.DateTimeParseException; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; import software.amazon.awssdk.services.dynamodb.DynamoDbClient; import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition; import software.amazon.awssdk.services.dynamodb.model.AttributeValue; @@ -30,7 +32,10 @@ public class DynamodbOperations extends AbstractDatabase implements Closeable { private ObjectMapper schemaObjectMapper; + private DynamodbConfig dynamodbConfig; + public DynamodbOperations(DynamodbConfig dynamodbConfig) { + this.dynamodbConfig = dynamodbConfig; this.dynamoDbClient = DynamodbUtils.createDynamoDbClient(dynamodbConfig); initMappers(); } @@ -105,12 +110,31 @@ public JsonNode inferSchema(String tableName, int sampleSize) { public List scanTable(String tableName, Set attributes, FilterAttribute filterAttribute) { List items = new ArrayList<>(); - var projectionAttributes = String.join(", ", attributes); + String prefix = "dyndb"; + // remove and replace reserved attribute names + Set copyAttributes = new HashSet<>(attributes); + dynamodbConfig.reservedAttributeNames().forEach(copyAttributes::remove); + dynamodbConfig.reservedAttributeNames().stream() + .filter(attributes::contains) + .map(str -> str.replaceAll("[-.]", "")) + .forEach(attr -> copyAttributes.add("#" + prefix + "_" + attr)); + + Map mappingAttributes = dynamodbConfig.reservedAttributeNames().stream() + .filter(attributes::contains) + .collect(Collectors.toUnmodifiableMap(k -> "#" + prefix + "_" + k.replaceAll("[-.]", ""), k -> k)); + + var projectionAttributes = String.join(", ", copyAttributes); + ScanRequest.Builder scanRequestBuilder = ScanRequest.builder() .tableName(tableName) .projectionExpression(projectionAttributes); + if (!mappingAttributes.isEmpty()) { + scanRequestBuilder + .expressionAttributeNames(mappingAttributes); + } + if (filterAttribute != null && filterAttribute.name() != null && filterAttribute.value() != null && filterAttribute.type() != null) { @@ -134,8 +158,10 @@ public List scanTable(String tableName, Set attributes, Filter comparator = ">"; } + String filterPlaceholder = dynamodbConfig.reservedAttributeNames().contains(filterName) ? + "#" + prefix + "_" + filterName.replaceAll("[-.]", "") : filterName; scanRequestBuilder - .filterExpression(filterName + " " + comparator + " :timestamp") + .filterExpression(filterPlaceholder + " " + comparator + " :timestamp") .expressionAttributeValues(Map.of(":timestamp", attributeValue)); } diff --git a/airbyte-integrations/connectors/source-dynamodb/src/main/java/io/airbyte/integrations/source/dynamodb/DynamodbUtils.java b/airbyte-integrations/connectors/source-dynamodb/src/main/java/io/airbyte/integrations/source/dynamodb/DynamodbUtils.java index d39da58511a7..9db8f7388379 100644 --- a/airbyte-integrations/connectors/source-dynamodb/src/main/java/io/airbyte/integrations/source/dynamodb/DynamodbUtils.java +++ b/airbyte-integrations/connectors/source-dynamodb/src/main/java/io/airbyte/integrations/source/dynamodb/DynamodbUtils.java @@ -86,9 +86,9 @@ private static AirbyteStateMessage convertStateMessage(final io.airbyte.protocol record StreamState( - AirbyteStateMessage.AirbyteStateType airbyteStateType, + AirbyteStateMessage.AirbyteStateType airbyteStateType, - List airbyteStateMessages) { + List airbyteStateMessages) { } diff --git a/airbyte-integrations/connectors/source-dynamodb/src/main/resources/spec.json b/airbyte-integrations/connectors/source-dynamodb/src/main/resources/spec.json index c6e6d129c878..6b6a09f075fb 100644 --- a/airbyte-integrations/connectors/source-dynamodb/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/source-dynamodb/src/main/resources/spec.json @@ -61,6 +61,13 @@ "description": "The corresponding secret to the access key id.", "airbyte_secret": true, "examples": ["a012345678910ABCDEFGH/AbCdEfGhEXAMPLEKEY"] + }, + "reserved_attribute_names": { + "title": "Reserved attribute names", + "type": "string", + "description": "Comma separated reserved attribute names present in your tables", + "airbyte_secret": true, + "examples": ["name, field_name, field-name"] } } } diff --git a/airbyte-integrations/connectors/source-dynamodb/src/test-integration/java/io/airbyte/integrations/source/dynamodb/DynamodbDataFactory.java b/airbyte-integrations/connectors/source-dynamodb/src/test-integration/java/io/airbyte/integrations/source/dynamodb/DynamodbDataFactory.java index df7fb6978435..1d71070afb51 100644 --- a/airbyte-integrations/connectors/source-dynamodb/src/test-integration/java/io/airbyte/integrations/source/dynamodb/DynamodbDataFactory.java +++ b/airbyte-integrations/connectors/source-dynamodb/src/test-integration/java/io/airbyte/integrations/source/dynamodb/DynamodbDataFactory.java @@ -80,6 +80,7 @@ public static JsonNode createJsonConfig(DynamodbContainer dynamodbContainer) { .put("region", dynamodbContainer.getRegion()) .put("access_key_id", dynamodbContainer.getAccessKey()) .put("secret_access_key", dynamodbContainer.getSecretKey()) + .put("reserved_attribute_names", "name, field.name, field-name") .build()); } diff --git a/airbyte-integrations/connectors/source-dynamodb/src/test-integration/java/io/airbyte/integrations/source/dynamodb/DynamodbOperationsTest.java b/airbyte-integrations/connectors/source-dynamodb/src/test-integration/java/io/airbyte/integrations/source/dynamodb/DynamodbOperationsTest.java index a00b97a5ed32..d2784fbeddce 100644 --- a/airbyte-integrations/connectors/source-dynamodb/src/test-integration/java/io/airbyte/integrations/source/dynamodb/DynamodbOperationsTest.java +++ b/airbyte-integrations/connectors/source-dynamodb/src/test-integration/java/io/airbyte/integrations/source/dynamodb/DynamodbOperationsTest.java @@ -153,7 +153,7 @@ void testScanTable() throws JsonProcessingException, JSONException { PutItemRequest putItemRequest1 = DynamodbDataFactory.putItemRequest(tableName, Map.of( "attr_1", AttributeValue.builder().s("str_4").build(), "attr_2", AttributeValue.builder().s("str_5").build(), - "attr_3", AttributeValue.builder().s("2017-12-21T17:42:34Z").build(), + "name", AttributeValue.builder().s("2017-12-21T17:42:34Z").build(), "attr_4", AttributeValue.builder().ns("12.5", "74.5").build())); dynamoDbClient.putItem(putItemRequest1); @@ -161,13 +161,13 @@ void testScanTable() throws JsonProcessingException, JSONException { PutItemRequest putItemRequest2 = DynamodbDataFactory.putItemRequest(tableName, Map.of( "attr_1", AttributeValue.builder().s("str_6").build(), "attr_2", AttributeValue.builder().s("str_7").build(), - "attr_3", AttributeValue.builder().s("2019-12-21T17:42:34Z").build(), + "name", AttributeValue.builder().s("2019-12-21T17:42:34Z").build(), "attr_6", AttributeValue.builder().ss("str_1", "str_2").build())); dynamoDbClient.putItem(putItemRequest2); - var response = dynamodbOperations.scanTable(tableName, Set.of("attr_1", "attr_2", "attr_3"), - new DynamodbOperations.FilterAttribute("attr_3", "2018-12-21T17:42:34Z", + var response = dynamodbOperations.scanTable(tableName, Set.of("attr_1", "attr_2", "name"), + new DynamodbOperations.FilterAttribute("name", "2018-12-21T17:42:34Z", DynamodbOperations.FilterAttribute.FilterType.S)); assertThat(response) @@ -175,7 +175,7 @@ void testScanTable() throws JsonProcessingException, JSONException { JSONAssert.assertEquals(objectMapper.writeValueAsString(response.get(0)), """ { - "attr_3": "2019-12-21T17:42:34Z", + "name": "2019-12-21T17:42:34Z", "attr_2": "str_7", "attr_1": "str_6" } diff --git a/connectors.md b/connectors.md index 80516aae7c2b..e3c2be02809a 100644 --- a/connectors.md +++ b/connectors.md @@ -62,7 +62,7 @@ | **Dockerhub** | Dockerhub icon | Source | airbyte/source-dockerhub:0.1.0 | alpha | [link](https://docs.airbyte.com/integrations/sources/dockerhub) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-dockerhub) | `72d405a3-56d8-499f-a571-667c03406e43` | | **Dremio** | Dremio icon | Source | airbyte/source-dremio:0.1.0 | alpha | [link](https://docs.airbyte.com/integrations/sources/dremio) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-dremio) | `d99e9ace-8621-46c2-9144-76ae4751d64b` | | **Drift** | Drift icon | Source | airbyte/source-drift:0.2.5 | alpha | [link](https://docs.airbyte.com/integrations/sources/drift) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-drift) | `445831eb-78db-4b1f-8f1f-0d96ad8739e2` | -| **DynamoDB** | DynamoDB icon | Source | airbyte/source-dynamodb:0.1.1 | alpha | [link](https://docs.airbyte.com/integrations/sources/dynamodb) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-dynamodb) | `50401137-8871-4c5a-abb7-1f5fda35545a` | +| **DynamoDB** | DynamoDB icon | Source | airbyte/source-dynamodb:0.1.2 | alpha | [link](https://docs.airbyte.com/integrations/sources/dynamodb) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-dynamodb) | `50401137-8871-4c5a-abb7-1f5fda35545a` | | **E2E Testing** | E2E Testing icon | Source | airbyte/source-e2e-test:2.1.3 | alpha | [link](https://docs.airbyte.com/integrations/sources/e2e-test) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-e2e-test) | `d53f9084-fa6b-4a5a-976c-5b8392f4ad8a` | | **Elasticsearch** | Elasticsearch icon | Source | airbyte/source-elasticsearch:0.1.1 | alpha | [link](https://docs.airbyte.com/integrations/sources/elasticsearch) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-elasticsearch) | `7cf88806-25f5-4e1a-b422-b2fa9e1b0090` | | **EmailOctopus** | EmailOctopus icon | Source | airbyte/source-emailoctopus:0.1.0 | alpha | [link](https://docs.airbyte.com/integrations/sources/emailoctopus) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-emailoctopus) | `46b25e70-c980-4590-a811-8deaf50ee09f` | diff --git a/docs/integrations/sources/dynamodb.md b/docs/integrations/sources/dynamodb.md index adcb6e4158d3..9ffddd23ac7a 100644 --- a/docs/integrations/sources/dynamodb.md +++ b/docs/integrations/sources/dynamodb.md @@ -51,15 +51,18 @@ This guide describes in details how you can configure the connector to connect w ### Сonfiguration Parameters -* endpoint: aws endpoint of the dynamodb instance -* region: the region code of the dynamodb instance -* access_key_id: the access key for the IAM user with the required permissions -* secret_access_key: the secret key for the IAM user with the required permissions - +* **_endpoint_**: aws endpoint of the dynamodb instance +* **_region_**: the region code of the dynamodb instance +* **_access_key_id_**: the access key for the IAM user with the required permissions +* **_secret_access_key_**: the secret key for the IAM user with the required permissions +* **_reserved_attribute_names_**: comma separated list of attribute names present in the replication tables which contain reserved words or special characters. https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Expressions.ExpressionAttributeNames.html ## Changelog -| Version | Date | Pull Request | Subject | -|:--------|:-----------|:-------------|:----------------| -| 0.1.1 | 02-09-2023 | https://github.com/airbytehq/airbyte/pull/22682 | Fix build | -| 0.1.0 | 11-14-2022 | https://github.com/airbytehq/airbyte/pull/18750 | Initial version | + +| Version | Date | Pull Request | Subject | +|:--------|:-----------|:------------------------------------------------|:---------------------------------------------------------------------| +| 0.1.2 | 01-19-2023 | https://github.com/airbytehq/airbyte/pull/20172 | Fix reserved words in projection expression & make them configurable | +| 0.1.1 | 02-09-2023 | https://github.com/airbytehq/airbyte/pull/22682 | Fix build | +| 0.1.0 | 11-14-2022 | https://github.com/airbytehq/airbyte/pull/18750 | Initial version | +