Skip to content

Commit

Permalink
🎉 Source Postgres: support all Postgres 14 types (#8726)
Browse files Browse the repository at this point in the history
* Add skeleton to support all postgres types

* Consolidate type tests

* Fix corner cases

* Bump postgres version

* Add tests for time and timetz

* Format code

* Revert date to timestamp

* Update comment

* Fix unit tests

* 🐛 Jdbc sources: switch from "string" to "array" schema type for columns with JDBCType.ARRAY (#8749)

* support array for jdbc sources

* fixed PR comments, added test cases

* added more elements for test case

* Fix test case

* add array test case for JdbcSourceOperations

Co-authored-by: Liren Tu <tuliren.git@outlook.com>

* Revert changes to support special number values

Postgres source cannot handle these special values yet
See https://github.com/airbytehq/airbyte/issues/8902

* Revert infinity and nan assertion in unit tests

This reverts commit 3bee7d1.

* Update documentation

* Bump postgres source version in seed

Co-authored-by: Yurii Bidiuk <35812734+yurii-bidiuk@users.noreply.github.com>
  • Loading branch information
tuliren and yurii-bidiuk authored Dec 20, 2021
1 parent 6ee29b5 commit ff4b83b
Show file tree
Hide file tree
Showing 15 changed files with 575 additions and 370 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@
- name: Postgres
sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerRepository: airbyte/source-postgres
dockerImageTag: 0.3.17
dockerImageTag: 0.4.0
documentationUrl: https://docs.airbyte.io/integrations/sources/postgres
icon: postgresql.svg
sourceType: database
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5281,7 +5281,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-postgres:0.3.17"
- dockerImage: "airbyte/source-postgres:0.4.0"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/postgres"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
package io.airbyte.db.jdbc;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.DataTypeUtils;
Expand Down Expand Up @@ -49,6 +51,15 @@ public JsonNode rowToJson(final ResultSet queryContext) throws SQLException {
return jsonNode;
}

protected void putArray(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
ArrayNode arrayNode = new ObjectMapper().createArrayNode();
ResultSet arrayResultSet = resultSet.getArray(index).getResultSet();
while (arrayResultSet.next()) {
arrayNode.add(arrayResultSet.getString(2));
}
node.set(columnName, arrayNode);
}

protected void putBoolean(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
node.put(columnName, resultSet.getBoolean(index));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class JdbcSourceOperations extends AbstractJdbcCompatibleSourceOperations

private static final Logger LOGGER = LoggerFactory.getLogger(JdbcSourceOperations.class);

private JDBCType safeGetJdbcType(final int columnTypeInt) {
protected JDBCType safeGetJdbcType(final int columnTypeInt) {
try {
return JDBCType.valueOf(columnTypeInt);
} catch (final Exception e) {
Expand Down Expand Up @@ -55,6 +55,7 @@ public void setJsonField(final ResultSet resultSet, final int colIndex, final Ob
case TIME -> putTime(json, columnName, resultSet, colIndex);
case TIMESTAMP -> putTimestamp(json, columnName, resultSet, colIndex);
case BLOB, BINARY, VARBINARY, LONGVARBINARY -> putBinary(json, columnName, resultSet, colIndex);
case ARRAY -> putArray(json, columnName, resultSet, colIndex);
default -> putDefault(json, columnName, resultSet, colIndex);
}
}
Expand Down Expand Up @@ -115,6 +116,7 @@ public JsonSchemaPrimitive getJsonType(final JDBCType jdbcType) {
case TIME -> JsonSchemaPrimitive.STRING;
case TIMESTAMP -> JsonSchemaPrimitive.STRING;
case BLOB, BINARY, VARBINARY, LONGVARBINARY -> JsonSchemaPrimitive.STRING_BINARY;
case ARRAY -> JsonSchemaPrimitive.ARRAY;
// since column types aren't necessarily meaningful to Airbyte, liberally convert all unrecgonised
// types to String
default -> JsonSchemaPrimitive.STRING;
Expand Down
82 changes: 56 additions & 26 deletions airbyte-db/lib/src/test/java/io/airbyte/db/jdbc/TestJdbcUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import static org.junit.jupiter.api.Assertions.assertEquals;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -115,7 +117,7 @@ void testSetJsonField() throws SQLException {
try (final Connection connection = dataSource.getConnection()) {
createTableWithAllTypes(connection);
insertRecordOfEachType(connection);
assertExpectedOutputValues(connection);
assertExpectedOutputValues(connection, jsonFieldExpectedValues());
assertExpectedOutputTypes(connection);
}
}
Expand Down Expand Up @@ -148,7 +150,7 @@ void testSetStatementField() throws SQLException {

ps.execute();

assertExpectedOutputValues(connection);
assertExpectedOutputValues(connection, expectedValues());
assertExpectedOutputTypes(connection);
}
}
Expand All @@ -172,7 +174,9 @@ private static void createTableWithAllTypes(final Connection connection) throws
+ "date DATE,"
+ "time TIME,"
+ "timestamp TIMESTAMP,"
+ "binary1 bytea"
+ "binary1 bytea,"
+ "text_array _text,"
+ "int_array int[]"
+ ");");

}
Expand All @@ -194,7 +198,9 @@ private static void insertRecordOfEachType(final Connection connection) throws S
+ "date,"
+ "time,"
+ "timestamp,"
+ "binary1"
+ "binary1,"
+ "text_array,"
+ "int_array"
+ ") VALUES("
+ "1::bit(1),"
+ "true,"
Expand All @@ -211,36 +217,18 @@ private static void insertRecordOfEachType(final Connection connection) throws S
+ "'2020-11-01',"
+ "'05:00',"
+ "'2001-09-29 03:00',"
+ "decode('61616161', 'hex')"
+ "decode('61616161', 'hex'),"
+ "'{one,two,three}',"
+ "'{1,2,3}'"
+ ");");
}

private static void assertExpectedOutputValues(final Connection connection) throws SQLException {
private static void assertExpectedOutputValues(final Connection connection, final ObjectNode expected) throws SQLException {
final ResultSet resultSet = connection.createStatement().executeQuery("SELECT * FROM data;");

resultSet.next();
final JsonNode actual = sourceOperations.rowToJson(resultSet);

final ObjectNode expected = (ObjectNode) Jsons.jsonNode(Collections.emptyMap());
expected.put("bit", true);
expected.put("boolean", true);
expected.put("smallint", (short) 1);
expected.put("int", 1);
expected.put("bigint", (long) 1);
expected.put("float", (double) 1.0);
expected.put("double", (double) 1.0);
expected.put("real", (float) 1.0);
expected.put("numeric", new BigDecimal(1));
expected.put("decimal", new BigDecimal(1));
expected.put("char", "a");
expected.put("varchar", "a");
// todo (cgardens) we should parse this to a date string
expected.put("date", "2020-11-01T00:00:00Z");
// todo (cgardens) we should parse this to a time string
expected.put("time", "1970-01-01T05:00:00Z");
expected.put("timestamp", "2001-09-29T03:00:00Z");
expected.put("binary1", "aaaa".getBytes(Charsets.UTF_8));

// field-wise comparison to make debugging easier.
MoreStreams.toStream(expected.fields()).forEach(e -> assertEquals(e.getValue(), actual.get(e.getKey()), "key: " + e.getKey()));
assertEquals(expected, actual);
Expand Down Expand Up @@ -273,9 +261,51 @@ private static void assertExpectedOutputTypes(final Connection connection) throw
.put("time", JsonSchemaPrimitive.STRING)
.put("timestamp", JsonSchemaPrimitive.STRING)
.put("binary1", JsonSchemaPrimitive.STRING_BINARY)
.put("text_array", JsonSchemaPrimitive.ARRAY)
.put("int_array", JsonSchemaPrimitive.ARRAY)
.build();

assertEquals(actual, expected);
}

private ObjectNode jsonFieldExpectedValues() {
final ObjectNode expected = expectedValues();
ArrayNode arrayNode = new ObjectMapper().createArrayNode();
arrayNode.add("one");
arrayNode.add("two");
arrayNode.add("three");
expected.set("text_array", arrayNode);

ArrayNode arrayNode2 = new ObjectMapper().createArrayNode();
arrayNode2.add("1");
arrayNode2.add("2");
arrayNode2.add("3");
expected.set("int_array", arrayNode2);

return expected;
}

private ObjectNode expectedValues() {
final ObjectNode expected = (ObjectNode) Jsons.jsonNode(Collections.emptyMap());
expected.put("bit", true);
expected.put("boolean", true);
expected.put("smallint", (short) 1);
expected.put("int", 1);
expected.put("bigint", (long) 1);
expected.put("float", (double) 1.0);
expected.put("double", (double) 1.0);
expected.put("real", (float) 1.0);
expected.put("numeric", new BigDecimal(1));
expected.put("decimal", new BigDecimal(1));
expected.put("char", "a");
expected.put("varchar", "a");
// todo (cgardens) we should parse this to a date string
expected.put("date", "2020-11-01T00:00:00Z");
// todo (cgardens) we should parse this to a time string
expected.put("time", "1970-01-01T05:00:00Z");
expected.put("timestamp", "2001-09-29T03:00:00Z");
expected.put("binary1", "aaaa".getBytes(Charsets.UTF_8));
return expected;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,8 @@ public List<String> getValues() {
}

public String getNameWithTestPrefix() {
return nameSpace + "_" + testNumber + "_" + sourceType;
// source type may include space (e.g. "character varying")
return nameSpace + "_" + testNumber + "_" + sourceType.replaceAll("\\s", "_");
}

public String getCreateSqlQuery() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,10 @@ Schema getSingleFieldType(final String fieldName, final JsonSchemaType fieldType
}
case ARRAY -> {
final JsonNode items = fieldDefinition.get("items");
Preconditions.checkNotNull(items, "Array field %s misses the items property.", fieldName);

if (items.isObject()) {
if (items == null) {
LOGGER.warn("Source connector provided schema for ARRAY with missed \"items\", will assume that it's a String type");
fieldSchema = Schema.createArray(Schema.createUnion(NULL_SCHEMA, STRING_SCHEMA));
} else if (items.isObject()) {
fieldSchema = Schema.createArray(getNullableFieldTypes(String.format("%s.items", fieldName), items));
} else if (items.isArray()) {
final List<Schema> arrayElementTypes = getSchemasFromTypes(fieldName, (ArrayNode) items);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -774,5 +774,48 @@
"time_field": 44581541000,
"_airbyte_additional_properties": null
}
},
{
"schemaName": "array_without_items_in_schema",
"namespace": "namespace16",
"appendAirbyteFields": false,
"jsonSchema": {
"type": "object",
"properties": {
"identifier": {
"type": "array"
}
}
},
"jsonObject": {
"identifier": ["151", 152, true, { "id": 153 }]
},
"avroSchema": {
"type": "record",
"name": "array_without_items_in_schema",
"namespace": "namespace16",
"fields": [
{
"name": "identifier",
"type": [
"null",
{
"type": "array",
"items": ["null", "string"]
}
],
"default": null
},
{
"name": "_airbyte_additional_properties",
"type": ["null", { "type": "map", "values": "string" }],
"default": null
}
]
},
"avroObject": {
"identifier": ["151", "152", "true", "{\"id\":153}"],
"_airbyte_additional_properties": null
}
}
]
Original file line number Diff line number Diff line change
Expand Up @@ -199,5 +199,18 @@
{ "type": "long", "logicalType": "time-micros" },
"string"
]
},
{
"fieldName": "array_field_without_items_type",
"jsonFieldSchema": {
"type": "array"
},
"avroFieldType": [
"null",
{
"type": "array",
"items": ["null", "string"]
}
]
}
]
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,9 @@ protected void initTests() {
TestDataHolder.builder()
.sourceType("array")
.fullSourceDataType("STRING[]")
.airbyteType(JsonSchemaPrimitive.STRING)
.airbyteType(JsonSchemaPrimitive.ARRAY)
.addInsertValues("ARRAY['sky', 'road', 'car']", "null")
.addExpectedValues("{sky,road,car}", null)
.addExpectedValues("[\"sky\",\"road\",\"car\"]", null)
.build());

addDataTypeTestData(
Expand Down Expand Up @@ -152,7 +152,7 @@ protected void initTests() {
.fullSourceDataType("bytea[]")
.airbyteType(JsonSchemaPrimitive.OBJECT)
.addInsertValues("ARRAY['☃'::bytes, 'ї'::bytes]")
.addExpectedValues("{\"\\\\xe29883\",\"\\\\xd197\"}")
.addExpectedValues("[\"\\\\xe29883\",\"\\\\xd197\"]")
.build());

addDataTypeTestData(
Expand Down Expand Up @@ -352,9 +352,18 @@ protected void initTests() {
TestDataHolder.builder()
.sourceType("text")
.fullSourceDataType("text[]")
.airbyteType(JsonSchemaPrimitive.STRING)
.airbyteType(JsonSchemaPrimitive.ARRAY)
.addInsertValues("'{10000, 10000, 10000, 10000}'", "null")
.addExpectedValues("[\"10000\",\"10000\",\"10000\",\"10000\"]", null)
.build());

addDataTypeTestData(
TestDataHolder.builder()
.sourceType("int")
.fullSourceDataType("int[]")
.airbyteType(JsonSchemaPrimitive.ARRAY)
.addInsertValues("'{10000, 10000, 10000, 10000}'", "null")
.addExpectedValues("{10000,10000,10000,10000}", null)
.addExpectedValues("[\"10000\",\"10000\",\"10000\",\"10000\"]", null)
.build());

}
Expand Down
5 changes: 3 additions & 2 deletions airbyte-integrations/connectors/source-postgres/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ WORKDIR /airbyte

ENV APPLICATION source-postgres

ADD build/distributions/${APPLICATION}*.tar /airbyte
COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.3.17
LABEL io.airbyte.version=0.4.0
LABEL io.airbyte.name=airbyte/source-postgres
Loading

0 comments on commit ff4b83b

Please sign in to comment.