Skip to content

Commit

Permalink
postgres destination (java) (#539)
Browse files Browse the repository at this point in the history
  • Loading branch information
cgardens authored Oct 13, 2020
1 parent eaa40ed commit adc41a7
Show file tree
Hide file tree
Showing 13 changed files with 820 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Charsets;
import java.io.IOException;
import java.util.Optional;

Expand Down Expand Up @@ -109,4 +110,8 @@ public static <T> T clone(final T object) {
return (T) deserialize(serialize(object), object.getClass());
}

public static byte[] toBytes(JsonNode jsonNode) {
return serialize(jsonNode).getBytes(Charsets.UTF_8);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import java.util.List;
Expand Down Expand Up @@ -193,6 +194,12 @@ void testMutateTypeToArrayStandard() {
Assertions.assertEquals(expectedWithoutArrayType, actualWithStringType);
}

@Test
void testToBytes() {
final String jsonString = "{\"test\":\"abc\",\"type\":[\"object\"]}";
Assertions.assertArrayEquals(jsonString.getBytes(Charsets.UTF_8), Jsons.toBytes(Jsons.deserialize(jsonString)));
}

private static class ToClass {

@JsonProperty("str")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ protected JsonNode getFailCheckConfig() {
}

@Override
protected List<JsonNode> retrieveRecords(TestDestinationEnv testEnv) throws Exception {
protected List<JsonNode> retrieveRecords(TestDestinationEnv testEnv, String streamName) throws Exception {
final List<Path> list = Files.list(testEnv.getLocalRoot().resolve(RELATIVE_PATH)).collect(Collectors.toList());
// todo (cgardens) - this should be here. add a retrieve tables abstract method to verify this.
assertEquals(1, list.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
package io.airbyte.integrations.base;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -93,13 +94,13 @@ public abstract class TestDestination {
/**
* Function that returns all of the records in destination as json at the time this method is
* invoked. These will be used to check that the data actually written is what should actually be
* there.
* there. Note: this returns a set and does not test any order guarantees.
*
* @param testEnv - information about the test environment.
* @return All of the records in the destination at the time this method is invoked.
* @throws Exception - can throw any exception, test framework will handle.
*/
protected abstract List<JsonNode> retrieveRecords(TestDestinationEnv testEnv) throws Exception;
protected abstract List<JsonNode> retrieveRecords(TestDestinationEnv testEnv, String streamName) throws Exception;

/**
* Function that performs any setup of external resources required for the test. e.g. instantiate a
Expand Down Expand Up @@ -180,7 +181,7 @@ void testSync(String messagesFilename, String catalogFilename) throws Exception
.map(record -> Jsons.deserialize(record, SingerMessage.class)).collect(Collectors.toList());
runSync(messages, catalog);

assertSameMessages(messages, retrieveRecords(testEnv));
assertSameMessages(messages, retrieveRecords(testEnv, catalog.getStreams().get(0).getName()));
}

/**
Expand All @@ -201,7 +202,7 @@ void testSecondSync() throws Exception {
.put("NZD", 700)
.build())));
runSync(secondSyncMessages, catalog);
assertSameMessages(secondSyncMessages, retrieveRecords(testEnv));
assertSameMessages(secondSyncMessages, retrieveRecords(testEnv, catalog.getStreams().get(0).getName()));
}

private void runSync(List<SingerMessage> messages, Schema catalog) throws IOException, WorkerException {
Expand All @@ -222,7 +223,10 @@ private void assertSameMessages(List<SingerMessage> expected, List<JsonNode> act
.filter(message -> message.getType() == Type.RECORD)
.map(SingerMessage::getRecord)
.collect(Collectors.toList());
assertEquals(expectedJson, actual);
// we want to ignore order in this comparison.
assertEquals(expectedJson.size(), actual.size());
assertTrue(expectedJson.containsAll(actual));
assertTrue(actual.containsAll(expectedJson));
}

public static class TestDestinationEnv {
Expand Down
4 changes: 2 additions & 2 deletions airbyte-integrations/java-template-destination/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
1. e.g.
```
mkdir -p airbyte-integrations/bigquery-destination/src/main/java/io/airbyte/integrations/destination/bigquery
mv airbyte-integrations/java-template-destination/src/main/java/io/airbyte/integrations/destination/template/DestinationTemplate.java airbyte-integrations/bigquery-destination/src/main/java/io/airbyte/integrations/destination/bigquery/DestinationTemplate.java
rm -r airbyte-integrations/java-template-destination/src/main/java/io/airbyte/integrations/destination/template
mv airbyte-integrations/bigquery-destination/src/main/java/io/airbyte/integrations/destination/template/DestinationTemplate.java airbyte-integrations/bigquery-destination/src/main/java/io/airbyte/integrations/destination/bigquery/DestinationTemplate.java
rm -r airbyte-integrations/bigquery-destination/src/main/java/io/airbyte/integrations/destination/template
```
1. Rename the template class to an appropriate name for your integration.
1. e.g. `DestinationTemplate.java` to `BigQueryDestination.java`.
Expand Down
3 changes: 3 additions & 0 deletions airbyte-integrations/postgres-destination/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
*
!Dockerfile
!build
10 changes: 10 additions & 0 deletions airbyte-integrations/postgres-destination/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
FROM airbyte/base-java:dev

WORKDIR /airbyte

# fixme - replace java-template with the name of directory that this file is in.
ENV APPLICATION postgres-destination

COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1
70 changes: 70 additions & 0 deletions airbyte-integrations/postgres-destination/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import com.bmuschko.gradle.docker.tasks.image.DockerBuildImage
plugins {
id 'com.bmuschko.docker-remote-api'
id 'application'
}

sourceSets {
integrationTest {
java {
srcDir 'src/test-integration/java'
}
resources {
srcDir 'src/test-integration/resources'
}
}
}
test.dependsOn('compileIntegrationTestJava')

configurations {
integrationTestImplementation.extendsFrom testImplementation
integrationTestRuntimeOnly.extendsFrom testRuntimeOnly
}

dependencies {
implementation project(':airbyte-config:models')
implementation project(':airbyte-db')
implementation project(':airbyte-integrations:base-java')
implementation project(':airbyte-queue')
implementation project(':airbyte-singer')

testImplementation "org.testcontainers:postgresql:1.15.0-rc2"

integrationTestImplementation project(':airbyte-integrations:integration-test-lib')
integrationTestImplementation project(':airbyte-integrations:postgres-destination')
integrationTestImplementation "org.testcontainers:postgresql:1.15.0-rc2"
}

application {
mainClass = 'io.airbyte.integrations.destination.postgres.PostgresDestination'
}


def image = 'airbyte/airbyte-postgres-destination:dev'

task imageName {
doLast {
println "IMAGE $image"
}
}

task buildImage(type: DockerBuildImage) {
inputDir = projectDir
images.add(image)
dependsOn ':airbyte-integrations:base-java:buildImage'
}

task integrationTest(type: Test) {
testClassesDirs += sourceSets.integrationTest.output.classesDirs
classpath += sourceSets.integrationTest.runtimeClasspath

useJUnitPlatform()
testLogging() {
events "passed", "failed"
exceptionFormat "full"
}

mustRunAfter test
dependsOn(buildImage)
}

Loading

0 comments on commit adc41a7

Please sign in to comment.