Skip to content

Commit

Permalink
Revert "Postgres Source : Support JSONB datatype" (airbytehq#23642)
Browse files Browse the repository at this point in the history
* Revert "Postgres Source : Support JSONB datatype (airbytehq#21695)"

This reverts commit 90884d0.

* upgrade version and add changelog

* auto-bump connector version

* update spec

---------

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
2 people authored and danielduckworth committed Mar 13, 2023
1 parent 4c1093b commit 20f561f
Show file tree
Hide file tree
Showing 17 changed files with 26 additions and 152 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class DataTypeEnumTest {
@Test
void testConversionFromJsonSchemaPrimitiveToDataType() {
assertEquals(5, DataType.class.getEnumConstants().length);
assertEquals(17, JsonSchemaPrimitive.class.getEnumConstants().length);
assertEquals(16, JsonSchemaPrimitive.class.getEnumConstants().length);

assertEquals(DataType.STRING, DataType.fromValue(JsonSchemaPrimitive.STRING.toString().toLowerCase()));
assertEquals(DataType.NUMBER, DataType.fromValue(JsonSchemaPrimitive.NUMBER.toString().toLowerCase()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
- name: AlloyDB for PostgreSQL
sourceDefinitionId: 1fa90628-2b9e-11ed-a261-0242ac120002
dockerRepository: airbyte/source-alloydb
dockerImageTag: 1.0.49
dockerImageTag: 1.0.51
documentationUrl: https://docs.airbyte.com/integrations/sources/alloydb
icon: alloydb.svg
sourceType: database
Expand Down Expand Up @@ -1462,7 +1462,7 @@
- name: Postgres
sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerRepository: airbyte/source-postgres
dockerImageTag: 1.0.50
dockerImageTag: 1.0.51
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
icon: postgresql.svg
sourceType: database
Expand Down
4 changes: 2 additions & 2 deletions airbyte-config/init/src/main/resources/seed/source_specs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-alloydb:1.0.49"
- dockerImage: "airbyte/source-alloydb:1.0.51"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/postgres"
connectionSpecification:
Expand Down Expand Up @@ -11635,7 +11635,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-postgres:1.0.50"
- dockerImage: "airbyte/source-postgres:1.0.51"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/postgres"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
import static org.apache.kafka.connect.data.Schema.OPTIONAL_FLOAT64_SCHEMA;
import static org.apache.kafka.connect.data.Schema.OPTIONAL_STRING_SCHEMA;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.airbyte.db.jdbc.DateTimeConverter;
import io.debezium.spi.converter.CustomConverter;
import io.debezium.spi.converter.RelationalColumn;
Expand Down Expand Up @@ -55,8 +53,6 @@ public class PostgresConverter implements CustomConverter<SchemaBuilder, Relatio
private final String[] NUMERIC_TYPES = {"NUMERIC", "DECIMAL"};
private final String[] ARRAY_TYPES = {"_NAME", "_NUMERIC", "_BYTEA", "_MONEY", "_BIT", "_DATE", "_TIME", "_TIMETZ", "_TIMESTAMP", "_TIMESTAMPTZ"};
private final String BYTEA_TYPE = "BYTEA";
private final String JSONB_TYPE = "JSONB";
private final ObjectMapper objectMapper = new ObjectMapper();

@Override
public void configure(final Properties props) {}
Expand All @@ -73,28 +69,13 @@ public void converterFor(final RelationalColumn field, final ConverterRegistrati
registerMoney(field, registration);
} else if (BYTEA_TYPE.equalsIgnoreCase(field.typeName())) {
registerBytea(field, registration);
} else if (JSONB_TYPE.equalsIgnoreCase(field.typeName())) {
registerJsonb(field, registration);
} else if (Arrays.stream(NUMERIC_TYPES).anyMatch(s -> s.equalsIgnoreCase(field.typeName()))) {
registerNumber(field, registration);
} else if (Arrays.stream(ARRAY_TYPES).anyMatch(s -> s.equalsIgnoreCase(field.typeName()))) {
registerArray(field, registration);
}
}

private void registerJsonb(RelationalColumn field, ConverterRegistration<SchemaBuilder> registration) {
registration.register(SchemaBuilder.string().optional(), x -> {
if (x == null) {
return DebeziumConverterUtils.convertDefaultValue(field);
}
try {
return objectMapper.readTree(x.toString()).toString();
} catch (JsonProcessingException e) {
throw new RuntimeException("Could not parse 'jsonb' value:" + e);
}
});
}

private void registerArray(RelationalColumn field, ConverterRegistration<SchemaBuilder> registration) {
final String fieldType = field.typeName().toUpperCase();
final SchemaBuilder arraySchema = switch (fieldType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-alloydb-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.0.49
LABEL io.airbyte.version=1.0.51
LABEL io.airbyte.name=airbyte/source-alloydb-strict-encrypt
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-alloydb/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-alloydb

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.0.49
LABEL io.airbyte.version=1.0.51
LABEL io.airbyte.name=airbyte/source-alloydb
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.0.50
LABEL io.airbyte.version=1.0.51
LABEL io.airbyte.name=airbyte/source-postgres-strict-encrypt
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-postgres/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.0.50
LABEL io.airbyte.version=1.0.51
LABEL io.airbyte.name=airbyte/source-postgres
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,6 @@ public void copyToJsonField(final ResultSet resultSet, final int colIndex, final
case "path" -> putObject(json, columnName, resultSet, colIndex, PGpath.class);
case "point" -> putObject(json, columnName, resultSet, colIndex, PGpoint.class);
case "polygon" -> putObject(json, columnName, resultSet, colIndex, PGpolygon.class);
case "jsonb" -> putJsonb(json, columnName, resultSet, colIndex);
case "_varchar", "_char", "_bpchar", "_text", "_name" -> putArray(json, columnName, resultSet, colIndex);
case "_int2", "_int4", "_int8", "_oid" -> putLongArray(json, columnName, resultSet, colIndex);
case "_numeric", "_decimal" -> putBigDecimalArray(json, columnName, resultSet, colIndex);
Expand All @@ -187,7 +186,6 @@ public void copyToJsonField(final ResultSet resultSet, final int colIndex, final
case "_timestamp" -> putTimestampArray(json, columnName, resultSet, colIndex);
case "_timetz" -> putTimeTzArray(json, columnName, resultSet, colIndex);
case "_time" -> putTimeArray(json, columnName, resultSet, colIndex);
case "_jsonb" -> putJsonbArray(json, columnName, resultSet, colIndex);
default -> {
switch (columnInfo.columnType) {
case BOOLEAN -> json.put(columnName, value.equalsIgnoreCase("t"));
Expand All @@ -211,33 +209,6 @@ public void copyToJsonField(final ResultSet resultSet, final int colIndex, final
}
}

private void putJsonbArray(ObjectNode node, String columnName, ResultSet resultSet, int colIndex) throws SQLException {
final ArrayNode arrayNode = Jsons.arrayNode();
final ResultSet arrayResultSet = resultSet.getArray(colIndex).getResultSet();

while (arrayResultSet.next()) {
final PGobject object = getObject(arrayResultSet, colIndex, PGobject.class);
final JsonNode value;
try {
value = new ObjectMapper().readTree(object.getValue());
} catch (JsonProcessingException e) {
throw new RuntimeException("Could not parse 'jsonb' value:" + e);
}
arrayNode.add(value);
}
node.set(columnName, arrayNode);
}

private void putJsonb(ObjectNode node, String columnName, ResultSet resultSet, int colIndex) throws SQLException {
final PGobject object = getObject(resultSet, colIndex, PGobject.class);

try {
node.put(columnName, new ObjectMapper().readTree(object.getValue()));
} catch (JsonProcessingException e) {
throw new RuntimeException("Could not parse 'jsonb' value:" + e);
}
}

private void putTimeArray(final ObjectNode node, final String columnName, final ResultSet resultSet, final int colIndex) throws SQLException {
final ArrayNode arrayNode = Jsons.arrayNode();
final ResultSet arrayResultSet = resultSet.getArray(colIndex).getResultSet();
Expand Down Expand Up @@ -422,13 +393,11 @@ public PostgresType getDatabaseFieldType(final JsonNode field) {
case "_time" -> PostgresType.TIME_ARRAY;
case "_date" -> PostgresType.DATE_ARRAY;
case "_bytea" -> PostgresType.BYTEA_ARRAY;
case "_jsonb" -> PostgresType.JSONB_ARRAY;
case "bool", "boolean" -> PostgresType.BOOLEAN;
// BYTEA is variable length binary string with hex output format by default (e.g. "\x6b707a").
// It should not be converted to base64 binary string. So it is represented as JDBC VARCHAR.
// https://www.postgresql.org/docs/14/datatype-binary.html
case "bytea" -> PostgresType.VARCHAR;
case "jsonb" -> PostgresType.JSONB;
case TIMESTAMPTZ -> PostgresType.TIMESTAMP_WITH_TIMEZONE;
case TIMETZ -> PostgresType.TIME_WITH_TIMEZONE;
default -> PostgresType.valueOf(field.get(INTERNAL_COLUMN_TYPE).asInt(), POSTGRES_TYPE_DICT);
Expand Down Expand Up @@ -527,10 +496,7 @@ public JsonSchemaType getAirbyteType(final PostgresType jdbcType) {
case DATE_ARRAY -> JsonSchemaType.builder(JsonSchemaPrimitive.ARRAY)
.withItems(JsonSchemaType.STRING_DATE)
.build();
case JSONB_ARRAY -> JsonSchemaType.builder(JsonSchemaPrimitive.ARRAY)
.withItems(JsonSchemaType.JSONB)
.build();
case JSONB -> JsonSchemaType.JSONB;

case DATE -> JsonSchemaType.STRING_DATE;
case TIME -> JsonSchemaType.STRING_TIME_WITHOUT_TIMEZONE;
case TIME_WITH_TIMEZONE -> JsonSchemaType.STRING_TIME_WITH_TIMEZONE;
Expand Down Expand Up @@ -625,7 +591,6 @@ private ColumnInfo getColumnInfo(final int colIndex, final PgResultSetMetaData m
}

private static class ColumnInfo {

public String columnTypeName;
public PostgresType columnType;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,7 @@ public enum PostgresType implements SQLType {
OID_ARRAY(Types.ARRAY),
FLOAT4_ARRAY(Types.ARRAY),
FLOAT8_ARRAY(Types.ARRAY),
BYTEA_ARRAY(Types.ARRAY),
JSONB_ARRAY(Types.ARRAY),
JSONB(Types.JAVA_OBJECT);
BYTEA_ARRAY(Types.ARRAY);

/**
* The Integer value for the JDBCType. It maps to a value in {@code Types.java}
Expand Down Expand Up @@ -124,15 +122,15 @@ public Integer getVendorTypeNumber() {
* {@code Types} value
* @see Types
*/
public static PostgresType valueOf(final int type, final Map<Integer, PostgresType> postgresTypeMap) {
public static PostgresType valueOf(final int type, final Map<Integer, PostgresType> postgresTypeMap) {
if (postgresTypeMap.containsKey(type)) {
return postgresTypeMap.get(type);
}
throw new IllegalArgumentException("Type:" + type + " is not a valid "
+ "Types.java value.");
}

public static PostgresType safeGetJdbcType(final int columnTypeInt, final Map<Integer, PostgresType> postgresTypeMap) {
public static PostgresType safeGetJdbcType(final int columnTypeInt, final Map<Integer, PostgresType> postgresTypeMap) {
try {
return PostgresType.valueOf(columnTypeInt, postgresTypeMap);
} catch (final Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,6 @@ public boolean testCatalog() {
return true;
}

protected String getValueFromJsonNode(final JsonNode jsonNode) {
if (jsonNode != null) {
if (jsonNode.isArray() || jsonNode.isObject()) {
return jsonNode.toString();
}

String value = jsonNode.asText();
return (value != null && value.equals("null") ? null : value);
}
return null;
}

// Test cases are sorted alphabetically based on the source type
// See https://www.postgresql.org/docs/14/datatype.html
@Override
Expand Down Expand Up @@ -265,12 +253,9 @@ protected void initTests() {
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("jsonb")
.airbyteType(JsonSchemaType.builder(JsonSchemaPrimitive.JSONB)
.withLegacyAirbyteTypeProperty("json")
.build())
.addInsertValues("null", "'10000'::jsonb", "'true'::jsonb", "'[1,2,3]'::jsonb",
"'{\"Janet\": 1, \"Melissa\": {\"loves\": \"trees\", \"married\": true}}'::jsonb")
.addExpectedValues(null, "10000", "true", "[1,2,3]", "{\"Janet\":1,\"Melissa\":{\"loves\":\"trees\",\"married\":true}}")
.airbyteType(JsonSchemaType.STRING)
.addInsertValues("null", "'[1, 2, 3]'::jsonb")
.addExpectedValues(null, "[1, 2, 3]")
.build());

addDataTypeTestData(
Expand Down Expand Up @@ -593,23 +578,6 @@ protected void initTests() {

addTimeWithTimeZoneTest();
addArraysTestData();
addJsonbArrayTest();
}

protected void addJsonbArrayTest() {

addDataTypeTestData(
TestDataHolder.builder()
.sourceType("jsonb_array")
.fullSourceDataType("JSONB[]")
.airbyteType(JsonSchemaType.builder(JsonSchemaPrimitive.ARRAY)
.withItems(JsonSchemaType.JSONB)
.build())
.addInsertValues(
"ARRAY['[1,2,1]', 'false']::jsonb[]",
"ARRAY['{\"letter\":\"A\", \"digit\":30}', '{\"letter\":\"B\", \"digit\":31}']::jsonb[]")
.addExpectedValues("[[1,2,1],false]", "[{\"digit\":30,\"letter\":\"A\"},{\"digit\":31,\"letter\":\"B\"}]")
.build());
}

protected void addTimeWithTimeZoneTest() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,8 @@
import io.airbyte.db.factory.DSLContextFactory;
import io.airbyte.db.factory.DatabaseDriver;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.standardtest.source.TestDataHolder;
import io.airbyte.integrations.standardtest.source.TestDestinationEnv;
import io.airbyte.integrations.util.HostPortResolver;
import io.airbyte.protocol.models.JsonSchemaPrimitiveUtil;
import io.airbyte.protocol.models.JsonSchemaType;
import java.util.List;
import org.jooq.SQLDialect;
import org.testcontainers.containers.PostgreSQLContainer;
Expand Down Expand Up @@ -98,24 +95,6 @@ protected void tearDown(final TestDestinationEnv testEnv) {
container.close();
}

@Override
protected void addJsonbArrayTest() {

addDataTypeTestData(
TestDataHolder.builder()
.sourceType("jsonb_array")
.fullSourceDataType("JSONB[]")
.airbyteType(JsonSchemaType.builder(JsonSchemaPrimitiveUtil.JsonSchemaPrimitive.ARRAY)
.withItems(JsonSchemaType.JSONB)
.build())
.addInsertValues(
"ARRAY['[1,2,1]', 'false']::jsonb[]",
"ARRAY['{\"letter\":\"A\", \"digit\":30}', '{\"letter\":\"B\", \"digit\":31}']::jsonb[]")
.addExpectedValues("[\"[1, 2, 1]\",\"false\"]",
"[\"{\\\"digit\\\": 30, \\\"letter\\\": \\\"A\\\"}\",\"{\\\"digit\\\": 31, \\\"letter\\\": \\\"B\\\"}\"]")
.build());
}

public boolean testCatalog() {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import io.airbyte.integrations.standardtest.source.TestDataHolder;
import io.airbyte.integrations.standardtest.source.TestDestinationEnv;
import io.airbyte.integrations.util.HostPortResolver;
import io.airbyte.protocol.models.JsonSchemaPrimitiveUtil;
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteStateMessage;
Expand Down Expand Up @@ -187,22 +186,4 @@ protected void addTimestampWithInfinityValuesTest() {
}
}

@Override
protected void addJsonbArrayTest() {

addDataTypeTestData(
TestDataHolder.builder()
.sourceType("jsonb_array")
.fullSourceDataType("JSONB[]")
.airbyteType(JsonSchemaType.builder(JsonSchemaPrimitiveUtil.JsonSchemaPrimitive.ARRAY)
.withItems(JsonSchemaType.JSONB)
.build())
.addInsertValues(
"ARRAY['[1,2,1]', 'false']::jsonb[]",
"ARRAY['{\"letter\":\"A\", \"digit\":30}', '{\"letter\":\"B\", \"digit\":31}']::jsonb[]")
.addExpectedValues("[\"[1, 2, 1]\",\"false\"]",
"[\"{\\\"digit\\\": 30, \\\"letter\\\": \\\"A\\\"}\",\"{\\\"digit\\\": 31, \\\"letter\\\": \\\"B\\\"}\"]")
.build());
}

}
Loading

0 comments on commit 20f561f

Please sign in to comment.