diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 8f9b622fa068..1c4d07cc252d 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -1320,7 +1320,7 @@ - name: Postgres sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750 dockerRepository: airbyte/source-postgres - dockerImageTag: 1.0.35 + dockerImageTag: 1.0.36 documentationUrl: https://docs.airbyte.com/integrations/sources/postgres icon: postgresql.svg sourceType: database diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 6528f9b8c170..7307d1c096e1 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -11353,7 +11353,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-postgres:1.0.35" +- dockerImage: "airbyte/source-postgres:1.0.36" spec: documentationUrl: "https://docs.airbyte.com/integrations/sources/postgres" connectionSpecification: diff --git a/airbyte-integrations/bases/debezium-v1-9-6/src/main/java/io/airbyte/integrations/debezium/internals/PostgresConverter.java b/airbyte-integrations/bases/debezium-v1-9-6/src/main/java/io/airbyte/integrations/debezium/internals/PostgresConverter.java index 8112f4b50927..e657619d821d 100644 --- a/airbyte-integrations/bases/debezium-v1-9-6/src/main/java/io/airbyte/integrations/debezium/internals/PostgresConverter.java +++ b/airbyte-integrations/bases/debezium-v1-9-6/src/main/java/io/airbyte/integrations/debezium/internals/PostgresConverter.java @@ -138,14 +138,11 @@ private void registerText(final RelationalColumn field, final ConverterRegistrat }); } - private Object convertArray(Object x, RelationalColumn field) { - final String fieldType = field.typeName().toUpperCase(); - Object[] values = new Object[0]; - try { - values = (Object[]) ((PgArray) x).getArray(); - } catch (SQLException e) { - LOGGER.error("Failed to convert PgArray:" + e); + private Object convertArray(final Object x, final RelationalColumn field) { + if (x == null) { + return DebeziumConverterUtils.convertDefaultValue(field); } + final String fieldType = field.typeName().toUpperCase(); switch (fieldType) { // debezium currently cannot handle MONEY[] datatype and it's not implemented case "_MONEY": @@ -167,15 +164,15 @@ private Object convertArray(Object x, RelationalColumn field) { .map(Double::valueOf) .collect(Collectors.toList()); case "_NUMERIC": - return Arrays.stream(values).map(value -> value == null ? null : Double.valueOf(value.toString())).collect(Collectors.toList()); + return Arrays.stream(getArray(x)).map(value -> value == null ? null : Double.valueOf(value.toString())).collect(Collectors.toList()); case "_TIME": - return Arrays.stream(values).map(value -> value == null ? null : convertToTime(value)).collect(Collectors.toList()); + return Arrays.stream(getArray(x)).map(value -> value == null ? null : convertToTime(value)).collect(Collectors.toList()); case "_DATE": - return Arrays.stream(values).map(value -> value == null ? null : convertToDate(value)).collect(Collectors.toList()); + return Arrays.stream(getArray(x)).map(value -> value == null ? null : convertToDate(value)).collect(Collectors.toList()); case "_TIMESTAMP": - return Arrays.stream(values).map(value -> value == null ? null : convertToTimestamp(value)).collect(Collectors.toList()); + return Arrays.stream(getArray(x)).map(value -> value == null ? null : convertToTimestamp(value)).collect(Collectors.toList()); case "_TIMESTAMPTZ": - return Arrays.stream(values).map(value -> value == null ? null : convertToTimestampWithTimezone(value)).collect(Collectors.toList()); + return Arrays.stream(getArray(x)).map(value -> value == null ? null : convertToTimestampWithTimezone(value)).collect(Collectors.toList()); case "_TIMETZ": final List timetzArr = new ArrayList<>(); @@ -194,13 +191,22 @@ private Object convertArray(Object x, RelationalColumn field) { }); return timetzArr; case "_BYTEA": - return Arrays.stream(values).map(value -> Base64.getEncoder().encodeToString((byte[]) value)).collect(Collectors.toList()); + return Arrays.stream(getArray(x)).map(value -> Base64.getEncoder().encodeToString((byte[]) value)).collect(Collectors.toList()); case "_BIT": - return Arrays.stream(values).map(value -> (Boolean) value).collect(Collectors.toList()); + return Arrays.stream(getArray(x)).map(value -> (Boolean) value).collect(Collectors.toList()); case "_NAME": - return Arrays.stream(values).map(value -> (String) value).collect(Collectors.toList()); + return Arrays.stream(getArray(x)).map(value -> (String) value).collect(Collectors.toList()); default: - return new ArrayList<>(); + throw new RuntimeException("Unknown array type detected " + fieldType); + } + } + + private Object[] getArray(final Object x) { + try { + return (Object[]) ((PgArray) x).getArray(); + } catch (final SQLException e) { + LOGGER.error("Failed to convert PgArray:" + e); + throw new RuntimeException(e); } } diff --git a/airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile b/airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile index f7c9b147afed..0978075777e9 100644 --- a/airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile +++ b/airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres-strict-encrypt COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=1.0.35 +LABEL io.airbyte.version=1.0.36 LABEL io.airbyte.name=airbyte/source-postgres-strict-encrypt diff --git a/airbyte-integrations/connectors/source-postgres/Dockerfile b/airbyte-integrations/connectors/source-postgres/Dockerfile index 793e460515a3..6edcec59beb5 100644 --- a/airbyte-integrations/connectors/source-postgres/Dockerfile +++ b/airbyte-integrations/connectors/source-postgres/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=1.0.35 +LABEL io.airbyte.version=1.0.36 LABEL io.airbyte.name=airbyte/source-postgres diff --git a/docs/integrations/sources/postgres.md b/docs/integrations/sources/postgres.md index f57c7a2d4f63..7e401765c920 100644 --- a/docs/integrations/sources/postgres.md +++ b/docs/integrations/sources/postgres.md @@ -411,6 +411,7 @@ The root causes is that the WALs needed for the incremental sync has been remove | Version | Date | Pull Request | Subject | |:--------|:-----------|:----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 1.0.36 | 2023-01-11 | [21003](https://github.com/airbytehq/airbyte/pull/21003) | Handle null values for array data types in CDC mode gracefully. | | 1.0.35 | 2023-01-04 | [20469](https://github.com/airbytehq/airbyte/pull/20469) | Introduce feature to make LSN commit behaviour configurable. | | 1.0.34 | 2022-12-13 | [20378](https://github.com/airbytehq/airbyte/pull/20378) | Improve descriptions | | 1.0.33 | 2022-12-12 | [18959](https://github.com/airbytehq/airbyte/pull/18959) | CDC : Don't timeout if snapshot is not complete. |