diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/9255b7de-f778-4087-8973-e17d7898c43d.json b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/9255b7de-f778-4087-8973-e17d7898c43d.json new file mode 100644 index 000000000000..ad0b8bfda461 --- /dev/null +++ b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/9255b7de-f778-4087-8973-e17d7898c43d.json @@ -0,0 +1,7 @@ +{ + "destinationDefinitionId": "9255b7de-f778-4087-8973-e17d7898c43d", + "name": "Scylla", + "dockerRepository": "airbyte/destination-scylla", + "dockerImageTag": "0.1.0", + "documentationUrl": "https://docs.airbyte.io/integrations/destinations/scylla" +} diff --git a/airbyte-integrations/connectors/destination-scylla/.dockerignore b/airbyte-integrations/connectors/destination-scylla/.dockerignore new file mode 100644 index 000000000000..65c7d0ad3e73 --- /dev/null +++ b/airbyte-integrations/connectors/destination-scylla/.dockerignore @@ -0,0 +1,3 @@ +* +!Dockerfile +!build diff --git a/airbyte-integrations/connectors/destination-scylla/Dockerfile b/airbyte-integrations/connectors/destination-scylla/Dockerfile new file mode 100644 index 000000000000..2012d72996e4 --- /dev/null +++ b/airbyte-integrations/connectors/destination-scylla/Dockerfile @@ -0,0 +1,11 @@ +FROM airbyte/integration-base-java:dev + +WORKDIR /airbyte +ENV APPLICATION destination-scylla + +COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar + +RUN tar xf ${APPLICATION}.tar --strip-components=1 + +LABEL io.airbyte.version=0.1.0 +LABEL io.airbyte.name=airbyte/destination-scylla diff --git a/airbyte-integrations/connectors/destination-scylla/README.md b/airbyte-integrations/connectors/destination-scylla/README.md new file mode 100644 index 000000000000..8e6e60e9f6b5 --- /dev/null +++ b/airbyte-integrations/connectors/destination-scylla/README.md @@ -0,0 +1,68 @@ +# Destination Scylla + +This is the repository for the Scylla destination connector in Java. +For information about how to use this connector within Airbyte, see [the User Documentation](https://docs.airbyte.io/integrations/destinations/scylla). + +## Local development + +#### Building via Gradle +From the Airbyte repository root, run: +``` +./gradlew :airbyte-integrations:connectors:destination-scylla:build +``` + +#### Create credentials +**If you are a community contributor**, generate the necessary credentials and place them in `secrets/config.json` conforming to the spec file in `src/main/resources/spec.json`. +Note that the `secrets` directory is git-ignored by default, so there is no danger of accidentally checking in sensitive information. + +**If you are an Airbyte core member**, follow the [instructions](https://docs.airbyte.io/connector-development#using-credentials-in-ci) to set up the credentials. + +### Locally running the connector docker image + +#### Build +Build the connector image via Gradle: +``` +./gradlew :airbyte-integrations:connectors:destination-scylla:airbyteDocker +``` +When building via Gradle, the docker image name and tag, respectively, are the values of the `io.airbyte.name` and `io.airbyte.version` `LABEL`s in +the Dockerfile. + +#### Run +Then run any of the connector commands as follows: +``` +docker run --rm airbyte/destination-scylla:dev spec +docker run --rm -v $(pwd)/secrets:/secrets airbyte/destination-scylla:dev check --config /secrets/config.json +docker run --rm -v $(pwd)/secrets:/secrets airbyte/destination-scylla:dev discover --config /secrets/config.json +docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integration_tests airbyte/destination-scylla:dev read --config /secrets/config.json --catalog /integration_tests/configured_catalog.json +``` + +## Testing +We use `JUnit` for Java tests. + +### Unit and Integration Tests +Place unit tests under `src/test/io/airbyte/integrations/destinations/scylla`. + +#### Acceptance Tests +Airbyte has a standard test suite that all destination connectors must pass. Implement the `TODO`s in +`src/test-integration/java/io/airbyte/integrations/destinations/scyllaDestinationAcceptanceTest.java`. + +### Using gradle to run tests +All commands should be run from airbyte project root. +To run unit tests: +``` +./gradlew :airbyte-integrations:connectors:destination-scylla:unitTest +``` +To run acceptance and custom integration tests: +``` +./gradlew :airbyte-integrations:connectors:destination-scylla:integrationTest +``` + +## Dependency Management + +### Publishing a new version of the connector +You've checked out the repo, implemented a million dollar feature, and you're ready to share your changes with the world. Now what? +1. Make sure your changes are passing unit and integration tests. +1. Bump the connector version in `Dockerfile` -- just increment the value of the `LABEL io.airbyte.version` appropriately (we use [SemVer](https://semver.org/)). +1. Create a Pull Request. +1. Pat yourself on the back for being an awesome contributor. +1. Someone from Airbyte will take a look at your PR and iterate with you to merge it into master. diff --git a/airbyte-integrations/connectors/destination-scylla/bootstrap.md b/airbyte-integrations/connectors/destination-scylla/bootstrap.md new file mode 100644 index 000000000000..3a2e33fc5521 --- /dev/null +++ b/airbyte-integrations/connectors/destination-scylla/bootstrap.md @@ -0,0 +1,32 @@ +# Scylla Destination + +Scylla is an open-source distributed NoSQL wide-column data store designed to handle large amounts of data across many +commodity servers, providing high availability with no single point of failure. It is designed to be compatible with +Apache Cassandra while achieving significantly higher throughputs and lower latencies. It supports the same protocols as +Cassandra (CQL and Thrift) and the same file formats (SSTable) + +The data is structured in keyspaces and tables and is partitioned and replicated across different nodes in the +cluster. +[Read more about Scylla](https://www.scylladb.com/) + +This connector maps an incoming `stream` to a Scylla `table` and a `namespace` to a Scylla`keyspace`. +When using destination sync mode `append` and `append_dedup`, an `insert` operation is performed against an existing +Scylla table. +When using `overwrite`, the records are first placed in a temp table. When all the messages have been received the data +is copied to the final table which is first truncated and the temp table is deleted. + +The Implementation uses the [Scylla](https://github.com/scylladb/java-driver/) driver in order to access +Scylla. [ScyllaCqlProvider](./src/main/java/io/airbyte/integrations/destination/scylla/ScyllaCqlProvider.java) +handles the communication with the Scylla cluster and internally it uses +the [ScyllaSessionPool](./src/main/java/io/airbyte/integrations/destination/scylla/ScyllaSessionPool.java) to retrieve a +session to the cluster. + +The [ScyllaMessageConsumer](./src/main/java/io/airbyte/integrations/destination/scylla/ScyllaMessageConsumer.java) +class contains the logic for handling airbyte messages, events and copying data between tables. + +## Development + +See the [ScyllaCqlProvider](./src/main/java/io/airbyte/integrations/destination/scylla/ScyllaCqlProvider.java) +class on how to use the Scylla driver. + +[Scylla driver docs.](https://docs.scylladb.com/using-scylla/drivers/cql-drivers/scylla-java-driver/) \ No newline at end of file diff --git a/airbyte-integrations/connectors/destination-scylla/build.gradle b/airbyte-integrations/connectors/destination-scylla/build.gradle new file mode 100644 index 000000000000..c54a722aca80 --- /dev/null +++ b/airbyte-integrations/connectors/destination-scylla/build.gradle @@ -0,0 +1,32 @@ +plugins { + id 'application' + id 'airbyte-docker' + id 'airbyte-integration-test-java' +} + +application { + mainClass = 'io.airbyte.integrations.destination.scylla.ScyllaDestination' +} + +def scyllaDriver = '3.10.2-scylla-1' +def assertVersion = '3.21.0' +def testContainersVersion = '1.16.2' + +dependencies { + implementation project(':airbyte-config:models') + implementation project(':airbyte-protocol:models') + implementation project(':airbyte-integrations:bases:base-java') + implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs) + + implementation "com.scylladb:scylla-driver-core:${scyllaDriver}" + + // https://mvnrepository.com/artifact/org.assertj/assertj-core + testImplementation "org.assertj:assertj-core:${assertVersion}" + // https://mvnrepository.com/artifact/org.testcontainers/testcontainers + testImplementation "org.testcontainers:testcontainers:${testContainersVersion}" + + + + integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test') + integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-scylla') +} diff --git a/airbyte-integrations/connectors/destination-scylla/docker-compose.yml b/airbyte-integrations/connectors/destination-scylla/docker-compose.yml new file mode 100644 index 000000000000..ad8561826411 --- /dev/null +++ b/airbyte-integrations/connectors/destination-scylla/docker-compose.yml @@ -0,0 +1,20 @@ +version: '3' + +services: + scylla1: + image: scylladb/scylla + ports: + - "9042:9042" + container_name: scylla1 + command: --smp 1 + +# uncomment if you want to run a cluster of scylladb nodes +# scylla2: +# image: scylladb/scylla +# container_name: scylla2 +# command: --seeds=scylla1 +# +# scylla3: +# image: scylladb/scylla +# container_name: scylla3 +# command: --seeds=scylla1 \ No newline at end of file diff --git a/airbyte-integrations/connectors/destination-scylla/src/main/java/io/airbyte/integrations/destination/scylla/ScyllaConfig.java b/airbyte-integrations/connectors/destination-scylla/src/main/java/io/airbyte/integrations/destination/scylla/ScyllaConfig.java new file mode 100644 index 000000000000..7833e408ca33 --- /dev/null +++ b/airbyte-integrations/connectors/destination-scylla/src/main/java/io/airbyte/integrations/destination/scylla/ScyllaConfig.java @@ -0,0 +1,96 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.scylla; + +import com.fasterxml.jackson.databind.JsonNode; +import java.util.Objects; + +public class ScyllaConfig { + + private final String keyspace; + + private final String username; + + private final String password; + + private final String address; + + private final int port; + + private final int replication; + + public ScyllaConfig(String keyspace, String username, String password, String address, int port, int replication) { + this.keyspace = keyspace; + this.username = username; + this.password = password; + this.address = address; + this.port = port; + this.replication = replication; + } + + public ScyllaConfig(JsonNode jsonNode) { + this.keyspace = jsonNode.get("keyspace").asText(); + this.username = jsonNode.get("username").asText(); + this.password = jsonNode.get("password").asText(); + this.address = jsonNode.get("address").asText(); + this.port = jsonNode.get("port").asInt(9042); + this.replication = jsonNode.get("replication").asInt(1); + } + + public String getKeyspace() { + return keyspace; + } + + public String getUsername() { + return username; + } + + public String getPassword() { + return password; + } + + public String getAddress() { + return address; + } + + public int getPort() { + return port; + } + + public int getReplication() { + return replication; + } + + @Override + public String toString() { + return "ScyllaConfig{" + + "keyspace='" + keyspace + '\'' + + ", username='" + username + '\'' + + ", password='" + password + '\'' + + ", address='" + address + '\'' + + ", port=" + port + + ", replication=" + replication + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ScyllaConfig that = (ScyllaConfig) o; + return port == that.port && username.equals(that.username) && password.equals(that.password) && + address.equals(that.address); + } + + @Override + public int hashCode() { + return Objects.hash(username, password, address, port); + } + +} diff --git a/airbyte-integrations/connectors/destination-scylla/src/main/java/io/airbyte/integrations/destination/scylla/ScyllaCqlProvider.java b/airbyte-integrations/connectors/destination-scylla/src/main/java/io/airbyte/integrations/destination/scylla/ScyllaCqlProvider.java new file mode 100644 index 000000000000..5ec345daceba --- /dev/null +++ b/airbyte-integrations/connectors/destination-scylla/src/main/java/io/airbyte/integrations/destination/scylla/ScyllaCqlProvider.java @@ -0,0 +1,177 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.scylla; + +import com.datastax.driver.core.AbstractTableMetadata; +import com.datastax.driver.core.BatchStatement; +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.DataType; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.querybuilder.QueryBuilder; +import com.datastax.driver.core.schemabuilder.SchemaBuilder; +import com.datastax.driver.core.utils.UUIDs; +import io.airbyte.integrations.base.JavaBaseConstants; +import java.io.Closeable; +import java.time.Instant; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ScyllaCqlProvider implements Closeable { + + private static final Logger LOGGER = LoggerFactory.getLogger(ScyllaCqlProvider.class); + + private static final int N_THREADS = Runtime.getRuntime().availableProcessors(); + + private final ScyllaConfig scyllaConfig; + + private final Cluster cluster; + + private final Session session; + + private final ExecutorService executorService; + + private final String columnId; + + private final String columnData; + + private final String columnTimestamp; + + public ScyllaCqlProvider(ScyllaConfig scyllaConfig) { + this.scyllaConfig = scyllaConfig; + var sessionTuple = ScyllaSessionPool.initSession(scyllaConfig); + this.cluster = sessionTuple.value1(); + this.session = sessionTuple.value2(); + this.executorService = Executors.newFixedThreadPool(N_THREADS); + var nameTransformer = new ScyllaNameTransformer(scyllaConfig); + this.columnId = nameTransformer.outputColumn(JavaBaseConstants.COLUMN_NAME_AB_ID); + this.columnData = nameTransformer.outputColumn(JavaBaseConstants.COLUMN_NAME_DATA); + this.columnTimestamp = nameTransformer.outputColumn(JavaBaseConstants.COLUMN_NAME_EMITTED_AT); + } + + public void createKeyspaceIfNotExists(String keyspace) { + var createKeyspace = SchemaBuilder.createKeyspace(keyspace) + .ifNotExists() + .with() + .replication(Map.of( + "class", "SimpleStrategy", + "replication_factor", scyllaConfig.getReplication())) + .durableWrites(true); + session.execute(createKeyspace); + } + + public void createTableIfNotExists(String keyspace, String table) { + var createTable = SchemaBuilder.createTable(keyspace, table) + .ifNotExists() + .addPartitionKey(columnId, DataType.uuid()) + .addColumn(columnData, DataType.text()) + .addColumn(columnTimestamp, DataType.timestamp()); + session.execute(createTable); + } + + public void dropTableIfExists(String keyspace, String table) { + var drop = SchemaBuilder.dropTable(keyspace, table).ifExists(); + session.execute(drop); + } + + public void truncate(String keyspace, String table) { + var truncate = QueryBuilder.truncate(keyspace, table); + session.execute(truncate); + } + + public void insert(String keyspace, String table, String data) { + var insert = QueryBuilder.insertInto(keyspace, table) + .value(columnId, UUIDs.random()) + .value(columnData, data) + .value(columnTimestamp, Instant.now().toEpochMilli()); + session.execute(insert); + } + + public List> select(String keyspace, String table) { + var select = QueryBuilder.select().all().from(keyspace, table); + return session.execute(select).all().stream() + .map(r -> Triplet.of( + r.get(columnId, UUID.class), + r.get(columnData, String.class), + r.get(columnTimestamp, Date.class).toInstant())) + .collect(Collectors.toList()); + } + + public List>> metadata() { + return cluster.getMetadata().getKeyspaces().stream() + .map(keyspace -> Tuple.of(keyspace.getName(), keyspace.getTables().stream() + .map(AbstractTableMetadata::getName) + .collect(Collectors.toList()))) + .collect(Collectors.toList()); + } + + public void copy(String keyspace, String sourceTable, String destinationTable) { + + var select = String.format("SELECT * FROM %s.%s WHERE token(%s) > ? AND token(%s) <= ?", + keyspace, sourceTable, columnId, columnId); + + var selectStatement = session.prepare(select); + + var insert = String.format("INSERT INTO %s.%s (%s, %s, %s) VALUES (?, ?, ?)", + keyspace, destinationTable, columnId, columnData, columnTimestamp); + + var insertStatement = session.prepare(insert); + // insertStatement.setConsistencyLevel(ConsistencyLevel.ONE); + + // perform full table scan in parallel using token ranges + // optimal for copying large amounts of data + cluster.getMetadata().getTokenRanges().stream() + .flatMap(range -> range.unwrap().stream()) + .map(range -> selectStatement.bind(range.getStart(), range.getEnd())) + .map(selectBoundStatement -> executorService.submit(() -> batchInsert(selectBoundStatement, insertStatement))) + .forEach(this::awaitThread); + + } + + private void batchInsert(BoundStatement select, PreparedStatement insert) { + // unlogged removes the log record for increased insert speed + var batchStatement = new BatchStatement(BatchStatement.Type.UNLOGGED); + + session.execute(select).all().stream() + .map(r -> Triplet.of( + r.get(columnId, UUID.class), + r.get(columnData, String.class), + r.get(columnTimestamp, Date.class))) + .map(t -> insert.bind(t.value1(), t.value2(), t.value3())) + .forEach(batchStatement::add); + + session.execute(batchStatement); + } + + private void awaitThread(Future future) { + try { + future.get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOGGER.error("Interrupted thread while copying data: ", e); + } catch (ExecutionException e) { + LOGGER.error("Error while copying data: ", e); + } + } + + @Override + public void close() { + // gracefully shutdown executor service + executorService.shutdown(); + // close scylla session + ScyllaSessionPool.closeSession(scyllaConfig); + } + +} diff --git a/airbyte-integrations/connectors/destination-scylla/src/main/java/io/airbyte/integrations/destination/scylla/ScyllaDestination.java b/airbyte-integrations/connectors/destination-scylla/src/main/java/io/airbyte/integrations/destination/scylla/ScyllaDestination.java new file mode 100644 index 000000000000..1fd5fdd03059 --- /dev/null +++ b/airbyte-integrations/connectors/destination-scylla/src/main/java/io/airbyte/integrations/destination/scylla/ScyllaDestination.java @@ -0,0 +1,63 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.scylla; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.integrations.BaseConnector; +import io.airbyte.integrations.base.AirbyteMessageConsumer; +import io.airbyte.integrations.base.Destination; +import io.airbyte.integrations.base.IntegrationRunner; +import io.airbyte.protocol.models.AirbyteConnectionStatus; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import java.util.UUID; +import java.util.function.Consumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ScyllaDestination extends BaseConnector implements Destination { + + private static final Logger LOGGER = LoggerFactory.getLogger(ScyllaDestination.class); + + public static void main(String[] args) throws Exception { + new IntegrationRunner(new ScyllaDestination()).run(args); + } + + @Override + public AirbyteConnectionStatus check(JsonNode config) { + var scyllaConfig = new ScyllaConfig(config); + // add random uuid to avoid conflicts with existing tables. + String tableName = "table_" + UUID.randomUUID().toString().replace("-", ""); + ScyllaCqlProvider scyllaCqlProvider = null; + try { + scyllaCqlProvider = new ScyllaCqlProvider(scyllaConfig); + // check connection and write permissions + scyllaCqlProvider.createKeyspaceIfNotExists(scyllaConfig.getKeyspace()); + scyllaCqlProvider.createTableIfNotExists(scyllaConfig.getKeyspace(), tableName); + scyllaCqlProvider.insert(scyllaConfig.getKeyspace(), tableName, "{}"); + return new AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.SUCCEEDED); + } catch (Exception e) { + LOGGER.error("Can't establish Scylla connection with reason: ", e); + return new AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.FAILED); + } finally { + if (scyllaCqlProvider != null) { + try { + scyllaCqlProvider.dropTableIfExists(scyllaConfig.getKeyspace(), tableName); + } catch (Exception e) { + LOGGER.error("Error while deleting temp table {} with reason: ", tableName, e); + } + scyllaCqlProvider.close(); + } + } + } + + @Override + public AirbyteMessageConsumer getConsumer(JsonNode config, + ConfiguredAirbyteCatalog configuredCatalog, + Consumer outputRecordCollector) { + return new ScyllaMessageConsumer(new ScyllaConfig(config), configuredCatalog, outputRecordCollector); + } + +} diff --git a/airbyte-integrations/connectors/destination-scylla/src/main/java/io/airbyte/integrations/destination/scylla/ScyllaMessageConsumer.java b/airbyte-integrations/connectors/destination-scylla/src/main/java/io/airbyte/integrations/destination/scylla/ScyllaMessageConsumer.java new file mode 100644 index 000000000000..b6f83fb761af --- /dev/null +++ b/airbyte-integrations/connectors/destination-scylla/src/main/java/io/airbyte/integrations/destination/scylla/ScyllaMessageConsumer.java @@ -0,0 +1,108 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.scylla; + +import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import java.util.Map; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ScyllaMessageConsumer extends FailureTrackingAirbyteMessageConsumer { + + private static final Logger LOGGER = LoggerFactory.getLogger(ScyllaMessageConsumer.class); + + private final ScyllaConfig scyllaConfig; + + private final Consumer outputRecordCollector; + + private final Map scyllaStreams; + + private final ScyllaCqlProvider scyllaCqlProvider; + + private AirbyteMessage lastMessage = null; + + public ScyllaMessageConsumer(ScyllaConfig scyllaConfig, + ConfiguredAirbyteCatalog configuredCatalog, + Consumer outputRecordCollector) { + this.scyllaConfig = scyllaConfig; + this.outputRecordCollector = outputRecordCollector; + this.scyllaCqlProvider = new ScyllaCqlProvider(scyllaConfig); + var nameTransformer = new ScyllaNameTransformer(scyllaConfig); + this.scyllaStreams = configuredCatalog.getStreams().stream() + .collect(Collectors.toUnmodifiableMap( + AirbyteStreamNameNamespacePair::fromConfiguredAirbyteSteam, + k -> new ScyllaStreamConfig( + nameTransformer.outputKeyspace(k.getStream().getNamespace()), + nameTransformer.outputTable(k.getStream().getName()), + nameTransformer.outputTmpTable(k.getStream().getName()), + k.getDestinationSyncMode()))); + } + + @Override + protected void startTracked() { + scyllaStreams.forEach((k, v) -> { + scyllaCqlProvider.createKeyspaceIfNotExists(v.getKeyspace()); + scyllaCqlProvider.createTableIfNotExists(v.getKeyspace(), v.getTempTableName()); + }); + } + + @Override + protected void acceptTracked(AirbyteMessage message) { + if (message.getType() == AirbyteMessage.Type.RECORD) { + var messageRecord = message.getRecord(); + var streamConfig = + scyllaStreams.get(AirbyteStreamNameNamespacePair.fromRecordMessage(messageRecord)); + if (streamConfig == null) { + throw new IllegalArgumentException("Unrecognized destination stream"); + } + var data = Jsons.serialize(messageRecord.getData()); + scyllaCqlProvider.insert(streamConfig.getKeyspace(), streamConfig.getTempTableName(), data); + } else if (message.getType() == AirbyteMessage.Type.STATE) { + this.lastMessage = message; + } else { + LOGGER.warn("Unsupported airbyte message type: {}", message.getType()); + } + } + + @Override + protected void close(boolean hasFailed) { + if (!hasFailed) { + scyllaStreams.forEach((k, v) -> { + try { + scyllaCqlProvider.createTableIfNotExists(v.getKeyspace(), v.getTableName()); + switch (v.getDestinationSyncMode()) { + case APPEND -> { + scyllaCqlProvider.copy(v.getKeyspace(), v.getTempTableName(), v.getTableName()); + } + case OVERWRITE -> { + scyllaCqlProvider.truncate(v.getKeyspace(), v.getTableName()); + scyllaCqlProvider.copy(v.getKeyspace(), v.getTempTableName(), v.getTableName()); + } + default -> throw new UnsupportedOperationException("Unsupported destination sync mode"); + } + } catch (Exception e) { + LOGGER.error("Error while copying data to table {}: ", v.getTableName(), e); + } + }); + outputRecordCollector.accept(lastMessage); + } + + scyllaStreams.forEach((k, v) -> { + try { + scyllaCqlProvider.dropTableIfExists(v.getKeyspace(), v.getTempTableName()); + } catch (Exception e) { + LOGGER.error("Error while deleting temp table {} with reason: ", v.getTempTableName(), e); + } + }); + scyllaCqlProvider.close(); + } + +} diff --git a/airbyte-integrations/connectors/destination-scylla/src/main/java/io/airbyte/integrations/destination/scylla/ScyllaNameTransformer.java b/airbyte-integrations/connectors/destination-scylla/src/main/java/io/airbyte/integrations/destination/scylla/ScyllaNameTransformer.java new file mode 100644 index 000000000000..06e4611827df --- /dev/null +++ b/airbyte-integrations/connectors/destination-scylla/src/main/java/io/airbyte/integrations/destination/scylla/ScyllaNameTransformer.java @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.scylla; + +import com.google.common.base.CharMatcher; +import io.airbyte.commons.text.Names; +import io.airbyte.integrations.destination.StandardNameTransformer; + +class ScyllaNameTransformer extends StandardNameTransformer { + + private final ScyllaConfig scyllaConfig; + + public ScyllaNameTransformer(ScyllaConfig scyllaConfig) { + this.scyllaConfig = scyllaConfig; + } + + String outputKeyspace(String namespace) { + if (namespace == null || namespace.isBlank()) { + return scyllaConfig.getKeyspace(); + } + return CharMatcher.is('_').trimLeadingFrom(Names.toAlphanumericAndUnderscore(namespace)); + } + + String outputTable(String streamName) { + var tableName = super.getRawTableName(streamName.toLowerCase()).substring(1); + // max allowed length for a scylla table is 48 characters + return tableName.length() > 48 ? tableName.substring(0, 48) : tableName; + } + + String outputTmpTable(String streamName) { + var tableName = super.getTmpTableName(streamName.toLowerCase()).substring(1); + // max allowed length for a scylla table is 48 characters + return tableName.length() > 48 ? tableName.substring(0, 48) : tableName; + } + + String outputColumn(String columnName) { + return Names.doubleQuote(columnName.toLowerCase()); + } + +} diff --git a/airbyte-integrations/connectors/destination-scylla/src/main/java/io/airbyte/integrations/destination/scylla/ScyllaSessionPool.java b/airbyte-integrations/connectors/destination-scylla/src/main/java/io/airbyte/integrations/destination/scylla/ScyllaSessionPool.java new file mode 100644 index 000000000000..fa82736872aa --- /dev/null +++ b/airbyte-integrations/connectors/destination-scylla/src/main/java/io/airbyte/integrations/destination/scylla/ScyllaSessionPool.java @@ -0,0 +1,54 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.scylla; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Session; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +class ScyllaSessionPool { + + private static final ConcurrentHashMap> sessions; + + static { + sessions = new ConcurrentHashMap<>(); + } + + private ScyllaSessionPool() { + + } + + static Tuple initSession(ScyllaConfig scyllaConfig) { + var cachedSession = sessions.get(scyllaConfig); + if (cachedSession != null) { + cachedSession.value3().incrementAndGet(); + return Tuple.of(cachedSession.value1(), cachedSession.value2()); + } else { + var cluster = Cluster.builder() + .addContactPoint(scyllaConfig.getAddress()) + .withPort(scyllaConfig.getPort()) + .withCredentials(scyllaConfig.getUsername(), scyllaConfig.getPassword()) + .build(); + var session = cluster.connect(); + sessions.put(scyllaConfig, Triplet.of(cluster, session, new AtomicInteger(1))); + return Tuple.of(cluster, session); + } + } + + static void closeSession(ScyllaConfig scyllaConfig) { + var session = sessions.get(scyllaConfig); + if (session == null) { + throw new IllegalStateException("No session for the provided config"); + } + int usage = session.value3().decrementAndGet(); + if (usage < 1) { + session.value2().close(); + session.value1().close(); + sessions.remove(scyllaConfig); + } + } + +} diff --git a/airbyte-integrations/connectors/destination-scylla/src/main/java/io/airbyte/integrations/destination/scylla/ScyllaStreamConfig.java b/airbyte-integrations/connectors/destination-scylla/src/main/java/io/airbyte/integrations/destination/scylla/ScyllaStreamConfig.java new file mode 100644 index 000000000000..37004d4e9b60 --- /dev/null +++ b/airbyte-integrations/connectors/destination-scylla/src/main/java/io/airbyte/integrations/destination/scylla/ScyllaStreamConfig.java @@ -0,0 +1,58 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.scylla; + +import io.airbyte.protocol.models.DestinationSyncMode; + +/* + * Immutable configuration class for storing destination stream config. + */ +class ScyllaStreamConfig { + + private final String keyspace; + + private final String tableName; + + private final String tempTableName; + + private final DestinationSyncMode destinationSyncMode; + + public ScyllaStreamConfig(String keyspace, + String tableName, + String tempTableName, + DestinationSyncMode destinationSyncMode) { + this.keyspace = keyspace; + this.tableName = tableName; + this.tempTableName = tempTableName; + this.destinationSyncMode = destinationSyncMode; + } + + public String getKeyspace() { + return keyspace; + } + + public String getTableName() { + return tableName; + } + + public String getTempTableName() { + return tempTableName; + } + + public DestinationSyncMode getDestinationSyncMode() { + return destinationSyncMode; + } + + @Override + public String toString() { + return "ScyllaStreamConfig{" + + "keyspace='" + keyspace + '\'' + + ", tableName='" + tableName + '\'' + + ", tempTableName='" + tempTableName + '\'' + + ", destinationSyncMode=" + destinationSyncMode + + '}'; + } + +} diff --git a/airbyte-integrations/connectors/destination-scylla/src/main/java/io/airbyte/integrations/destination/scylla/Triplet.java b/airbyte-integrations/connectors/destination-scylla/src/main/java/io/airbyte/integrations/destination/scylla/Triplet.java new file mode 100644 index 000000000000..0735a7502aa4 --- /dev/null +++ b/airbyte-integrations/connectors/destination-scylla/src/main/java/io/airbyte/integrations/destination/scylla/Triplet.java @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.scylla; + +public class Triplet { + + private final V1 value1; + + private final V2 value2; + + private final V3 value3; + + public Triplet(V1 value1, V2 value2, V3 value3) { + this.value1 = value1; + this.value2 = value2; + this.value3 = value3; + } + + public static Triplet of(V1 value1, V2 value2, V3 value3) { + return new Triplet<>(value1, value2, value3); + } + + public V1 value1() { + return value1; + } + + public V2 value2() { + return value2; + } + + public V3 value3() { + return value3; + } + + @Override + public String toString() { + return "Triplet{" + + "value1=" + value1 + + ", value2=" + value2 + + ", value3=" + value3 + + '}'; + } + +} diff --git a/airbyte-integrations/connectors/destination-scylla/src/main/java/io/airbyte/integrations/destination/scylla/Tuple.java b/airbyte-integrations/connectors/destination-scylla/src/main/java/io/airbyte/integrations/destination/scylla/Tuple.java new file mode 100644 index 000000000000..0b1ee0177ad2 --- /dev/null +++ b/airbyte-integrations/connectors/destination-scylla/src/main/java/io/airbyte/integrations/destination/scylla/Tuple.java @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.scylla; + +public class Tuple { + + private final V1 value1; + + private final V2 value2; + + public Tuple(V1 value1, V2 value2) { + this.value1 = value1; + this.value2 = value2; + } + + public static Tuple of(V1 value1, V2 value2) { + return new Tuple<>(value1, value2); + } + + public V1 value1() { + return value1; + } + + public V2 value2() { + return value2; + } + + @Override + public String toString() { + return "Tuple{" + + "value1=" + value1 + + ", value2=" + value2 + + '}'; + } + +} diff --git a/airbyte-integrations/connectors/destination-scylla/src/main/resources/spec.json b/airbyte-integrations/connectors/destination-scylla/src/main/resources/spec.json new file mode 100644 index 000000000000..6fbed67d0478 --- /dev/null +++ b/airbyte-integrations/connectors/destination-scylla/src/main/resources/spec.json @@ -0,0 +1,57 @@ +{ + "documentationUrl": "https://docs.airbyte.io/integrations/destinations/scylla", + "supportsIncremental": true, + "supportsNormalization": false, + "supportsDBT": false, + "supported_destination_sync_modes": ["overwrite", "append"], + "connectionSpecification": { + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Scylla Destination Spec", + "type": "object", + "required": ["keyspace", "username", "password", "address", "port"], + "additionalProperties": true, + "properties": { + "keyspace": { + "title": "Keyspace", + "description": "Default Scylla keyspace to create data in.", + "type": "string", + "order": 0 + }, + "username": { + "title": "Username", + "description": "Username to use to access Scylla.", + "type": "string", + "order": 1 + }, + "password": { + "title": "Password", + "description": "Password associated with Scylla.", + "type": "string", + "airbyte_secret": true, + "order": 2 + }, + "address": { + "title": "Address", + "description": "Address to connect to.", + "type": "string", + "order": 3 + }, + "port": { + "title": "Port", + "description": "Port of Scylla.", + "type": "integer", + "minimum": 0, + "maximum": 65536, + "default": 9042, + "order": 4 + }, + "replication": { + "title": "Replication factor", + "type": "integer", + "description": "Indicates to how many nodes the data should be replicated to.", + "default": 1, + "order": 5 + } + } + } +} diff --git a/airbyte-integrations/connectors/destination-scylla/src/test-integration/java/io/airbyte/integrations/destination/scylla/ScyllaContainerInitializr.java b/airbyte-integrations/connectors/destination-scylla/src/test-integration/java/io/airbyte/integrations/destination/scylla/ScyllaContainerInitializr.java new file mode 100644 index 000000000000..8313b56309c4 --- /dev/null +++ b/airbyte-integrations/connectors/destination-scylla/src/test-integration/java/io/airbyte/integrations/destination/scylla/ScyllaContainerInitializr.java @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.scylla; + +import org.testcontainers.containers.GenericContainer; + +class ScyllaContainerInitializr { + + private static ScyllaContainer scyllaContainer; + + private ScyllaContainerInitializr() { + + } + + public static ScyllaContainer initContainer() { + if (scyllaContainer == null) { + scyllaContainer = new ScyllaContainer() + .withExposedPorts(9042) + // single cpu core cluster + .withCommand("--smp 1"); + } + scyllaContainer.start(); + return scyllaContainer; + } + + static class ScyllaContainer extends GenericContainer { + + public ScyllaContainer() { + super("scylladb/scylla:4.5.0"); + } + + } + +} diff --git a/airbyte-integrations/connectors/destination-scylla/src/test-integration/java/io/airbyte/integrations/destination/scylla/ScyllaCqlProviderTest.java b/airbyte-integrations/connectors/destination-scylla/src/test-integration/java/io/airbyte/integrations/destination/scylla/ScyllaCqlProviderTest.java new file mode 100644 index 000000000000..29f2e7c513ee --- /dev/null +++ b/airbyte-integrations/connectors/destination-scylla/src/test-integration/java/io/airbyte/integrations/destination/scylla/ScyllaCqlProviderTest.java @@ -0,0 +1,133 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.scylla; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import com.datastax.driver.core.exceptions.InvalidQueryException; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; + +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +class ScyllaCqlProviderTest { + + private static final String SCYLLA_KEYSPACE = "scylla_keyspace"; + + private static final String SCYLLA_TABLE = "scylla_table"; + + private ScyllaCqlProvider scyllaCqlProvider; + + private ScyllaNameTransformer nameTransformer; + + @BeforeAll + void setup() { + var scyllaContainer = ScyllaContainerInitializr.initContainer(); + var scyllaConfig = TestDataFactory.scyllaConfig( + scyllaContainer.getHost(), + scyllaContainer.getFirstMappedPort()); + this.scyllaCqlProvider = new ScyllaCqlProvider(scyllaConfig); + this.nameTransformer = new ScyllaNameTransformer(scyllaConfig); + this.scyllaCqlProvider.createKeyspaceIfNotExists(SCYLLA_KEYSPACE); + this.scyllaCqlProvider.createTableIfNotExists(SCYLLA_KEYSPACE, SCYLLA_TABLE); + } + + @AfterEach + void clean() { + scyllaCqlProvider.truncate(SCYLLA_KEYSPACE, SCYLLA_TABLE); + } + + @Test + void testCreateKeySpaceIfNotExists() { + String keyspace = nameTransformer.outputKeyspace("test_keyspace"); + assertDoesNotThrow(() -> scyllaCqlProvider.createKeyspaceIfNotExists(keyspace)); + } + + @Test + void testCreateTableIfNotExists() { + String table = nameTransformer.outputTable("test_stream"); + assertDoesNotThrow(() -> scyllaCqlProvider.createTableIfNotExists(SCYLLA_KEYSPACE, table)); + } + + @Test + void testInsert() { + // given + scyllaCqlProvider.insert(SCYLLA_KEYSPACE, SCYLLA_TABLE, "{\"property\":\"data1\"}"); + scyllaCqlProvider.insert(SCYLLA_KEYSPACE, SCYLLA_TABLE, "{\"property\":\"data2\"}"); + scyllaCqlProvider.insert(SCYLLA_KEYSPACE, SCYLLA_TABLE, "{\"property\":\"data3\"}"); + + // when + var resultSet = scyllaCqlProvider.select(SCYLLA_KEYSPACE, SCYLLA_TABLE); + + // then + assertThat(resultSet) + .isNotNull() + .hasSize(3) + .anyMatch(r -> r.value2().equals("{\"property\":\"data1\"}")) + .anyMatch(r -> r.value2().equals("{\"property\":\"data2\"}")) + .anyMatch(r -> r.value2().equals("{\"property\":\"data3\"}")); + + } + + @Test + void testTruncate() { + // given + scyllaCqlProvider.insert(SCYLLA_KEYSPACE, SCYLLA_TABLE, "{\"property\":\"data1\"}"); + scyllaCqlProvider.insert(SCYLLA_KEYSPACE, SCYLLA_TABLE, "{\"property\":\"data2\"}"); + scyllaCqlProvider.insert(SCYLLA_KEYSPACE, SCYLLA_TABLE, "{\"property\":\"data3\"}"); + + // when + scyllaCqlProvider.truncate(SCYLLA_KEYSPACE, SCYLLA_TABLE); + var resultSet = scyllaCqlProvider.select(SCYLLA_KEYSPACE, SCYLLA_TABLE); + + // then + assertThat(resultSet) + .isNotNull() + .isEmpty(); + } + + @Test + void testDropTableIfExists() { + // given + String table = nameTransformer.outputTmpTable("test_stream"); + scyllaCqlProvider.createTableIfNotExists(SCYLLA_KEYSPACE, table); + + // when + scyllaCqlProvider.dropTableIfExists(SCYLLA_KEYSPACE, table); + + // then + assertThrows(InvalidQueryException.class, () -> scyllaCqlProvider.select(SCYLLA_KEYSPACE, table)); + } + + @Test + void testCopy() { + // given + String tmpTable = nameTransformer.outputTmpTable("test_stream_copy"); + scyllaCqlProvider.createTableIfNotExists(SCYLLA_KEYSPACE, tmpTable); + scyllaCqlProvider.insert(SCYLLA_KEYSPACE, tmpTable, "{\"property\":\"data1\"}"); + scyllaCqlProvider.insert(SCYLLA_KEYSPACE, tmpTable, "{\"property\":\"data2\"}"); + scyllaCqlProvider.insert(SCYLLA_KEYSPACE, tmpTable, "{\"property\":\"data3\"}"); + + String rawTable = nameTransformer.outputTable("test_stream_copy"); + scyllaCqlProvider.createTableIfNotExists(SCYLLA_KEYSPACE, rawTable); + + // when + scyllaCqlProvider.copy(SCYLLA_KEYSPACE, tmpTable, rawTable); + var resultSet = scyllaCqlProvider.select(SCYLLA_KEYSPACE, rawTable); + + // then + assertThat(resultSet) + .isNotNull() + .hasSize(3) + .anyMatch(r -> r.value2().equals("{\"property\":\"data1\"}")) + .anyMatch(r -> r.value2().equals("{\"property\":\"data2\"}")) + .anyMatch(r -> r.value2().equals("{\"property\":\"data3\"}")); + + } + +} diff --git a/airbyte-integrations/connectors/destination-scylla/src/test-integration/java/io/airbyte/integrations/destination/scylla/ScyllaDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-scylla/src/test-integration/java/io/airbyte/integrations/destination/scylla/ScyllaDestinationAcceptanceTest.java new file mode 100644 index 000000000000..da269515e2a9 --- /dev/null +++ b/airbyte-integrations/connectors/destination-scylla/src/test-integration/java/io/airbyte/integrations/destination/scylla/ScyllaDestinationAcceptanceTest.java @@ -0,0 +1,89 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.scylla; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.destination.scylla.ScyllaContainerInitializr.ScyllaContainer; +import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest; +import java.util.Comparator; +import java.util.List; +import java.util.stream.Collectors; +import org.junit.jupiter.api.BeforeAll; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class ScyllaDestinationAcceptanceTest extends DestinationAcceptanceTest { + + private static final Logger LOGGER = LoggerFactory.getLogger(ScyllaDestinationAcceptanceTest.class); + + private JsonNode configJson; + + private ScyllaCqlProvider scyllaCqlProvider; + + private ScyllaNameTransformer nameTransformer; + + private static ScyllaContainer scyllaContainer; + + @Override + protected String getImageName() { + return "airbyte/destination-scylla:dev"; + } + + @BeforeAll + static void initContainer() { + scyllaContainer = ScyllaContainerInitializr.initContainer(); + } + + @Override + protected void setup(TestDestinationEnv testEnv) { + configJson = TestDataFactory.jsonConfig( + scyllaContainer.getHost(), + scyllaContainer.getFirstMappedPort()); + var scyllaConfig = new ScyllaConfig(configJson); + this.scyllaCqlProvider = new ScyllaCqlProvider(scyllaConfig); + this.nameTransformer = new ScyllaNameTransformer(scyllaConfig); + } + + @Override + protected void tearDown(TestDestinationEnv testEnv) { + scyllaCqlProvider.metadata().stream() + .filter(m -> !m.value1().startsWith("system")) + .forEach(meta -> { + var keyspace = meta.value1(); + meta.value2().forEach(table -> scyllaCqlProvider.truncate(keyspace, table)); + }); + } + + @Override + protected JsonNode getConfig() { + return configJson; + } + + @Override + protected JsonNode getFailCheckConfig() { + return TestDataFactory.jsonConfig("127.129.0.1", 8080); + } + + @Override + protected boolean implementsNamespaces() { + return true; + } + + @Override + protected List retrieveRecords(TestDestinationEnv testEnv, + String streamName, + String namespace, + JsonNode streamSchema) { + var keyspace = nameTransformer.outputKeyspace(namespace); + var table = nameTransformer.outputTable(streamName); + return scyllaCqlProvider.select(keyspace, table).stream() + .sorted(Comparator.comparing(Triplet::value3)) + .map(Triplet::value2) + .map(Jsons::deserialize) + .collect(Collectors.toList()); + } + +} diff --git a/airbyte-integrations/connectors/destination-scylla/src/test-integration/java/io/airbyte/integrations/destination/scylla/ScyllaDestinationTest.java b/airbyte-integrations/connectors/destination-scylla/src/test-integration/java/io/airbyte/integrations/destination/scylla/ScyllaDestinationTest.java new file mode 100644 index 000000000000..9a55f43a29c6 --- /dev/null +++ b/airbyte-integrations/connectors/destination-scylla/src/test-integration/java/io/airbyte/integrations/destination/scylla/ScyllaDestinationTest.java @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.scylla; + +import static org.assertj.core.api.Assertions.assertThat; + +import io.airbyte.integrations.destination.scylla.ScyllaContainerInitializr.ScyllaContainer; +import io.airbyte.protocol.models.AirbyteConnectionStatus; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; + +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +class ScyllaDestinationTest { + + private ScyllaDestination scyllaDestination; + + private ScyllaContainer scyllaContainer; + + @BeforeAll + void setup() { + this.scyllaContainer = ScyllaContainerInitializr.initContainer(); + this.scyllaDestination = new ScyllaDestination(); + } + + @Test + void testCheckWithStatusSucceeded() { + + var jsonConfiguration = TestDataFactory.jsonConfig( + scyllaContainer.getHost(), + scyllaContainer.getFirstMappedPort()); + + var connectionStatus = scyllaDestination.check(jsonConfiguration); + + assertThat(connectionStatus.getStatus()).isEqualTo(AirbyteConnectionStatus.Status.SUCCEEDED); + } + + @Test + void testCheckWithStatusFailed() { + + var jsonConfiguration = TestDataFactory.jsonConfig("192.0.2.1", 8080); + + var connectionStatus = scyllaDestination.check(jsonConfiguration); + + assertThat(connectionStatus.getStatus()).isEqualTo(AirbyteConnectionStatus.Status.FAILED); + + } + +} diff --git a/airbyte-integrations/connectors/destination-scylla/src/test-integration/java/io/airbyte/integrations/destination/scylla/TestDataFactory.java b/airbyte-integrations/connectors/destination-scylla/src/test-integration/java/io/airbyte/integrations/destination/scylla/TestDataFactory.java new file mode 100644 index 000000000000..4c26abda813f --- /dev/null +++ b/airbyte-integrations/connectors/destination-scylla/src/test-integration/java/io/airbyte/integrations/destination/scylla/TestDataFactory.java @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.scylla; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.ImmutableMap; +import io.airbyte.commons.json.Jsons; + +class TestDataFactory { + + private TestDataFactory() { + + } + + static ScyllaConfig scyllaConfig(String address, int port) { + return new ScyllaConfig( + "default_keyspace", + "usr", + "pw", + address, + port, + 2); + } + + static JsonNode jsonConfig(String address, int port) { + return Jsons.jsonNode(ImmutableMap.builder() + .put("keyspace", "default_keyspace") + .put("username", "usr") + .put("password", "pw") + .put("address", address) + .put("port", port) + .put("replication", 2) + .build()); + } + +} diff --git a/airbyte-integrations/connectors/destination-scylla/src/test/java/io/airbyte/integrations/destination/scylla/ScyllaConfigTest.java b/airbyte-integrations/connectors/destination-scylla/src/test/java/io/airbyte/integrations/destination/scylla/ScyllaConfigTest.java new file mode 100644 index 000000000000..98328e5a1411 --- /dev/null +++ b/airbyte-integrations/connectors/destination-scylla/src/test/java/io/airbyte/integrations/destination/scylla/ScyllaConfigTest.java @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.scylla; + +import static org.assertj.core.api.Assertions.assertThat; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class ScyllaConfigTest { + + private ScyllaConfig scyllaConfig; + + @BeforeEach + void setup() { + var jsonNode = TestDataFactory.jsonConfig("127.0.0.1", 9042); + this.scyllaConfig = new ScyllaConfig(jsonNode); + } + + @Test + void testConfig() { + + assertThat(scyllaConfig) + .hasFieldOrPropertyWithValue("keyspace", "default_keyspace") + .hasFieldOrPropertyWithValue("username", "usr") + .hasFieldOrPropertyWithValue("password", "pw") + .hasFieldOrPropertyWithValue("address", "127.0.0.1") + .hasFieldOrPropertyWithValue("port", 9042) + .hasFieldOrPropertyWithValue("replication", 2); + + } + +} diff --git a/airbyte-integrations/connectors/destination-scylla/src/test/java/io/airbyte/integrations/destination/scylla/ScyllaNameTransformerTest.java b/airbyte-integrations/connectors/destination-scylla/src/test/java/io/airbyte/integrations/destination/scylla/ScyllaNameTransformerTest.java new file mode 100644 index 000000000000..5c1f82fbfcef --- /dev/null +++ b/airbyte-integrations/connectors/destination-scylla/src/test/java/io/airbyte/integrations/destination/scylla/ScyllaNameTransformerTest.java @@ -0,0 +1,60 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.scylla; + +import static org.assertj.core.api.Assertions.assertThat; + +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; + +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +class ScyllaNameTransformerTest { + + private ScyllaNameTransformer scyllaNameTransformer; + + @BeforeAll + void setup() { + var scyllaConfig = TestDataFactory.scyllaConfig("127.0.0.1", 9042); + this.scyllaNameTransformer = new ScyllaNameTransformer(scyllaConfig); + } + + @Test + void testOutputTable() { + + var table = scyllaNameTransformer.outputTable("stream_name"); + + assertThat(table).matches("airbyte_raw_stream_name"); + + } + + @Test + void testOutputTmpTable() { + + var table = scyllaNameTransformer.outputTmpTable("stream_name"); + + assertThat(table).matches("airbyte_tmp_+[a-z]+_stream_name"); + + } + + @Test + void testOutputKeyspace() { + + var keyspace = scyllaNameTransformer.outputKeyspace("***keyspace^h"); + + assertThat(keyspace).matches("keyspace_h"); + + } + + @Test + void outputColumn() { + + var column = scyllaNameTransformer.outputColumn("_airbyte_data"); + + assertThat(column).matches("\"_airbyte_data\""); + + } + +} diff --git a/airbyte-integrations/connectors/destination-scylla/src/test/java/io/airbyte/integrations/destination/scylla/TestDataFactory.java b/airbyte-integrations/connectors/destination-scylla/src/test/java/io/airbyte/integrations/destination/scylla/TestDataFactory.java new file mode 100644 index 000000000000..4378545da4bb --- /dev/null +++ b/airbyte-integrations/connectors/destination-scylla/src/test/java/io/airbyte/integrations/destination/scylla/TestDataFactory.java @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.scylla; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.ImmutableMap; +import io.airbyte.commons.json.Jsons; + +class TestDataFactory { + + static JsonNode jsonConfig(String address, int port) { + return Jsons.jsonNode(ImmutableMap.builder() + .put("keyspace", "default_keyspace") + .put("username", "usr") + .put("password", "pw") + .put("address", address) + .put("port", port) + .put("replication", 2) + .build()); + } + + static ScyllaConfig scyllaConfig(String address, int port) { + return new ScyllaConfig( + "default_keyspace", + "usr", + "pw", + address, + port, + 2); + } + +} diff --git a/docs/SUMMARY.md b/docs/SUMMARY.md index 3101c010dad2..27d9d09b9fec 100644 --- a/docs/SUMMARY.md +++ b/docs/SUMMARY.md @@ -166,7 +166,11 @@ * [Redshift](integrations/destinations/redshift.md) * [S3](integrations/destinations/s3.md) * [Snowflake](integrations/destinations/snowflake.md) +<<<<<<< HEAD * [Cassandra](integrations/destinations/cassandra.md) +======= + * [Scylla](integrations/destinations/scylla.md) +>>>>>>> b92e2f803bb0610d05681ac0c4aec8e8fdee6e42 * [Custom or New Connector](integrations/custom-connectors.md) * [Connector Development](connector-development/README.md) * [Tutorials](connector-development/tutorials/README.md) diff --git a/docs/integrations/README.md b/docs/integrations/README.md index 4a0782d6b0b4..290a99fba06f 100644 --- a/docs/integrations/README.md +++ b/docs/integrations/README.md @@ -150,5 +150,9 @@ Airbyte uses a grading system for connectors to help users understand what to ex | [S3](destinations/s3.md) | Certified | | [SQL Server \(MSSQL\)](destinations/mssql.md) | Alpha | | [Snowflake](destinations/snowflake.md) | Certified | +<<<<<<< HEAD | [Cassandra](destinations/cassandra.md) | Alpha | +======= +| [Scylla](destinations/scylla.md) | Alpha | +>>>>>>> b92e2f803bb0610d05681ac0c4aec8e8fdee6e42 diff --git a/docs/integrations/destinations/scylla.md b/docs/integrations/destinations/scylla.md new file mode 100644 index 000000000000..14ae8435ec0b --- /dev/null +++ b/docs/integrations/destinations/scylla.md @@ -0,0 +1,46 @@ +# Scylla + +## Sync overview + +### Output schema + +The incoming airbyte data is structured in keyspaces and tables and is partitioned and replicated across different nodes +in the cluster. This connector maps an incoming `stream` to a Scylla `table` and a `namespace` to a Scylla`keyspace`. +Fields in the airbyte message become different columns in the Scylla tables. Each table will contain the following +columns. + +* `_airbyte_ab_id`: A random uuid generated to be used as a partition key. +* `_airbyte_emitted_at`: a timestamp representing when the event was received from the data source. +* `_airbyte_data`: a json text representing the extracted data. + +### Features + +| Feature | Support | Notes | +| :--- | :---: | :--- | +| Full Refresh Sync | ✅ | Warning: this mode deletes all previously synced data in the configured DynamoDB table. | +| Incremental - Append Sync | ✅ | | +| Incremental - Deduped History | ❌ | As this connector does not support dbt, we don't support this sync mode on this destination. | +| Namespaces | ✅ | Namespace will be used as part of the table name. | + +### Performance considerations + +Scylla is highly performant and is designed to handle large amounts of data by using different nodes in the cluster in +order to perform write operations. As long as you have enough nodes in your cluster the database can scale infinitely +and handle any amount of data from the connector. + +## Getting started + +### Requirements + +* Driver compatibility: NA +* Configuration + * Keyspace [default keyspace to use when writing data] + * Username [authentication username] + * Password [authentication password] + * Address [cluster address] + * Port [default: 9042] + * Replication [optional] [default: 1] + +### Setup guide + +###### TODO: more info, screenshots?, etc... \ No newline at end of file