Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 Source Postgres: support all Postgres 14 types #8726

Merged
merged 16 commits into from
Dec 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@
- name: Postgres
sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerRepository: airbyte/source-postgres
dockerImageTag: 0.3.17
dockerImageTag: 0.4.0
documentationUrl: https://docs.airbyte.io/integrations/sources/postgres
icon: postgresql.svg
sourceType: database
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5281,7 +5281,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-postgres:0.3.17"
- dockerImage: "airbyte/source-postgres:0.4.0"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/postgres"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
package io.airbyte.db.jdbc;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.DataTypeUtils;
Expand Down Expand Up @@ -49,6 +51,15 @@ public JsonNode rowToJson(final ResultSet queryContext) throws SQLException {
return jsonNode;
}

protected void putArray(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
ArrayNode arrayNode = new ObjectMapper().createArrayNode();
ResultSet arrayResultSet = resultSet.getArray(index).getResultSet();
while (arrayResultSet.next()) {
arrayNode.add(arrayResultSet.getString(2));
}
node.set(columnName, arrayNode);
}

protected void putBoolean(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
node.put(columnName, resultSet.getBoolean(index));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class JdbcSourceOperations extends AbstractJdbcCompatibleSourceOperations

private static final Logger LOGGER = LoggerFactory.getLogger(JdbcSourceOperations.class);

private JDBCType safeGetJdbcType(final int columnTypeInt) {
protected JDBCType safeGetJdbcType(final int columnTypeInt) {
try {
return JDBCType.valueOf(columnTypeInt);
} catch (final Exception e) {
Expand Down Expand Up @@ -55,6 +55,7 @@ public void setJsonField(final ResultSet resultSet, final int colIndex, final Ob
case TIME -> putTime(json, columnName, resultSet, colIndex);
case TIMESTAMP -> putTimestamp(json, columnName, resultSet, colIndex);
case BLOB, BINARY, VARBINARY, LONGVARBINARY -> putBinary(json, columnName, resultSet, colIndex);
case ARRAY -> putArray(json, columnName, resultSet, colIndex);
default -> putDefault(json, columnName, resultSet, colIndex);
}
}
Expand Down Expand Up @@ -115,6 +116,7 @@ public JsonSchemaPrimitive getJsonType(final JDBCType jdbcType) {
case TIME -> JsonSchemaPrimitive.STRING;
case TIMESTAMP -> JsonSchemaPrimitive.STRING;
case BLOB, BINARY, VARBINARY, LONGVARBINARY -> JsonSchemaPrimitive.STRING_BINARY;
case ARRAY -> JsonSchemaPrimitive.ARRAY;
// since column types aren't necessarily meaningful to Airbyte, liberally convert all unrecgonised
// types to String
default -> JsonSchemaPrimitive.STRING;
Expand Down
82 changes: 56 additions & 26 deletions airbyte-db/lib/src/test/java/io/airbyte/db/jdbc/TestJdbcUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import static org.junit.jupiter.api.Assertions.assertEquals;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -115,7 +117,7 @@ void testSetJsonField() throws SQLException {
try (final Connection connection = dataSource.getConnection()) {
createTableWithAllTypes(connection);
insertRecordOfEachType(connection);
assertExpectedOutputValues(connection);
assertExpectedOutputValues(connection, jsonFieldExpectedValues());
assertExpectedOutputTypes(connection);
}
}
Expand Down Expand Up @@ -148,7 +150,7 @@ void testSetStatementField() throws SQLException {

ps.execute();

assertExpectedOutputValues(connection);
assertExpectedOutputValues(connection, expectedValues());
assertExpectedOutputTypes(connection);
}
}
Expand All @@ -172,7 +174,9 @@ private static void createTableWithAllTypes(final Connection connection) throws
+ "date DATE,"
+ "time TIME,"
+ "timestamp TIMESTAMP,"
+ "binary1 bytea"
+ "binary1 bytea,"
+ "text_array _text,"
+ "int_array int[]"
+ ");");

}
Expand All @@ -194,7 +198,9 @@ private static void insertRecordOfEachType(final Connection connection) throws S
+ "date,"
+ "time,"
+ "timestamp,"
+ "binary1"
+ "binary1,"
+ "text_array,"
+ "int_array"
+ ") VALUES("
+ "1::bit(1),"
+ "true,"
Expand All @@ -211,36 +217,18 @@ private static void insertRecordOfEachType(final Connection connection) throws S
+ "'2020-11-01',"
+ "'05:00',"
+ "'2001-09-29 03:00',"
+ "decode('61616161', 'hex')"
+ "decode('61616161', 'hex'),"
+ "'{one,two,three}',"
+ "'{1,2,3}'"
+ ");");
}

private static void assertExpectedOutputValues(final Connection connection) throws SQLException {
private static void assertExpectedOutputValues(final Connection connection, final ObjectNode expected) throws SQLException {
final ResultSet resultSet = connection.createStatement().executeQuery("SELECT * FROM data;");

resultSet.next();
final JsonNode actual = sourceOperations.rowToJson(resultSet);

final ObjectNode expected = (ObjectNode) Jsons.jsonNode(Collections.emptyMap());
expected.put("bit", true);
expected.put("boolean", true);
expected.put("smallint", (short) 1);
expected.put("int", 1);
expected.put("bigint", (long) 1);
expected.put("float", (double) 1.0);
expected.put("double", (double) 1.0);
expected.put("real", (float) 1.0);
expected.put("numeric", new BigDecimal(1));
expected.put("decimal", new BigDecimal(1));
expected.put("char", "a");
expected.put("varchar", "a");
// todo (cgardens) we should parse this to a date string
expected.put("date", "2020-11-01T00:00:00Z");
// todo (cgardens) we should parse this to a time string
expected.put("time", "1970-01-01T05:00:00Z");
expected.put("timestamp", "2001-09-29T03:00:00Z");
expected.put("binary1", "aaaa".getBytes(Charsets.UTF_8));

// field-wise comparison to make debugging easier.
MoreStreams.toStream(expected.fields()).forEach(e -> assertEquals(e.getValue(), actual.get(e.getKey()), "key: " + e.getKey()));
assertEquals(expected, actual);
Expand Down Expand Up @@ -273,9 +261,51 @@ private static void assertExpectedOutputTypes(final Connection connection) throw
.put("time", JsonSchemaPrimitive.STRING)
.put("timestamp", JsonSchemaPrimitive.STRING)
.put("binary1", JsonSchemaPrimitive.STRING_BINARY)
.put("text_array", JsonSchemaPrimitive.ARRAY)
.put("int_array", JsonSchemaPrimitive.ARRAY)
.build();

assertEquals(actual, expected);
}

private ObjectNode jsonFieldExpectedValues() {
final ObjectNode expected = expectedValues();
ArrayNode arrayNode = new ObjectMapper().createArrayNode();
arrayNode.add("one");
arrayNode.add("two");
arrayNode.add("three");
expected.set("text_array", arrayNode);

ArrayNode arrayNode2 = new ObjectMapper().createArrayNode();
arrayNode2.add("1");
arrayNode2.add("2");
arrayNode2.add("3");
expected.set("int_array", arrayNode2);

return expected;
}

private ObjectNode expectedValues() {
final ObjectNode expected = (ObjectNode) Jsons.jsonNode(Collections.emptyMap());
expected.put("bit", true);
expected.put("boolean", true);
expected.put("smallint", (short) 1);
expected.put("int", 1);
expected.put("bigint", (long) 1);
expected.put("float", (double) 1.0);
expected.put("double", (double) 1.0);
expected.put("real", (float) 1.0);
expected.put("numeric", new BigDecimal(1));
expected.put("decimal", new BigDecimal(1));
expected.put("char", "a");
expected.put("varchar", "a");
// todo (cgardens) we should parse this to a date string
expected.put("date", "2020-11-01T00:00:00Z");
// todo (cgardens) we should parse this to a time string
expected.put("time", "1970-01-01T05:00:00Z");
expected.put("timestamp", "2001-09-29T03:00:00Z");
expected.put("binary1", "aaaa".getBytes(Charsets.UTF_8));
return expected;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,8 @@ public List<String> getValues() {
}

public String getNameWithTestPrefix() {
return nameSpace + "_" + testNumber + "_" + sourceType;
// source type may include space (e.g. "character varying")
return nameSpace + "_" + testNumber + "_" + sourceType.replaceAll("\\s", "_");
}

public String getCreateSqlQuery() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,10 @@ Schema getSingleFieldType(final String fieldName, final JsonSchemaType fieldType
}
case ARRAY -> {
final JsonNode items = fieldDefinition.get("items");
Preconditions.checkNotNull(items, "Array field %s misses the items property.", fieldName);

if (items.isObject()) {
if (items == null) {
LOGGER.warn("Source connector provided schema for ARRAY with missed \"items\", will assume that it's a String type");
fieldSchema = Schema.createArray(Schema.createUnion(NULL_SCHEMA, STRING_SCHEMA));
} else if (items.isObject()) {
fieldSchema = Schema.createArray(getNullableFieldTypes(String.format("%s.items", fieldName), items));
} else if (items.isArray()) {
final List<Schema> arrayElementTypes = getSchemasFromTypes(fieldName, (ArrayNode) items);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -774,5 +774,48 @@
"time_field": 44581541000,
"_airbyte_additional_properties": null
}
},
{
"schemaName": "array_without_items_in_schema",
"namespace": "namespace16",
"appendAirbyteFields": false,
"jsonSchema": {
"type": "object",
"properties": {
"identifier": {
"type": "array"
}
}
},
"jsonObject": {
"identifier": ["151", 152, true, { "id": 153 }]
},
"avroSchema": {
"type": "record",
"name": "array_without_items_in_schema",
"namespace": "namespace16",
"fields": [
{
"name": "identifier",
"type": [
"null",
{
"type": "array",
"items": ["null", "string"]
}
],
"default": null
},
{
"name": "_airbyte_additional_properties",
"type": ["null", { "type": "map", "values": "string" }],
"default": null
}
]
},
"avroObject": {
"identifier": ["151", "152", "true", "{\"id\":153}"],
"_airbyte_additional_properties": null
}
}
]
Original file line number Diff line number Diff line change
Expand Up @@ -199,5 +199,18 @@
{ "type": "long", "logicalType": "time-micros" },
"string"
]
},
{
"fieldName": "array_field_without_items_type",
"jsonFieldSchema": {
"type": "array"
},
"avroFieldType": [
"null",
{
"type": "array",
"items": ["null", "string"]
}
]
}
]
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,9 @@ protected void initTests() {
TestDataHolder.builder()
.sourceType("array")
.fullSourceDataType("STRING[]")
.airbyteType(JsonSchemaPrimitive.STRING)
.airbyteType(JsonSchemaPrimitive.ARRAY)
.addInsertValues("ARRAY['sky', 'road', 'car']", "null")
.addExpectedValues("{sky,road,car}", null)
.addExpectedValues("[\"sky\",\"road\",\"car\"]", null)
.build());

addDataTypeTestData(
Expand Down Expand Up @@ -152,7 +152,7 @@ protected void initTests() {
.fullSourceDataType("bytea[]")
.airbyteType(JsonSchemaPrimitive.OBJECT)
.addInsertValues("ARRAY['☃'::bytes, 'ї'::bytes]")
.addExpectedValues("{\"\\\\xe29883\",\"\\\\xd197\"}")
.addExpectedValues("[\"\\\\xe29883\",\"\\\\xd197\"]")
.build());

addDataTypeTestData(
Expand Down Expand Up @@ -352,9 +352,18 @@ protected void initTests() {
TestDataHolder.builder()
.sourceType("text")
.fullSourceDataType("text[]")
.airbyteType(JsonSchemaPrimitive.STRING)
.airbyteType(JsonSchemaPrimitive.ARRAY)
.addInsertValues("'{10000, 10000, 10000, 10000}'", "null")
.addExpectedValues("[\"10000\",\"10000\",\"10000\",\"10000\"]", null)
.build());

addDataTypeTestData(
TestDataHolder.builder()
.sourceType("int")
.fullSourceDataType("int[]")
.airbyteType(JsonSchemaPrimitive.ARRAY)
.addInsertValues("'{10000, 10000, 10000, 10000}'", "null")
.addExpectedValues("{10000,10000,10000,10000}", null)
.addExpectedValues("[\"10000\",\"10000\",\"10000\",\"10000\"]", null)
.build());

}
Expand Down
5 changes: 3 additions & 2 deletions airbyte-integrations/connectors/source-postgres/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ WORKDIR /airbyte

ENV APPLICATION source-postgres

ADD build/distributions/${APPLICATION}*.tar /airbyte
COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.3.17
LABEL io.airbyte.version=0.4.0
LABEL io.airbyte.name=airbyte/source-postgres
Loading