From 893338b75018b196443f4865b4d9ea2e4908376f Mon Sep 17 00:00:00 2001 From: Jinni Gu Date: Fri, 20 Aug 2021 02:26:48 -0700 Subject: [PATCH 1/9] Added the DynamoDB destination connector. Implemented getConsumer and check methods. Signed-off-by: Jinni Gu --- .../destination/dynamodb/DynamodbChecker.java | 98 +++++++++++++ .../dynamodb/DynamodbConsumer.java | 134 ++++++++++++++++++ .../dynamodb/DynamodbDestination.java | 71 ++++++++++ 3 files changed, 303 insertions(+) create mode 100644 airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbChecker.java create mode 100644 airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbConsumer.java create mode 100644 airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbDestination.java diff --git a/airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbChecker.java b/airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbChecker.java new file mode 100644 index 000000000000..503ae276b7f3 --- /dev/null +++ b/airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbChecker.java @@ -0,0 +1,98 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.dynamodb; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.client.builder.AwsClientBuilder; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; +import com.amazonaws.services.dynamodbv2.document.*; +import com.amazonaws.services.dynamodbv2.model.*; +import io.airbyte.integrations.base.JavaBaseConstants; + +import java.util.Arrays; +import java.util.UUID; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DynamodbChecker { + + private static final Logger LOGGER = LoggerFactory.getLogger(DynamodbChecker.class); + + public static void attemptDynamodbWriteAndDelete(DynamodbDestinationConfig dynamodbDestinationConfig) throws Exception { + var prefix = dynamodbDestinationConfig.getTableName(); + final String outputTableName = prefix + "_airbyte_connection_test_" + UUID.randomUUID().toString().replaceAll("-", ""); + attemptWriteAndDeleteDynamodbItem(dynamodbDestinationConfig, outputTableName); + } + + private static void attemptWriteAndDeleteDynamodbItem(DynamodbDestinationConfig dynamodbDestinationConfig, String outputTableName) throws Exception { + DynamoDB dynamoDB = new DynamoDB(getAmazonDynamoDB(dynamodbDestinationConfig)); + Table table = dynamoDB.createTable(outputTableName, //create table + Arrays.asList(new KeySchemaElement(JavaBaseConstants.COLUMN_NAME_AB_ID, KeyType.HASH), new KeySchemaElement("sync_time", KeyType.RANGE)), + Arrays.asList(new AttributeDefinition(JavaBaseConstants.COLUMN_NAME_AB_ID, ScalarAttributeType.S), new AttributeDefinition("sync_time", ScalarAttributeType.N)), + new ProvisionedThroughput(1L, 1L)); + table.waitForActive(); + + try { + PutItemOutcome outcome = table + .putItem(new Item().withPrimaryKey(JavaBaseConstants.COLUMN_NAME_AB_ID, UUID.randomUUID().toString(), "sync_time", System.currentTimeMillis())); + } catch (Exception e) { + LOGGER.error(e.getMessage()); + } + + table.delete(); //delete table + table.waitForDelete(); + } + + public static AmazonDynamoDB getAmazonDynamoDB(DynamodbDestinationConfig dynamodbDestinationConfig) { + var endpoint = dynamodbDestinationConfig.getEndpoint(); + var region = dynamodbDestinationConfig.getRegion(); + var accessKeyId = dynamodbDestinationConfig.getAccessKeyId(); + var secretAccessKey = dynamodbDestinationConfig.getSecretAccessKey(); + + var awsCreds = new BasicAWSCredentials(accessKeyId, secretAccessKey); + + if (endpoint.isEmpty()) { + return AmazonDynamoDBClientBuilder.standard() + .withCredentials(new AWSStaticCredentialsProvider(awsCreds)) + .withRegion(dynamodbDestinationConfig.getRegion()) + .build(); + + } else { + ClientConfiguration clientConfiguration = new ClientConfiguration(); + clientConfiguration.setSignerOverride("AWSDynamodbSignerType"); + + return AmazonDynamoDBClientBuilder + .standard() + .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endpoint, region)) + .withClientConfiguration(clientConfiguration) + .withCredentials(new AWSStaticCredentialsProvider(awsCreds)) + .build(); + } + } +} diff --git a/airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbConsumer.java b/airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbConsumer.java new file mode 100644 index 000000000000..33f180458e3a --- /dev/null +++ b/airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbConsumer.java @@ -0,0 +1,134 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.dynamodb; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.client.builder.AwsClientBuilder; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; +import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer; +import io.airbyte.protocol.models.*; + +import java.util.*; +import java.util.function.Consumer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DynamodbConsumer extends FailureTrackingAirbyteMessageConsumer { + + private static final Logger LOGGER = LoggerFactory.getLogger(DynamodbConsumer.class); + + private final DynamodbDestinationConfig dynamodbDestinationConfig; + private final ConfiguredAirbyteCatalog configuredCatalog; + private final Consumer outputRecordCollector; + private final Map streamNameAndNamespaceToWriters; + + private AirbyteMessage lastStateMessage = null; + + public DynamodbConsumer(DynamodbDestinationConfig dynamodbDestinationConfig, + ConfiguredAirbyteCatalog configuredCatalog, + Consumer outputRecordCollector) { + this.dynamodbDestinationConfig = dynamodbDestinationConfig; + this.configuredCatalog = configuredCatalog; + this.outputRecordCollector = outputRecordCollector; + this.streamNameAndNamespaceToWriters = new HashMap<>(configuredCatalog.getStreams().size()); + } + + @Override + protected void startTracked() throws Exception { + + var endpoint = dynamodbDestinationConfig.getEndpoint(); + AWSCredentials awsCreds = new BasicAWSCredentials(dynamodbDestinationConfig.getAccessKeyId(), dynamodbDestinationConfig.getSecretAccessKey()); + AmazonDynamoDB amazonDynamodb = null; + + if (endpoint.isEmpty()) { + amazonDynamodb = AmazonDynamoDBClientBuilder.standard() + .withCredentials(new AWSStaticCredentialsProvider(awsCreds)) + .withRegion(dynamodbDestinationConfig.getRegion()) + .build(); + } else { + ClientConfiguration clientConfiguration = new ClientConfiguration(); + clientConfiguration.setSignerOverride("AWSDynamodbSignerType"); + + amazonDynamodb = AmazonDynamoDBClientBuilder + .standard() + .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endpoint, dynamodbDestinationConfig.getRegion())) + .withClientConfiguration(clientConfiguration) + .withCredentials(new AWSStaticCredentialsProvider(awsCreds)) + .build(); + } + + var uploadTimestamp = System.currentTimeMillis(); + + for (ConfiguredAirbyteStream configuredStream : configuredCatalog.getStreams()) { + var writer = new DynamodbWriter(dynamodbDestinationConfig, amazonDynamodb, configuredStream, uploadTimestamp); + + AirbyteStream stream = configuredStream.getStream(); + AirbyteStreamNameNamespacePair streamNamePair = AirbyteStreamNameNamespacePair + .fromAirbyteSteam(stream); + streamNameAndNamespaceToWriters.put(streamNamePair, writer); + } + } + + @Override + protected void acceptTracked(AirbyteMessage airbyteMessage) throws Exception { + if (airbyteMessage.getType() == AirbyteMessage.Type.STATE) { + this.lastStateMessage = airbyteMessage; + return; + } else if (airbyteMessage.getType() != AirbyteMessage.Type.RECORD) { + return; + } + + AirbyteRecordMessage recordMessage = airbyteMessage.getRecord(); + AirbyteStreamNameNamespacePair pair = AirbyteStreamNameNamespacePair + .fromRecordMessage(recordMessage); + + if (!streamNameAndNamespaceToWriters.containsKey(pair)) { + throw new IllegalArgumentException( + String.format( + "Message contained record from a stream that was not in the catalog. \ncatalog: %s , \nmessage: %s", + Jsons.serialize(configuredCatalog), Jsons.serialize(recordMessage))); + } + + streamNameAndNamespaceToWriters.get(pair).write(UUID.randomUUID(), recordMessage); + } + + @Override + protected void close(boolean hasFailed) throws Exception { + for (DynamodbWriter handler : streamNameAndNamespaceToWriters.values()) { + handler.close(hasFailed); + } + // DynamoDB stream uploader is all or nothing if a failure happens in the destination. + if (!hasFailed) { + outputRecordCollector.accept(lastStateMessage); + } + } +} diff --git a/airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbDestination.java b/airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbDestination.java new file mode 100644 index 000000000000..b85ad6321190 --- /dev/null +++ b/airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbDestination.java @@ -0,0 +1,71 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.dynamodb; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.integrations.BaseConnector; +import io.airbyte.integrations.base.AirbyteMessageConsumer; +import io.airbyte.integrations.base.Destination; +import io.airbyte.integrations.base.IntegrationRunner; +import io.airbyte.protocol.models.AirbyteConnectionStatus; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; + +import java.util.function.Consumer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DynamodbDestination extends BaseConnector implements Destination { + + private static final Logger LOGGER = LoggerFactory.getLogger(DynamodbDestination.class); + + public static void main(String[] args) throws Exception { + new IntegrationRunner(new DynamodbDestination()).run(args); + } + + @Override + public AirbyteConnectionStatus check(JsonNode config) { + try { + DynamodbChecker.attemptDynamodbWriteAndDelete(DynamodbDestinationConfig.getDynamodbDestinationConfig(config)); + return new AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.SUCCEEDED); + } catch (Exception e) { + LOGGER.error("Exception attempting to access the DynamoDB table: ", e); + return new AirbyteConnectionStatus() + .withStatus(AirbyteConnectionStatus.Status.FAILED) + .withMessage("Could not connect to the DynamoDB table with the provided configuration. \n" + e + .getMessage()); + } + } + + @Override + public AirbyteMessageConsumer getConsumer(JsonNode config, + ConfiguredAirbyteCatalog configuredCatalog, + Consumer outputRecordCollector) { + // TODO + return new DynamodbConsumer(DynamodbDestinationConfig.getDynamodbDestinationConfig(config), configuredCatalog, outputRecordCollector); + } + +} From 1974e662af870f128a8873fe3170ce986979dbcf Mon Sep 17 00:00:00 2001 From: Yiqing Wang Date: Fri, 20 Aug 2021 02:43:31 -0700 Subject: [PATCH 2/9] Added auto-generated project files. Signed-off-by: Yiqing Wang --- .../destination-dynamodb/.dockerignore | 3 + .../destination-dynamodb/Dockerfile | 11 +++ .../connectors/destination-dynamodb/README.md | 68 +++++++++++++++++++ .../destination-dynamodb/build.gradle | 21 ++++++ 4 files changed, 103 insertions(+) create mode 100644 airbyte-integrations/connectors/destination-dynamodb/.dockerignore create mode 100644 airbyte-integrations/connectors/destination-dynamodb/Dockerfile create mode 100644 airbyte-integrations/connectors/destination-dynamodb/README.md create mode 100644 airbyte-integrations/connectors/destination-dynamodb/build.gradle diff --git a/airbyte-integrations/connectors/destination-dynamodb/.dockerignore b/airbyte-integrations/connectors/destination-dynamodb/.dockerignore new file mode 100644 index 000000000000..65c7d0ad3e73 --- /dev/null +++ b/airbyte-integrations/connectors/destination-dynamodb/.dockerignore @@ -0,0 +1,3 @@ +* +!Dockerfile +!build diff --git a/airbyte-integrations/connectors/destination-dynamodb/Dockerfile b/airbyte-integrations/connectors/destination-dynamodb/Dockerfile new file mode 100644 index 000000000000..319c38ea3133 --- /dev/null +++ b/airbyte-integrations/connectors/destination-dynamodb/Dockerfile @@ -0,0 +1,11 @@ +FROM airbyte/integration-base-java:dev + +WORKDIR /airbyte +ENV APPLICATION destination-dynamodb + +COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar + +RUN tar xf ${APPLICATION}.tar --strip-components=1 + +LABEL io.airbyte.version=0.1.0 +LABEL io.airbyte.name=airbyte/destination-dynamodb diff --git a/airbyte-integrations/connectors/destination-dynamodb/README.md b/airbyte-integrations/connectors/destination-dynamodb/README.md new file mode 100644 index 000000000000..3677fb5347ee --- /dev/null +++ b/airbyte-integrations/connectors/destination-dynamodb/README.md @@ -0,0 +1,68 @@ +# Destination Dynamodb + +This is the repository for the Dynamodb destination connector in Java. +For information about how to use this connector within Airbyte, see [the User Documentation](https://docs.airbyte.io/integrations/destinations/dynamodb). + +## Local development + +#### Building via Gradle +From the Airbyte repository root, run: +``` +./gradlew :airbyte-integrations:connectors:destination-dynamodb:build +``` + +#### Create credentials +**If you are a community contributor**, generate the necessary credentials and place them in `secrets/config.json` conforming to the spec file in `src/main/resources/spec.json`. +Note that the `secrets` directory is git-ignored by default, so there is no danger of accidentally checking in sensitive information. + +**If you are an Airbyte core member**, follow the [instructions](https://docs.airbyte.io/connector-development#using-credentials-in-ci) to set up the credentials. + +### Locally running the connector docker image + +#### Build +Build the connector image via Gradle: +``` +./gradlew :airbyte-integrations:connectors:destination-dynamodb:airbyteDocker +``` +When building via Gradle, the docker image name and tag, respectively, are the values of the `io.airbyte.name` and `io.airbyte.version` `LABEL`s in +the Dockerfile. + +#### Run +Then run any of the connector commands as follows: +``` +docker run --rm airbyte/destination-dynamodb:dev spec +docker run --rm -v $(pwd)/secrets:/secrets airbyte/destination-dynamodb:dev check --config /secrets/config.json +docker run --rm -v $(pwd)/secrets:/secrets airbyte/destination-dynamodb:dev discover --config /secrets/config.json +docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integration_tests airbyte/destination-dynamodb:dev read --config /secrets/config.json --catalog /integration_tests/configured_catalog.json +``` + +## Testing +We use `JUnit` for Java tests. + +### Unit and Integration Tests +Place unit tests under `src/test/io/airbyte/integrations/destinations/dynamodb`. + +#### Acceptance Tests +Airbyte has a standard test suite that all destination connectors must pass. Implement the `TODO`s in +`src/test-integration/java/io/airbyte/integrations/destinations/dynamodbDestinationAcceptanceTest.java`. + +### Using gradle to run tests +All commands should be run from airbyte project root. +To run unit tests: +``` +./gradlew :airbyte-integrations:connectors:destination-dynamodb:unitTest +``` +To run acceptance and custom integration tests: +``` +./gradlew :airbyte-integrations:connectors:destination-dynamodb:integrationTest +``` + +## Dependency Management + +### Publishing a new version of the connector +You've checked out the repo, implemented a million dollar feature, and you're ready to share your changes with the world. Now what? +1. Make sure your changes are passing unit and integration tests. +1. Bump the connector version in `Dockerfile` -- just increment the value of the `LABEL io.airbyte.version` appropriately (we use [SemVer](https://semver.org/)). +1. Create a Pull Request. +1. Pat yourself on the back for being an awesome contributor. +1. Someone from Airbyte will take a look at your PR and iterate with you to merge it into master. diff --git a/airbyte-integrations/connectors/destination-dynamodb/build.gradle b/airbyte-integrations/connectors/destination-dynamodb/build.gradle new file mode 100644 index 000000000000..b33317f137c7 --- /dev/null +++ b/airbyte-integrations/connectors/destination-dynamodb/build.gradle @@ -0,0 +1,21 @@ +plugins { + id 'application' + id 'airbyte-docker' + id 'airbyte-integration-test-java' +} + +application { + mainClass = 'io.airbyte.integrations.destination.dynamodb.DynamodbDestination' +} + +dependencies { + implementation project(':airbyte-config:models') + implementation project(':airbyte-protocol:models') + implementation project(':airbyte-integrations:bases:base-java') + implementation project(':airbyte-integrations:connectors:destination-jdbc') + implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs) + implementation 'com.amazonaws:aws-java-sdk-dynamodb:1.12.47' + + integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test') + integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-dynamodb') +} From b7a19d1de2d6de9a7f08795d1c7edf3e4689b79d Mon Sep 17 00:00:00 2001 From: Yiqing Wang Date: Fri, 20 Aug 2021 02:46:00 -0700 Subject: [PATCH 3/9] Added config related files and output table helper. Signed-off-by: Yiqing Wang --- .../8ccd8909-4e99-4141-b48d-4984b70b2d89.json | 7 ++ .../seed/destination_definitions.yaml | 5 ++ .../sample_secrets/config.json | 6 ++ .../dynamodb/DynamodbDestinationConfig.java | 77 +++++++++++++++++ .../dynamodb/DynamodbOutputTableHelper.java | 54 ++++++++++++ .../src/main/resources/spec.json | 82 +++++++++++++++++++ 6 files changed, 231 insertions(+) create mode 100644 airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/8ccd8909-4e99-4141-b48d-4984b70b2d89.json create mode 100644 airbyte-integrations/connectors/destination-dynamodb/sample_secrets/config.json create mode 100644 airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbDestinationConfig.java create mode 100644 airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbOutputTableHelper.java create mode 100644 airbyte-integrations/connectors/destination-dynamodb/src/main/resources/spec.json diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/8ccd8909-4e99-4141-b48d-4984b70b2d89.json b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/8ccd8909-4e99-4141-b48d-4984b70b2d89.json new file mode 100644 index 000000000000..1b2728fb3c10 --- /dev/null +++ b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/8ccd8909-4e99-4141-b48d-4984b70b2d89.json @@ -0,0 +1,7 @@ +{ + "destinationDefinitionId": "8ccd8909-4e99-4141-b48d-4984b70b2d89", + "name": "DynamoDB", + "dockerRepository": "airbyte/destination-dynamodb", + "dockerImageTag": "0.1.0", + "documentationUrl": "https://docs.airbyte.io/integrations/destinations/dynamodb" +} diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index c892a1c695c4..eb4de6972261 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -75,3 +75,8 @@ dockerRepository: airbyte/destination-kafka dockerImageTag: 0.1.1 documentationUrl: https://docs.airbyte.io/integrations/destinations/kafka +- destinationDefinitionId: 8ccd8909-4e99-4141-b48d-4984b70b2d89 + name: DynamoDB + dockerRepository: airbyte/destination-dynamodb + dockerImageTag: 0.1.0 + documentationUrl: https://docs.airbyte.io/integrations/destinations/dynamodb \ No newline at end of file diff --git a/airbyte-integrations/connectors/destination-dynamodb/sample_secrets/config.json b/airbyte-integrations/connectors/destination-dynamodb/sample_secrets/config.json new file mode 100644 index 000000000000..580e6fad531b --- /dev/null +++ b/airbyte-integrations/connectors/destination-dynamodb/sample_secrets/config.json @@ -0,0 +1,6 @@ +{ + "dynamodb_table_name": "paste-table-name-here", + "dynamodb_region": "paste-dynamodb-region-here", + "access_key_id": "paste-access-key-id-here", + "secret_access_key": "paste-secret-access-key-here" +} diff --git a/airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbDestinationConfig.java b/airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbDestinationConfig.java new file mode 100644 index 000000000000..ee9ed5a9938a --- /dev/null +++ b/airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbDestinationConfig.java @@ -0,0 +1,77 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.dynamodb; + +import com.fasterxml.jackson.databind.JsonNode; + +public class DynamodbDestinationConfig { + private final String endpoint; + private final String tableName; + private final String accessKeyId; + private final String secretAccessKey; + private final String region; + + public DynamodbDestinationConfig( + String endpoint, + String tableName, + String region, + String accessKeyId, + String secretAccessKey) { + this.endpoint = endpoint; + this.tableName = tableName; + this.region = region; + this.accessKeyId = accessKeyId; + this.secretAccessKey = secretAccessKey; + } + + public static DynamodbDestinationConfig getDynamodbDestinationConfig(JsonNode config) { + return new DynamodbDestinationConfig( + config.get("dynamodb_endpoint") == null ? "" : config.get("dynamodb_endpoint").asText(), + config.get("dynamodb_table_name").asText(), + config.get("dynamodb_region").asText(), + config.get("access_key_id").asText(), + config.get("secret_access_key").asText()); + } + + public String getEndpoint() { + return endpoint; + } + + public String getAccessKeyId() { + return accessKeyId; + } + + public String getSecretAccessKey() { + return secretAccessKey; + } + + public String getRegion() { + return region; + } + + public String getTableName() { + return tableName; + } +} \ No newline at end of file diff --git a/airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbOutputTableHelper.java b/airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbOutputTableHelper.java new file mode 100644 index 000000000000..7cc3762f3afd --- /dev/null +++ b/airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbOutputTableHelper.java @@ -0,0 +1,54 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.dynamodb; + +import io.airbyte.integrations.destination.ExtendedNameTransformer; + +import io.airbyte.protocol.models.AirbyteStream; + +import java.util.LinkedList; +import java.util.List; + +public class DynamodbOutputTableHelper { + public static String getOutputTableName(String tableName, AirbyteStream stream) { + return getOutputTableName(tableName, stream.getNamespace(), stream.getName()); + } + + public static String getOutputTableName(String tableName, String namespace, String streamName) { + List paths = new LinkedList<>(); + + if (tableName != null) { + paths.add(tableName); + } + if (namespace != null) { + paths.add(new ExtendedNameTransformer().convertStreamName(namespace)); + } + if (streamName != null) { + paths.add(new ExtendedNameTransformer().convertStreamName(streamName)); + } + + return String.join("_", paths); + } +} diff --git a/airbyte-integrations/connectors/destination-dynamodb/src/main/resources/spec.json b/airbyte-integrations/connectors/destination-dynamodb/src/main/resources/spec.json new file mode 100644 index 000000000000..87e9218a7260 --- /dev/null +++ b/airbyte-integrations/connectors/destination-dynamodb/src/main/resources/spec.json @@ -0,0 +1,82 @@ +{ + "documentationUrl": "https://docs.airbyte.io/integrations/destinations/dynamodb", + "supportsIncremental": true, + "supportsNormalization": false, + "supportsDBT": false, + "supported_destination_sync_modes": ["overwrite", "append"], + "connectionSpecification": { + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "DynamoDB Destination Spec", + "type": "object", + "required": [ + "dynamodb_table_name", + "dynamodb_region", + "access_key_id", + "secret_access_key" + ], + "additionalProperties": false, + "properties": { + "dynamodb_endpoint": { + "title": "Endpoint", + "type": "string", + "default": "", + "description": "This is your DynamoDB endpoint url.(if you are working with AWS DynamoDB, just leave empty).", + "examples": ["http://localhost:9000"] + }, + "dynamodb_table_name": { + "title": "DynamoDB Table Name", + "type": "string", + "description": "The name of the DynamoDB table.", + "examples": ["airbyte_sync"] + }, + "dynamodb_region": { + "title": "DynamoDB Region", + "type": "string", + "default": "", + "description": "The region of the DynamoDB.", + "enum": [ + "", + "us-east-1", + "us-east-2", + "us-west-1", + "us-west-2", + "af-south-1", + "ap-east-1", + "ap-south-1", + "ap-northeast-1", + "ap-northeast-2", + "ap-northeast-3", + "ap-southeast-1", + "ap-southeast-2", + "ca-central-1", + "cn-north-1", + "cn-northwest-1", + "eu-central-1", + "eu-north-1", + "eu-south-1", + "eu-west-1", + "eu-west-2", + "eu-west-3", + "sa-east-1", + "me-south-1", + "us-gov-east-1", + "us-gov-west-1" + ] + }, + "access_key_id": { + "type": "string", + "description": "The access key id to access the DynamoDB. Airbyte requires Read and Write permissions to the DynamoDB.", + "title": "DynamoDB Key Id", + "airbyte_secret": true, + "examples": ["A012345678910EXAMPLE"] + }, + "secret_access_key": { + "type": "string", + "description": "The corresponding secret to the access key id.", + "title": "DynamoDB Access Key", + "airbyte_secret": true, + "examples": ["a012345678910ABCDEFGH/AbCdEfGhEXAMPLEKEY"] + } + } + } +} From 2cf22e8bbfd0288ac0340b4a7d5ab4b45bb2b0a4 Mon Sep 17 00:00:00 2001 From: Jinni Gu Date: Fri, 20 Aug 2021 02:50:20 -0700 Subject: [PATCH 4/9] Added document for DynamoDB destination. Signed-off-by: Jinni Gu --- docs/integrations/destinations/dynamodb.md | 54 ++++++++++++++++++++++ 1 file changed, 54 insertions(+) create mode 100644 docs/integrations/destinations/dynamodb.md diff --git a/docs/integrations/destinations/dynamodb.md b/docs/integrations/destinations/dynamodb.md new file mode 100644 index 000000000000..584e7f289909 --- /dev/null +++ b/docs/integrations/destinations/dynamodb.md @@ -0,0 +1,54 @@ +# Dynamodb + +This destination writes data to AWS DynamoDB. + +The Airbyte DynamoDB destination allows you to sync data to AWS DynamoDB. Each stream is written to its own table under the DynamoDB. + +## Sync overview + +### Output schema + +Each stream will be output into its own DynamoDB table. Each table will a collections of `json` objects containing 4 fields: + +* `_airbyte_ab_id`: a uuid assigned by Airbyte to each event that is processed. +* `_airbyte_emitted_at`: a timestamp representing when the event was pulled from the data source. +* `_airbyte_data`: a json blob representing with the extracted data. +* `sync_time`: a timestamp representing when the sync up task be triggered. + +### Features + +| Feature | Support | Notes | +| :--- | :---: | :--- | +| Full Refresh Sync | ✅ | Warning: this mode deletes all previously synced data in the configured DynamoDB table. | +| Incremental - Append Sync | ✅ | | +| Namespaces | ✅ | Namespace will be used as part of the table name. | + +### Performance considerations + +This connector by default uses 10 capacity units for both Read and Write in DynamoDB tables. Please provision more capacity units in the DynamoDB console when there are performance constraints. + +## Getting started + +### Requirements + +1. Allow connections from Airbyte server to your AWS DynamoDB tables \(if they exist in separate VPCs\). +2. The credentials for AWS DynamoDB \(for the COPY strategy\). + +### Setup guide + +* Fill up DynamoDB info + * **DynamoDB Endpoint** + * Leave empty if using AWS DynamoDB, fill in endpoint URL if using customized endpoint. + * **DynamoDB Table Name** + * The name prefix of the DynamoDB table to store the extracted data. The table name is \\_\\_\. + * **DynamoDB Region** + * The region of the DynamoDB. + * **Access Key Id** + * See [this](https://docs.aws.amazon.com/general/latest/gr/aws-sec-cred-types.html#access-keys-and-secret-access-keys) on how to generate an access key. + * We recommend creating an Airbyte-specific user. This user will require [read and write permissions](https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_examples_dynamodb_specific-table.html) to the DynamoDB table. + * **Secret Access Key** + * Corresponding key to the above key id. +* Make sure your DynamoDB tables are accessible from the machine running Airbyte. + * This depends on your networking setup. + * You can check AWS DynamoDB documentation with a tutorial on how to properly configure your DynamoDB's access [here](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/access-control-overview.html). + * The easiest way to verify if Airbyte is able to connect to your DynamoDB tables is via the check connection tool in the UI. \ No newline at end of file From dd8d5822fab857927cf7a3cb17a41af3355e57d5 Mon Sep 17 00:00:00 2001 From: qtz123 Date: Fri, 20 Aug 2021 23:15:11 -0700 Subject: [PATCH 5/9] Implemented DynamodbWriter. Added integration tests and unit tests. Signed-off-by: qtz123 --- .../destination/dynamodb/DynamodbWriter.java | 188 +++++++++++++++++ .../DynamodbDestinationAcceptanceTest.java | 189 ++++++++++++++++++ .../dynamodb/DynamodbDestinationTest.java | 68 +++++++ 3 files changed, 445 insertions(+) create mode 100644 airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbWriter.java create mode 100644 airbyte-integrations/connectors/destination-dynamodb/src/test-integration/java/io/airbyte/integrations/destination/dynamodb/DynamodbDestinationAcceptanceTest.java create mode 100644 airbyte-integrations/connectors/destination-dynamodb/src/test/java/io/airbyte/integrations/destination/dynamodb/DynamodbDestinationTest.java diff --git a/airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbWriter.java b/airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbWriter.java new file mode 100644 index 000000000000..becb6c148c51 --- /dev/null +++ b/airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbWriter.java @@ -0,0 +1,188 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.dynamodb; + +import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; +import com.amazonaws.services.dynamodbv2.document.*; +import com.amazonaws.services.dynamodbv2.model.*; +import com.amazonaws.services.dynamodbv2.util.TableUtils; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectWriter; +import io.airbyte.commons.jackson.MoreMappers; +import io.airbyte.integrations.base.JavaBaseConstants; +import io.airbyte.protocol.models.AirbyteRecordMessage; +import io.airbyte.protocol.models.ConfiguredAirbyteStream; +import io.airbyte.protocol.models.DestinationSyncMode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Map; +import java.util.UUID; + +public class DynamodbWriter { + + protected static final Logger LOGGER = LoggerFactory.getLogger(DynamodbWriter.class); + + private static final ObjectMapper MAPPER = MoreMappers.initMapper(); + private static final ObjectWriter WRITER = MAPPER.writer(); + + private final DynamodbDestinationConfig config; + private final DynamoDB dynamodb; + private final ConfiguredAirbyteStream configuredStream; + private final long uploadTimestamp; + private TableWriteItems tableWriteItems; + private final String outputTableName; + + public DynamodbWriter(DynamodbDestinationConfig config, + AmazonDynamoDB amazonDynamodb, + ConfiguredAirbyteStream configuredStream, + long uploadTimestamp) { + + this.config = config; + this.dynamodb = new DynamoDB(amazonDynamodb); + this.configuredStream = configuredStream; + this.uploadTimestamp = uploadTimestamp; + this.outputTableName = DynamodbOutputTableHelper.getOutputTableName(config.getTableName(), configuredStream.getStream()); + + final DestinationSyncMode syncMode = configuredStream.getDestinationSyncMode(); + if (syncMode == null) { + throw new IllegalStateException("Undefined destination sync mode"); + } + + final boolean isAppendMode = syncMode != DestinationSyncMode.OVERWRITE; + boolean tableExist = true; + + try { + if (!isAppendMode) { + Table table = dynamodb.getTable(outputTableName); + + if (isTableExist(table)) { + table.delete(); + table.waitForDelete(); + } + } + + var table = createTableIfNotExists(amazonDynamodb, outputTableName); + table.waitForActive(); + } catch (Exception e) { + LOGGER.error(e.getMessage()); + } + + this.tableWriteItems = new TableWriteItems(outputTableName); + } + + private static boolean isTableExist(Table table) { + try { + table.describe(); + } catch (ResourceNotFoundException e) { + return false; + } + return true; + } + + private Table createTableIfNotExists(AmazonDynamoDB amazonDynamodb, String tableName) throws Exception { + AttributeDefinition partitionKeyDefinition = new AttributeDefinition() + .withAttributeName(JavaBaseConstants.COLUMN_NAME_AB_ID) + .withAttributeType(ScalarAttributeType.S); + AttributeDefinition sortKeyDefinition = new AttributeDefinition() + .withAttributeName("sync_time") + .withAttributeType(ScalarAttributeType.N); + KeySchemaElement partitionKeySchema = new KeySchemaElement() + .withAttributeName(JavaBaseConstants.COLUMN_NAME_AB_ID) + .withKeyType(KeyType.HASH); + KeySchemaElement sortKeySchema = new KeySchemaElement() + .withAttributeName("sync_time") + .withKeyType(KeyType.RANGE); + ProvisionedThroughput throughput = new ProvisionedThroughput() + .withReadCapacityUnits(10L) + .withWriteCapacityUnits(10L); + + TableUtils.createTableIfNotExists(amazonDynamodb, new CreateTableRequest() + .withTableName(tableName) + .withAttributeDefinitions(partitionKeyDefinition) + .withKeySchema(partitionKeySchema) + .withAttributeDefinitions(sortKeyDefinition) + .withKeySchema(sortKeySchema) + .withProvisionedThroughput(throughput)); + return new DynamoDB(amazonDynamodb).getTable(tableName); + } + + public void write(UUID id, AirbyteRecordMessage recordMessage) { + + ObjectMapper mapper = new ObjectMapper(); + Map dataMap = mapper.convertValue(recordMessage.getData(), new TypeReference>() { + }); + + var item = new Item() + .withPrimaryKey(JavaBaseConstants.COLUMN_NAME_AB_ID, UUID.randomUUID().toString(), "sync_time", uploadTimestamp) + .withMap(JavaBaseConstants.COLUMN_NAME_DATA, dataMap) + .withLong(JavaBaseConstants.COLUMN_NAME_EMITTED_AT, recordMessage.getEmittedAt()); + tableWriteItems.addItemToPut(item); + BatchWriteItemOutcome outcome; + if (tableWriteItems.getItemsToPut().size() >= 25) { + try { + int maxRetries = 5; + outcome = dynamodb.batchWriteItem(tableWriteItems); + tableWriteItems = new TableWriteItems(this.outputTableName); + + while (outcome.getUnprocessedItems().size() > 0 && maxRetries > 0) { + outcome = dynamodb.batchWriteItemUnprocessed(outcome.getUnprocessedItems()); + maxRetries--; + } + + if (maxRetries == 0) { + LOGGER.warn(String.format("Unprocessed items count after retry %d times: %s", 5, Integer.toString(outcome.getUnprocessedItems().size()))); + } + } catch (Exception e) { + LOGGER.error(e.getMessage()); + } + } + } + + public void close(boolean hasFailed) throws IOException { + if (hasFailed) { + LOGGER.warn("Failure in writing data to DynamoDB. Aborting..."); + } else { + try { + int maxRetries = 5; + if (tableWriteItems.getItemsToPut().size() > 0) { + var outcome = dynamodb.batchWriteItem(tableWriteItems); + while (outcome.getUnprocessedItems().size() > 0 && maxRetries > 0) { + outcome = dynamodb.batchWriteItemUnprocessed(outcome.getUnprocessedItems()); + maxRetries--; + } + if (maxRetries == 0) { + LOGGER.warn(String.format("Unprocessed items count after retry %d times: %s", 5, Integer.toString(outcome.getUnprocessedItems().size()))); + } + } + } catch (Exception e) { + LOGGER.error(e.getMessage()); + } + LOGGER.info("Data writing completed for DynamoDB."); + } + } +} diff --git a/airbyte-integrations/connectors/destination-dynamodb/src/test-integration/java/io/airbyte/integrations/destination/dynamodb/DynamodbDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-dynamodb/src/test-integration/java/io/airbyte/integrations/destination/dynamodb/DynamodbDestinationAcceptanceTest.java new file mode 100644 index 000000000000..f53e962232ee --- /dev/null +++ b/airbyte-integrations/connectors/destination-dynamodb/src/test-integration/java/io/airbyte/integrations/destination/dynamodb/DynamodbDestinationAcceptanceTest.java @@ -0,0 +1,189 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.dynamodb; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.client.builder.AwsClientBuilder; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; +import com.amazonaws.services.dynamodbv2.document.*; +import com.amazonaws.services.dynamodbv2.document.spec.ScanSpec; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import io.airbyte.commons.io.IOs; +import io.airbyte.commons.jackson.MoreMappers; +import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.base.JavaBaseConstants; +import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest; + +import java.io.IOException; +import java.math.BigDecimal; +import java.nio.file.Path; +import java.util.*; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DynamodbDestinationAcceptanceTest extends DestinationAcceptanceTest { + + private static final Logger LOGGER = LoggerFactory.getLogger(DynamodbDestinationAcceptanceTest.class); + protected static final ObjectMapper MAPPER = MoreMappers.initMapper(); + + protected final String secretFilePath = "secrets/config.json"; + protected JsonNode configJson; + protected DynamodbDestinationConfig config; + protected AmazonDynamoDB client; + + protected JsonNode getBaseConfigJson() { + return Jsons.deserialize(IOs.readFile(Path.of(secretFilePath))); + } + + @Override + protected String getImageName() { + return "airbyte/destination-dynamodb:dev"; + } + + @Override + protected JsonNode getConfig() { + return configJson; + } + + @Override + protected JsonNode getFailCheckConfig() { + JsonNode baseJson = getBaseConfigJson(); + JsonNode failCheckJson = Jsons.clone(baseJson); + // invalid credential + ((ObjectNode) failCheckJson).put("access_key_id", "fake-key"); + ((ObjectNode) failCheckJson).put("secret_access_key", "fake-secret"); + return failCheckJson; + } + + /** + * Helper method to retrieve all synced objects inside the configured bucket path. + */ + protected List getAllSyncedObjects(String streamName, String namespace) { + var dynamodb = new DynamoDB(this.client); + var tableName = DynamodbOutputTableHelper.getOutputTableName(this.config.getTableName(), streamName, namespace); + var table = dynamodb.getTable(tableName); + List items = new ArrayList(); + List resultItems = new ArrayList(); + Long maxSyncTime = 0L; + + try { + ItemCollection scanItems = table.scan(new ScanSpec()); + + Iterator iter = scanItems.iterator(); + while (iter.hasNext()) { + + Item item = iter.next(); + items.add(item); + maxSyncTime = Math.max(maxSyncTime, ((BigDecimal) item.get("sync_time")).longValue()); + } + } catch (Exception e) { + LOGGER.error(e.getMessage()); + } + + Long finalMaxSyncTime = maxSyncTime; + LOGGER.error(finalMaxSyncTime.toString()); + items.sort(Comparator.comparingLong(o -> ((BigDecimal) o.get(JavaBaseConstants.COLUMN_NAME_EMITTED_AT)).longValue())); + + return items; + } + + @Override + protected List retrieveRecords(TestDestinationEnv testEnv, + String streamName, + String namespace, + JsonNode streamSchema) + throws IOException { + List items = getAllSyncedObjects(streamName, namespace); + List jsonRecords = new LinkedList<>(); + + for (var item : items) { + var itemJson = item.toJSON(); + LOGGER.error(itemJson); + jsonRecords.add(Jsons.deserialize(itemJson).get(JavaBaseConstants.COLUMN_NAME_DATA)); + } + + return jsonRecords; + } + + @Override + protected void setup(TestDestinationEnv testEnv) { + JsonNode baseConfigJson = getBaseConfigJson(); + // Set a random s3 bucket path for each integration test + JsonNode configJson = Jsons.clone(baseConfigJson); + this.configJson = configJson; + this.config = DynamodbDestinationConfig.getDynamodbDestinationConfig(configJson); + + var endpoint = config.getEndpoint(); + var region = config.getRegion(); + var accessKeyId = config.getAccessKeyId(); + var secretAccessKey = config.getSecretAccessKey(); + + var awsCreds = new BasicAWSCredentials(accessKeyId, secretAccessKey); + + if (endpoint.isEmpty()) { + this.client = AmazonDynamoDBClientBuilder.standard() + .withCredentials(new AWSStaticCredentialsProvider(awsCreds)) + .withRegion(config.getRegion()) + .build(); + + } else { + ClientConfiguration clientConfiguration = new ClientConfiguration(); + clientConfiguration.setSignerOverride("AWSDynamodbSignerType"); + + this.client = AmazonDynamoDBClientBuilder + .standard() + .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endpoint, region)) + .withClientConfiguration(clientConfiguration) + .withCredentials(new AWSStaticCredentialsProvider(awsCreds)) + .build(); + } + } + + @Override + protected void tearDown(TestDestinationEnv testEnv) { + var dynamodb = new DynamoDB(this.client); + List tables = new ArrayList(); + dynamodb.listTables().forEach(o -> { + if (o.getTableName().startsWith(this.config.getTableName())) tables.add(o.getTableName()); + }); + + try { + for (var tableName : tables) { + Table table = dynamodb.getTable(tableName); + table.delete(); + table.waitForDelete(); + LOGGER.info(String.format("Delete table %s", tableName)); + } + } catch (Exception e) { + LOGGER.error(e.getMessage()); + } + } +} diff --git a/airbyte-integrations/connectors/destination-dynamodb/src/test/java/io/airbyte/integrations/destination/dynamodb/DynamodbDestinationTest.java b/airbyte-integrations/connectors/destination-dynamodb/src/test/java/io/airbyte/integrations/destination/dynamodb/DynamodbDestinationTest.java new file mode 100644 index 000000000000..1030dcb4686a --- /dev/null +++ b/airbyte-integrations/connectors/destination-dynamodb/src/test/java/io/airbyte/integrations/destination/dynamodb/DynamodbDestinationTest.java @@ -0,0 +1,68 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.dynamodb; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.commons.json.Jsons; +import io.airbyte.protocol.models.*; + +import org.junit.jupiter.api.Test; + +class DynamodbDestinationTest { + + @Test + void testGetOutputTableNameWithString() throws Exception { + var actual = DynamodbOutputTableHelper.getOutputTableName("test_table", "test_namespace", "test_stream"); + assertEquals("test_table_test_namespace_test_stream", actual); + } + + @Test + void testGetOutputTableNameWithStream() throws Exception { + var stream = new AirbyteStream(); + stream.setName("test_stream"); + stream.setNamespace("test_namespace"); + var actual = DynamodbOutputTableHelper.getOutputTableName("test_table", stream); + assertEquals("test_table_test_namespace_test_stream", actual); + } + + @Test + void testGetDynamodbDestinationdbConfig() throws Exception { + JsonNode json = Jsons.deserialize("{\n" + + " \"dynamodb_table_name\": \"test_table\",\n" + + " \"dynamodb_region\": \"test_region\",\n" + + " \"access_key_id\": \"test_key_id\",\n" + + " \"secret_access_key\": \"test_access_key\"\n" + + "}" + ); + var config = DynamodbDestinationConfig.getDynamodbDestinationConfig(json); + + assertEquals(config.getTableName(), "test_table"); + assertEquals(config.getRegion(), "test_region"); + assertEquals(config.getAccessKeyId(), "test_key_id"); + assertEquals(config.getSecretAccessKey(), "test_access_key"); + } +} From d8b4cef13bc21450561bc5834dd78a9e1c8d96cf Mon Sep 17 00:00:00 2001 From: qtz123 Date: Fri, 20 Aug 2021 23:50:44 -0700 Subject: [PATCH 6/9] Added DynamoDB in the SUMMARY.md. Signed-off-by: qtz123 --- docs/SUMMARY.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/SUMMARY.md b/docs/SUMMARY.md index 7e772f71d069..8f95f61d22cb 100644 --- a/docs/SUMMARY.md +++ b/docs/SUMMARY.md @@ -122,6 +122,7 @@ * [Zuora](integrations/sources/zuora.md) * [Destinations](integrations/destinations/README.md) * [BigQuery](integrations/destinations/bigquery.md) + * [DynamoDB](integrations/destinations/dynamodb.md) * [Google Cloud Storage (GCS)](integrations/destinations/gcs.md) * [Google PubSub](integrations/destinations/pubsub.md) * [Kafka](integrations/destinations/kafka.md) From 14954d55bd97cda9af505561ca96ee8a396e9f36 Mon Sep 17 00:00:00 2001 From: Jinni Gu Date: Sat, 21 Aug 2021 02:24:57 -0700 Subject: [PATCH 7/9] Formatted code using ./gradlew format. Signed-off-by: Jinni Gu --- .../destination/dynamodb/DynamodbChecker.java | 92 ++++--- .../dynamodb/DynamodbConsumer.java | 159 ++++++----- .../dynamodb/DynamodbDestination.java | 48 ++-- .../dynamodb/DynamodbDestinationConfig.java | 84 +++--- .../dynamodb/DynamodbOutputTableHelper.java | 34 +-- .../destination/dynamodb/DynamodbWriter.java | 247 +++++++++-------- .../DynamodbDestinationAcceptanceTest.java | 260 +++++++++--------- .../dynamodb/DynamodbDestinationTest.java | 57 ++-- 8 files changed, 490 insertions(+), 491 deletions(-) diff --git a/airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbChecker.java b/airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbChecker.java index 503ae276b7f3..8fa49a436704 100644 --- a/airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbChecker.java +++ b/airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbChecker.java @@ -33,66 +33,68 @@ import com.amazonaws.services.dynamodbv2.document.*; import com.amazonaws.services.dynamodbv2.model.*; import io.airbyte.integrations.base.JavaBaseConstants; - import java.util.Arrays; import java.util.UUID; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class DynamodbChecker { - private static final Logger LOGGER = LoggerFactory.getLogger(DynamodbChecker.class); - - public static void attemptDynamodbWriteAndDelete(DynamodbDestinationConfig dynamodbDestinationConfig) throws Exception { - var prefix = dynamodbDestinationConfig.getTableName(); - final String outputTableName = prefix + "_airbyte_connection_test_" + UUID.randomUUID().toString().replaceAll("-", ""); - attemptWriteAndDeleteDynamodbItem(dynamodbDestinationConfig, outputTableName); - } + private static final Logger LOGGER = LoggerFactory.getLogger(DynamodbChecker.class); - private static void attemptWriteAndDeleteDynamodbItem(DynamodbDestinationConfig dynamodbDestinationConfig, String outputTableName) throws Exception { - DynamoDB dynamoDB = new DynamoDB(getAmazonDynamoDB(dynamodbDestinationConfig)); - Table table = dynamoDB.createTable(outputTableName, //create table - Arrays.asList(new KeySchemaElement(JavaBaseConstants.COLUMN_NAME_AB_ID, KeyType.HASH), new KeySchemaElement("sync_time", KeyType.RANGE)), - Arrays.asList(new AttributeDefinition(JavaBaseConstants.COLUMN_NAME_AB_ID, ScalarAttributeType.S), new AttributeDefinition("sync_time", ScalarAttributeType.N)), - new ProvisionedThroughput(1L, 1L)); - table.waitForActive(); + public static void attemptDynamodbWriteAndDelete(DynamodbDestinationConfig dynamodbDestinationConfig) throws Exception { + var prefix = dynamodbDestinationConfig.getTableName(); + final String outputTableName = prefix + "_airbyte_connection_test_" + UUID.randomUUID().toString().replaceAll("-", ""); + attemptWriteAndDeleteDynamodbItem(dynamodbDestinationConfig, outputTableName); + } - try { - PutItemOutcome outcome = table - .putItem(new Item().withPrimaryKey(JavaBaseConstants.COLUMN_NAME_AB_ID, UUID.randomUUID().toString(), "sync_time", System.currentTimeMillis())); - } catch (Exception e) { - LOGGER.error(e.getMessage()); - } + private static void attemptWriteAndDeleteDynamodbItem(DynamodbDestinationConfig dynamodbDestinationConfig, String outputTableName) + throws Exception { + DynamoDB dynamoDB = new DynamoDB(getAmazonDynamoDB(dynamodbDestinationConfig)); + Table table = dynamoDB.createTable(outputTableName, // create table + Arrays.asList(new KeySchemaElement(JavaBaseConstants.COLUMN_NAME_AB_ID, KeyType.HASH), new KeySchemaElement("sync_time", KeyType.RANGE)), + Arrays.asList(new AttributeDefinition(JavaBaseConstants.COLUMN_NAME_AB_ID, ScalarAttributeType.S), + new AttributeDefinition("sync_time", ScalarAttributeType.N)), + new ProvisionedThroughput(1L, 1L)); + table.waitForActive(); - table.delete(); //delete table - table.waitForDelete(); + try { + PutItemOutcome outcome = table + .putItem( + new Item().withPrimaryKey(JavaBaseConstants.COLUMN_NAME_AB_ID, UUID.randomUUID().toString(), "sync_time", System.currentTimeMillis())); + } catch (Exception e) { + LOGGER.error(e.getMessage()); } - public static AmazonDynamoDB getAmazonDynamoDB(DynamodbDestinationConfig dynamodbDestinationConfig) { - var endpoint = dynamodbDestinationConfig.getEndpoint(); - var region = dynamodbDestinationConfig.getRegion(); - var accessKeyId = dynamodbDestinationConfig.getAccessKeyId(); - var secretAccessKey = dynamodbDestinationConfig.getSecretAccessKey(); + table.delete(); // delete table + table.waitForDelete(); + } + + public static AmazonDynamoDB getAmazonDynamoDB(DynamodbDestinationConfig dynamodbDestinationConfig) { + var endpoint = dynamodbDestinationConfig.getEndpoint(); + var region = dynamodbDestinationConfig.getRegion(); + var accessKeyId = dynamodbDestinationConfig.getAccessKeyId(); + var secretAccessKey = dynamodbDestinationConfig.getSecretAccessKey(); - var awsCreds = new BasicAWSCredentials(accessKeyId, secretAccessKey); + var awsCreds = new BasicAWSCredentials(accessKeyId, secretAccessKey); - if (endpoint.isEmpty()) { - return AmazonDynamoDBClientBuilder.standard() - .withCredentials(new AWSStaticCredentialsProvider(awsCreds)) - .withRegion(dynamodbDestinationConfig.getRegion()) - .build(); + if (endpoint.isEmpty()) { + return AmazonDynamoDBClientBuilder.standard() + .withCredentials(new AWSStaticCredentialsProvider(awsCreds)) + .withRegion(dynamodbDestinationConfig.getRegion()) + .build(); - } else { - ClientConfiguration clientConfiguration = new ClientConfiguration(); - clientConfiguration.setSignerOverride("AWSDynamodbSignerType"); + } else { + ClientConfiguration clientConfiguration = new ClientConfiguration(); + clientConfiguration.setSignerOverride("AWSDynamodbSignerType"); - return AmazonDynamoDBClientBuilder - .standard() - .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endpoint, region)) - .withClientConfiguration(clientConfiguration) - .withCredentials(new AWSStaticCredentialsProvider(awsCreds)) - .build(); - } + return AmazonDynamoDBClientBuilder + .standard() + .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endpoint, region)) + .withClientConfiguration(clientConfiguration) + .withCredentials(new AWSStaticCredentialsProvider(awsCreds)) + .build(); } + } + } diff --git a/airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbConsumer.java b/airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbConsumer.java index 33f180458e3a..40954e770cda 100644 --- a/airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbConsumer.java +++ b/airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbConsumer.java @@ -35,100 +35,99 @@ import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer; import io.airbyte.protocol.models.*; - import java.util.*; import java.util.function.Consumer; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class DynamodbConsumer extends FailureTrackingAirbyteMessageConsumer { - private static final Logger LOGGER = LoggerFactory.getLogger(DynamodbConsumer.class); + private static final Logger LOGGER = LoggerFactory.getLogger(DynamodbConsumer.class); + + private final DynamodbDestinationConfig dynamodbDestinationConfig; + private final ConfiguredAirbyteCatalog configuredCatalog; + private final Consumer outputRecordCollector; + private final Map streamNameAndNamespaceToWriters; + + private AirbyteMessage lastStateMessage = null; + + public DynamodbConsumer(DynamodbDestinationConfig dynamodbDestinationConfig, + ConfiguredAirbyteCatalog configuredCatalog, + Consumer outputRecordCollector) { + this.dynamodbDestinationConfig = dynamodbDestinationConfig; + this.configuredCatalog = configuredCatalog; + this.outputRecordCollector = outputRecordCollector; + this.streamNameAndNamespaceToWriters = new HashMap<>(configuredCatalog.getStreams().size()); + } + + @Override + protected void startTracked() throws Exception { + + var endpoint = dynamodbDestinationConfig.getEndpoint(); + AWSCredentials awsCreds = new BasicAWSCredentials(dynamodbDestinationConfig.getAccessKeyId(), dynamodbDestinationConfig.getSecretAccessKey()); + AmazonDynamoDB amazonDynamodb = null; + + if (endpoint.isEmpty()) { + amazonDynamodb = AmazonDynamoDBClientBuilder.standard() + .withCredentials(new AWSStaticCredentialsProvider(awsCreds)) + .withRegion(dynamodbDestinationConfig.getRegion()) + .build(); + } else { + ClientConfiguration clientConfiguration = new ClientConfiguration(); + clientConfiguration.setSignerOverride("AWSDynamodbSignerType"); + + amazonDynamodb = AmazonDynamoDBClientBuilder + .standard() + .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endpoint, dynamodbDestinationConfig.getRegion())) + .withClientConfiguration(clientConfiguration) + .withCredentials(new AWSStaticCredentialsProvider(awsCreds)) + .build(); + } - private final DynamodbDestinationConfig dynamodbDestinationConfig; - private final ConfiguredAirbyteCatalog configuredCatalog; - private final Consumer outputRecordCollector; - private final Map streamNameAndNamespaceToWriters; + var uploadTimestamp = System.currentTimeMillis(); - private AirbyteMessage lastStateMessage = null; + for (ConfiguredAirbyteStream configuredStream : configuredCatalog.getStreams()) { + var writer = new DynamodbWriter(dynamodbDestinationConfig, amazonDynamodb, configuredStream, uploadTimestamp); - public DynamodbConsumer(DynamodbDestinationConfig dynamodbDestinationConfig, - ConfiguredAirbyteCatalog configuredCatalog, - Consumer outputRecordCollector) { - this.dynamodbDestinationConfig = dynamodbDestinationConfig; - this.configuredCatalog = configuredCatalog; - this.outputRecordCollector = outputRecordCollector; - this.streamNameAndNamespaceToWriters = new HashMap<>(configuredCatalog.getStreams().size()); + AirbyteStream stream = configuredStream.getStream(); + AirbyteStreamNameNamespacePair streamNamePair = AirbyteStreamNameNamespacePair + .fromAirbyteSteam(stream); + streamNameAndNamespaceToWriters.put(streamNamePair, writer); } - - @Override - protected void startTracked() throws Exception { - - var endpoint = dynamodbDestinationConfig.getEndpoint(); - AWSCredentials awsCreds = new BasicAWSCredentials(dynamodbDestinationConfig.getAccessKeyId(), dynamodbDestinationConfig.getSecretAccessKey()); - AmazonDynamoDB amazonDynamodb = null; - - if (endpoint.isEmpty()) { - amazonDynamodb = AmazonDynamoDBClientBuilder.standard() - .withCredentials(new AWSStaticCredentialsProvider(awsCreds)) - .withRegion(dynamodbDestinationConfig.getRegion()) - .build(); - } else { - ClientConfiguration clientConfiguration = new ClientConfiguration(); - clientConfiguration.setSignerOverride("AWSDynamodbSignerType"); - - amazonDynamodb = AmazonDynamoDBClientBuilder - .standard() - .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endpoint, dynamodbDestinationConfig.getRegion())) - .withClientConfiguration(clientConfiguration) - .withCredentials(new AWSStaticCredentialsProvider(awsCreds)) - .build(); - } - - var uploadTimestamp = System.currentTimeMillis(); - - for (ConfiguredAirbyteStream configuredStream : configuredCatalog.getStreams()) { - var writer = new DynamodbWriter(dynamodbDestinationConfig, amazonDynamodb, configuredStream, uploadTimestamp); - - AirbyteStream stream = configuredStream.getStream(); - AirbyteStreamNameNamespacePair streamNamePair = AirbyteStreamNameNamespacePair - .fromAirbyteSteam(stream); - streamNameAndNamespaceToWriters.put(streamNamePair, writer); - } + } + + @Override + protected void acceptTracked(AirbyteMessage airbyteMessage) throws Exception { + if (airbyteMessage.getType() == AirbyteMessage.Type.STATE) { + this.lastStateMessage = airbyteMessage; + return; + } else if (airbyteMessage.getType() != AirbyteMessage.Type.RECORD) { + return; } - @Override - protected void acceptTracked(AirbyteMessage airbyteMessage) throws Exception { - if (airbyteMessage.getType() == AirbyteMessage.Type.STATE) { - this.lastStateMessage = airbyteMessage; - return; - } else if (airbyteMessage.getType() != AirbyteMessage.Type.RECORD) { - return; - } - - AirbyteRecordMessage recordMessage = airbyteMessage.getRecord(); - AirbyteStreamNameNamespacePair pair = AirbyteStreamNameNamespacePair - .fromRecordMessage(recordMessage); - - if (!streamNameAndNamespaceToWriters.containsKey(pair)) { - throw new IllegalArgumentException( - String.format( - "Message contained record from a stream that was not in the catalog. \ncatalog: %s , \nmessage: %s", - Jsons.serialize(configuredCatalog), Jsons.serialize(recordMessage))); - } - - streamNameAndNamespaceToWriters.get(pair).write(UUID.randomUUID(), recordMessage); + AirbyteRecordMessage recordMessage = airbyteMessage.getRecord(); + AirbyteStreamNameNamespacePair pair = AirbyteStreamNameNamespacePair + .fromRecordMessage(recordMessage); + + if (!streamNameAndNamespaceToWriters.containsKey(pair)) { + throw new IllegalArgumentException( + String.format( + "Message contained record from a stream that was not in the catalog. \ncatalog: %s , \nmessage: %s", + Jsons.serialize(configuredCatalog), Jsons.serialize(recordMessage))); } - @Override - protected void close(boolean hasFailed) throws Exception { - for (DynamodbWriter handler : streamNameAndNamespaceToWriters.values()) { - handler.close(hasFailed); - } - // DynamoDB stream uploader is all or nothing if a failure happens in the destination. - if (!hasFailed) { - outputRecordCollector.accept(lastStateMessage); - } + streamNameAndNamespaceToWriters.get(pair).write(UUID.randomUUID(), recordMessage); + } + + @Override + protected void close(boolean hasFailed) throws Exception { + for (DynamodbWriter handler : streamNameAndNamespaceToWriters.values()) { + handler.close(hasFailed); } + // DynamoDB stream uploader is all or nothing if a failure happens in the destination. + if (!hasFailed) { + outputRecordCollector.accept(lastStateMessage); + } + } + } diff --git a/airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbDestination.java b/airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbDestination.java index b85ad6321190..8e6f087e39fe 100644 --- a/airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbDestination.java +++ b/airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbDestination.java @@ -32,40 +32,38 @@ import io.airbyte.protocol.models.AirbyteConnectionStatus; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; - import java.util.function.Consumer; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class DynamodbDestination extends BaseConnector implements Destination { - private static final Logger LOGGER = LoggerFactory.getLogger(DynamodbDestination.class); + private static final Logger LOGGER = LoggerFactory.getLogger(DynamodbDestination.class); - public static void main(String[] args) throws Exception { - new IntegrationRunner(new DynamodbDestination()).run(args); - } + public static void main(String[] args) throws Exception { + new IntegrationRunner(new DynamodbDestination()).run(args); + } - @Override - public AirbyteConnectionStatus check(JsonNode config) { - try { - DynamodbChecker.attemptDynamodbWriteAndDelete(DynamodbDestinationConfig.getDynamodbDestinationConfig(config)); - return new AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.SUCCEEDED); - } catch (Exception e) { - LOGGER.error("Exception attempting to access the DynamoDB table: ", e); - return new AirbyteConnectionStatus() - .withStatus(AirbyteConnectionStatus.Status.FAILED) - .withMessage("Could not connect to the DynamoDB table with the provided configuration. \n" + e - .getMessage()); - } + @Override + public AirbyteConnectionStatus check(JsonNode config) { + try { + DynamodbChecker.attemptDynamodbWriteAndDelete(DynamodbDestinationConfig.getDynamodbDestinationConfig(config)); + return new AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.SUCCEEDED); + } catch (Exception e) { + LOGGER.error("Exception attempting to access the DynamoDB table: ", e); + return new AirbyteConnectionStatus() + .withStatus(AirbyteConnectionStatus.Status.FAILED) + .withMessage("Could not connect to the DynamoDB table with the provided configuration. \n" + e + .getMessage()); } + } - @Override - public AirbyteMessageConsumer getConsumer(JsonNode config, - ConfiguredAirbyteCatalog configuredCatalog, - Consumer outputRecordCollector) { - // TODO - return new DynamodbConsumer(DynamodbDestinationConfig.getDynamodbDestinationConfig(config), configuredCatalog, outputRecordCollector); - } + @Override + public AirbyteMessageConsumer getConsumer(JsonNode config, + ConfiguredAirbyteCatalog configuredCatalog, + Consumer outputRecordCollector) { + // TODO + return new DynamodbConsumer(DynamodbDestinationConfig.getDynamodbDestinationConfig(config), configuredCatalog, outputRecordCollector); + } } diff --git a/airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbDestinationConfig.java b/airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbDestinationConfig.java index ee9ed5a9938a..310dc530b0aa 100644 --- a/airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbDestinationConfig.java +++ b/airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbDestinationConfig.java @@ -27,51 +27,53 @@ import com.fasterxml.jackson.databind.JsonNode; public class DynamodbDestinationConfig { - private final String endpoint; - private final String tableName; - private final String accessKeyId; - private final String secretAccessKey; - private final String region; - public DynamodbDestinationConfig( - String endpoint, - String tableName, - String region, - String accessKeyId, - String secretAccessKey) { - this.endpoint = endpoint; - this.tableName = tableName; - this.region = region; - this.accessKeyId = accessKeyId; - this.secretAccessKey = secretAccessKey; - } + private final String endpoint; + private final String tableName; + private final String accessKeyId; + private final String secretAccessKey; + private final String region; - public static DynamodbDestinationConfig getDynamodbDestinationConfig(JsonNode config) { - return new DynamodbDestinationConfig( - config.get("dynamodb_endpoint") == null ? "" : config.get("dynamodb_endpoint").asText(), - config.get("dynamodb_table_name").asText(), - config.get("dynamodb_region").asText(), - config.get("access_key_id").asText(), - config.get("secret_access_key").asText()); - } + public DynamodbDestinationConfig( + String endpoint, + String tableName, + String region, + String accessKeyId, + String secretAccessKey) { + this.endpoint = endpoint; + this.tableName = tableName; + this.region = region; + this.accessKeyId = accessKeyId; + this.secretAccessKey = secretAccessKey; + } - public String getEndpoint() { - return endpoint; - } + public static DynamodbDestinationConfig getDynamodbDestinationConfig(JsonNode config) { + return new DynamodbDestinationConfig( + config.get("dynamodb_endpoint") == null ? "" : config.get("dynamodb_endpoint").asText(), + config.get("dynamodb_table_name").asText(), + config.get("dynamodb_region").asText(), + config.get("access_key_id").asText(), + config.get("secret_access_key").asText()); + } - public String getAccessKeyId() { - return accessKeyId; - } + public String getEndpoint() { + return endpoint; + } - public String getSecretAccessKey() { - return secretAccessKey; - } + public String getAccessKeyId() { + return accessKeyId; + } - public String getRegion() { - return region; - } + public String getSecretAccessKey() { + return secretAccessKey; + } - public String getTableName() { - return tableName; - } -} \ No newline at end of file + public String getRegion() { + return region; + } + + public String getTableName() { + return tableName; + } + +} diff --git a/airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbOutputTableHelper.java b/airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbOutputTableHelper.java index 7cc3762f3afd..af540c3d4764 100644 --- a/airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbOutputTableHelper.java +++ b/airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbOutputTableHelper.java @@ -25,30 +25,30 @@ package io.airbyte.integrations.destination.dynamodb; import io.airbyte.integrations.destination.ExtendedNameTransformer; - import io.airbyte.protocol.models.AirbyteStream; - import java.util.LinkedList; import java.util.List; public class DynamodbOutputTableHelper { - public static String getOutputTableName(String tableName, AirbyteStream stream) { - return getOutputTableName(tableName, stream.getNamespace(), stream.getName()); - } - public static String getOutputTableName(String tableName, String namespace, String streamName) { - List paths = new LinkedList<>(); + public static String getOutputTableName(String tableName, AirbyteStream stream) { + return getOutputTableName(tableName, stream.getNamespace(), stream.getName()); + } - if (tableName != null) { - paths.add(tableName); - } - if (namespace != null) { - paths.add(new ExtendedNameTransformer().convertStreamName(namespace)); - } - if (streamName != null) { - paths.add(new ExtendedNameTransformer().convertStreamName(streamName)); - } + public static String getOutputTableName(String tableName, String namespace, String streamName) { + List paths = new LinkedList<>(); - return String.join("_", paths); + if (tableName != null) { + paths.add(tableName); + } + if (namespace != null) { + paths.add(new ExtendedNameTransformer().convertStreamName(namespace)); } + if (streamName != null) { + paths.add(new ExtendedNameTransformer().convertStreamName(streamName)); + } + + return String.join("_", paths); + } + } diff --git a/airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbWriter.java b/airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbWriter.java index becb6c148c51..01267a2af6a3 100644 --- a/airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbWriter.java +++ b/airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbWriter.java @@ -36,153 +36,152 @@ import io.airbyte.protocol.models.AirbyteRecordMessage; import io.airbyte.protocol.models.ConfiguredAirbyteStream; import io.airbyte.protocol.models.DestinationSyncMode; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.IOException; import java.util.Map; import java.util.UUID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class DynamodbWriter { - protected static final Logger LOGGER = LoggerFactory.getLogger(DynamodbWriter.class); - - private static final ObjectMapper MAPPER = MoreMappers.initMapper(); - private static final ObjectWriter WRITER = MAPPER.writer(); + protected static final Logger LOGGER = LoggerFactory.getLogger(DynamodbWriter.class); - private final DynamodbDestinationConfig config; - private final DynamoDB dynamodb; - private final ConfiguredAirbyteStream configuredStream; - private final long uploadTimestamp; - private TableWriteItems tableWriteItems; - private final String outputTableName; + private static final ObjectMapper MAPPER = MoreMappers.initMapper(); + private static final ObjectWriter WRITER = MAPPER.writer(); - public DynamodbWriter(DynamodbDestinationConfig config, - AmazonDynamoDB amazonDynamodb, - ConfiguredAirbyteStream configuredStream, - long uploadTimestamp) { + private final DynamodbDestinationConfig config; + private final DynamoDB dynamodb; + private final ConfiguredAirbyteStream configuredStream; + private final long uploadTimestamp; + private TableWriteItems tableWriteItems; + private final String outputTableName; - this.config = config; - this.dynamodb = new DynamoDB(amazonDynamodb); - this.configuredStream = configuredStream; - this.uploadTimestamp = uploadTimestamp; - this.outputTableName = DynamodbOutputTableHelper.getOutputTableName(config.getTableName(), configuredStream.getStream()); + public DynamodbWriter(DynamodbDestinationConfig config, + AmazonDynamoDB amazonDynamodb, + ConfiguredAirbyteStream configuredStream, + long uploadTimestamp) { - final DestinationSyncMode syncMode = configuredStream.getDestinationSyncMode(); - if (syncMode == null) { - throw new IllegalStateException("Undefined destination sync mode"); - } + this.config = config; + this.dynamodb = new DynamoDB(amazonDynamodb); + this.configuredStream = configuredStream; + this.uploadTimestamp = uploadTimestamp; + this.outputTableName = DynamodbOutputTableHelper.getOutputTableName(config.getTableName(), configuredStream.getStream()); - final boolean isAppendMode = syncMode != DestinationSyncMode.OVERWRITE; - boolean tableExist = true; + final DestinationSyncMode syncMode = configuredStream.getDestinationSyncMode(); + if (syncMode == null) { + throw new IllegalStateException("Undefined destination sync mode"); + } - try { - if (!isAppendMode) { - Table table = dynamodb.getTable(outputTableName); + final boolean isAppendMode = syncMode != DestinationSyncMode.OVERWRITE; + boolean tableExist = true; - if (isTableExist(table)) { - table.delete(); - table.waitForDelete(); - } - } + try { + if (!isAppendMode) { + Table table = dynamodb.getTable(outputTableName); - var table = createTableIfNotExists(amazonDynamodb, outputTableName); - table.waitForActive(); - } catch (Exception e) { - LOGGER.error(e.getMessage()); + if (isTableExist(table)) { + table.delete(); + table.waitForDelete(); } + } - this.tableWriteItems = new TableWriteItems(outputTableName); + var table = createTableIfNotExists(amazonDynamodb, outputTableName); + table.waitForActive(); + } catch (Exception e) { + LOGGER.error(e.getMessage()); } - private static boolean isTableExist(Table table) { - try { - table.describe(); - } catch (ResourceNotFoundException e) { - return false; - } - return true; - } + this.tableWriteItems = new TableWriteItems(outputTableName); + } - private Table createTableIfNotExists(AmazonDynamoDB amazonDynamodb, String tableName) throws Exception { - AttributeDefinition partitionKeyDefinition = new AttributeDefinition() - .withAttributeName(JavaBaseConstants.COLUMN_NAME_AB_ID) - .withAttributeType(ScalarAttributeType.S); - AttributeDefinition sortKeyDefinition = new AttributeDefinition() - .withAttributeName("sync_time") - .withAttributeType(ScalarAttributeType.N); - KeySchemaElement partitionKeySchema = new KeySchemaElement() - .withAttributeName(JavaBaseConstants.COLUMN_NAME_AB_ID) - .withKeyType(KeyType.HASH); - KeySchemaElement sortKeySchema = new KeySchemaElement() - .withAttributeName("sync_time") - .withKeyType(KeyType.RANGE); - ProvisionedThroughput throughput = new ProvisionedThroughput() - .withReadCapacityUnits(10L) - .withWriteCapacityUnits(10L); - - TableUtils.createTableIfNotExists(amazonDynamodb, new CreateTableRequest() - .withTableName(tableName) - .withAttributeDefinitions(partitionKeyDefinition) - .withKeySchema(partitionKeySchema) - .withAttributeDefinitions(sortKeyDefinition) - .withKeySchema(sortKeySchema) - .withProvisionedThroughput(throughput)); - return new DynamoDB(amazonDynamodb).getTable(tableName); + private static boolean isTableExist(Table table) { + try { + table.describe(); + } catch (ResourceNotFoundException e) { + return false; } + return true; + } + + private Table createTableIfNotExists(AmazonDynamoDB amazonDynamodb, String tableName) throws Exception { + AttributeDefinition partitionKeyDefinition = new AttributeDefinition() + .withAttributeName(JavaBaseConstants.COLUMN_NAME_AB_ID) + .withAttributeType(ScalarAttributeType.S); + AttributeDefinition sortKeyDefinition = new AttributeDefinition() + .withAttributeName("sync_time") + .withAttributeType(ScalarAttributeType.N); + KeySchemaElement partitionKeySchema = new KeySchemaElement() + .withAttributeName(JavaBaseConstants.COLUMN_NAME_AB_ID) + .withKeyType(KeyType.HASH); + KeySchemaElement sortKeySchema = new KeySchemaElement() + .withAttributeName("sync_time") + .withKeyType(KeyType.RANGE); + ProvisionedThroughput throughput = new ProvisionedThroughput() + .withReadCapacityUnits(10L) + .withWriteCapacityUnits(10L); + + TableUtils.createTableIfNotExists(amazonDynamodb, new CreateTableRequest() + .withTableName(tableName) + .withAttributeDefinitions(partitionKeyDefinition) + .withKeySchema(partitionKeySchema) + .withAttributeDefinitions(sortKeyDefinition) + .withKeySchema(sortKeySchema) + .withProvisionedThroughput(throughput)); + return new DynamoDB(amazonDynamodb).getTable(tableName); + } + + public void write(UUID id, AirbyteRecordMessage recordMessage) { + + ObjectMapper mapper = new ObjectMapper(); + Map dataMap = mapper.convertValue(recordMessage.getData(), new TypeReference>() {}); + + var item = new Item() + .withPrimaryKey(JavaBaseConstants.COLUMN_NAME_AB_ID, UUID.randomUUID().toString(), "sync_time", uploadTimestamp) + .withMap(JavaBaseConstants.COLUMN_NAME_DATA, dataMap) + .withLong(JavaBaseConstants.COLUMN_NAME_EMITTED_AT, recordMessage.getEmittedAt()); + tableWriteItems.addItemToPut(item); + BatchWriteItemOutcome outcome; + if (tableWriteItems.getItemsToPut().size() >= 25) { + try { + int maxRetries = 5; + outcome = dynamodb.batchWriteItem(tableWriteItems); + tableWriteItems = new TableWriteItems(this.outputTableName); + + while (outcome.getUnprocessedItems().size() > 0 && maxRetries > 0) { + outcome = dynamodb.batchWriteItemUnprocessed(outcome.getUnprocessedItems()); + maxRetries--; + } - public void write(UUID id, AirbyteRecordMessage recordMessage) { - - ObjectMapper mapper = new ObjectMapper(); - Map dataMap = mapper.convertValue(recordMessage.getData(), new TypeReference>() { - }); - - var item = new Item() - .withPrimaryKey(JavaBaseConstants.COLUMN_NAME_AB_ID, UUID.randomUUID().toString(), "sync_time", uploadTimestamp) - .withMap(JavaBaseConstants.COLUMN_NAME_DATA, dataMap) - .withLong(JavaBaseConstants.COLUMN_NAME_EMITTED_AT, recordMessage.getEmittedAt()); - tableWriteItems.addItemToPut(item); - BatchWriteItemOutcome outcome; - if (tableWriteItems.getItemsToPut().size() >= 25) { - try { - int maxRetries = 5; - outcome = dynamodb.batchWriteItem(tableWriteItems); - tableWriteItems = new TableWriteItems(this.outputTableName); - - while (outcome.getUnprocessedItems().size() > 0 && maxRetries > 0) { - outcome = dynamodb.batchWriteItemUnprocessed(outcome.getUnprocessedItems()); - maxRetries--; - } - - if (maxRetries == 0) { - LOGGER.warn(String.format("Unprocessed items count after retry %d times: %s", 5, Integer.toString(outcome.getUnprocessedItems().size()))); - } - } catch (Exception e) { - LOGGER.error(e.getMessage()); - } + if (maxRetries == 0) { + LOGGER.warn(String.format("Unprocessed items count after retry %d times: %s", 5, Integer.toString(outcome.getUnprocessedItems().size()))); } + } catch (Exception e) { + LOGGER.error(e.getMessage()); + } } - - public void close(boolean hasFailed) throws IOException { - if (hasFailed) { - LOGGER.warn("Failure in writing data to DynamoDB. Aborting..."); - } else { - try { - int maxRetries = 5; - if (tableWriteItems.getItemsToPut().size() > 0) { - var outcome = dynamodb.batchWriteItem(tableWriteItems); - while (outcome.getUnprocessedItems().size() > 0 && maxRetries > 0) { - outcome = dynamodb.batchWriteItemUnprocessed(outcome.getUnprocessedItems()); - maxRetries--; - } - if (maxRetries == 0) { - LOGGER.warn(String.format("Unprocessed items count after retry %d times: %s", 5, Integer.toString(outcome.getUnprocessedItems().size()))); - } - } - } catch (Exception e) { - LOGGER.error(e.getMessage()); - } - LOGGER.info("Data writing completed for DynamoDB."); + } + + public void close(boolean hasFailed) throws IOException { + if (hasFailed) { + LOGGER.warn("Failure in writing data to DynamoDB. Aborting..."); + } else { + try { + int maxRetries = 5; + if (tableWriteItems.getItemsToPut().size() > 0) { + var outcome = dynamodb.batchWriteItem(tableWriteItems); + while (outcome.getUnprocessedItems().size() > 0 && maxRetries > 0) { + outcome = dynamodb.batchWriteItemUnprocessed(outcome.getUnprocessedItems()); + maxRetries--; + } + if (maxRetries == 0) { + LOGGER.warn(String.format("Unprocessed items count after retry %d times: %s", 5, Integer.toString(outcome.getUnprocessedItems().size()))); + } } + } catch (Exception e) { + LOGGER.error(e.getMessage()); + } + LOGGER.info("Data writing completed for DynamoDB."); } + } + } diff --git a/airbyte-integrations/connectors/destination-dynamodb/src/test-integration/java/io/airbyte/integrations/destination/dynamodb/DynamodbDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-dynamodb/src/test-integration/java/io/airbyte/integrations/destination/dynamodb/DynamodbDestinationAcceptanceTest.java index f53e962232ee..794d05cffb35 100644 --- a/airbyte-integrations/connectors/destination-dynamodb/src/test-integration/java/io/airbyte/integrations/destination/dynamodb/DynamodbDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-dynamodb/src/test-integration/java/io/airbyte/integrations/destination/dynamodb/DynamodbDestinationAcceptanceTest.java @@ -40,150 +40,150 @@ import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.base.JavaBaseConstants; import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest; - import java.io.IOException; import java.math.BigDecimal; import java.nio.file.Path; import java.util.*; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class DynamodbDestinationAcceptanceTest extends DestinationAcceptanceTest { - private static final Logger LOGGER = LoggerFactory.getLogger(DynamodbDestinationAcceptanceTest.class); - protected static final ObjectMapper MAPPER = MoreMappers.initMapper(); - - protected final String secretFilePath = "secrets/config.json"; - protected JsonNode configJson; - protected DynamodbDestinationConfig config; - protected AmazonDynamoDB client; - - protected JsonNode getBaseConfigJson() { - return Jsons.deserialize(IOs.readFile(Path.of(secretFilePath))); - } - - @Override - protected String getImageName() { - return "airbyte/destination-dynamodb:dev"; + private static final Logger LOGGER = LoggerFactory.getLogger(DynamodbDestinationAcceptanceTest.class); + protected static final ObjectMapper MAPPER = MoreMappers.initMapper(); + + protected final String secretFilePath = "secrets/config.json"; + protected JsonNode configJson; + protected DynamodbDestinationConfig config; + protected AmazonDynamoDB client; + + protected JsonNode getBaseConfigJson() { + return Jsons.deserialize(IOs.readFile(Path.of(secretFilePath))); + } + + @Override + protected String getImageName() { + return "airbyte/destination-dynamodb:dev"; + } + + @Override + protected JsonNode getConfig() { + return configJson; + } + + @Override + protected JsonNode getFailCheckConfig() { + JsonNode baseJson = getBaseConfigJson(); + JsonNode failCheckJson = Jsons.clone(baseJson); + // invalid credential + ((ObjectNode) failCheckJson).put("access_key_id", "fake-key"); + ((ObjectNode) failCheckJson).put("secret_access_key", "fake-secret"); + return failCheckJson; + } + + /** + * Helper method to retrieve all synced objects inside the configured bucket path. + */ + protected List getAllSyncedObjects(String streamName, String namespace) { + var dynamodb = new DynamoDB(this.client); + var tableName = DynamodbOutputTableHelper.getOutputTableName(this.config.getTableName(), streamName, namespace); + var table = dynamodb.getTable(tableName); + List items = new ArrayList(); + List resultItems = new ArrayList(); + Long maxSyncTime = 0L; + + try { + ItemCollection scanItems = table.scan(new ScanSpec()); + + Iterator iter = scanItems.iterator(); + while (iter.hasNext()) { + + Item item = iter.next(); + items.add(item); + maxSyncTime = Math.max(maxSyncTime, ((BigDecimal) item.get("sync_time")).longValue()); + } + } catch (Exception e) { + LOGGER.error(e.getMessage()); } - @Override - protected JsonNode getConfig() { - return configJson; + Long finalMaxSyncTime = maxSyncTime; + LOGGER.error(finalMaxSyncTime.toString()); + items.sort(Comparator.comparingLong(o -> ((BigDecimal) o.get(JavaBaseConstants.COLUMN_NAME_EMITTED_AT)).longValue())); + + return items; + } + + @Override + protected List retrieveRecords(TestDestinationEnv testEnv, + String streamName, + String namespace, + JsonNode streamSchema) + throws IOException { + List items = getAllSyncedObjects(streamName, namespace); + List jsonRecords = new LinkedList<>(); + + for (var item : items) { + var itemJson = item.toJSON(); + LOGGER.error(itemJson); + jsonRecords.add(Jsons.deserialize(itemJson).get(JavaBaseConstants.COLUMN_NAME_DATA)); } - @Override - protected JsonNode getFailCheckConfig() { - JsonNode baseJson = getBaseConfigJson(); - JsonNode failCheckJson = Jsons.clone(baseJson); - // invalid credential - ((ObjectNode) failCheckJson).put("access_key_id", "fake-key"); - ((ObjectNode) failCheckJson).put("secret_access_key", "fake-secret"); - return failCheckJson; + return jsonRecords; + } + + @Override + protected void setup(TestDestinationEnv testEnv) { + JsonNode baseConfigJson = getBaseConfigJson(); + // Set a random s3 bucket path for each integration test + JsonNode configJson = Jsons.clone(baseConfigJson); + this.configJson = configJson; + this.config = DynamodbDestinationConfig.getDynamodbDestinationConfig(configJson); + + var endpoint = config.getEndpoint(); + var region = config.getRegion(); + var accessKeyId = config.getAccessKeyId(); + var secretAccessKey = config.getSecretAccessKey(); + + var awsCreds = new BasicAWSCredentials(accessKeyId, secretAccessKey); + + if (endpoint.isEmpty()) { + this.client = AmazonDynamoDBClientBuilder.standard() + .withCredentials(new AWSStaticCredentialsProvider(awsCreds)) + .withRegion(config.getRegion()) + .build(); + + } else { + ClientConfiguration clientConfiguration = new ClientConfiguration(); + clientConfiguration.setSignerOverride("AWSDynamodbSignerType"); + + this.client = AmazonDynamoDBClientBuilder + .standard() + .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endpoint, region)) + .withClientConfiguration(clientConfiguration) + .withCredentials(new AWSStaticCredentialsProvider(awsCreds)) + .build(); } - - /** - * Helper method to retrieve all synced objects inside the configured bucket path. - */ - protected List getAllSyncedObjects(String streamName, String namespace) { - var dynamodb = new DynamoDB(this.client); - var tableName = DynamodbOutputTableHelper.getOutputTableName(this.config.getTableName(), streamName, namespace); - var table = dynamodb.getTable(tableName); - List items = new ArrayList(); - List resultItems = new ArrayList(); - Long maxSyncTime = 0L; - - try { - ItemCollection scanItems = table.scan(new ScanSpec()); - - Iterator iter = scanItems.iterator(); - while (iter.hasNext()) { - - Item item = iter.next(); - items.add(item); - maxSyncTime = Math.max(maxSyncTime, ((BigDecimal) item.get("sync_time")).longValue()); - } - } catch (Exception e) { - LOGGER.error(e.getMessage()); - } - - Long finalMaxSyncTime = maxSyncTime; - LOGGER.error(finalMaxSyncTime.toString()); - items.sort(Comparator.comparingLong(o -> ((BigDecimal) o.get(JavaBaseConstants.COLUMN_NAME_EMITTED_AT)).longValue())); - - return items; - } - - @Override - protected List retrieveRecords(TestDestinationEnv testEnv, - String streamName, - String namespace, - JsonNode streamSchema) - throws IOException { - List items = getAllSyncedObjects(streamName, namespace); - List jsonRecords = new LinkedList<>(); - - for (var item : items) { - var itemJson = item.toJSON(); - LOGGER.error(itemJson); - jsonRecords.add(Jsons.deserialize(itemJson).get(JavaBaseConstants.COLUMN_NAME_DATA)); - } - - return jsonRecords; + } + + @Override + protected void tearDown(TestDestinationEnv testEnv) { + var dynamodb = new DynamoDB(this.client); + List tables = new ArrayList(); + dynamodb.listTables().forEach(o -> { + if (o.getTableName().startsWith(this.config.getTableName())) + tables.add(o.getTableName()); + }); + + try { + for (var tableName : tables) { + Table table = dynamodb.getTable(tableName); + table.delete(); + table.waitForDelete(); + LOGGER.info(String.format("Delete table %s", tableName)); + } + } catch (Exception e) { + LOGGER.error(e.getMessage()); } + } - @Override - protected void setup(TestDestinationEnv testEnv) { - JsonNode baseConfigJson = getBaseConfigJson(); - // Set a random s3 bucket path for each integration test - JsonNode configJson = Jsons.clone(baseConfigJson); - this.configJson = configJson; - this.config = DynamodbDestinationConfig.getDynamodbDestinationConfig(configJson); - - var endpoint = config.getEndpoint(); - var region = config.getRegion(); - var accessKeyId = config.getAccessKeyId(); - var secretAccessKey = config.getSecretAccessKey(); - - var awsCreds = new BasicAWSCredentials(accessKeyId, secretAccessKey); - - if (endpoint.isEmpty()) { - this.client = AmazonDynamoDBClientBuilder.standard() - .withCredentials(new AWSStaticCredentialsProvider(awsCreds)) - .withRegion(config.getRegion()) - .build(); - - } else { - ClientConfiguration clientConfiguration = new ClientConfiguration(); - clientConfiguration.setSignerOverride("AWSDynamodbSignerType"); - - this.client = AmazonDynamoDBClientBuilder - .standard() - .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endpoint, region)) - .withClientConfiguration(clientConfiguration) - .withCredentials(new AWSStaticCredentialsProvider(awsCreds)) - .build(); - } - } - - @Override - protected void tearDown(TestDestinationEnv testEnv) { - var dynamodb = new DynamoDB(this.client); - List tables = new ArrayList(); - dynamodb.listTables().forEach(o -> { - if (o.getTableName().startsWith(this.config.getTableName())) tables.add(o.getTableName()); - }); - - try { - for (var tableName : tables) { - Table table = dynamodb.getTable(tableName); - table.delete(); - table.waitForDelete(); - LOGGER.info(String.format("Delete table %s", tableName)); - } - } catch (Exception e) { - LOGGER.error(e.getMessage()); - } - } } diff --git a/airbyte-integrations/connectors/destination-dynamodb/src/test/java/io/airbyte/integrations/destination/dynamodb/DynamodbDestinationTest.java b/airbyte-integrations/connectors/destination-dynamodb/src/test/java/io/airbyte/integrations/destination/dynamodb/DynamodbDestinationTest.java index 1030dcb4686a..3e80a03ca9f6 100644 --- a/airbyte-integrations/connectors/destination-dynamodb/src/test/java/io/airbyte/integrations/destination/dynamodb/DynamodbDestinationTest.java +++ b/airbyte-integrations/connectors/destination-dynamodb/src/test/java/io/airbyte/integrations/destination/dynamodb/DynamodbDestinationTest.java @@ -29,40 +29,39 @@ import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.commons.json.Jsons; import io.airbyte.protocol.models.*; - import org.junit.jupiter.api.Test; class DynamodbDestinationTest { - @Test - void testGetOutputTableNameWithString() throws Exception { - var actual = DynamodbOutputTableHelper.getOutputTableName("test_table", "test_namespace", "test_stream"); - assertEquals("test_table_test_namespace_test_stream", actual); - } + @Test + void testGetOutputTableNameWithString() throws Exception { + var actual = DynamodbOutputTableHelper.getOutputTableName("test_table", "test_namespace", "test_stream"); + assertEquals("test_table_test_namespace_test_stream", actual); + } + + @Test + void testGetOutputTableNameWithStream() throws Exception { + var stream = new AirbyteStream(); + stream.setName("test_stream"); + stream.setNamespace("test_namespace"); + var actual = DynamodbOutputTableHelper.getOutputTableName("test_table", stream); + assertEquals("test_table_test_namespace_test_stream", actual); + } - @Test - void testGetOutputTableNameWithStream() throws Exception { - var stream = new AirbyteStream(); - stream.setName("test_stream"); - stream.setNamespace("test_namespace"); - var actual = DynamodbOutputTableHelper.getOutputTableName("test_table", stream); - assertEquals("test_table_test_namespace_test_stream", actual); - } + @Test + void testGetDynamodbDestinationdbConfig() throws Exception { + JsonNode json = Jsons.deserialize("{\n" + + " \"dynamodb_table_name\": \"test_table\",\n" + + " \"dynamodb_region\": \"test_region\",\n" + + " \"access_key_id\": \"test_key_id\",\n" + + " \"secret_access_key\": \"test_access_key\"\n" + + "}"); + var config = DynamodbDestinationConfig.getDynamodbDestinationConfig(json); - @Test - void testGetDynamodbDestinationdbConfig() throws Exception { - JsonNode json = Jsons.deserialize("{\n" + - " \"dynamodb_table_name\": \"test_table\",\n" + - " \"dynamodb_region\": \"test_region\",\n" + - " \"access_key_id\": \"test_key_id\",\n" + - " \"secret_access_key\": \"test_access_key\"\n" + - "}" - ); - var config = DynamodbDestinationConfig.getDynamodbDestinationConfig(json); + assertEquals(config.getTableName(), "test_table"); + assertEquals(config.getRegion(), "test_region"); + assertEquals(config.getAccessKeyId(), "test_key_id"); + assertEquals(config.getSecretAccessKey(), "test_access_key"); + } - assertEquals(config.getTableName(), "test_table"); - assertEquals(config.getRegion(), "test_region"); - assertEquals(config.getAccessKeyId(), "test_key_id"); - assertEquals(config.getSecretAccessKey(), "test_access_key"); - } } From 843dceeddf487a2e302b64e0c8d0b5b44cc83b79 Mon Sep 17 00:00:00 2001 From: qtz123 Date: Sat, 21 Aug 2021 12:46:20 -0700 Subject: [PATCH 8/9] Added changelog to the doc. Signed-off-by: qtz123 --- docs/integrations/destinations/dynamodb.md | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/docs/integrations/destinations/dynamodb.md b/docs/integrations/destinations/dynamodb.md index 584e7f289909..dfa230be31f1 100644 --- a/docs/integrations/destinations/dynamodb.md +++ b/docs/integrations/destinations/dynamodb.md @@ -51,4 +51,10 @@ This connector by default uses 10 capacity units for both Read and Write in Dyna * Make sure your DynamoDB tables are accessible from the machine running Airbyte. * This depends on your networking setup. * You can check AWS DynamoDB documentation with a tutorial on how to properly configure your DynamoDB's access [here](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/access-control-overview.html). - * The easiest way to verify if Airbyte is able to connect to your DynamoDB tables is via the check connection tool in the UI. \ No newline at end of file + * The easiest way to verify if Airbyte is able to connect to your DynamoDB tables is via the check connection tool in the UI. + +## CHANGELOG + +| Version | Date | Pull Request | Subject | +| :--- | :--- | :--- | :--- | +| 0.1.0 | 2021-08-20 | [#5561](https://github.com/airbytehq/airbyte/pull/5561) | Initial release. | From 23fa64380c0ddea8f29ee8422bdb861ea62dc587 Mon Sep 17 00:00:00 2001 From: Yiqing Wang Date: Wed, 25 Aug 2021 01:10:31 -0700 Subject: [PATCH 9/9] Used PAY_PER_REQUEST instead of provisioned for DynamoDB. Gave the value a name batchSize. Removed unnecessary logs. Signed-off-by: Yiqing Wang --- .../integrations/destination/dynamodb/DynamodbWriter.java | 8 +++----- .../dynamodb/DynamodbDestinationAcceptanceTest.java | 2 -- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbWriter.java b/airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbWriter.java index 01267a2af6a3..5805f6ca264b 100644 --- a/airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbWriter.java +++ b/airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbWriter.java @@ -55,6 +55,7 @@ public class DynamodbWriter { private final long uploadTimestamp; private TableWriteItems tableWriteItems; private final String outputTableName; + private final int batchSize = 25; public DynamodbWriter(DynamodbDestinationConfig config, AmazonDynamoDB amazonDynamodb, @@ -116,9 +117,6 @@ private Table createTableIfNotExists(AmazonDynamoDB amazonDynamodb, String table KeySchemaElement sortKeySchema = new KeySchemaElement() .withAttributeName("sync_time") .withKeyType(KeyType.RANGE); - ProvisionedThroughput throughput = new ProvisionedThroughput() - .withReadCapacityUnits(10L) - .withWriteCapacityUnits(10L); TableUtils.createTableIfNotExists(amazonDynamodb, new CreateTableRequest() .withTableName(tableName) @@ -126,7 +124,7 @@ private Table createTableIfNotExists(AmazonDynamoDB amazonDynamodb, String table .withKeySchema(partitionKeySchema) .withAttributeDefinitions(sortKeyDefinition) .withKeySchema(sortKeySchema) - .withProvisionedThroughput(throughput)); + .withBillingMode(BillingMode.PAY_PER_REQUEST)); return new DynamoDB(amazonDynamodb).getTable(tableName); } @@ -141,7 +139,7 @@ public void write(UUID id, AirbyteRecordMessage recordMessage) { .withLong(JavaBaseConstants.COLUMN_NAME_EMITTED_AT, recordMessage.getEmittedAt()); tableWriteItems.addItemToPut(item); BatchWriteItemOutcome outcome; - if (tableWriteItems.getItemsToPut().size() >= 25) { + if (tableWriteItems.getItemsToPut().size() >= batchSize) { try { int maxRetries = 5; outcome = dynamodb.batchWriteItem(tableWriteItems); diff --git a/airbyte-integrations/connectors/destination-dynamodb/src/test-integration/java/io/airbyte/integrations/destination/dynamodb/DynamodbDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-dynamodb/src/test-integration/java/io/airbyte/integrations/destination/dynamodb/DynamodbDestinationAcceptanceTest.java index 794d05cffb35..47b128c174c2 100644 --- a/airbyte-integrations/connectors/destination-dynamodb/src/test-integration/java/io/airbyte/integrations/destination/dynamodb/DynamodbDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-dynamodb/src/test-integration/java/io/airbyte/integrations/destination/dynamodb/DynamodbDestinationAcceptanceTest.java @@ -107,7 +107,6 @@ protected List getAllSyncedObjects(String streamName, String namespace) { } Long finalMaxSyncTime = maxSyncTime; - LOGGER.error(finalMaxSyncTime.toString()); items.sort(Comparator.comparingLong(o -> ((BigDecimal) o.get(JavaBaseConstants.COLUMN_NAME_EMITTED_AT)).longValue())); return items; @@ -124,7 +123,6 @@ protected List retrieveRecords(TestDestinationEnv testEnv, for (var item : items) { var itemJson = item.toJSON(); - LOGGER.error(itemJson); jsonRecords.add(Jsons.deserialize(itemJson).get(JavaBaseConstants.COLUMN_NAME_DATA)); }