Skip to content

Commit

Permalink
Add postgres JDBC source (#794)
Browse files Browse the repository at this point in the history
  • Loading branch information
sherifnada authored Nov 3, 2020
1 parent 852b761 commit 7059825
Show file tree
Hide file tree
Showing 15 changed files with 504 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"sourceDefinitionId": "decd338e-5647-4c0b-adf4-da0e75f5a750",
"name": "Postgres",
"dockerRepository": "airbyte/source-postgres-singer",
"dockerImageTag": "0.1.4",
"documentationUrl": "https://hub.docker.com/r/airbyte/integration-singer-postgres-source"
"dockerRepository": "airbyte/source-postgres",
"dockerImageTag": "0.1.0",
"documentationUrl": "https://hub.docker.com/r/airbyte/source-postgres"
}
2 changes: 1 addition & 1 deletion airbyte-db/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ dependencies {
api 'org.apache.commons:commons-dbcp2:2.7.0'
api 'org.jooq:jooq-meta:3.13.4'
api 'org.jooq:jooq:3.13.4'
api 'org.postgresql:postgresql:42.2.16'
api 'org.postgresql:postgresql:42.2.18'

testImplementation "org.testcontainers:postgresql:1.15.0-rc2"
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ public AirbyteConnectionStatus check(JsonNode config) {

return new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED);
} catch (Exception e) {
LOGGER.debug("Exception while checking connection: ", e);
return new AirbyteConnectionStatus()
.withStatus(Status.FAILED)
.withMessage("Can't connect with provided configuration.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ dependencies {
integrationTestImplementation 'com.fasterxml.jackson.core:jackson-databind'
integrationTestImplementation 'org.apache.commons:commons-text:1.9'
integrationTestImplementation "org.testcontainers:postgresql:1.15.0-rc2"
integrationTestImplementation "org.postgresql:postgresql:42.2.16"
integrationTestImplementation "org.postgresql:postgresql:42.2.18"

integrationTestImplementation project(':airbyte-config:models')
integrationTestImplementation project(':airbyte-workers')
Expand Down
3 changes: 3 additions & 0 deletions airbyte-integrations/connectors/source-postgres/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
*
!Dockerfile
!build
12 changes: 12 additions & 0 deletions airbyte-integrations/connectors/source-postgres/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
FROM airbyte/integration-base-java:dev

WORKDIR /airbyte

ENV APPLICATION source-postgres

COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.0
LABEL io.airbyte.name=airbyte/source-postgres
32 changes: 32 additions & 0 deletions airbyte-integrations/connectors/source-postgres/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
plugins {
id 'application'
}

apply from: rootProject.file('tools/gradle/commons/integrations/image.gradle')
apply from: rootProject.file('tools/gradle/commons/integrations/integration-test.gradle')

dependencies {
implementation "org.postgresql:postgresql:42.2.18"

implementation project(':airbyte-db')
implementation project(':airbyte-integrations:bases:base-java')
implementation project(':airbyte-protocol:models')
implementation project(':airbyte-integrations:connectors:source-jdbc')

testImplementation 'org.apache.commons:commons-text:1.9'
testImplementation 'org.apache.commons:commons-lang3:3.11'
testImplementation 'org.apache.commons:commons-dbcp2:2.7.0'
testImplementation 'org.testcontainers:postgresql:1.15.0-rc2'

testImplementation project(':airbyte-test-utils')

integrationTestImplementation project(':airbyte-integrations:bases:standard-source-test')
}

application {
mainClass = 'io.airbyte.integrations.source.postgres.PostgresSource'
}

buildImage.dependsOn(assemble)
buildImage.dependsOn(':airbyte-integrations:bases:base-java:buildImage')
integrationTest.dependsOn(buildImage)
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.integrations.source.postgres;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.Source;
import io.airbyte.integrations.source.jdbc.AbstractJdbcSource;
import org.jooq.SQLDialect;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PostgresSource extends AbstractJdbcSource implements Source {

private static final Logger LOGGER = LoggerFactory.getLogger(PostgresSource.class);

public PostgresSource() {
super("org.postgresql.Driver", SQLDialect.POSTGRES);
}

@Override
public JsonNode toJdbcConfig(JsonNode config) {
return Jsons.jsonNode(ImmutableMap.builder()
.put("username", config.get("username").asText())
.put("password", config.get("password").asText())
.put("jdbc_url", String.format("jdbc:postgresql://%s:%s/%s",
config.get("host").asText(),
config.get("port").asText(),
config.get("database").asText()))
.build());
}

public static void main(String[] args) throws Exception {
final Source source = new PostgresSource();
LOGGER.info("starting source: {}", PostgresSource.class);
new IntegrationRunner(source).run(args);
LOGGER.info("completed source: {}", PostgresSource.class);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
{
"documentationUrl": "https://docs.airbyte.io/integrations/sources/postgres",
"connectionSpecification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Postgres Source Spec",
"type": "object",
"required": ["host", "password", "port", "database", "username"],
"additionalProperties": false,
"properties": {
"host": {
"description": "Hostname of the database.",
"type": "string"
},
"port": {
"description": "Port of the database.",
"type": "integer",
"minimum": 0,
"maximum": 65536,
"default": 5432,
"examples": ["5432"]
},
"database": {
"description": "Name of the database.",
"type": "string"
},
"username": {
"description": "Username to use to access the database.",
"type": "string"
},
"password": {
"description": "Password associated with the username.",
"type": "string"
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.integrations.io.airbyte.integration_tests.sources;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.db.Database;
import io.airbyte.db.Databases;
import io.airbyte.integrations.base.TestSource;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.protocol.models.Field;
import java.util.Collections;
import java.util.List;
import org.jooq.SQLDialect;
import org.testcontainers.containers.PostgreSQLContainer;

public class PostgresIntegrationTests extends TestSource {

private static final String STREAM_NAME = "id_and_name";

private PostgreSQLContainer<?> container;
private JsonNode config;

@Override
protected void setup(TestDestinationEnv testEnv) throws Exception {
container = new PostgreSQLContainer<>("postgres:13-alpine");
container.start();

config = Jsons.jsonNode(ImmutableMap.builder()
.put("host", container.getHost())
.put("port", container.getFirstMappedPort())
.put("database", container.getDatabaseName())
.put("username", container.getUsername())
.put("password", container.getPassword())
.build());

final Database database = Databases.createDatabase(
config.get("username").asText(),
config.get("password").asText(),
String.format("jdbc:postgresql://%s:%s/%s",
config.get("host").asText(),
config.get("port").asText(),
config.get("database").asText()),
"org.postgresql.Driver",
SQLDialect.POSTGRES);

database.query(ctx -> {
ctx.fetch("CREATE TABLE id_and_name(id INTEGER, name VARCHAR(200));");
ctx.fetch("INSERT INTO id_and_name (id, name) VALUES (1,'arthur'), (2, 'thomas'), (3, 'finn');");
return null;
});

database.close();
}

@Override
protected void tearDown(TestDestinationEnv testEnv) {
container.close();
}

@Override
protected String getImageName() {
return "airbyte/source-postgres:dev";
}

@Override
protected ConnectorSpecification getSpec() throws Exception {
return Jsons.deserialize(MoreResources.readResource("spec.json"), ConnectorSpecification.class);
}

@Override
protected JsonNode getConfig() {
return config;
}

@Override
protected AirbyteCatalog getCatalog() {
return CatalogHelpers.createAirbyteCatalog(
STREAM_NAME,
Field.of("id", Field.JsonSchemaPrimitive.NUMBER),
Field.of("name", Field.JsonSchemaPrimitive.STRING));
}

@Override
protected List<String> getRegexTests() {
return Collections.emptyList();
}

}
Loading

0 comments on commit 7059825

Please sign in to comment.