From 4c0de230484ec1d70efd7fa141cbaa4d498da5c4 Mon Sep 17 00:00:00 2001
From: cgardens
Date: Thu, 8 Oct 2020 21:07:56 -0700
Subject: [PATCH 1/6] wip, sync and spec work
---
.../csv/CsvDestinationIntegrationTest.java | 1 +
.../integrations/base/TestDestination.java | 6 +-
.../java-template-destination/readme.md | 4 +-
.../postgres-destination/.dockerignore | 3 +
.../postgres-destination/Dockerfile | 10 ++
.../postgres-destination/build.gradle | 69 +++++++++
.../postgres/PostgresDestination.java | 122 ++++++++++++++++
.../src/main/resources/spec.json | 44 ++++++
.../postgres/PostgresIntegrationTest.java | 99 +++++++++++++
.../postgres/PostgresDestinationTest.java | 133 ++++++++++++++++++
10 files changed, 486 insertions(+), 5 deletions(-)
create mode 100644 airbyte-integrations/postgres-destination/.dockerignore
create mode 100644 airbyte-integrations/postgres-destination/Dockerfile
create mode 100644 airbyte-integrations/postgres-destination/build.gradle
create mode 100644 airbyte-integrations/postgres-destination/src/main/java/io/airbyte/integrations/destination/postgres/PostgresDestination.java
create mode 100644 airbyte-integrations/postgres-destination/src/main/resources/spec.json
create mode 100644 airbyte-integrations/postgres-destination/src/test-integration/java/io/airbyte/integrations/destination/postgres/PostgresIntegrationTest.java
create mode 100644 airbyte-integrations/postgres-destination/src/test/java/io/airbyte/integrations/destination/postgres/PostgresDestinationTest.java
diff --git a/airbyte-integrations/csv-destination/src/test-integration/java/io/airbyte/integrations/destination/csv/CsvDestinationIntegrationTest.java b/airbyte-integrations/csv-destination/src/test-integration/java/io/airbyte/integrations/destination/csv/CsvDestinationIntegrationTest.java
index 6db99f23421d..3475fe4fd051 100644
--- a/airbyte-integrations/csv-destination/src/test-integration/java/io/airbyte/integrations/destination/csv/CsvDestinationIntegrationTest.java
+++ b/airbyte-integrations/csv-destination/src/test-integration/java/io/airbyte/integrations/destination/csv/CsvDestinationIntegrationTest.java
@@ -77,6 +77,7 @@ protected List recordRetriever(TestDestinationEnv testEnv) throws Exce
@Override
protected void setup(TestDestinationEnv testEnv) throws Exception {
+
// no op
}
diff --git a/airbyte-integrations/integration-test-lib/src/main/java/io/airbyte/integrations/base/TestDestination.java b/airbyte-integrations/integration-test-lib/src/main/java/io/airbyte/integrations/base/TestDestination.java
index 689d428ac6e1..badc76caf92c 100644
--- a/airbyte-integrations/integration-test-lib/src/main/java/io/airbyte/integrations/base/TestDestination.java
+++ b/airbyte-integrations/integration-test-lib/src/main/java/io/airbyte/integrations/base/TestDestination.java
@@ -100,7 +100,7 @@ public abstract class TestDestination {
* @return All of the records in the destination at the time this method is invoked.
* @throws Exception - can throw any exception, test framework will handle.
*/
- protected abstract List recordRetriever(TestDestinationEnv testEnv) throws Exception;
+ protected abstract List recordRetriever(TestDestinationEnv testEnv, String streamName) throws Exception;
/**
* Function that performs any setup of external resources required for the test. e.g. instantiate a
@@ -182,7 +182,7 @@ void testSync(String messagesFilename, String catalogFilename) throws Exception
.map(record -> Jsons.deserialize(record, SingerMessage.class)).collect(Collectors.toList());
runSync(messages, catalog);
- assertSameMessages(messages, recordRetriever(testEnv));
+ assertSameMessages(messages, recordRetriever(testEnv, catalog.getStreams().get(0).getName()));
}
/**
@@ -202,7 +202,7 @@ void testSecondSync() throws Exception {
.put("HKD", 10)
.put("NZD", 700)));
runSync(secondSyncMessages, catalog);
- assertSameMessages(secondSyncMessages, recordRetriever(testEnv));
+ assertSameMessages(secondSyncMessages, recordRetriever(testEnv, catalog.getStreams().get(0).getName()));
}
private void runSync(List messages, Schema catalog) throws IOException, WorkerException {
diff --git a/airbyte-integrations/java-template-destination/readme.md b/airbyte-integrations/java-template-destination/readme.md
index f78342521e3f..cd81c0c3b518 100644
--- a/airbyte-integrations/java-template-destination/readme.md
+++ b/airbyte-integrations/java-template-destination/readme.md
@@ -5,8 +5,8 @@
1. e.g.
```
mkdir -p airbyte-integrations/bigquery-destination/src/main/java/io/airbyte/integrations/destination/bigquery
- mv airbyte-integrations/java-template-destination/src/main/java/io/airbyte/integrations/destination/template/DestinationTemplate.java airbyte-integrations/bigquery-destination/src/main/java/io/airbyte/integrations/destination/bigquery/DestinationTemplate.java
- rm -r airbyte-integrations/java-template-destination/src/main/java/io/airbyte/integrations/destination/template
+ mv airbyte-integrations/bigquery-destination/src/main/java/io/airbyte/integrations/destination/template/DestinationTemplate.java airbyte-integrations/bigquery-destination/src/main/java/io/airbyte/integrations/destination/bigquery/DestinationTemplate.java
+ rm -r airbyte-integrations/bigquery-destination/src/main/java/io/airbyte/integrations/destination/template
```
1. Rename the template class to an appropriate name for your integration.
1. e.g. `DestinationTemplate.java` to `BigQueryDestination.java`.
diff --git a/airbyte-integrations/postgres-destination/.dockerignore b/airbyte-integrations/postgres-destination/.dockerignore
new file mode 100644
index 000000000000..65c7d0ad3e73
--- /dev/null
+++ b/airbyte-integrations/postgres-destination/.dockerignore
@@ -0,0 +1,3 @@
+*
+!Dockerfile
+!build
diff --git a/airbyte-integrations/postgres-destination/Dockerfile b/airbyte-integrations/postgres-destination/Dockerfile
new file mode 100644
index 000000000000..77b191423d73
--- /dev/null
+++ b/airbyte-integrations/postgres-destination/Dockerfile
@@ -0,0 +1,10 @@
+FROM airbyte/base-java:dev
+
+WORKDIR /airbyte
+
+# fixme - replace java-template with the name of directory that this file is in.
+ENV APPLICATION postgres-destination
+
+COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
+
+RUN tar xf ${APPLICATION}.tar --strip-components=1
diff --git a/airbyte-integrations/postgres-destination/build.gradle b/airbyte-integrations/postgres-destination/build.gradle
new file mode 100644
index 000000000000..af162cc3c1e9
--- /dev/null
+++ b/airbyte-integrations/postgres-destination/build.gradle
@@ -0,0 +1,69 @@
+import com.bmuschko.gradle.docker.tasks.image.DockerBuildImage
+plugins {
+ id 'com.bmuschko.docker-remote-api'
+ id 'application'
+}
+
+sourceSets {
+ integrationTest {
+ java {
+ srcDir 'src/test-integration/java'
+ }
+ resources {
+ srcDir 'src/test-integration/resources'
+ }
+ }
+}
+test.dependsOn('compileIntegrationTestJava')
+
+configurations {
+ integrationTestImplementation.extendsFrom testImplementation
+ integrationTestRuntimeOnly.extendsFrom testRuntimeOnly
+}
+
+dependencies {
+ implementation project(':airbyte-config:models')
+ implementation project(':airbyte-db')
+ implementation project(':airbyte-integrations:base-java')
+ implementation project(':airbyte-singer')
+
+ integrationTestImplementation project(':airbyte-integrations:integration-test-lib')
+ integrationTestImplementation project(':airbyte-integrations:postgres-destination')
+
+ testImplementation "org.testcontainers:postgresql:1.15.0-rc2"
+ integrationTestImplementation "org.testcontainers:postgresql:1.15.0-rc2"
+}
+
+application {
+ mainClass = 'io.airbyte.integrations.destination.postgres.PosgresDestination'
+}
+
+
+def image = 'airbyte/airbyte-java-template-destination:dev'
+
+task imageName {
+ doLast {
+ println "IMAGE $image"
+ }
+}
+
+task buildImage(type: DockerBuildImage) {
+ inputDir = projectDir
+ images.add(image)
+ dependsOn ':airbyte-integrations:base-java:buildImage'
+}
+
+task integrationTest(type: Test) {
+ testClassesDirs += sourceSets.integrationTest.output.classesDirs
+ classpath += sourceSets.integrationTest.runtimeClasspath
+
+ useJUnitPlatform()
+ testLogging() {
+ events "passed", "failed"
+ exceptionFormat "full"
+ }
+
+ mustRunAfter test
+ dependsOn(buildImage)
+}
+
diff --git a/airbyte-integrations/postgres-destination/src/main/java/io/airbyte/integrations/destination/postgres/PostgresDestination.java b/airbyte-integrations/postgres-destination/src/main/java/io/airbyte/integrations/destination/postgres/PostgresDestination.java
new file mode 100644
index 000000000000..d194993fab4e
--- /dev/null
+++ b/airbyte-integrations/postgres-destination/src/main/java/io/airbyte/integrations/destination/postgres/PostgresDestination.java
@@ -0,0 +1,122 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 Airbyte
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package io.airbyte.integrations.destination.postgres;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import io.airbyte.commons.json.Jsons;
+import io.airbyte.commons.resources.MoreResources;
+import io.airbyte.config.DestinationConnectionSpecification;
+import io.airbyte.config.Schema;
+import io.airbyte.config.StandardCheckConnectionOutput;
+import io.airbyte.config.StandardCheckConnectionOutput.Status;
+import io.airbyte.config.StandardDiscoverSchemaOutput;
+import io.airbyte.db.DatabaseHelper;
+import io.airbyte.integrations.base.Destination;
+import io.airbyte.integrations.base.DestinationConsumer;
+import io.airbyte.integrations.base.IntegrationRunner;
+import io.airbyte.singer.SingerMessage;
+import java.io.IOException;
+import org.apache.commons.dbcp2.BasicDataSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PostgresDestination implements Destination {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(PostgresDestination.class);
+
+ @Override
+ public DestinationConnectionSpecification spec() throws IOException {
+ // return a jsonschema representation of the spec for the integration.
+ final String resourceString = MoreResources.readResource("spec.json");
+ return Jsons.deserialize(resourceString, DestinationConnectionSpecification.class);
+ }
+
+ // fixme - implement this method such that it checks whether it can connect to the destination.
+ // this should return a StandardCheckConnectionOutput with the status field set to true if the
+ // connection succeeds and false if it does not. if false consider adding a message in the message
+ // field to help the user figure out what they need to do differently so that the connection will
+ // succeed.
+ @Override
+ public StandardCheckConnectionOutput check(JsonNode config) {
+ try {
+ final BasicDataSource connectionPool = DatabaseHelper.getConnectionPool(
+ config.get("username").asText(),
+ config.get("password").asText(),
+ String.format("jdbc:postgresql://%s:%s/%s", config.get("host").asText(), config.get("port").asText(), config.get("database").asText()));
+
+ DatabaseHelper.query(connectionPool, ctx -> ctx.execute(
+ "SELECT *\n"
+ + "FROM pg_catalog.pg_tables\n"
+ + "WHERE schemaname != 'pg_catalog' AND schemaname != 'information_schema';"));
+
+ connectionPool.close();
+ } catch (Exception e) {
+ // todo (cgardens) - better error messaging.
+ return new StandardCheckConnectionOutput().withStatus(Status.FAILURE).withMessage(e.getMessage());
+ }
+
+ return new StandardCheckConnectionOutput().withStatus(Status.SUCCESS);
+ }
+
+ // fixme - implement this method such that it returns the current schema found in the destination.
+ @Override
+ public StandardDiscoverSchemaOutput discover(JsonNode config) {
+ throw new RuntimeException("Not Implemented");
+ }
+
+ // fixme - implement this method such that it returns a consumer that can push messages to the
+ // destination.
+ @Override
+ public DestinationConsumer write(JsonNode config, Schema schema) throws IOException {
+ return new RecordConsumer();
+ }
+
+ public static class RecordConsumer implements DestinationConsumer {
+
+ @Override
+ public void accept(SingerMessage singerMessage) throws Exception {
+ // fixme - implement how to write a message to the destination
+ throw new RuntimeException("Not Implemented");
+ }
+
+ @Override
+ public void close() throws Exception {
+ // fixme - implement hot to close the connection to the destination.
+ throw new RuntimeException("Not Implemented");
+ }
+
+ }
+
+ public static void main(String[] args) throws Exception {
+ // fixme - instantiate your implementation of the Destination interface and pass it to
+ // IntegrationRunner.
+ final Destination destination = new PostgresDestination();
+ // this is airbyte's entrypoint into the integration. do not remove this line!
+ LOGGER.info("starting destination: {}", PostgresDestination.class);
+ new IntegrationRunner(destination).run(args);
+ LOGGER.info("completed destination: {}", PostgresDestination.class);
+ }
+
+}
diff --git a/airbyte-integrations/postgres-destination/src/main/resources/spec.json b/airbyte-integrations/postgres-destination/src/main/resources/spec.json
new file mode 100644
index 000000000000..441d4e7e1a4b
--- /dev/null
+++ b/airbyte-integrations/postgres-destination/src/main/resources/spec.json
@@ -0,0 +1,44 @@
+{
+ "destinationId": "",
+ "destinationSpecificationId": "",
+ "documentationUrl": "",
+ "specification": {
+ "$schema": "http://json-schema.org/draft-07/schema#",
+ "title": "Postgres Destination Spec",
+ "type": "object",
+ "required": ["host", "port", "username", "database", "schema"],
+ "additionalProperties": false,
+ "properties": {
+ "host": {
+ "description": "Hostname of the database.",
+ "type": "string"
+ },
+ "port": {
+ "description": "Port of the database.",
+ "type": "integer",
+ "minimum": 0,
+ "maximum": 65536,
+ "default": 5432,
+ "examples": ["5432"]
+ },
+ "username": {
+ "description": "Username to use to access the database.",
+ "type": "string"
+ },
+ "password": {
+ "description": "Password associated with the username.",
+ "type": "string"
+ },
+ "database": {
+ "description": "Name of the database.",
+ "type": "string"
+ },
+ "schema": {
+ "description": "Unless specifically configured, the usual value for this field is \"public\".",
+ "type": "string",
+ "examples": ["public"],
+ "default": "public"
+ }
+ }
+ }
+}
diff --git a/airbyte-integrations/postgres-destination/src/test-integration/java/io/airbyte/integrations/destination/postgres/PostgresIntegrationTest.java b/airbyte-integrations/postgres-destination/src/test-integration/java/io/airbyte/integrations/destination/postgres/PostgresIntegrationTest.java
new file mode 100644
index 000000000000..4ade753165ff
--- /dev/null
+++ b/airbyte-integrations/postgres-destination/src/test-integration/java/io/airbyte/integrations/destination/postgres/PostgresIntegrationTest.java
@@ -0,0 +1,99 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 Airbyte
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package io.airbyte.integrations.destination.postgres;
+
+import static java.util.stream.Collectors.toList;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.collect.ImmutableMap;
+import io.airbyte.commons.json.Jsons;
+import io.airbyte.db.DatabaseHelper;
+import io.airbyte.integrations.base.TestDestination;
+import java.util.List;
+import org.apache.commons.dbcp2.BasicDataSource;
+import org.jooq.Record;
+import org.testcontainers.containers.PostgreSQLContainer;
+
+public class PostgresIntegrationTest extends TestDestination {
+
+ private PostgreSQLContainer> db;
+
+ @Override
+ protected String getImageName() {
+ return "airbyte/airbyte-postgres-destination:dev";
+ }
+
+ @Override
+ protected JsonNode getConfig() {
+ return Jsons.jsonNode(ImmutableMap.builder()
+ .put("host", db.getHost())
+ .put("username", db.getUsername())
+ .put("password", db.getPassword())
+ .put("schema", "public")
+ .put("port", db.getFirstMappedPort())
+ .put("database", db.getDatabaseName())
+ .build());
+ }
+
+ @Override
+ protected JsonNode getInvalidConfig() {
+ return Jsons.jsonNode(ImmutableMap.builder()
+ .put("host", db.getHost())
+ .put("username", db.getUsername())
+ .put("password", "wrong password")
+ .put("schema", "public")
+ .put("port", db.getFirstMappedPort())
+ .put("database", db.getDatabaseName())
+ .build()); }
+
+ @Override
+ protected List recordRetriever(TestDestinationEnv testEnv, String streamName) throws Exception {
+ BasicDataSource pool =
+ DatabaseHelper.getConnectionPool(db.getUsername(), db.getPassword(), db.getJdbcUrl());
+
+ return DatabaseHelper.query(
+ pool,
+ ctx -> ctx
+ .fetch(String.format("SELECT * FROM public.%s ORDER BY inserted_at ASC;", streamName))
+ .stream()
+ .map(nestedRecords -> ((Record) nestedRecords.get(0))) // todo ?
+ .map(Record::intoMap)
+ .map(Jsons::jsonNode)
+ .collect(toList()));
+ }
+
+ @Override
+ protected void setup(TestDestinationEnv testEnv) {
+ db = new PostgreSQLContainer<>("postgres:13-alpine");
+ db.start();
+ }
+
+ @Override
+ protected void tearDown(TestDestinationEnv testEnv) {
+ db.stop();
+ db.close();
+ }
+
+}
diff --git a/airbyte-integrations/postgres-destination/src/test/java/io/airbyte/integrations/destination/postgres/PostgresDestinationTest.java b/airbyte-integrations/postgres-destination/src/test/java/io/airbyte/integrations/destination/postgres/PostgresDestinationTest.java
new file mode 100644
index 000000000000..3f707a33851a
--- /dev/null
+++ b/airbyte-integrations/postgres-destination/src/test/java/io/airbyte/integrations/destination/postgres/PostgresDestinationTest.java
@@ -0,0 +1,133 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 Airbyte
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package io.airbyte.integrations.destination.postgres;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import io.airbyte.commons.json.Jsons;
+import io.airbyte.commons.resources.MoreResources;
+import io.airbyte.config.DataType;
+import io.airbyte.config.DestinationConnectionSpecification;
+import io.airbyte.config.Field;
+import io.airbyte.config.Schema;
+import io.airbyte.config.StandardCheckConnectionOutput;
+import io.airbyte.config.StandardCheckConnectionOutput.Status;
+import io.airbyte.config.Stream;
+import io.airbyte.singer.SingerMessage;
+import io.airbyte.singer.SingerMessage.Type;
+import java.io.IOException;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.PostgreSQLContainer;
+
+class PostgresDestinationTest {
+
+ private static final ObjectMapper objectMapper = new ObjectMapper();
+
+ private static final String USERS_STREAM_NAME = "users";
+ private static final String TASKS_STREAM_NAME = "tasks";
+ private static final SingerMessage SINGER_MESSAGE_USERS1 = new SingerMessage().withType(Type.RECORD).withStream(USERS_STREAM_NAME)
+ .withRecord(objectMapper.createObjectNode().put("name", "john").put("id", "10"));
+ private static final SingerMessage SINGER_MESSAGE_USERS2 = new SingerMessage().withType(Type.RECORD).withStream(USERS_STREAM_NAME)
+ .withRecord(objectMapper.createObjectNode().put("name", "susan").put("id", "30"));
+ private static final SingerMessage SINGER_MESSAGE_TASKS1 = new SingerMessage().withType(Type.RECORD).withStream(TASKS_STREAM_NAME)
+ .withRecord(objectMapper.createObjectNode().put("goal", "announce the game."));
+ private static final SingerMessage SINGER_MESSAGE_TASKS2 = new SingerMessage().withType(Type.RECORD).withStream(TASKS_STREAM_NAME)
+ .withRecord(objectMapper.createObjectNode().put("goal", "ship some code."));
+ private static final SingerMessage SINGER_MESSAGE_RECORD = new SingerMessage().withType(Type.STATE)
+ .withValue(objectMapper.createObjectNode().put("checkpoint", "now!"));
+
+ private static final Schema CATALOG = new Schema().withStreams(Lists.newArrayList(
+ new Stream().withName(USERS_STREAM_NAME)
+ .withFields(Lists.newArrayList(new Field().withName("name").withDataType(DataType.STRING).withSelected(true),
+ new Field().withName("id").withDataType(DataType.STRING).withSelected(true))),
+ new Stream().withName(TASKS_STREAM_NAME)
+ .withFields(Lists.newArrayList(new Field().withName("goal").withDataType(DataType.STRING).withSelected(true)))));
+
+ private JsonNode config;
+
+ private PostgreSQLContainer> db;
+
+ @BeforeEach
+ void setup() throws IOException {
+ db = new PostgreSQLContainer<>("postgres:13-alpine");
+ db.start();
+
+ config = Jsons.jsonNode(ImmutableMap.builder()
+ .put("host", db.getHost())
+ .put("username", db.getUsername())
+ .put("password", db.getPassword())
+ .put("schema", "public")
+ .put("port", db.getFirstMappedPort())
+ .put("database", db.getDatabaseName())
+ .build());
+ }
+
+ @AfterEach
+ void tearDown() {
+ db.stop();
+ db.close();
+ }
+
+ // todo - same test as csv destination
+ @Test
+ void testSpec() throws IOException {
+ final DestinationConnectionSpecification actual = new PostgresDestination().spec();
+ final String resourceString = MoreResources.readResource("spec.json");
+ final DestinationConnectionSpecification expected = Jsons.deserialize(resourceString, DestinationConnectionSpecification.class);
+
+ assertEquals(expected, actual);
+ }
+
+ // todo - same test as csv destination
+ @Test
+ void testCheckSuccess() {
+ final StandardCheckConnectionOutput actual = new PostgresDestination().check(config);
+ final StandardCheckConnectionOutput expected = new StandardCheckConnectionOutput().withStatus(Status.SUCCESS);
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ void testCheckFailure() {
+ ((ObjectNode) config).put("password", "fake");
+ final StandardCheckConnectionOutput actual = new PostgresDestination().check(config);
+ final StandardCheckConnectionOutput expected = new StandardCheckConnectionOutput().withStatus(Status.FAILURE)
+ .withMessage("Cannot create PoolableConnectionFactory (FATAL: password authentication failed for user \"test\")");
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ void testWriteSuccess() throws Exception {}
+
+ @Test
+ void testWriteFailure() throws Exception {}
+
+}
From dadc0da42301ef9a2731fd7bde2626fc96749e57 Mon Sep 17 00:00:00 2001
From: cgardens
Date: Fri, 9 Oct 2020 13:33:30 -0700
Subject: [PATCH 2/6] add buffer
---
.../java/io/airbyte/commons/json/Jsons.java | 4 +
airbyte-persistent-queue/build.gradle | 9 +
.../AbstractCloseableInputQueue.java | 99 +++++++++++
.../persistentqueue/BigQueueWrapper.java | 58 +++++++
.../persistentqueue/CloseableInputQueue.java | 11 ++
.../AbstractCloseableInputQueueTest.java | 160 ++++++++++++++++++
.../persistentqueue/BigQueueWrapperTest.java | 116 +++++++++++++
settings.gradle | 1 +
8 files changed, 458 insertions(+)
create mode 100644 airbyte-persistent-queue/build.gradle
create mode 100644 airbyte-persistent-queue/src/main/java/io/airbyte/persistentqueue/AbstractCloseableInputQueue.java
create mode 100644 airbyte-persistent-queue/src/main/java/io/airbyte/persistentqueue/BigQueueWrapper.java
create mode 100644 airbyte-persistent-queue/src/main/java/io/airbyte/persistentqueue/CloseableInputQueue.java
create mode 100644 airbyte-persistent-queue/src/test/java/io/airbyte/persistentqueue/AbstractCloseableInputQueueTest.java
create mode 100644 airbyte-persistent-queue/src/test/java/io/airbyte/persistentqueue/BigQueueWrapperTest.java
diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/json/Jsons.java b/airbyte-commons/src/main/java/io/airbyte/commons/json/Jsons.java
index 07c8821a05cf..7991b31b5717 100644
--- a/airbyte-commons/src/main/java/io/airbyte/commons/json/Jsons.java
+++ b/airbyte-commons/src/main/java/io/airbyte/commons/json/Jsons.java
@@ -28,6 +28,7 @@
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Charsets;
import java.io.IOException;
import java.util.Optional;
@@ -109,4 +110,7 @@ public static T clone(final T object) {
return (T) deserialize(serialize(object), object.getClass());
}
+ public static byte[] toBytes(JsonNode jsonNode) {
+ return Jsons.serialize(jsonNode).getBytes(Charsets.UTF_8);
+ }
}
diff --git a/airbyte-persistent-queue/build.gradle b/airbyte-persistent-queue/build.gradle
new file mode 100644
index 000000000000..e776801c9ac7
--- /dev/null
+++ b/airbyte-persistent-queue/build.gradle
@@ -0,0 +1,9 @@
+plugins {
+ id 'java-library'
+}
+
+dependencies {
+ implementation 'com.squareup.tape2:tape:2.0.0-beta1'
+ implementation 'com.baidu:leansoft-bigqueue:0.7.3'
+
+}
diff --git a/airbyte-persistent-queue/src/main/java/io/airbyte/persistentqueue/AbstractCloseableInputQueue.java b/airbyte-persistent-queue/src/main/java/io/airbyte/persistentqueue/AbstractCloseableInputQueue.java
new file mode 100644
index 000000000000..3fcf551f3dc0
--- /dev/null
+++ b/airbyte-persistent-queue/src/main/java/io/airbyte/persistentqueue/AbstractCloseableInputQueue.java
@@ -0,0 +1,99 @@
+package io.airbyte.persistentqueue;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.AbstractIterator;
+import java.util.AbstractQueue;
+import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public abstract class AbstractCloseableInputQueue extends AbstractQueue implements CloseableInputQueue {
+
+ protected final AtomicBoolean inputClosed = new AtomicBoolean(false);
+ protected final AtomicBoolean closed = new AtomicBoolean(false);
+
+ protected abstract boolean enqueueInternal(E element);
+
+ protected abstract E pollInternal();
+
+ /**
+ * Adds an element to the queue.
+ *
+ * @param element - element to enqueue.
+ * @return true if enqueue successful, otherwise false.
+ * @throws IllegalStateException if invoked after {@link CloseableInputQueue#closeInput()}
+ */
+ @Override
+ public boolean offer(E element) {
+ Preconditions.checkState(!closed.get());
+ Preconditions.checkState(!inputClosed.get());
+ Preconditions.checkNotNull(element);
+ return enqueueInternal(element);
+ }
+
+ /*
+ * (non javadoc comment to avoid autoformatting making this impossible to read).
+ * Blocking call to dequeue an element.
+ * | hasValue | inputClosed | behavior |
+ * ----------------------------------------
+ * | true | false | return val |
+ * | false | false | block until |
+ * | true | true | return val |
+ * | false | true | return null |
+ * @return a value from the queue or null if the queue is empty and will not receive anymore data.
+ */
+ @Override
+ public E poll() {
+ Preconditions.checkState(!closed.get());
+ // if the queue is closed, always stop.
+ while (!closed.get()) {
+ final E dequeue = pollInternal();
+
+ // if we find a value, always return it.
+ if (dequeue != null) {
+ return dequeue;
+ }
+
+ // if there is nothing in the queue and there will be no more values, end.
+ if (dequeue == null && inputClosed.get()) {
+ return null;
+ }
+
+ // if not value but could be more, sleep then try again.
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public void closeInput() {
+ inputClosed.set(true);
+ }
+
+ protected abstract void closeInternal() throws Exception;
+
+ @Override
+ public Iterator iterator() {
+ Preconditions.checkState(!closed.get());
+
+ return new AbstractIterator<>() {
+ @Override
+ protected E computeNext() {
+ final E poll = poll();
+ if (poll == null) {
+ return endOfData();
+ }
+ return poll;
+ }
+ };
+ }
+
+ @Override
+ public void close() throws Exception {
+ closed.set(true);
+ closeInternal();
+ }
+}
diff --git a/airbyte-persistent-queue/src/main/java/io/airbyte/persistentqueue/BigQueueWrapper.java b/airbyte-persistent-queue/src/main/java/io/airbyte/persistentqueue/BigQueueWrapper.java
new file mode 100644
index 000000000000..55ec03b138c2
--- /dev/null
+++ b/airbyte-persistent-queue/src/main/java/io/airbyte/persistentqueue/BigQueueWrapper.java
@@ -0,0 +1,58 @@
+package io.airbyte.persistentqueue;
+
+import com.google.common.base.Preconditions;
+import com.leansoft.bigqueue.BigQueueImpl;
+import com.leansoft.bigqueue.IBigQueue;
+import java.io.IOException;
+import java.nio.file.Path;
+
+// BigQueue is threadsafe.
+public class BigQueueWrapper extends AbstractCloseableInputQueue implements CloseableInputQueue {
+
+ private final IBigQueue queue;
+
+ public BigQueueWrapper(Path persistencePath, String queueName) throws IOException {
+ queue = new BigQueueImpl(persistencePath.toString(), queueName);
+ }
+
+ @Override
+ public boolean enqueueInternal(byte[] bytes) {
+ try {
+ queue.enqueue(bytes);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return true;
+ }
+
+ @Override
+ protected byte[] pollInternal() {
+ try {
+ return queue.dequeue();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public byte[] peek() {
+ Preconditions.checkState(!closed.get());
+ try {
+ return queue.peek();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public int size() {
+ Preconditions.checkState(!closed.get());
+ return Math.toIntExact(queue.size());
+ }
+
+ @Override
+ protected void closeInternal() throws Exception {
+ queue.close();
+ queue.gc();
+ }
+}
diff --git a/airbyte-persistent-queue/src/main/java/io/airbyte/persistentqueue/CloseableInputQueue.java b/airbyte-persistent-queue/src/main/java/io/airbyte/persistentqueue/CloseableInputQueue.java
new file mode 100644
index 000000000000..bfca25e9d2fe
--- /dev/null
+++ b/airbyte-persistent-queue/src/main/java/io/airbyte/persistentqueue/CloseableInputQueue.java
@@ -0,0 +1,11 @@
+package io.airbyte.persistentqueue;
+
+import java.util.Queue;
+
+public interface CloseableInputQueue extends Queue, AutoCloseable {
+
+ /**
+ * Calling this signals that no more records will be written to the queue by ANY thread.
+ */
+ void closeInput();
+}
diff --git a/airbyte-persistent-queue/src/test/java/io/airbyte/persistentqueue/AbstractCloseableInputQueueTest.java b/airbyte-persistent-queue/src/test/java/io/airbyte/persistentqueue/AbstractCloseableInputQueueTest.java
new file mode 100644
index 000000000000..a450d30baef1
--- /dev/null
+++ b/airbyte-persistent-queue/src/test/java/io/airbyte/persistentqueue/AbstractCloseableInputQueueTest.java
@@ -0,0 +1,160 @@
+package io.airbyte.persistentqueue;
+
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import com.google.common.collect.Lists;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Test the contract of {@link AbstractCloseableInputQueueTest} state machine.
+ */
+class AbstractCloseableInputQueueTest {
+ private CloseableInputQueue queue;
+
+ @BeforeEach
+ void setup() {
+ queue = new TestQueue();
+ }
+
+ /*
+ OFFER CONTRACT
+ */
+ @Test
+ void testOfferInputClosedFalse() {
+ assertDoesNotThrow(() -> queue.offer("hello"));
+ }
+
+ @Test
+ void testOfferInputClosed() {
+ queue.closeInput();
+ assertThrows(IllegalStateException.class, () -> queue.offer("hello"));
+ }
+
+ /*
+ POLL CONTRACT
+ */
+ @Test
+ void testPollHasValueInputClosedFalse() {
+ queue.offer("hello");
+ assertEquals("hello", queue.poll());
+ }
+
+ @Test
+ void testPollHasValueInputClosed() {
+ queue.offer("hello");
+ queue.closeInput();
+ assertEquals("hello", queue.poll());
+ }
+
+ @Test
+ void testPollHasValueFalseInputClosed() {
+ queue.closeInput();
+ assertNull(queue.poll());
+ }
+
+ @SuppressWarnings("BusyWait")
+ @Test
+ void testPollHasValueFalseInputClosedFalse() throws InterruptedException {
+ final AtomicBoolean hasAttempted = new AtomicBoolean(false);
+ final AtomicReference output = new AtomicReference<>();
+
+ Thread getterThread = new Thread(() -> {
+ hasAttempted.set(true);
+ output.set(queue.poll());
+ });
+ getterThread.start();
+
+
+ final Thread setterThread = new Thread(() -> {
+ while(!hasAttempted.get()) {
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ assertTrue(hasAttempted.get());
+ assertNull(output.get());
+
+ queue.offer("hello");
+ queue.closeInput();
+ });
+ setterThread.start();
+
+ setterThread.join(1000);
+ getterThread.join(1000);
+
+ assertTrue(hasAttempted.get());
+ assertEquals("hello", output.get());
+ }
+
+ /*
+ ITERATOR CONTRACT
+ */
+ @Test
+ void testIterator() {
+ queue.offer("hello");
+ queue.offer("goodbye");
+ queue.closeInput();
+
+ final List expected = Lists.newArrayList("hello", "goodbye");
+ final List actual = new ArrayList<>(queue);
+
+ assertEquals(expected, actual);
+ }
+
+ /*
+ CLOSED CONTRACT
+ */
+ @Test
+ void testClosed() throws Exception {
+ queue.close();
+ assertDoesNotThrow(() -> queue.close());
+ assertDoesNotThrow(() -> queue.closeInput());
+ assertThrows(IllegalStateException.class, () -> queue.offer("hello"));
+ assertThrows(IllegalStateException.class, () -> queue.poll());
+ assertThrows(IllegalStateException.class, () -> queue.iterator());
+ }
+
+ private static class TestQueue extends AbstractCloseableInputQueue{
+ private final List list = new ArrayList<>();
+
+ @Override
+ protected boolean enqueueInternal(String element) {
+ list.add(0, element);
+ return true;
+ }
+
+ @Override
+ protected String pollInternal() {
+ return list.size() == 0 ? null : list.remove(list.size() - 1);
+ }
+
+ @Override
+ protected void closeInternal() {
+ // no op
+ }
+
+ @Override
+ public int size() {
+ return list.size();
+ }
+
+ @Override
+ public String peek() {
+ return list.size() == 0 ? null : list.get(list.size() - 1);
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/airbyte-persistent-queue/src/test/java/io/airbyte/persistentqueue/BigQueueWrapperTest.java b/airbyte-persistent-queue/src/test/java/io/airbyte/persistentqueue/BigQueueWrapperTest.java
new file mode 100644
index 000000000000..41d5854e9f13
--- /dev/null
+++ b/airbyte-persistent-queue/src/test/java/io/airbyte/persistentqueue/BigQueueWrapperTest.java
@@ -0,0 +1,116 @@
+package io.airbyte.persistentqueue;
+
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+class BigQueueWrapperTest {
+
+ private CloseableInputQueue queue;
+
+ @BeforeEach
+ void setup() throws IOException {
+ queue = new BigQueueWrapper(Files.createTempDirectory("qtest"), "test");
+ }
+
+ @AfterEach
+ void teardown() throws Exception {
+ queue.close();
+ }
+
+ @Test
+ void testSize() {
+ assertEquals(0, queue.size());
+ queue.offer(toBytes("hello"));
+ assertEquals(1, queue.size());
+ queue.offer(toBytes("hello"));
+ assertEquals(2, queue.size());
+ }
+
+ @Test
+ void testIterator() throws Exception {
+ queue.offer(toBytes("hello"));
+ queue.offer(toBytes("goodbye"));
+ queue.closeInput();
+
+ final List expected = Lists.newArrayList("hello", "goodbye");
+ final List actual = queue.stream().map(val -> val != null ? new String(val, Charsets.UTF_8) : null).collect(Collectors.toList());
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ void testPollHasValueInputClosedFalse() {
+ queue.offer(toBytes("hello"));
+ assertEquals("hello", new String(Objects.requireNonNull(queue.poll()), Charsets.UTF_8));
+ }
+
+ @Test
+ void testPollHasValueInputClosed() {
+ queue.offer(toBytes("hello"));
+ queue.closeInput();
+ assertEquals("hello", new String(Objects.requireNonNull(queue.poll()), Charsets.UTF_8));
+ }
+
+ @Test
+ void testPollHasValueFalseInputClosed() {
+ queue.closeInput();
+ assertNull(queue.poll());
+ }
+
+ @Test
+ void testPollHasValueFalseInputClosedFalse() throws InterruptedException {
+ final AtomicBoolean hasAttempted = new AtomicBoolean(false);
+ final AtomicReference output = new AtomicReference<>();
+
+ Thread getterThread = new Thread(() -> {
+ hasAttempted.set(true);
+ output.set(new String(Objects.requireNonNull(queue.poll()), Charsets.UTF_8));
+ });
+ getterThread.start();
+
+
+ final Thread setterThread = new Thread(() -> {
+ while(!hasAttempted.get()) {
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ assertTrue(hasAttempted.get());
+ assertNull(output.get());
+
+ queue.offer(toBytes("hello"));
+ queue.closeInput();
+ });
+ setterThread.start();
+
+ setterThread.join(1000);
+ getterThread.join(1000);
+
+ assertTrue(hasAttempted.get());
+ assertEquals("hello", output.get());
+
+ }
+
+ private static byte[] toBytes(String string) {
+ return string.getBytes(Charsets.UTF_8);
+ }
+
+}
\ No newline at end of file
diff --git a/settings.gradle b/settings.gradle
index 517bda8f4e83..d9d1ec1d78b7 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -10,6 +10,7 @@ include ':airbyte-config:init'
include ':airbyte-config:persistence'
include ':airbyte-db'
include ':airbyte-integrations'
+include ':airbyte-persistent-queue'
include ':airbyte-protocol:models'
include ':airbyte-scheduler'
include ':airbyte-server'
From 2d610ae64c261a709c9d133140728ca229cc8930 Mon Sep 17 00:00:00 2001
From: cgardens
Date: Fri, 9 Oct 2020 16:07:35 -0700
Subject: [PATCH 3/6] sync works: held together with tape and bubblegum
---
.../concurrency/GracefulShutdownHandler.java | 21 +-
.../java/io/airbyte/commons/json/Jsons.java | 5 +
.../GracefulShutdownHandlerTest.java | 12 +-
.../destination/postgres/CsvDestination.java | 215 ++++++++++++++
.../postgres-destination/build.gradle | 1 +
.../postgres/PostgresDestination.java | 266 ++++++++++++++++--
.../postgres/PostgresDestinationTest.java | 59 +++-
.../io/airbyte/scheduler/SchedulerApp.java | 17 +-
8 files changed, 553 insertions(+), 43 deletions(-)
rename airbyte-scheduler/src/main/java/io/airbyte/scheduler/SchedulerShutdownHandler.java => airbyte-commons/src/main/java/io/airbyte/commons/concurrency/GracefulShutdownHandler.java (66%)
rename airbyte-scheduler/src/test/java/io/airbyte/scheduler/SchedulerShutdownHandlerTest.java => airbyte-commons/src/test/java/io/airbyte/commons/concurrency/GracefulShutdownHandlerTest.java (82%)
create mode 100644 airbyte-integrations/csv-destination/src/main/java/io/airbyte/integrations/destination/postgres/CsvDestination.java
diff --git a/airbyte-scheduler/src/main/java/io/airbyte/scheduler/SchedulerShutdownHandler.java b/airbyte-commons/src/main/java/io/airbyte/commons/concurrency/GracefulShutdownHandler.java
similarity index 66%
rename from airbyte-scheduler/src/main/java/io/airbyte/scheduler/SchedulerShutdownHandler.java
rename to airbyte-commons/src/main/java/io/airbyte/commons/concurrency/GracefulShutdownHandler.java
index 8c78341b83f9..57040b86a443 100644
--- a/airbyte-scheduler/src/main/java/io/airbyte/scheduler/SchedulerShutdownHandler.java
+++ b/airbyte-commons/src/main/java/io/airbyte/commons/concurrency/GracefulShutdownHandler.java
@@ -22,19 +22,26 @@
* SOFTWARE.
*/
-package io.airbyte.scheduler;
+package io.airbyte.commons.concurrency;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class SchedulerShutdownHandler extends Thread {
+public class GracefulShutdownHandler extends Thread {
- private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerShutdownHandler.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(GracefulShutdownHandler.class);
+ private final long terminationWaitTime;
+ private final TimeUnit terminateWaitTimeUnits;
private final ExecutorService[] threadPools;
- public SchedulerShutdownHandler(final ExecutorService... threadPools) {
+ public GracefulShutdownHandler(
+ long terminationWaitTime,
+ TimeUnit terminateWaitTimeUnits,
+ final ExecutorService... threadPools) {
+ this.terminationWaitTime = terminationWaitTime;
+ this.terminateWaitTimeUnits = terminateWaitTimeUnits;
this.threadPools = threadPools;
}
@@ -44,11 +51,11 @@ public void run() {
threadPool.shutdown();
try {
- if (!threadPool.awaitTermination(30, TimeUnit.SECONDS)) {
- LOGGER.error("Unable to kill worker threads by shutdown timeout.");
+ if (!threadPool.awaitTermination(terminationWaitTime, terminateWaitTimeUnits)) {
+ LOGGER.error("Unable to kill threads by shutdown timeout.");
}
} catch (InterruptedException e) {
- LOGGER.error("Wait for graceful worker thread shutdown interrupted.", e);
+ LOGGER.error("Wait for graceful thread shutdown interrupted.", e);
}
}
}
diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/json/Jsons.java b/airbyte-commons/src/main/java/io/airbyte/commons/json/Jsons.java
index 7991b31b5717..311b5a11c0ce 100644
--- a/airbyte-commons/src/main/java/io/airbyte/commons/json/Jsons.java
+++ b/airbyte-commons/src/main/java/io/airbyte/commons/json/Jsons.java
@@ -81,6 +81,11 @@ public static JsonNode jsonNode(final T object) {
return OBJECT_MAPPER.valueToTree(object);
}
+ // todo (cgardens) - this is wrong. adding extra quotes.
+ public static JsonNode jsonNode(final byte[] bytes) {
+ return OBJECT_MAPPER.valueToTree(new String(bytes, Charsets.UTF_8));
+ }
+
public static T object(final JsonNode jsonNode, final Class klass) {
return OBJECT_MAPPER.convertValue(jsonNode, klass);
}
diff --git a/airbyte-scheduler/src/test/java/io/airbyte/scheduler/SchedulerShutdownHandlerTest.java b/airbyte-commons/src/test/java/io/airbyte/commons/concurrency/GracefulShutdownHandlerTest.java
similarity index 82%
rename from airbyte-scheduler/src/test/java/io/airbyte/scheduler/SchedulerShutdownHandlerTest.java
rename to airbyte-commons/src/test/java/io/airbyte/commons/concurrency/GracefulShutdownHandlerTest.java
index 43e937fec610..7cd7f4998c1d 100644
--- a/airbyte-scheduler/src/test/java/io/airbyte/scheduler/SchedulerShutdownHandlerTest.java
+++ b/airbyte-commons/src/test/java/io/airbyte/commons/concurrency/GracefulShutdownHandlerTest.java
@@ -22,23 +22,23 @@
* SOFTWARE.
*/
-package io.airbyte.scheduler;
+package io.airbyte.commons.concurrency;
-import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;
-class SchedulerShutdownHandlerTest {
+class GracefulShutdownHandlerTest {
@Test
public void testRun() throws InterruptedException {
final ExecutorService executorService = mock(ExecutorService.class);
- final SchedulerShutdownHandler schedulerShutdownHandler = new SchedulerShutdownHandler(executorService);
- schedulerShutdownHandler.start();
- schedulerShutdownHandler.join();
+ final GracefulShutdownHandler gracefulShutdownHandler = new GracefulShutdownHandler(30, TimeUnit.SECONDS, executorService);
+ gracefulShutdownHandler.start();
+ gracefulShutdownHandler.join();
verify(executorService).shutdown();
}
diff --git a/airbyte-integrations/csv-destination/src/main/java/io/airbyte/integrations/destination/postgres/CsvDestination.java b/airbyte-integrations/csv-destination/src/main/java/io/airbyte/integrations/destination/postgres/CsvDestination.java
new file mode 100644
index 000000000000..b252b604466f
--- /dev/null
+++ b/airbyte-integrations/csv-destination/src/main/java/io/airbyte/integrations/destination/postgres/CsvDestination.java
@@ -0,0 +1,215 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 Airbyte
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package io.airbyte.integrations.destination.postgres;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.base.Preconditions;
+import io.airbyte.commons.json.Jsons;
+import io.airbyte.commons.resources.MoreResources;
+import io.airbyte.config.DestinationConnectionSpecification;
+import io.airbyte.config.Schema;
+import io.airbyte.config.StandardCheckConnectionOutput;
+import io.airbyte.config.StandardCheckConnectionOutput.Status;
+import io.airbyte.config.StandardDiscoverSchemaOutput;
+import io.airbyte.config.Stream;
+import io.airbyte.integrations.base.Destination;
+import io.airbyte.integrations.base.DestinationConsumer;
+import io.airbyte.integrations.base.FailureTrackingConsumer;
+import io.airbyte.integrations.base.IntegrationRunner;
+import io.airbyte.singer.SingerMessage;
+import io.airbyte.singer.SingerMessage.Type;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVPrinter;
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CsvDestination implements Destination {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(CsvDestination.class);
+
+ static final String COLUMN_NAME = "data"; // we output all data as a blob to a single column.
+ static final String DESTINATION_PATH_FIELD = "destination_path";
+
+ @Override
+ public DestinationConnectionSpecification spec() throws IOException {
+ final String resourceString = MoreResources.readResource("spec.json");
+ return Jsons.deserialize(resourceString, DestinationConnectionSpecification.class);
+ }
+
+ @Override
+ public StandardCheckConnectionOutput check(JsonNode config) {
+ try {
+ FileUtils.forceMkdir(getDestinationPath(config).toFile());
+ } catch (Exception e) {
+ return new StandardCheckConnectionOutput().withStatus(Status.FAILURE).withMessage(e.getMessage());
+ }
+ return new StandardCheckConnectionOutput().withStatus(Status.SUCCESS);
+ }
+
+ // todo (cgardens) - we currently don't leverage discover in our destinations, so skipping
+ // implementing it... for now.
+ @Override
+ public StandardDiscoverSchemaOutput discover(JsonNode config) {
+ throw new RuntimeException("Not Implemented");
+ }
+
+ /**
+ * @param config - csv destination config.
+ * @param schema - schema of the incoming messages.
+ * @return - a consumer to handle writing records to the filesystem.
+ * @throws IOException - exception throw in manipulating the filesytem.
+ */
+ @Override
+ public DestinationConsumer write(JsonNode config, Schema schema) throws IOException {
+ final Path destinationDir = getDestinationPath(config);
+
+ FileUtils.forceMkdir(destinationDir.toFile());
+
+ final long now = Instant.now().toEpochMilli();
+ final Map writeConfigs = new HashMap<>();
+ for (final Stream stream : schema.getStreams()) {
+ final Path tmpPath = destinationDir.resolve(stream.getName() + "_" + now + ".csv");
+ final Path finalPath = destinationDir.resolve(stream.getName() + ".csv");
+ final FileWriter fileWriter = new FileWriter(tmpPath.toFile());
+ final CSVPrinter printer = new CSVPrinter(fileWriter, CSVFormat.DEFAULT.withHeader(COLUMN_NAME));
+ writeConfigs.put(stream.getName(), new WriteConfig(printer, tmpPath, finalPath));
+ }
+
+ return new CsvConsumer(writeConfigs, schema);
+ }
+
+ /**
+ * Extract provided relative path from csv config object and append to local mount path.
+ *
+ * @param config - csv config object
+ * @return absolute path with the relative path appended to the local volume mount.
+ */
+ private Path getDestinationPath(JsonNode config) {
+ final String destinationRelativePath = config.get(DESTINATION_PATH_FIELD).asText();
+ Preconditions.checkNotNull(destinationRelativePath);
+
+ return Path.of(destinationRelativePath);
+ }
+
+ /**
+ * This consumer writes individual records to temporary files. If all of the messages are written
+ * successfully, it moves the tmp files to files named by their respective stream. If there are any
+ * failures, nothing is written.
+ */
+ private static class CsvConsumer extends FailureTrackingConsumer {
+
+ private final Map writeConfigs;
+ private final Schema schema;
+
+ public CsvConsumer(Map writeConfigs, Schema schema) {
+ this.schema = schema;
+ LOGGER.info("initializing consumer.");
+
+ this.writeConfigs = writeConfigs;
+ }
+
+ @Override
+ protected void acceptTracked(SingerMessage singerMessage) throws Exception {
+
+ // ignore other message types.
+ if (singerMessage.getType() == Type.RECORD) {
+ if (!writeConfigs.containsKey(singerMessage.getStream())) {
+ throw new IllegalArgumentException(
+ String.format("Message contained record from a stream that was not in the catalog. \ncatalog: %s , \nmessage: %s",
+ Jsons.serialize(schema), Jsons.serialize(singerMessage)));
+ }
+
+ writeConfigs.get(singerMessage.getStream()).getWriter().printRecord(Jsons.serialize(singerMessage.getRecord()));
+ }
+ }
+
+ @Override
+ protected void close(boolean hasFailed) throws IOException {
+ LOGGER.info("finalizing consumer.");
+
+ for (final Map.Entry entries : writeConfigs.entrySet()) {
+ try {
+ entries.getValue().getWriter().flush();
+ entries.getValue().getWriter().close();
+ } catch (Exception e) {
+ hasFailed = true;
+ LOGGER.error("failed to close writer for: {}.", entries.getKey());
+ }
+ }
+ // do not persist the data, if there are any failures.
+ if (!hasFailed) {
+ for (final WriteConfig writeConfig : writeConfigs.values()) {
+ Files.move(writeConfig.getTmpPath(), writeConfig.getFinalPath(), StandardCopyOption.REPLACE_EXISTING);
+ }
+ }
+ // clean up tmp files.
+ for (final WriteConfig writeConfig : writeConfigs.values()) {
+ Files.deleteIfExists(writeConfig.getTmpPath());
+ }
+
+ }
+
+ }
+
+ private static class WriteConfig {
+
+ private final CSVPrinter writer;
+ private final Path tmpPath;
+ private final Path finalPath;
+
+ public WriteConfig(CSVPrinter writer, Path tmpPath, Path finalPath) {
+ this.writer = writer;
+ this.tmpPath = tmpPath;
+ this.finalPath = finalPath;
+ }
+
+ public CSVPrinter getWriter() {
+ return writer;
+ }
+
+ public Path getTmpPath() {
+ return tmpPath;
+ }
+
+ public Path getFinalPath() {
+ return finalPath;
+ }
+
+ }
+
+ public static void main(String[] args) throws Exception {
+ new IntegrationRunner(new CsvDestination()).run(args);
+ }
+
+}
diff --git a/airbyte-integrations/postgres-destination/build.gradle b/airbyte-integrations/postgres-destination/build.gradle
index af162cc3c1e9..7f52a5cf2867 100644
--- a/airbyte-integrations/postgres-destination/build.gradle
+++ b/airbyte-integrations/postgres-destination/build.gradle
@@ -25,6 +25,7 @@ dependencies {
implementation project(':airbyte-config:models')
implementation project(':airbyte-db')
implementation project(':airbyte-integrations:base-java')
+ implementation project(':airbyte-persistent-queue')
implementation project(':airbyte-singer')
integrationTestImplementation project(':airbyte-integrations:integration-test-lib')
diff --git a/airbyte-integrations/postgres-destination/src/main/java/io/airbyte/integrations/destination/postgres/PostgresDestination.java b/airbyte-integrations/postgres-destination/src/main/java/io/airbyte/integrations/destination/postgres/PostgresDestination.java
index d194993fab4e..eb4e15db6767 100644
--- a/airbyte-integrations/postgres-destination/src/main/java/io/airbyte/integrations/destination/postgres/PostgresDestination.java
+++ b/airbyte-integrations/postgres-destination/src/main/java/io/airbyte/integrations/destination/postgres/PostgresDestination.java
@@ -25,6 +25,7 @@
package io.airbyte.integrations.destination.postgres;
import com.fasterxml.jackson.databind.JsonNode;
+import io.airbyte.commons.concurrency.GracefulShutdownHandler;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.config.DestinationConnectionSpecification;
@@ -32,12 +33,25 @@
import io.airbyte.config.StandardCheckConnectionOutput;
import io.airbyte.config.StandardCheckConnectionOutput.Status;
import io.airbyte.config.StandardDiscoverSchemaOutput;
+import io.airbyte.config.Stream;
import io.airbyte.db.DatabaseHelper;
import io.airbyte.integrations.base.Destination;
import io.airbyte.integrations.base.DestinationConsumer;
+import io.airbyte.integrations.base.FailureTrackingConsumer;
import io.airbyte.integrations.base.IntegrationRunner;
+import io.airbyte.persistentqueue.BigQueueWrapper;
+import io.airbyte.persistentqueue.CloseableInputQueue;
import io.airbyte.singer.SingerMessage;
+import io.airbyte.singer.SingerMessage.Type;
import java.io.IOException;
+import java.nio.file.Files;
+import java.sql.SQLException;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.dbcp2.BasicDataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,11 +75,7 @@ public DestinationConnectionSpecification spec() throws IOException {
@Override
public StandardCheckConnectionOutput check(JsonNode config) {
try {
- final BasicDataSource connectionPool = DatabaseHelper.getConnectionPool(
- config.get("username").asText(),
- config.get("password").asText(),
- String.format("jdbc:postgresql://%s:%s/%s", config.get("host").asText(), config.get("port").asText(), config.get("database").asText()));
-
+ final BasicDataSource connectionPool = getConnectionPool(config);
DatabaseHelper.query(connectionPool, ctx -> ctx.execute(
"SELECT *\n"
+ "FROM pg_catalog.pg_tables\n"
@@ -86,34 +96,252 @@ public StandardDiscoverSchemaOutput discover(JsonNode config) {
throw new RuntimeException("Not Implemented");
}
- // fixme - implement this method such that it returns a consumer that can push messages to the
- // destination.
+ /**
+ * Strategy:
+ *
+ * 1. Create a temporary table for each stream
+ *
+ *
+ * 2. Accumulate records in a buffer. One buffer per stream.
+ *
+ *
+ * 3. As records accumulate write them in batch to the database. We set a minimum numbers of records before writing to avoid wasteful record-wise
+ * writes.
+ *
+ *
+ * 4. Once all records have been written to buffer, flush the buffer and write any remaining records to the database (regardless of how few are
+ * left).
+ *
+ *
+ * 5. In a single transaction, delete the target tables if they exist and rename the temp tables to the final table name.
+ *
+ *
+ * @param config - integration-specific configuration object as json. e.g. { "username": "airbyte", "password": "super secure" }
+ * @param schema - schema of the incoming messages.
+ * @return consumer that writes singer messages to the database.
+ * @throws Exception - anything could happen!
+ */
@Override
- public DestinationConsumer write(JsonNode config, Schema schema) throws IOException {
- return new RecordConsumer();
+ public DestinationConsumer write(JsonNode config, Schema schema) throws Exception {
+ // connect to db.
+ final BasicDataSource connectionPool = getConnectionPool(config);
+ Map writeBuffers = new HashMap<>();
+
+ // create tmp tables if not exist
+ for (final Stream stream : schema.getStreams()) {
+ final String tableName = stream.getName();
+ final String tmpTableName = stream.getName() + "_" + Instant.now().toEpochMilli();
+ DatabaseHelper.query(connectionPool, ctx -> ctx.execute(String.format(
+ "CREATE EXTENSION IF NOT EXISTS \"uuid-ossp\";\n"
+ + "CREATE TABLE \"%s\" ( \n"
+ + "\"ab_id\" uuid PRIMARY KEY DEFAULT uuid_generate_v4(),\n"
+ + "\"data\" jsonb,\n"
+ + "\"ab_inserted_at\" TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP\n"
+ + ");",
+ tmpTableName)));
+
+ // todo (cgardens) -temp dir should be in the job root.
+ final BigQueueWrapper writeBuffer = new BigQueueWrapper(Files.createTempDirectory(stream.getName()), stream.getName());
+ writeBuffers.put(stream.getName(), new WriteConfig(tableName, tmpTableName, writeBuffer));
+ }
+
+ // write to tmp tables
+ // if success copy delete main table if exists. rename tmp tables to real tables.
+ return new RecordConsumer(connectionPool, writeBuffers, schema);
}
- public static class RecordConsumer implements DestinationConsumer {
+ public static class RecordConsumer extends FailureTrackingConsumer implements DestinationConsumer {
+
+ private static final long THREAD_DELAY_MILLIS = 500L;
+
+ private static final long GRACEFUL_SHUTDOWN_MINUTES = 5L;
+ private static final int MIN_RECORDS = 500;
+ private static final int BATCH_SIZE = 500;
+
+
+ private final ScheduledExecutorService writerPool;
+ private final BasicDataSource connectionPool;
+ private final Map writeConfigs;
+ private final Schema schema;
+
+ public RecordConsumer(BasicDataSource connectionPool, Map writeConfigs, Schema schema) {
+ this.connectionPool = connectionPool;
+ this.writeConfigs = writeConfigs;
+ this.schema = schema;
+ this.writerPool = Executors.newSingleThreadScheduledExecutor();
+ // todo (cgardens) - how long? boh.
+ Runtime.getRuntime().addShutdownHook(new GracefulShutdownHandler(GRACEFUL_SHUTDOWN_MINUTES, TimeUnit.MINUTES, writerPool));
+
+ writerPool.scheduleWithFixedDelay(
+ () -> writeStreamsWithNRecords(MIN_RECORDS, BATCH_SIZE, writeConfigs, connectionPool),
+ THREAD_DELAY_MILLIS,
+ THREAD_DELAY_MILLIS,
+ TimeUnit.MILLISECONDS);
+
+ }
+
+ /**
+ * Write records from buffer to postgres in batch.
+ *
+ * @param minRecords - the minimum number of records in the buffer before writing. helps avoid wastefully writing one record at a time.
+ * @param batchSize - the maximum number of records to write in a single query.
+ * @param writeBuffers - map of stream name to its respective buffer.
+ * @param connectionPool - connection to the db.
+ */
+ private static void writeStreamsWithNRecords(
+ int minRecords,
+ int batchSize,
+ Map writeBuffers, // todo can trim this down.
+ BasicDataSource connectionPool
+ ) {
+ for (final Map.Entry entry : writeBuffers.entrySet()) {
+ final String tmpTableName = entry.getValue().getTmpTableName();
+ final CloseableInputQueue writeBuffer = entry.getValue().getWriteBuffer();
+ while (writeBuffer.size() > minRecords) {
+ try {
+ DatabaseHelper.query(connectionPool, ctx -> {
+ final StringBuilder query = new StringBuilder(String.format("INSERT INTO %s(data)\n", tmpTableName))
+ .append("VALUES \n");
+ // todo (cgardens) - hack.
+ boolean first = true;
+ // todo (cgardens) - stop early if we are getting nulls.
+ for (int i = 0; i <= batchSize; i++) {
+ final byte[] record = writeBuffer.poll();
+ if (record != null) {
+ // don't write comma before the first record.
+ if (first) {
+ first = false;
+ } else {
+ query.append(", \n");
+ }
+// query.append("('{\"name\":\"john\",\"id\":\"10\"}')");
+ final String a = Jsons.serialize(record);
+ final String b = String.format("(%s)", Jsons.serialize(record));
+ final String c = String.format("('%s')", Jsons.serialize(record));
+// query.append(String.format("('%s')", Jsons.serialize(Jsons.jsonNode(record))));
+ query.append(String.format("('%s')", Jsons.serialize(Jsons.deserialize(new String(record)))));
+ }
+ }
+ query.append(";");
+ return ctx.execute(query.toString());
+ });
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
@Override
- public void accept(SingerMessage singerMessage) throws Exception {
- // fixme - implement how to write a message to the destination
- throw new RuntimeException("Not Implemented");
+ public void acceptTracked(SingerMessage singerMessage) {
+ // ignore other message types.
+ if (singerMessage.getType() == Type.RECORD) {
+ if (!writeConfigs.containsKey(singerMessage.getStream())) {
+ throw new IllegalArgumentException(
+ String.format("Message contained record from a stream that was not in the catalog. \ncatalog: %s , \nmessage: %s",
+ Jsons.serialize(schema), Jsons.serialize(singerMessage)));
+ }
+
+ // todo (cgardens) - we should let this throw an io exception. Maybe we should be throwing known airbyte exceptions.
+ writeConfigs.get(singerMessage.getStream()).getWriteBuffer().offer(Jsons.toBytes(singerMessage.getRecord()));
+ }
}
@Override
- public void close() throws Exception {
- // fixme - implement hot to close the connection to the destination.
- throw new RuntimeException("Not Implemented");
+ public void close(boolean hasFailed) throws Exception {
+ // signal no more writes to buffers.
+ for (final WriteConfig writeConfig : writeConfigs.values()) {
+ writeConfig.getWriteBuffer().closeInput();
+ }
+
+ if (hasFailed) {
+ LOGGER.error("executing on failed close procedure.");
+
+ // kill executor pool fast.
+ writerPool.shutdown();
+ writerPool.awaitTermination(1, TimeUnit.SECONDS);
+ } else {
+ LOGGER.error("executing on success close procedure.");
+
+ // shutdown executor pool with time to complete writes.
+ writerPool.shutdown();
+ writerPool.awaitTermination(GRACEFUL_SHUTDOWN_MINUTES, TimeUnit.MINUTES);
+
+ // write anything that is left in the buffers.
+ writeStreamsWithNRecords(0, 500, writeConfigs, connectionPool);
+
+ // delete tables if already exist. copy new tables into their place.
+ DatabaseHelper.query(connectionPool, ctx -> {
+ final StringBuilder query = new StringBuilder("");
+ // todo (cgardens) - need to actually do the transaction part. jooq doesn't want to except valid transaction sql syntax because it makes total sense to ruin sql.
+// final StringBuilder query = new StringBuilder("BEGIN\n");
+ for (final WriteConfig writeConfig : writeConfigs.values()) {
+ query.append(String.format("DROP TABLE IF EXISTS %s;\n", writeConfig.getTableName()));
+
+ query.append(String.format("ALTER TABLE %s RENAME TO %s;\n", writeConfig.getTmpTableName(), writeConfig.getTableName()));
+ }
+// query.append("COMMIT");
+ return ctx.execute(query.toString());
+ });
+
+ }
+
+ // close buffers.
+ for (final WriteConfig writeConfig : writeConfigs.values()) {
+ writeConfig.getWriteBuffer().close();
+ }
+ cleanupTmpTables(connectionPool, writeConfigs);
}
+ private static void cleanupTmpTables(BasicDataSource connectionPool, Map writeConfigs) {
+ for (WriteConfig writeConfig : writeConfigs.values()) {
+ try {
+ DatabaseHelper.query(connectionPool, ctx -> ctx.execute(String.format("DROP TABLE IF EXISTS %s;", writeConfig.getTmpTableName())));
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
}
+ private static class WriteConfig {
+
+ private final String tableName;
+ private final String tmpTableName;
+ private final CloseableInputQueue writeBuffer;
+
+ private WriteConfig(String tableName, String tmpTableName, CloseableInputQueue writeBuffer) {
+ this.tableName = tableName;
+ this.tmpTableName = tmpTableName;
+ this.writeBuffer = writeBuffer;
+ }
+
+ public String getTableName() {
+ return tableName;
+ }
+
+ public String getTmpTableName() {
+ return tmpTableName;
+ }
+
+ public CloseableInputQueue getWriteBuffer() {
+ return writeBuffer;
+ }
+ }
+
+ private BasicDataSource getConnectionPool(JsonNode config) {
+ return DatabaseHelper.getConnectionPool(
+ config.get("username").asText(),
+ config.get("password").asText(),
+ String.format("jdbc:postgresql://%s:%s/%s",
+ config.get("host").asText(),
+ config.get("port").asText(),
+ config.get("database").asText()));
+ }
+
+
public static void main(String[] args) throws Exception {
- // fixme - instantiate your implementation of the Destination interface and pass it to
- // IntegrationRunner.
final Destination destination = new PostgresDestination();
- // this is airbyte's entrypoint into the integration. do not remove this line!
LOGGER.info("starting destination: {}", PostgresDestination.class);
new IntegrationRunner(destination).run(args);
LOGGER.info("completed destination: {}", PostgresDestination.class);
diff --git a/airbyte-integrations/postgres-destination/src/test/java/io/airbyte/integrations/destination/postgres/PostgresDestinationTest.java b/airbyte-integrations/postgres-destination/src/test/java/io/airbyte/integrations/destination/postgres/PostgresDestinationTest.java
index 3f707a33851a..62028c9517a1 100644
--- a/airbyte-integrations/postgres-destination/src/test/java/io/airbyte/integrations/destination/postgres/PostgresDestinationTest.java
+++ b/airbyte-integrations/postgres-destination/src/test/java/io/airbyte/integrations/destination/postgres/PostgresDestinationTest.java
@@ -24,6 +24,7 @@
package io.airbyte.integrations.destination.postgres;
+import static java.util.stream.Collectors.toList;
import static org.junit.jupiter.api.Assertions.assertEquals;
import com.fasterxml.jackson.databind.JsonNode;
@@ -31,6 +32,7 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.config.DataType;
@@ -40,9 +42,19 @@
import io.airbyte.config.StandardCheckConnectionOutput;
import io.airbyte.config.StandardCheckConnectionOutput.Status;
import io.airbyte.config.Stream;
+import io.airbyte.db.DatabaseHelper;
+import io.airbyte.integrations.base.DestinationConsumer;
import io.airbyte.singer.SingerMessage;
import io.airbyte.singer.SingerMessage.Type;
import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.commons.dbcp2.BasicDataSource;
+import org.jooq.Record;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -77,7 +89,7 @@ class PostgresDestinationTest {
private PostgreSQLContainer> db;
@BeforeEach
- void setup() throws IOException {
+ void setup() {
db = new PostgreSQLContainer<>("postgres:13-alpine");
db.start();
@@ -125,9 +137,50 @@ void testCheckFailure() {
}
@Test
- void testWriteSuccess() throws Exception {}
+ void testWriteSuccess() throws Exception {
+ final DestinationConsumer consumer = new PostgresDestination().write(config, CATALOG);
+
+ consumer.accept(SINGER_MESSAGE_USERS1);
+ consumer.accept(SINGER_MESSAGE_TASKS1);
+ consumer.accept(SINGER_MESSAGE_USERS2);
+ consumer.accept(SINGER_MESSAGE_TASKS2);
+ consumer.accept(SINGER_MESSAGE_RECORD);
+ consumer.close();
+
+ // verify that the file is parsable as json (sanity check since the quoting is so goofy).
+ List usersActual = recordRetriever(USERS_STREAM_NAME);
+ final List expectedUsersJson = Lists.newArrayList(SINGER_MESSAGE_USERS1.getRecord(), SINGER_MESSAGE_USERS2.getRecord());
+ assertEquals(expectedUsersJson, usersActual);
+
+ List tasksActual = recordRetriever(TASKS_STREAM_NAME);
+ final List expectedTasksJson = Lists.newArrayList(SINGER_MESSAGE_TASKS1.getRecord(), SINGER_MESSAGE_TASKS2.getRecord());
+ assertEquals(expectedTasksJson, tasksActual);
+ }
@Test
- void testWriteFailure() throws Exception {}
+ void testWriteFailure() throws Exception {
+ }
+ private List recordRetriever(String streamName) throws Exception {
+ BasicDataSource pool =
+ DatabaseHelper.getConnectionPool(db.getUsername(), db.getPassword(), db.getJdbcUrl());
+
+ return DatabaseHelper.query(
+ pool,
+ ctx -> ctx
+ .fetch(String.format("SELECT * FROM public.%s ORDER BY ab_inserted_at ASC;", streamName))
+// .fetch(String.format("SELECT * FROM %s ORDER BY inserted_at ASC;", streamName))
+ .stream()
+ .map(Record::intoMap)
+ .map(r -> r.entrySet().stream().map(e -> {
+ // todo (cgardens) - bad in place mutation.
+ if (e.getValue().getClass().equals(org.jooq.JSONB.class)) {
+ e.setValue(e.getValue().toString());
+ }
+ return e;
+ }).collect(Collectors.toMap(Entry::getKey, Entry::getValue)))
+ .map(r -> (String)r.get("data"))
+ .map(Jsons::deserialize)
+ .collect(toList()));
+ }
}
diff --git a/airbyte-scheduler/src/main/java/io/airbyte/scheduler/SchedulerApp.java b/airbyte-scheduler/src/main/java/io/airbyte/scheduler/SchedulerApp.java
index 129949dee8f2..35aec65e739d 100644
--- a/airbyte-scheduler/src/main/java/io/airbyte/scheduler/SchedulerApp.java
+++ b/airbyte-scheduler/src/main/java/io/airbyte/scheduler/SchedulerApp.java
@@ -25,6 +25,7 @@
package io.airbyte.scheduler;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.airbyte.commons.concurrency.GracefulShutdownHandler;
import io.airbyte.config.Configs;
import io.airbyte.config.EnvConfigs;
import io.airbyte.config.persistence.ConfigPersistence;
@@ -47,15 +48,15 @@
import org.slf4j.LoggerFactory;
/**
- * The SchedulerApp is responsible for finding new scheduled jobs that need to be run and to launch
- * them. The current implementation uses a thread pool on the scheduler's machine to launch the
- * jobs. One thread is reserved for the job submitter, which is responsible for finding and
- * launching new jobs.
+ * The SchedulerApp is responsible for finding new scheduled jobs that need to be run and to launch them. The current implementation uses a thread
+ * pool on the scheduler's machine to launch the jobs. One thread is reserved for the job submitter, which is responsible for finding and launching
+ * new jobs.
*/
public class SchedulerApp {
private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerApp.class);
+ private static final long GRACEFUL_SHUTDOWN_SECONDS = 30;
private static final int MAX_WORKERS = 4;
private static final long JOB_SUBMITTER_DELAY_MILLIS = 5000L;
private static final ThreadFactory THREAD_FACTORY = new ThreadFactoryBuilder().setNameFormat("worker-%d").build();
@@ -66,9 +67,9 @@ public class SchedulerApp {
private final ProcessBuilderFactory pbf;
public SchedulerApp(BasicDataSource connectionPool,
- Path configRoot,
- Path workspaceRoot,
- ProcessBuilderFactory pbf) {
+ Path configRoot,
+ Path workspaceRoot,
+ ProcessBuilderFactory pbf) {
this.connectionPool = connectionPool;
this.configRoot = configRoot;
this.workspaceRoot = workspaceRoot;
@@ -98,7 +99,7 @@ public void start() {
JOB_SUBMITTER_DELAY_MILLIS,
TimeUnit.MILLISECONDS);
- Runtime.getRuntime().addShutdownHook(new SchedulerShutdownHandler(workerThreadPool, scheduledPool));
+ Runtime.getRuntime().addShutdownHook(new GracefulShutdownHandler(GRACEFUL_SHUTDOWN_SECONDS, TimeUnit.SECONDS, workerThreadPool, scheduledPool));
}
public static void main(String[] args) {
From e91965519bca824079b708365b7e587b0aac95cf Mon Sep 17 00:00:00 2001
From: cgardens
Date: Fri, 9 Oct 2020 21:32:48 -0700
Subject: [PATCH 4/6] all the tests pass
---
.../concurrency/GracefulShutdownHandler.java | 6 +-
.../java/io/airbyte/commons/json/Jsons.java | 8 +--
.../csv/CsvDestinationIntegrationTest.java | 3 +-
.../postgres-destination/build.gradle | 4 +-
.../postgres/PostgresDestination.java | 63 ++++++++++---------
.../postgres/PostgresIntegrationTest.java | 25 +++++---
.../postgres/PostgresDestinationTest.java | 56 +++++++++++++----
.../AbstractCloseableInputQueue.java | 40 +++++++++---
.../persistentqueue/BigQueueWrapper.java | 25 ++++++++
.../persistentqueue/CloseableInputQueue.java | 25 ++++++++
.../AbstractCloseableInputQueueTest.java | 43 ++++++++++---
.../persistentqueue/BigQueueWrapperTest.java | 30 +++++++--
.../io/airbyte/scheduler/SchedulerApp.java | 13 ++--
13 files changed, 249 insertions(+), 92 deletions(-)
diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/concurrency/GracefulShutdownHandler.java b/airbyte-commons/src/main/java/io/airbyte/commons/concurrency/GracefulShutdownHandler.java
index 57040b86a443..5ed45f2dfb78 100644
--- a/airbyte-commons/src/main/java/io/airbyte/commons/concurrency/GracefulShutdownHandler.java
+++ b/airbyte-commons/src/main/java/io/airbyte/commons/concurrency/GracefulShutdownHandler.java
@@ -37,9 +37,9 @@ public class GracefulShutdownHandler extends Thread {
private final ExecutorService[] threadPools;
public GracefulShutdownHandler(
- long terminationWaitTime,
- TimeUnit terminateWaitTimeUnits,
- final ExecutorService... threadPools) {
+ long terminationWaitTime,
+ TimeUnit terminateWaitTimeUnits,
+ final ExecutorService... threadPools) {
this.terminationWaitTime = terminationWaitTime;
this.terminateWaitTimeUnits = terminateWaitTimeUnits;
this.threadPools = threadPools;
diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/json/Jsons.java b/airbyte-commons/src/main/java/io/airbyte/commons/json/Jsons.java
index 311b5a11c0ce..c97b645b37d6 100644
--- a/airbyte-commons/src/main/java/io/airbyte/commons/json/Jsons.java
+++ b/airbyte-commons/src/main/java/io/airbyte/commons/json/Jsons.java
@@ -81,11 +81,6 @@ public static JsonNode jsonNode(final T object) {
return OBJECT_MAPPER.valueToTree(object);
}
- // todo (cgardens) - this is wrong. adding extra quotes.
- public static JsonNode jsonNode(final byte[] bytes) {
- return OBJECT_MAPPER.valueToTree(new String(bytes, Charsets.UTF_8));
- }
-
public static T object(final JsonNode jsonNode, final Class klass) {
return OBJECT_MAPPER.convertValue(jsonNode, klass);
}
@@ -116,6 +111,7 @@ public static T clone(final T object) {
}
public static byte[] toBytes(JsonNode jsonNode) {
- return Jsons.serialize(jsonNode).getBytes(Charsets.UTF_8);
+ return serialize(jsonNode).getBytes(Charsets.UTF_8);
}
+
}
diff --git a/airbyte-integrations/csv-destination/src/test-integration/java/io/airbyte/integrations/destination/csv/CsvDestinationIntegrationTest.java b/airbyte-integrations/csv-destination/src/test-integration/java/io/airbyte/integrations/destination/csv/CsvDestinationIntegrationTest.java
index 3475fe4fd051..8c91638ff78a 100644
--- a/airbyte-integrations/csv-destination/src/test-integration/java/io/airbyte/integrations/destination/csv/CsvDestinationIntegrationTest.java
+++ b/airbyte-integrations/csv-destination/src/test-integration/java/io/airbyte/integrations/destination/csv/CsvDestinationIntegrationTest.java
@@ -60,7 +60,7 @@ protected JsonNode getInvalidConfig() {
}
@Override
- protected List recordRetriever(TestDestinationEnv testEnv) throws Exception {
+ protected List recordRetriever(TestDestinationEnv testEnv, String streamName) throws Exception {
final List list = Files.list(testEnv.getLocalRoot().resolve(RELATIVE_PATH)).collect(Collectors.toList());
assertEquals(1, list.size());
@@ -77,7 +77,6 @@ protected List recordRetriever(TestDestinationEnv testEnv) throws Exce
@Override
protected void setup(TestDestinationEnv testEnv) throws Exception {
-
// no op
}
diff --git a/airbyte-integrations/postgres-destination/build.gradle b/airbyte-integrations/postgres-destination/build.gradle
index 7f52a5cf2867..7247a43d06cb 100644
--- a/airbyte-integrations/postgres-destination/build.gradle
+++ b/airbyte-integrations/postgres-destination/build.gradle
@@ -36,11 +36,11 @@ dependencies {
}
application {
- mainClass = 'io.airbyte.integrations.destination.postgres.PosgresDestination'
+ mainClass = 'io.airbyte.integrations.destination.postgres.PostgresDestination'
}
-def image = 'airbyte/airbyte-java-template-destination:dev'
+def image = 'airbyte/airbyte-postgres-destination:dev'
task imageName {
doLast {
diff --git a/airbyte-integrations/postgres-destination/src/main/java/io/airbyte/integrations/destination/postgres/PostgresDestination.java b/airbyte-integrations/postgres-destination/src/main/java/io/airbyte/integrations/destination/postgres/PostgresDestination.java
index eb4e15db6767..b86249c23acf 100644
--- a/airbyte-integrations/postgres-destination/src/main/java/io/airbyte/integrations/destination/postgres/PostgresDestination.java
+++ b/airbyte-integrations/postgres-destination/src/main/java/io/airbyte/integrations/destination/postgres/PostgresDestination.java
@@ -25,6 +25,7 @@
package io.airbyte.integrations.destination.postgres;
import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.base.Charsets;
import io.airbyte.commons.concurrency.GracefulShutdownHandler;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
@@ -59,6 +60,7 @@
public class PostgresDestination implements Destination {
private static final Logger LOGGER = LoggerFactory.getLogger(PostgresDestination.class);
+ static final String COLUMN_NAME = "data";
@Override
public DestinationConnectionSpecification spec() throws IOException {
@@ -105,18 +107,20 @@ public StandardDiscoverSchemaOutput discover(JsonNode config) {
* 2. Accumulate records in a buffer. One buffer per stream.
*
*
- * 3. As records accumulate write them in batch to the database. We set a minimum numbers of records before writing to avoid wasteful record-wise
- * writes.
+ * 3. As records accumulate write them in batch to the database. We set a minimum numbers of records
+ * before writing to avoid wasteful record-wise writes.
*
*
- * 4. Once all records have been written to buffer, flush the buffer and write any remaining records to the database (regardless of how few are
- * left).
+ * 4. Once all records have been written to buffer, flush the buffer and write any remaining records
+ * to the database (regardless of how few are left).
*
*
- * 5. In a single transaction, delete the target tables if they exist and rename the temp tables to the final table name.
+ * 5. In a single transaction, delete the target tables if they exist and rename the temp tables to
+ * the final table name.
*
*
- * @param config - integration-specific configuration object as json. e.g. { "username": "airbyte", "password": "super secure" }
+ * @param config - integration-specific configuration object as json. e.g. { "username": "airbyte",
+ * "password": "super secure" }
* @param schema - schema of the incoming messages.
* @return consumer that writes singer messages to the database.
* @throws Exception - anything could happen!
@@ -135,10 +139,10 @@ public DestinationConsumer write(JsonNode config, Schema schema)
"CREATE EXTENSION IF NOT EXISTS \"uuid-ossp\";\n"
+ "CREATE TABLE \"%s\" ( \n"
+ "\"ab_id\" uuid PRIMARY KEY DEFAULT uuid_generate_v4(),\n"
- + "\"data\" jsonb,\n"
+ + "\"%s\" jsonb,\n"
+ "\"ab_inserted_at\" TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP\n"
+ ");",
- tmpTableName)));
+ tmpTableName, COLUMN_NAME)));
// todo (cgardens) -temp dir should be in the job root.
final BigQueueWrapper writeBuffer = new BigQueueWrapper(Files.createTempDirectory(stream.getName()), stream.getName());
@@ -158,7 +162,6 @@ public static class RecordConsumer extends FailureTrackingConsumer writeConfigs;
@@ -181,26 +184,26 @@ public RecordConsumer(BasicDataSource connectionPool, Map w
}
/**
- * Write records from buffer to postgres in batch.
+ * Write records from buffer to postgres in batch.
*
- * @param minRecords - the minimum number of records in the buffer before writing. helps avoid wastefully writing one record at a time.
- * @param batchSize - the maximum number of records to write in a single query.
- * @param writeBuffers - map of stream name to its respective buffer.
+ * @param minRecords - the minimum number of records in the buffer before writing. helps avoid
+ * wastefully writing one record at a time.
+ * @param batchSize - the maximum number of records to write in a single query.
+ * @param writeBuffers - map of stream name to its respective buffer.
* @param connectionPool - connection to the db.
*/
private static void writeStreamsWithNRecords(
- int minRecords,
- int batchSize,
- Map writeBuffers, // todo can trim this down.
- BasicDataSource connectionPool
- ) {
+ int minRecords,
+ int batchSize,
+ Map writeBuffers, // todo can trim this down.
+ BasicDataSource connectionPool) {
for (final Map.Entry entry : writeBuffers.entrySet()) {
final String tmpTableName = entry.getValue().getTmpTableName();
final CloseableInputQueue writeBuffer = entry.getValue().getWriteBuffer();
while (writeBuffer.size() > minRecords) {
try {
DatabaseHelper.query(connectionPool, ctx -> {
- final StringBuilder query = new StringBuilder(String.format("INSERT INTO %s(data)\n", tmpTableName))
+ final StringBuilder query = new StringBuilder(String.format("INSERT INTO %s(%s)\n", tmpTableName, COLUMN_NAME))
.append("VALUES \n");
// todo (cgardens) - hack.
boolean first = true;
@@ -214,12 +217,7 @@ private static void writeStreamsWithNRecords(
} else {
query.append(", \n");
}
-// query.append("('{\"name\":\"john\",\"id\":\"10\"}')");
- final String a = Jsons.serialize(record);
- final String b = String.format("(%s)", Jsons.serialize(record));
- final String c = String.format("('%s')", Jsons.serialize(record));
-// query.append(String.format("('%s')", Jsons.serialize(Jsons.jsonNode(record))));
- query.append(String.format("('%s')", Jsons.serialize(Jsons.deserialize(new String(record)))));
+ query.append(String.format("('%s')", new String(record, Charsets.UTF_8)));
}
}
query.append(";");
@@ -242,7 +240,8 @@ public void acceptTracked(SingerMessage singerMessage) {
Jsons.serialize(schema), Jsons.serialize(singerMessage)));
}
- // todo (cgardens) - we should let this throw an io exception. Maybe we should be throwing known airbyte exceptions.
+ // todo (cgardens) - we should let this throw an io exception. Maybe we should be throwing known
+ // airbyte exceptions.
writeConfigs.get(singerMessage.getStream()).getWriteBuffer().offer(Jsons.toBytes(singerMessage.getRecord()));
}
}
@@ -272,15 +271,16 @@ public void close(boolean hasFailed) throws Exception {
// delete tables if already exist. copy new tables into their place.
DatabaseHelper.query(connectionPool, ctx -> {
- final StringBuilder query = new StringBuilder("");
- // todo (cgardens) - need to actually do the transaction part. jooq doesn't want to except valid transaction sql syntax because it makes total sense to ruin sql.
-// final StringBuilder query = new StringBuilder("BEGIN\n");
+ final StringBuilder query = new StringBuilder();
+ // todo (cgardens) - need to actually do the transaction part. jooq doesn't want to except valid
+ // transaction sql syntax because it makes total sense to ruin sql.
+ // final StringBuilder query = new StringBuilder("BEGIN\n");
for (final WriteConfig writeConfig : writeConfigs.values()) {
query.append(String.format("DROP TABLE IF EXISTS %s;\n", writeConfig.getTableName()));
query.append(String.format("ALTER TABLE %s RENAME TO %s;\n", writeConfig.getTmpTableName(), writeConfig.getTableName()));
}
-// query.append("COMMIT");
+ // query.append("COMMIT");
return ctx.execute(query.toString());
});
@@ -302,6 +302,7 @@ private static void cleanupTmpTables(BasicDataSource connectionPool, Map getWriteBuffer() {
return writeBuffer;
}
+
}
private BasicDataSource getConnectionPool(JsonNode config) {
@@ -339,7 +341,6 @@ private BasicDataSource getConnectionPool(JsonNode config) {
config.get("database").asText()));
}
-
public static void main(String[] args) throws Exception {
final Destination destination = new PostgresDestination();
LOGGER.info("starting destination: {}", PostgresDestination.class);
diff --git a/airbyte-integrations/postgres-destination/src/test-integration/java/io/airbyte/integrations/destination/postgres/PostgresIntegrationTest.java b/airbyte-integrations/postgres-destination/src/test-integration/java/io/airbyte/integrations/destination/postgres/PostgresIntegrationTest.java
index 4ade753165ff..42e3f5058a96 100644
--- a/airbyte-integrations/postgres-destination/src/test-integration/java/io/airbyte/integrations/destination/postgres/PostgresIntegrationTest.java
+++ b/airbyte-integrations/postgres-destination/src/test-integration/java/io/airbyte/integrations/destination/postgres/PostgresIntegrationTest.java
@@ -31,8 +31,10 @@
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.DatabaseHelper;
import io.airbyte.integrations.base.TestDestination;
+import java.util.AbstractMap;
import java.util.List;
-import org.apache.commons.dbcp2.BasicDataSource;
+import java.util.Map.Entry;
+import java.util.stream.Collectors;
import org.jooq.Record;
import org.testcontainers.containers.PostgreSQLContainer;
@@ -66,21 +68,26 @@ protected JsonNode getInvalidConfig() {
.put("schema", "public")
.put("port", db.getFirstMappedPort())
.put("database", db.getDatabaseName())
- .build()); }
+ .build());
+ }
@Override
- protected List recordRetriever(TestDestinationEnv testEnv, String streamName) throws Exception {
- BasicDataSource pool =
- DatabaseHelper.getConnectionPool(db.getUsername(), db.getPassword(), db.getJdbcUrl());
+ protected List recordRetriever(TestDestinationEnv env, String streamName) throws Exception {
return DatabaseHelper.query(
- pool,
+ DatabaseHelper.getConnectionPool(db.getUsername(), db.getPassword(), db.getJdbcUrl()),
ctx -> ctx
- .fetch(String.format("SELECT * FROM public.%s ORDER BY inserted_at ASC;", streamName))
+ .fetch(String.format("SELECT * FROM %s ORDER BY ab_inserted_at ASC;", streamName))
.stream()
- .map(nestedRecords -> ((Record) nestedRecords.get(0))) // todo ?
.map(Record::intoMap)
- .map(Jsons::jsonNode)
+ .map(r -> r.entrySet().stream().map(e -> {
+ if (e.getValue().getClass().equals(org.jooq.JSONB.class)) {
+ return new AbstractMap.SimpleImmutableEntry<>(e.getKey(), e.getValue().toString());
+ }
+ return e;
+ }).collect(Collectors.toMap(Entry::getKey, Entry::getValue)))
+ .map(r -> (String) r.get(PostgresDestination.COLUMN_NAME))
+ .map(Jsons::deserialize)
.collect(toList()));
}
diff --git a/airbyte-integrations/postgres-destination/src/test/java/io/airbyte/integrations/destination/postgres/PostgresDestinationTest.java b/airbyte-integrations/postgres-destination/src/test/java/io/airbyte/integrations/destination/postgres/PostgresDestinationTest.java
index 62028c9517a1..406a5c2f32b0 100644
--- a/airbyte-integrations/postgres-destination/src/test/java/io/airbyte/integrations/destination/postgres/PostgresDestinationTest.java
+++ b/airbyte-integrations/postgres-destination/src/test/java/io/airbyte/integrations/destination/postgres/PostgresDestinationTest.java
@@ -26,13 +26,16 @@
import static java.util.stream.Collectors.toList;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.config.DataType;
@@ -47,8 +50,8 @@
import io.airbyte.singer.SingerMessage;
import io.airbyte.singer.SingerMessage.Type;
import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
+import java.sql.SQLException;
+import java.util.AbstractMap;
import java.util.List;
import java.util.Map.Entry;
import java.util.Set;
@@ -147,7 +150,6 @@ void testWriteSuccess() throws Exception {
consumer.accept(SINGER_MESSAGE_RECORD);
consumer.close();
- // verify that the file is parsable as json (sanity check since the quoting is so goofy).
List usersActual = recordRetriever(USERS_STREAM_NAME);
final List expectedUsersJson = Lists.newArrayList(SINGER_MESSAGE_USERS1.getRecord(), SINGER_MESSAGE_USERS2.getRecord());
assertEquals(expectedUsersJson, usersActual);
@@ -155,32 +157,62 @@ void testWriteSuccess() throws Exception {
List tasksActual = recordRetriever(TASKS_STREAM_NAME);
final List expectedTasksJson = Lists.newArrayList(SINGER_MESSAGE_TASKS1.getRecord(), SINGER_MESSAGE_TASKS2.getRecord());
assertEquals(expectedTasksJson, tasksActual);
+
+ assertTmpTablesNotPresent(CATALOG.getStreams().stream().map(Stream::getName).collect(Collectors.toList()));
}
+ @SuppressWarnings("ResultOfMethodCallIgnored")
@Test
void testWriteFailure() throws Exception {
+ // hack to force an exception to be thrown from within the consumer.
+ final SingerMessage spiedMessage = spy(SINGER_MESSAGE_USERS1);
+ doThrow(new RuntimeException()).when(spiedMessage).getStream();
+
+ final DestinationConsumer consumer = spy(new PostgresDestination().write(config, CATALOG));
+
+ assertThrows(RuntimeException.class, () -> consumer.accept(spiedMessage));
+ consumer.accept(SINGER_MESSAGE_USERS2);
+ consumer.close();
+
+ final List tableNames = CATALOG.getStreams().stream().map(Stream::getName).collect(toList());
+ assertTmpTablesNotPresent(CATALOG.getStreams().stream().map(Stream::getName).collect(Collectors.toList()));
+ // assert that no tables were created.
+ assertTrue(fetchNamesOfTablesInDb().stream().noneMatch(tableName -> tableNames.stream().anyMatch(tableName::startsWith)));
+ }
+
+ private List fetchNamesOfTablesInDb() throws SQLException {
+ return DatabaseHelper.query(getDatabasePool(),
+ ctx -> ctx.fetch("SELECT table_name FROM information_schema.tables WHERE table_schema='public' AND table_type='BASE TABLE';"))
+ .stream()
+ .map(record -> (String) record.get("table_name")).collect(Collectors.toList());
+ }
+
+ private void assertTmpTablesNotPresent(List tableNames) throws SQLException {
+ Set tmpTableNamePrefixes = tableNames.stream().map(name -> name + "_").collect(Collectors.toSet());
+ assertTrue(fetchNamesOfTablesInDb().stream().noneMatch(tableName -> tmpTableNamePrefixes.stream().anyMatch(tableName::startsWith)));
+ }
+
+ private BasicDataSource getDatabasePool() {
+ return DatabaseHelper.getConnectionPool(db.getUsername(), db.getPassword(), db.getJdbcUrl());
}
private List recordRetriever(String streamName) throws Exception {
- BasicDataSource pool =
- DatabaseHelper.getConnectionPool(db.getUsername(), db.getPassword(), db.getJdbcUrl());
return DatabaseHelper.query(
- pool,
+ getDatabasePool(),
ctx -> ctx
- .fetch(String.format("SELECT * FROM public.%s ORDER BY ab_inserted_at ASC;", streamName))
-// .fetch(String.format("SELECT * FROM %s ORDER BY inserted_at ASC;", streamName))
+ .fetch(String.format("SELECT * FROM %s ORDER BY ab_inserted_at ASC;", streamName))
.stream()
.map(Record::intoMap)
.map(r -> r.entrySet().stream().map(e -> {
- // todo (cgardens) - bad in place mutation.
if (e.getValue().getClass().equals(org.jooq.JSONB.class)) {
- e.setValue(e.getValue().toString());
+ return new AbstractMap.SimpleImmutableEntry<>(e.getKey(), e.getValue().toString());
}
return e;
}).collect(Collectors.toMap(Entry::getKey, Entry::getValue)))
- .map(r -> (String)r.get("data"))
+ .map(r -> (String) r.get(PostgresDestination.COLUMN_NAME))
.map(Jsons::deserialize)
.collect(toList()));
}
+
}
diff --git a/airbyte-persistent-queue/src/main/java/io/airbyte/persistentqueue/AbstractCloseableInputQueue.java b/airbyte-persistent-queue/src/main/java/io/airbyte/persistentqueue/AbstractCloseableInputQueue.java
index 3fcf551f3dc0..7ebab3026e01 100644
--- a/airbyte-persistent-queue/src/main/java/io/airbyte/persistentqueue/AbstractCloseableInputQueue.java
+++ b/airbyte-persistent-queue/src/main/java/io/airbyte/persistentqueue/AbstractCloseableInputQueue.java
@@ -1,3 +1,27 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 Airbyte
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
package io.airbyte.persistentqueue;
import com.google.common.base.Preconditions;
@@ -31,14 +55,11 @@ public boolean offer(E element) {
}
/*
- * (non javadoc comment to avoid autoformatting making this impossible to read).
- * Blocking call to dequeue an element.
- * | hasValue | inputClosed | behavior |
- * ----------------------------------------
- * | true | false | return val |
- * | false | false | block until |
- * | true | true | return val |
- * | false | true | return null |
+ * (non javadoc comment to avoid autoformatting making this impossible to read). Blocking call to
+ * dequeue an element. | hasValue | inputClosed | behavior |
+ * ---------------------------------------- | true | false | return val | | false | false | block
+ * until | | true | true | return val | | false | true | return null |
+ *
* @return a value from the queue or null if the queue is empty and will not receive anymore data.
*/
@Override
@@ -80,6 +101,7 @@ public Iterator iterator() {
Preconditions.checkState(!closed.get());
return new AbstractIterator<>() {
+
@Override
protected E computeNext() {
final E poll = poll();
@@ -88,6 +110,7 @@ protected E computeNext() {
}
return poll;
}
+
};
}
@@ -96,4 +119,5 @@ public void close() throws Exception {
closed.set(true);
closeInternal();
}
+
}
diff --git a/airbyte-persistent-queue/src/main/java/io/airbyte/persistentqueue/BigQueueWrapper.java b/airbyte-persistent-queue/src/main/java/io/airbyte/persistentqueue/BigQueueWrapper.java
index 55ec03b138c2..93b06659fc4a 100644
--- a/airbyte-persistent-queue/src/main/java/io/airbyte/persistentqueue/BigQueueWrapper.java
+++ b/airbyte-persistent-queue/src/main/java/io/airbyte/persistentqueue/BigQueueWrapper.java
@@ -1,3 +1,27 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 Airbyte
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
package io.airbyte.persistentqueue;
import com.google.common.base.Preconditions;
@@ -55,4 +79,5 @@ protected void closeInternal() throws Exception {
queue.close();
queue.gc();
}
+
}
diff --git a/airbyte-persistent-queue/src/main/java/io/airbyte/persistentqueue/CloseableInputQueue.java b/airbyte-persistent-queue/src/main/java/io/airbyte/persistentqueue/CloseableInputQueue.java
index bfca25e9d2fe..1128f1860543 100644
--- a/airbyte-persistent-queue/src/main/java/io/airbyte/persistentqueue/CloseableInputQueue.java
+++ b/airbyte-persistent-queue/src/main/java/io/airbyte/persistentqueue/CloseableInputQueue.java
@@ -1,3 +1,27 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 Airbyte
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
package io.airbyte.persistentqueue;
import java.util.Queue;
@@ -8,4 +32,5 @@ public interface CloseableInputQueue extends Queue, AutoCloseable {
* Calling this signals that no more records will be written to the queue by ANY thread.
*/
void closeInput();
+
}
diff --git a/airbyte-persistent-queue/src/test/java/io/airbyte/persistentqueue/AbstractCloseableInputQueueTest.java b/airbyte-persistent-queue/src/test/java/io/airbyte/persistentqueue/AbstractCloseableInputQueueTest.java
index a450d30baef1..aa10f9637729 100644
--- a/airbyte-persistent-queue/src/test/java/io/airbyte/persistentqueue/AbstractCloseableInputQueueTest.java
+++ b/airbyte-persistent-queue/src/test/java/io/airbyte/persistentqueue/AbstractCloseableInputQueueTest.java
@@ -1,5 +1,28 @@
-package io.airbyte.persistentqueue;
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 Airbyte
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+package io.airbyte.persistentqueue;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -19,6 +42,7 @@
* Test the contract of {@link AbstractCloseableInputQueueTest} state machine.
*/
class AbstractCloseableInputQueueTest {
+
private CloseableInputQueue queue;
@BeforeEach
@@ -27,7 +51,7 @@ void setup() {
}
/*
- OFFER CONTRACT
+ * OFFER CONTRACT
*/
@Test
void testOfferInputClosedFalse() {
@@ -41,7 +65,7 @@ void testOfferInputClosed() {
}
/*
- POLL CONTRACT
+ * POLL CONTRACT
*/
@Test
void testPollHasValueInputClosedFalse() {
@@ -74,9 +98,8 @@ void testPollHasValueFalseInputClosedFalse() throws InterruptedException {
});
getterThread.start();
-
final Thread setterThread = new Thread(() -> {
- while(!hasAttempted.get()) {
+ while (!hasAttempted.get()) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
@@ -100,7 +123,7 @@ void testPollHasValueFalseInputClosedFalse() throws InterruptedException {
}
/*
- ITERATOR CONTRACT
+ * ITERATOR CONTRACT
*/
@Test
void testIterator() {
@@ -115,7 +138,7 @@ void testIterator() {
}
/*
- CLOSED CONTRACT
+ * CLOSED CONTRACT
*/
@Test
void testClosed() throws Exception {
@@ -127,7 +150,8 @@ void testClosed() throws Exception {
assertThrows(IllegalStateException.class, () -> queue.iterator());
}
- private static class TestQueue extends AbstractCloseableInputQueue{
+ private static class TestQueue extends AbstractCloseableInputQueue {
+
private final List list = new ArrayList<>();
@Override
@@ -155,6 +179,7 @@ public int size() {
public String peek() {
return list.size() == 0 ? null : list.get(list.size() - 1);
}
+
}
-}
\ No newline at end of file
+}
diff --git a/airbyte-persistent-queue/src/test/java/io/airbyte/persistentqueue/BigQueueWrapperTest.java b/airbyte-persistent-queue/src/test/java/io/airbyte/persistentqueue/BigQueueWrapperTest.java
index 41d5854e9f13..665051af7090 100644
--- a/airbyte-persistent-queue/src/test/java/io/airbyte/persistentqueue/BigQueueWrapperTest.java
+++ b/airbyte-persistent-queue/src/test/java/io/airbyte/persistentqueue/BigQueueWrapperTest.java
@@ -1,5 +1,28 @@
-package io.airbyte.persistentqueue;
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 Airbyte
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+package io.airbyte.persistentqueue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
@@ -83,9 +106,8 @@ void testPollHasValueFalseInputClosedFalse() throws InterruptedException {
});
getterThread.start();
-
final Thread setterThread = new Thread(() -> {
- while(!hasAttempted.get()) {
+ while (!hasAttempted.get()) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
@@ -113,4 +135,4 @@ private static byte[] toBytes(String string) {
return string.getBytes(Charsets.UTF_8);
}
-}
\ No newline at end of file
+}
diff --git a/airbyte-scheduler/src/main/java/io/airbyte/scheduler/SchedulerApp.java b/airbyte-scheduler/src/main/java/io/airbyte/scheduler/SchedulerApp.java
index 35aec65e739d..2cbc7c901c19 100644
--- a/airbyte-scheduler/src/main/java/io/airbyte/scheduler/SchedulerApp.java
+++ b/airbyte-scheduler/src/main/java/io/airbyte/scheduler/SchedulerApp.java
@@ -48,9 +48,10 @@
import org.slf4j.LoggerFactory;
/**
- * The SchedulerApp is responsible for finding new scheduled jobs that need to be run and to launch them. The current implementation uses a thread
- * pool on the scheduler's machine to launch the jobs. One thread is reserved for the job submitter, which is responsible for finding and launching
- * new jobs.
+ * The SchedulerApp is responsible for finding new scheduled jobs that need to be run and to launch
+ * them. The current implementation uses a thread pool on the scheduler's machine to launch the
+ * jobs. One thread is reserved for the job submitter, which is responsible for finding and
+ * launching new jobs.
*/
public class SchedulerApp {
@@ -67,9 +68,9 @@ public class SchedulerApp {
private final ProcessBuilderFactory pbf;
public SchedulerApp(BasicDataSource connectionPool,
- Path configRoot,
- Path workspaceRoot,
- ProcessBuilderFactory pbf) {
+ Path configRoot,
+ Path workspaceRoot,
+ ProcessBuilderFactory pbf) {
this.connectionPool = connectionPool;
this.configRoot = configRoot;
this.workspaceRoot = workspaceRoot;
From fcc1a4ef48a5029d8ff055454e1540adbb4949e7 Mon Sep 17 00:00:00 2001
From: cgardens
Date: Fri, 9 Oct 2020 22:11:41 -0700
Subject: [PATCH 5/6] clean up
---
.../java/io/airbyte/db/DatabaseHelper.java | 20 +++-
.../postgres/PostgresDestination.java | 98 +++++++++----------
.../AbstractCloseableInputQueue.java | 16 ++-
3 files changed, 74 insertions(+), 60 deletions(-)
diff --git a/airbyte-db/src/main/java/io/airbyte/db/DatabaseHelper.java b/airbyte-db/src/main/java/io/airbyte/db/DatabaseHelper.java
index 8f8dc54cfee9..45b6135681c3 100644
--- a/airbyte-db/src/main/java/io/airbyte/db/DatabaseHelper.java
+++ b/airbyte-db/src/main/java/io/airbyte/db/DatabaseHelper.java
@@ -30,12 +30,14 @@
import org.jooq.DSLContext;
import org.jooq.SQLDialect;
import org.jooq.impl.DSL;
+import org.jooq.*;
+import org.jooq.impl.*;
public class DatabaseHelper {
public static BasicDataSource getConnectionPool(String username,
- String password,
- String jdbcConnectionString) {
+ String password,
+ String jdbcConnectionString) {
BasicDataSource connectionPool = new BasicDataSource();
connectionPool.setDriverClassName("org.postgresql.Driver");
@@ -46,14 +48,24 @@ public static BasicDataSource getConnectionPool(String username,
return connectionPool;
}
- public static T query(BasicDataSource connectionPool, ContextQueryFunction transform)
- throws SQLException {
+ public static T query(BasicDataSource connectionPool, ContextQueryFunction transform) throws SQLException {
+
try (Connection connection = connectionPool.getConnection()) {
DSLContext context = getContext(connection);
return transform.apply(context);
}
}
+ public static T transaction(BasicDataSource connectionPool, ContextQueryFunction transform) throws SQLException {
+ try (Connection connection = connectionPool.getConnection()) {
+ DSLContext context = getContext(connection);
+ return context.transactionResult(configuration -> {
+ DSLContext transactionContext = DSL.using(configuration);
+ return transform.apply(transactionContext);
+ });
+ }
+ }
+
private static DSLContext getContext(Connection connection) {
return DSL.using(connection, SQLDialect.POSTGRES);
}
diff --git a/airbyte-integrations/postgres-destination/src/main/java/io/airbyte/integrations/destination/postgres/PostgresDestination.java b/airbyte-integrations/postgres-destination/src/main/java/io/airbyte/integrations/destination/postgres/PostgresDestination.java
index b86249c23acf..f0c487f44f44 100644
--- a/airbyte-integrations/postgres-destination/src/main/java/io/airbyte/integrations/destination/postgres/PostgresDestination.java
+++ b/airbyte-integrations/postgres-destination/src/main/java/io/airbyte/integrations/destination/postgres/PostgresDestination.java
@@ -46,6 +46,7 @@
import io.airbyte.singer.SingerMessage.Type;
import java.io.IOException;
import java.nio.file.Files;
+import java.nio.file.Path;
import java.sql.SQLException;
import java.time.Instant;
import java.util.HashMap;
@@ -85,7 +86,7 @@ public StandardCheckConnectionOutput check(JsonNode config) {
connectionPool.close();
} catch (Exception e) {
- // todo (cgardens) - better error messaging.
+ // todo (cgardens) - better error messaging for common cases. e.g. wrong password.
return new StandardCheckConnectionOutput().withStatus(Status.FAILURE).withMessage(e.getMessage());
}
@@ -107,20 +108,18 @@ public StandardDiscoverSchemaOutput discover(JsonNode config) {
* 2. Accumulate records in a buffer. One buffer per stream.
*
*
- * 3. As records accumulate write them in batch to the database. We set a minimum numbers of records
- * before writing to avoid wasteful record-wise writes.
+ * 3. As records accumulate write them in batch to the database. We set a minimum numbers of records before writing to avoid wasteful record-wise
+ * writes.
*
*
- * 4. Once all records have been written to buffer, flush the buffer and write any remaining records
- * to the database (regardless of how few are left).
+ * 4. Once all records have been written to buffer, flush the buffer and write any remaining records to the database (regardless of how few are
+ * left).
*
*
- * 5. In a single transaction, delete the target tables if they exist and rename the temp tables to
- * the final table name.
+ * 5. In a single transaction, delete the target tables if they exist and rename the temp tables to the final table name.
*
*
- * @param config - integration-specific configuration object as json. e.g. { "username": "airbyte",
- * "password": "super secure" }
+ * @param config - integration-specific configuration object as json. e.g. { "username": "airbyte", "password": "super secure" }
* @param schema - schema of the incoming messages.
* @return consumer that writes singer messages to the database.
* @throws Exception - anything could happen!
@@ -144,8 +143,7 @@ public DestinationConsumer write(JsonNode config, Schema schema)
+ ");",
tmpTableName, COLUMN_NAME)));
- // todo (cgardens) -temp dir should be in the job root.
- final BigQueueWrapper writeBuffer = new BigQueueWrapper(Files.createTempDirectory(stream.getName()), stream.getName());
+ final BigQueueWrapper writeBuffer = new BigQueueWrapper(Path.of(stream.getName()), stream.getName());
writeBuffers.put(stream.getName(), new WriteConfig(tableName, tmpTableName, writeBuffer));
}
@@ -180,49 +178,27 @@ public RecordConsumer(BasicDataSource connectionPool, Map w
THREAD_DELAY_MILLIS,
THREAD_DELAY_MILLIS,
TimeUnit.MILLISECONDS);
-
}
/**
* Write records from buffer to postgres in batch.
*
- * @param minRecords - the minimum number of records in the buffer before writing. helps avoid
- * wastefully writing one record at a time.
- * @param batchSize - the maximum number of records to write in a single query.
- * @param writeBuffers - map of stream name to its respective buffer.
+ * @param minRecords - the minimum number of records in the buffer before writing. helps avoid wastefully writing one record at a time.
+ * @param batchSize - the maximum number of records to write in a single insert.
+ * @param writeBuffers - map of stream name to its respective buffer.
* @param connectionPool - connection to the db.
*/
private static void writeStreamsWithNRecords(
- int minRecords,
- int batchSize,
- Map writeBuffers, // todo can trim this down.
- BasicDataSource connectionPool) {
+ int minRecords,
+ int batchSize,
+ Map writeBuffers,
+ BasicDataSource connectionPool) {
for (final Map.Entry entry : writeBuffers.entrySet()) {
final String tmpTableName = entry.getValue().getTmpTableName();
final CloseableInputQueue writeBuffer = entry.getValue().getWriteBuffer();
while (writeBuffer.size() > minRecords) {
try {
- DatabaseHelper.query(connectionPool, ctx -> {
- final StringBuilder query = new StringBuilder(String.format("INSERT INTO %s(%s)\n", tmpTableName, COLUMN_NAME))
- .append("VALUES \n");
- // todo (cgardens) - hack.
- boolean first = true;
- // todo (cgardens) - stop early if we are getting nulls.
- for (int i = 0; i <= batchSize; i++) {
- final byte[] record = writeBuffer.poll();
- if (record != null) {
- // don't write comma before the first record.
- if (first) {
- first = false;
- } else {
- query.append(", \n");
- }
- query.append(String.format("('%s')", new String(record, Charsets.UTF_8)));
- }
- }
- query.append(";");
- return ctx.execute(query.toString());
- });
+ DatabaseHelper.query(connectionPool, ctx -> ctx.execute(buildWriteQuery(batchSize, writeBuffer, tmpTableName)));
} catch (SQLException e) {
throw new RuntimeException(e);
}
@@ -230,6 +206,34 @@ private static void writeStreamsWithNRecords(
}
}
+ // build the following query:
+ // INSERT INTO (data)
+ // VALUES
+ // ({ "my": "data" }),
+ // ({ "my": "data" });
+ private static String buildWriteQuery(int batchSize, CloseableInputQueue writeBuffer, String tmpTableName) {
+ final StringBuilder query = new StringBuilder(String.format("INSERT INTO %s(%s)\n", tmpTableName, COLUMN_NAME))
+ .append("VALUES \n");
+ boolean firstRecordInQuery = true;
+ for (int i = 0; i <= batchSize; i++) {
+ final byte[] record = writeBuffer.poll();
+ if (record == null) {
+ break;
+ }
+
+ // don't write comma before the first record.
+ if (firstRecordInQuery) {
+ firstRecordInQuery = false;
+ } else {
+ query.append(", \n");
+ }
+ query.append(String.format("('%s')", new String(record, Charsets.UTF_8)));
+ }
+ query.append(";");
+
+ return query.toString();
+ }
+
@Override
public void acceptTracked(SingerMessage singerMessage) {
// ignore other message types.
@@ -240,8 +244,6 @@ public void acceptTracked(SingerMessage singerMessage) {
Jsons.serialize(schema), Jsons.serialize(singerMessage)));
}
- // todo (cgardens) - we should let this throw an io exception. Maybe we should be throwing known
- // airbyte exceptions.
writeConfigs.get(singerMessage.getStream()).getWriteBuffer().offer(Jsons.toBytes(singerMessage.getRecord()));
}
}
@@ -249,9 +251,7 @@ public void acceptTracked(SingerMessage singerMessage) {
@Override
public void close(boolean hasFailed) throws Exception {
// signal no more writes to buffers.
- for (final WriteConfig writeConfig : writeConfigs.values()) {
- writeConfig.getWriteBuffer().closeInput();
- }
+ writeConfigs.values().forEach(writeConfig -> writeConfig.getWriteBuffer().closeInput());
if (hasFailed) {
LOGGER.error("executing on failed close procedure.");
@@ -270,17 +270,13 @@ public void close(boolean hasFailed) throws Exception {
writeStreamsWithNRecords(0, 500, writeConfigs, connectionPool);
// delete tables if already exist. copy new tables into their place.
- DatabaseHelper.query(connectionPool, ctx -> {
+ DatabaseHelper.transaction(connectionPool, ctx -> {
final StringBuilder query = new StringBuilder();
- // todo (cgardens) - need to actually do the transaction part. jooq doesn't want to except valid
- // transaction sql syntax because it makes total sense to ruin sql.
- // final StringBuilder query = new StringBuilder("BEGIN\n");
for (final WriteConfig writeConfig : writeConfigs.values()) {
query.append(String.format("DROP TABLE IF EXISTS %s;\n", writeConfig.getTableName()));
query.append(String.format("ALTER TABLE %s RENAME TO %s;\n", writeConfig.getTmpTableName(), writeConfig.getTableName()));
}
- // query.append("COMMIT");
return ctx.execute(query.toString());
});
diff --git a/airbyte-persistent-queue/src/main/java/io/airbyte/persistentqueue/AbstractCloseableInputQueue.java b/airbyte-persistent-queue/src/main/java/io/airbyte/persistentqueue/AbstractCloseableInputQueue.java
index 7ebab3026e01..95be9a589dca 100644
--- a/airbyte-persistent-queue/src/main/java/io/airbyte/persistentqueue/AbstractCloseableInputQueue.java
+++ b/airbyte-persistent-queue/src/main/java/io/airbyte/persistentqueue/AbstractCloseableInputQueue.java
@@ -54,11 +54,17 @@ public boolean offer(E element) {
return enqueueInternal(element);
}
- /*
- * (non javadoc comment to avoid autoformatting making this impossible to read). Blocking call to
- * dequeue an element. | hasValue | inputClosed | behavior |
- * ---------------------------------------- | true | false | return val | | false | false | block
- * until | | true | true | return val | | false | true | return null |
+ // Blocking call to dequeue an element.
+ // (comment is up here to avoid autoformat scrambling it up.)
+ // | hasValue | inputClosed | behavior |
+ // ----------------------------------------
+ // | true | false | return val |
+ // | false | false | block until |
+ // | true | true | return val |
+ // | false | true | return null |
+
+ /**
+ * Blocking call to dequeue an element.
*
* @return a value from the queue or null if the queue is empty and will not receive anymore data.
*/
From 4f4d2c8969ab0b491d9afafc0ec161adbd95f401 Mon Sep 17 00:00:00 2001
From: cgardens
Date: Fri, 9 Oct 2020 22:21:54 -0700
Subject: [PATCH 6/6] clean up
---
.../destination/postgres/PostgresDestination.java | 5 ++---
.../{BigQueueWrapper.java => BigQueue.java} | 9 ++++++---
.../{BigQueueWrapperTest.java => BigQueueTest.java} | 4 ++--
3 files changed, 10 insertions(+), 8 deletions(-)
rename airbyte-persistent-queue/src/main/java/io/airbyte/persistentqueue/{BigQueueWrapper.java => BigQueue.java} (84%)
rename airbyte-persistent-queue/src/test/java/io/airbyte/persistentqueue/{BigQueueWrapperTest.java => BigQueueTest.java} (97%)
diff --git a/airbyte-integrations/postgres-destination/src/main/java/io/airbyte/integrations/destination/postgres/PostgresDestination.java b/airbyte-integrations/postgres-destination/src/main/java/io/airbyte/integrations/destination/postgres/PostgresDestination.java
index f0c487f44f44..e22cd914a42a 100644
--- a/airbyte-integrations/postgres-destination/src/main/java/io/airbyte/integrations/destination/postgres/PostgresDestination.java
+++ b/airbyte-integrations/postgres-destination/src/main/java/io/airbyte/integrations/destination/postgres/PostgresDestination.java
@@ -40,12 +40,11 @@
import io.airbyte.integrations.base.DestinationConsumer;
import io.airbyte.integrations.base.FailureTrackingConsumer;
import io.airbyte.integrations.base.IntegrationRunner;
-import io.airbyte.persistentqueue.BigQueueWrapper;
+import io.airbyte.persistentqueue.BigQueue;
import io.airbyte.persistentqueue.CloseableInputQueue;
import io.airbyte.singer.SingerMessage;
import io.airbyte.singer.SingerMessage.Type;
import java.io.IOException;
-import java.nio.file.Files;
import java.nio.file.Path;
import java.sql.SQLException;
import java.time.Instant;
@@ -143,7 +142,7 @@ public DestinationConsumer write(JsonNode config, Schema schema)
+ ");",
tmpTableName, COLUMN_NAME)));
- final BigQueueWrapper writeBuffer = new BigQueueWrapper(Path.of(stream.getName()), stream.getName());
+ final BigQueue writeBuffer = new BigQueue(Path.of(stream.getName()), stream.getName());
writeBuffers.put(stream.getName(), new WriteConfig(tableName, tmpTableName, writeBuffer));
}
diff --git a/airbyte-persistent-queue/src/main/java/io/airbyte/persistentqueue/BigQueueWrapper.java b/airbyte-persistent-queue/src/main/java/io/airbyte/persistentqueue/BigQueue.java
similarity index 84%
rename from airbyte-persistent-queue/src/main/java/io/airbyte/persistentqueue/BigQueueWrapper.java
rename to airbyte-persistent-queue/src/main/java/io/airbyte/persistentqueue/BigQueue.java
index 93b06659fc4a..e62d9ff594ee 100644
--- a/airbyte-persistent-queue/src/main/java/io/airbyte/persistentqueue/BigQueueWrapper.java
+++ b/airbyte-persistent-queue/src/main/java/io/airbyte/persistentqueue/BigQueue.java
@@ -30,12 +30,14 @@
import java.io.IOException;
import java.nio.file.Path;
-// BigQueue is threadsafe.
-public class BigQueueWrapper extends AbstractCloseableInputQueue implements CloseableInputQueue {
+/**
+ * Wraps BigQueueImpl behind Airbyte persistent queue interface. BigQueueImpl is threadsafe.
+ */
+public class BigQueue extends AbstractCloseableInputQueue implements CloseableInputQueue {
private final IBigQueue queue;
- public BigQueueWrapper(Path persistencePath, String queueName) throws IOException {
+ public BigQueue(Path persistencePath, String queueName) throws IOException {
queue = new BigQueueImpl(persistencePath.toString(), queueName);
}
@@ -76,6 +78,7 @@ public int size() {
@Override
protected void closeInternal() throws Exception {
+ // todo (cgardens) - this barfs out a huge warning. known issue with the lib: https://github.com/bulldog2011/bigqueue/issues/35.
queue.close();
queue.gc();
}
diff --git a/airbyte-persistent-queue/src/test/java/io/airbyte/persistentqueue/BigQueueWrapperTest.java b/airbyte-persistent-queue/src/test/java/io/airbyte/persistentqueue/BigQueueTest.java
similarity index 97%
rename from airbyte-persistent-queue/src/test/java/io/airbyte/persistentqueue/BigQueueWrapperTest.java
rename to airbyte-persistent-queue/src/test/java/io/airbyte/persistentqueue/BigQueueTest.java
index 665051af7090..777c935ed785 100644
--- a/airbyte-persistent-queue/src/test/java/io/airbyte/persistentqueue/BigQueueWrapperTest.java
+++ b/airbyte-persistent-queue/src/test/java/io/airbyte/persistentqueue/BigQueueTest.java
@@ -41,13 +41,13 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-class BigQueueWrapperTest {
+class BigQueueTest {
private CloseableInputQueue queue;
@BeforeEach
void setup() throws IOException {
- queue = new BigQueueWrapper(Files.createTempDirectory("qtest"), "test");
+ queue = new BigQueue(Files.createTempDirectory("qtest"), "test");
}
@AfterEach