Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Revert "🐛 Source Dynamodb: Fix reserved words in expression"" #23598

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -474,7 +474,7 @@
- name: DynamoDB
sourceDefinitionId: 50401137-8871-4c5a-abb7-1f5fda35545a
dockerRepository: airbyte/source-dynamodb
dockerImageTag: 0.1.1
dockerImageTag: 0.1.2
documentationUrl: https://docs.airbyte.com/integrations/sources/dynamodb
icon: dynamodb.svg
sourceType: api
Original file line number Diff line number Diff line change
@@ -3399,7 +3399,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-dynamodb:0.1.1"
- dockerImage: "airbyte/source-dynamodb:0.1.2"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/dynamodb"
connectionSpecification:
@@ -3465,6 +3465,13 @@
airbyte_secret: true
examples:
- "a012345678910ABCDEFGH/AbCdEfGhEXAMPLEKEY"
reserved_attribute_names:
title: "Reserved attribute names"
type: "string"
description: "Comma separated reserved attribute names present in your tables"
airbyte_secret: true
examples:
- "name, field_name, field-name"
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-dynamodb/Dockerfile
Original file line number Diff line number Diff line change
@@ -17,5 +17,5 @@ ENV APPLICATION source-dynamodb
COPY --from=build /airbyte /airbyte

# Airbyte's build system uses these labels to know what to name and tag the docker images produced by this Dockerfile.
LABEL io.airbyte.version=0.1.1
LABEL io.airbyte.version=0.1.2
LABEL io.airbyte.name=airbyte/source-dynamodb
Original file line number Diff line number Diff line change
@@ -6,6 +6,8 @@

import com.fasterxml.jackson.databind.JsonNode;
import java.net.URI;
import java.util.Arrays;
import java.util.List;
import software.amazon.awssdk.regions.Region;

public record DynamodbConfig(
@@ -16,18 +18,22 @@ public record DynamodbConfig(

String accessKey,

String secretKey
String secretKey,

List<String> reservedAttributeNames

) {

public static DynamodbConfig createDynamodbConfig(JsonNode jsonNode) {
JsonNode endpoint = jsonNode.get("endpoint");
JsonNode region = jsonNode.get("region");
JsonNode attributeNames = jsonNode.get("reserved_attribute_names");
return new DynamodbConfig(
endpoint != null && !endpoint.asText().isBlank() ? URI.create(endpoint.asText()) : null,
region != null && !region.asText().isBlank() ? Region.of(region.asText()) : null,
jsonNode.get("access_key_id").asText(),
jsonNode.get("secret_access_key").asText());
jsonNode.get("secret_access_key").asText(),
attributeNames != null ? Arrays.asList(attributeNames.asText().split("\\s*,\\s*")) : List.of());
}

}
Original file line number Diff line number Diff line change
@@ -13,9 +13,11 @@
import java.time.format.DateTimeParseException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
@@ -30,7 +32,10 @@ public class DynamodbOperations extends AbstractDatabase implements Closeable {

private ObjectMapper schemaObjectMapper;

private DynamodbConfig dynamodbConfig;

public DynamodbOperations(DynamodbConfig dynamodbConfig) {
this.dynamodbConfig = dynamodbConfig;
this.dynamoDbClient = DynamodbUtils.createDynamoDbClient(dynamodbConfig);
initMappers();
}
@@ -105,12 +110,31 @@ public JsonNode inferSchema(String tableName, int sampleSize) {
public List<JsonNode> scanTable(String tableName, Set<String> attributes, FilterAttribute filterAttribute) {
List<JsonNode> items = new ArrayList<>();

var projectionAttributes = String.join(", ", attributes);
String prefix = "dyndb";
// remove and replace reserved attribute names
Set<String> copyAttributes = new HashSet<>(attributes);
dynamodbConfig.reservedAttributeNames().forEach(copyAttributes::remove);
dynamodbConfig.reservedAttributeNames().stream()
.filter(attributes::contains)
.map(str -> str.replaceAll("[-.]", ""))
.forEach(attr -> copyAttributes.add("#" + prefix + "_" + attr));

Map<String, String> mappingAttributes = dynamodbConfig.reservedAttributeNames().stream()
.filter(attributes::contains)
.collect(Collectors.toUnmodifiableMap(k -> "#" + prefix + "_" + k.replaceAll("[-.]", ""), k -> k));

var projectionAttributes = String.join(", ", copyAttributes);


ScanRequest.Builder scanRequestBuilder = ScanRequest.builder()
.tableName(tableName)
.projectionExpression(projectionAttributes);

if (!mappingAttributes.isEmpty()) {
scanRequestBuilder
.expressionAttributeNames(mappingAttributes);
}

if (filterAttribute != null && filterAttribute.name() != null &&
filterAttribute.value() != null && filterAttribute.type() != null) {

@@ -134,8 +158,10 @@ public List<JsonNode> scanTable(String tableName, Set<String> attributes, Filter
comparator = ">";
}

String filterPlaceholder = dynamodbConfig.reservedAttributeNames().contains(filterName) ?
"#" + prefix + "_" + filterName.replaceAll("[-.]", "") : filterName;
scanRequestBuilder
.filterExpression(filterName + " " + comparator + " :timestamp")
.filterExpression(filterPlaceholder + " " + comparator + " :timestamp")
.expressionAttributeValues(Map.of(":timestamp", attributeValue));

}
Original file line number Diff line number Diff line change
@@ -86,9 +86,9 @@ private static AirbyteStateMessage convertStateMessage(final io.airbyte.protocol

record StreamState(

AirbyteStateMessage.AirbyteStateType airbyteStateType,
AirbyteStateMessage.AirbyteStateType airbyteStateType,

List<AirbyteStateMessage> airbyteStateMessages) {
List<AirbyteStateMessage> airbyteStateMessages) {

}

Original file line number Diff line number Diff line change
@@ -61,6 +61,13 @@
"description": "The corresponding secret to the access key id.",
"airbyte_secret": true,
"examples": ["a012345678910ABCDEFGH/AbCdEfGhEXAMPLEKEY"]
},
"reserved_attribute_names": {
"title": "Reserved attribute names",
"type": "string",
"description": "Comma separated reserved attribute names present in your tables",
"airbyte_secret": true,
"examples": ["name, field_name, field-name"]
}
}
}
Original file line number Diff line number Diff line change
@@ -80,6 +80,7 @@ public static JsonNode createJsonConfig(DynamodbContainer dynamodbContainer) {
.put("region", dynamodbContainer.getRegion())
.put("access_key_id", dynamodbContainer.getAccessKey())
.put("secret_access_key", dynamodbContainer.getSecretKey())
.put("reserved_attribute_names", "name, field.name, field-name")
.build());
}

Original file line number Diff line number Diff line change
@@ -153,29 +153,29 @@ void testScanTable() throws JsonProcessingException, JSONException {
PutItemRequest putItemRequest1 = DynamodbDataFactory.putItemRequest(tableName, Map.of(
"attr_1", AttributeValue.builder().s("str_4").build(),
"attr_2", AttributeValue.builder().s("str_5").build(),
"attr_3", AttributeValue.builder().s("2017-12-21T17:42:34Z").build(),
"name", AttributeValue.builder().s("2017-12-21T17:42:34Z").build(),
"attr_4", AttributeValue.builder().ns("12.5", "74.5").build()));

dynamoDbClient.putItem(putItemRequest1);

PutItemRequest putItemRequest2 = DynamodbDataFactory.putItemRequest(tableName, Map.of(
"attr_1", AttributeValue.builder().s("str_6").build(),
"attr_2", AttributeValue.builder().s("str_7").build(),
"attr_3", AttributeValue.builder().s("2019-12-21T17:42:34Z").build(),
"name", AttributeValue.builder().s("2019-12-21T17:42:34Z").build(),
"attr_6", AttributeValue.builder().ss("str_1", "str_2").build()));

dynamoDbClient.putItem(putItemRequest2);

var response = dynamodbOperations.scanTable(tableName, Set.of("attr_1", "attr_2", "attr_3"),
new DynamodbOperations.FilterAttribute("attr_3", "2018-12-21T17:42:34Z",
var response = dynamodbOperations.scanTable(tableName, Set.of("attr_1", "attr_2", "name"),
new DynamodbOperations.FilterAttribute("name", "2018-12-21T17:42:34Z",
DynamodbOperations.FilterAttribute.FilterType.S));

assertThat(response)
.hasSize(1);

JSONAssert.assertEquals(objectMapper.writeValueAsString(response.get(0)), """
{
"attr_3": "2019-12-21T17:42:34Z",
"name": "2019-12-21T17:42:34Z",
"attr_2": "str_7",
"attr_1": "str_6"
}
2 changes: 1 addition & 1 deletion connectors.md
Original file line number Diff line number Diff line change
@@ -62,7 +62,7 @@
| **Dockerhub** | <img alt="Dockerhub icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/dockerhub.svg" height="30" height="30"/> | Source | airbyte/source-dockerhub:0.1.0 | alpha | [link](https://docs.airbyte.com/integrations/sources/dockerhub) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-dockerhub) | <small>`72d405a3-56d8-499f-a571-667c03406e43`</small> |
| **Dremio** | <img alt="Dremio icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/dremio.svg" height="30" height="30"/> | Source | airbyte/source-dremio:0.1.0 | alpha | [link](https://docs.airbyte.com/integrations/sources/dremio) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-dremio) | <small>`d99e9ace-8621-46c2-9144-76ae4751d64b`</small> |
| **Drift** | <img alt="Drift icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/drift.svg" height="30" height="30"/> | Source | airbyte/source-drift:0.2.5 | alpha | [link](https://docs.airbyte.com/integrations/sources/drift) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-drift) | <small>`445831eb-78db-4b1f-8f1f-0d96ad8739e2`</small> |
| **DynamoDB** | <img alt="DynamoDB icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/dynamodb.svg" height="30" height="30"/> | Source | airbyte/source-dynamodb:0.1.1 | alpha | [link](https://docs.airbyte.com/integrations/sources/dynamodb) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-dynamodb) | <small>`50401137-8871-4c5a-abb7-1f5fda35545a`</small> |
| **DynamoDB** | <img alt="DynamoDB icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/dynamodb.svg" height="30" height="30"/> | Source | airbyte/source-dynamodb:0.1.2 | alpha | [link](https://docs.airbyte.com/integrations/sources/dynamodb) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-dynamodb) | <small>`50401137-8871-4c5a-abb7-1f5fda35545a`</small> |
| **E2E Testing** | <img alt="E2E Testing icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/airbyte.svg" height="30" height="30"/> | Source | airbyte/source-e2e-test:2.1.3 | alpha | [link](https://docs.airbyte.com/integrations/sources/e2e-test) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-e2e-test) | <small>`d53f9084-fa6b-4a5a-976c-5b8392f4ad8a`</small> |
| **Elasticsearch** | <img alt="Elasticsearch icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/elasticsearch.svg" height="30" height="30"/> | Source | airbyte/source-elasticsearch:0.1.1 | alpha | [link](https://docs.airbyte.com/integrations/sources/elasticsearch) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-elasticsearch) | <small>`7cf88806-25f5-4e1a-b422-b2fa9e1b0090`</small> |
| **EmailOctopus** | <img alt="EmailOctopus icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/emailoctopus.svg" height="30" height="30"/> | Source | airbyte/source-emailoctopus:0.1.0 | alpha | [link](https://docs.airbyte.com/integrations/sources/emailoctopus) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-emailoctopus) | <small>`46b25e70-c980-4590-a811-8deaf50ee09f`</small> |
21 changes: 12 additions & 9 deletions docs/integrations/sources/dynamodb.md
Original file line number Diff line number Diff line change
@@ -51,15 +51,18 @@ This guide describes in details how you can configure the connector to connect w

### Сonfiguration Parameters

* endpoint: aws endpoint of the dynamodb instance
* region: the region code of the dynamodb instance
* access_key_id: the access key for the IAM user with the required permissions
* secret_access_key: the secret key for the IAM user with the required permissions

* **_endpoint_**: aws endpoint of the dynamodb instance
* **_region_**: the region code of the dynamodb instance
* **_access_key_id_**: the access key for the IAM user with the required permissions
* **_secret_access_key_**: the secret key for the IAM user with the required permissions
* **_reserved_attribute_names_**: comma separated list of attribute names present in the replication tables which contain reserved words or special characters. https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Expressions.ExpressionAttributeNames.html

## Changelog

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-------------|:----------------|
| 0.1.1 | 02-09-2023 | https://github.com/airbytehq/airbyte/pull/22682 | Fix build |
| 0.1.0 | 11-14-2022 | https://github.com/airbytehq/airbyte/pull/18750 | Initial version |

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:------------------------------------------------|:---------------------------------------------------------------------|
| 0.1.2 | 01-19-2023 | https://github.com/airbytehq/airbyte/pull/20172 | Fix reserved words in projection expression & make them configurable |
| 0.1.1 | 02-09-2023 | https://github.com/airbytehq/airbyte/pull/22682 | Fix build |
| 0.1.0 | 11-14-2022 | https://github.com/airbytehq/airbyte/pull/18750 | Initial version |