From 8c52a8060cc5e2d48682db9e7b60be8e7ddc3a84 Mon Sep 17 00:00:00 2001 From: subodh Date: Wed, 4 Jan 2023 11:59:14 +0530 Subject: [PATCH 1/4] postgres-source-cdc:handle null values for array data types --- .../debezium/internals/PostgresConverter.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/airbyte-integrations/bases/debezium-v1-9-6/src/main/java/io/airbyte/integrations/debezium/internals/PostgresConverter.java b/airbyte-integrations/bases/debezium-v1-9-6/src/main/java/io/airbyte/integrations/debezium/internals/PostgresConverter.java index 8112f4b50927..8ad16f59150a 100644 --- a/airbyte-integrations/bases/debezium-v1-9-6/src/main/java/io/airbyte/integrations/debezium/internals/PostgresConverter.java +++ b/airbyte-integrations/bases/debezium-v1-9-6/src/main/java/io/airbyte/integrations/debezium/internals/PostgresConverter.java @@ -138,13 +138,17 @@ private void registerText(final RelationalColumn field, final ConverterRegistrat }); } - private Object convertArray(Object x, RelationalColumn field) { + private Object convertArray(final Object x, final RelationalColumn field) { + if (x == null) { + return DebeziumConverterUtils.convertDefaultValue(field); + } final String fieldType = field.typeName().toUpperCase(); Object[] values = new Object[0]; try { values = (Object[]) ((PgArray) x).getArray(); - } catch (SQLException e) { + } catch (final SQLException e) { LOGGER.error("Failed to convert PgArray:" + e); + throw new RuntimeException(e); } switch (fieldType) { // debezium currently cannot handle MONEY[] datatype and it's not implemented @@ -200,7 +204,7 @@ private Object convertArray(Object x, RelationalColumn field) { case "_NAME": return Arrays.stream(values).map(value -> (String) value).collect(Collectors.toList()); default: - return new ArrayList<>(); + throw new RuntimeException("Unknown array type detected " + fieldType); } } From b527d2caf350dfe63bc1f72c8f3b8efc5fbbb739 Mon Sep 17 00:00:00 2001 From: subodh Date: Wed, 4 Jan 2023 15:40:26 +0530 Subject: [PATCH 2/4] fix test --- .../debezium/internals/PostgresConverter.java | 32 ++++++++++--------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/airbyte-integrations/bases/debezium-v1-9-6/src/main/java/io/airbyte/integrations/debezium/internals/PostgresConverter.java b/airbyte-integrations/bases/debezium-v1-9-6/src/main/java/io/airbyte/integrations/debezium/internals/PostgresConverter.java index 8ad16f59150a..e657619d821d 100644 --- a/airbyte-integrations/bases/debezium-v1-9-6/src/main/java/io/airbyte/integrations/debezium/internals/PostgresConverter.java +++ b/airbyte-integrations/bases/debezium-v1-9-6/src/main/java/io/airbyte/integrations/debezium/internals/PostgresConverter.java @@ -143,13 +143,6 @@ private Object convertArray(final Object x, final RelationalColumn field) { return DebeziumConverterUtils.convertDefaultValue(field); } final String fieldType = field.typeName().toUpperCase(); - Object[] values = new Object[0]; - try { - values = (Object[]) ((PgArray) x).getArray(); - } catch (final SQLException e) { - LOGGER.error("Failed to convert PgArray:" + e); - throw new RuntimeException(e); - } switch (fieldType) { // debezium currently cannot handle MONEY[] datatype and it's not implemented case "_MONEY": @@ -171,15 +164,15 @@ private Object convertArray(final Object x, final RelationalColumn field) { .map(Double::valueOf) .collect(Collectors.toList()); case "_NUMERIC": - return Arrays.stream(values).map(value -> value == null ? null : Double.valueOf(value.toString())).collect(Collectors.toList()); + return Arrays.stream(getArray(x)).map(value -> value == null ? null : Double.valueOf(value.toString())).collect(Collectors.toList()); case "_TIME": - return Arrays.stream(values).map(value -> value == null ? null : convertToTime(value)).collect(Collectors.toList()); + return Arrays.stream(getArray(x)).map(value -> value == null ? null : convertToTime(value)).collect(Collectors.toList()); case "_DATE": - return Arrays.stream(values).map(value -> value == null ? null : convertToDate(value)).collect(Collectors.toList()); + return Arrays.stream(getArray(x)).map(value -> value == null ? null : convertToDate(value)).collect(Collectors.toList()); case "_TIMESTAMP": - return Arrays.stream(values).map(value -> value == null ? null : convertToTimestamp(value)).collect(Collectors.toList()); + return Arrays.stream(getArray(x)).map(value -> value == null ? null : convertToTimestamp(value)).collect(Collectors.toList()); case "_TIMESTAMPTZ": - return Arrays.stream(values).map(value -> value == null ? null : convertToTimestampWithTimezone(value)).collect(Collectors.toList()); + return Arrays.stream(getArray(x)).map(value -> value == null ? null : convertToTimestampWithTimezone(value)).collect(Collectors.toList()); case "_TIMETZ": final List timetzArr = new ArrayList<>(); @@ -198,16 +191,25 @@ private Object convertArray(final Object x, final RelationalColumn field) { }); return timetzArr; case "_BYTEA": - return Arrays.stream(values).map(value -> Base64.getEncoder().encodeToString((byte[]) value)).collect(Collectors.toList()); + return Arrays.stream(getArray(x)).map(value -> Base64.getEncoder().encodeToString((byte[]) value)).collect(Collectors.toList()); case "_BIT": - return Arrays.stream(values).map(value -> (Boolean) value).collect(Collectors.toList()); + return Arrays.stream(getArray(x)).map(value -> (Boolean) value).collect(Collectors.toList()); case "_NAME": - return Arrays.stream(values).map(value -> (String) value).collect(Collectors.toList()); + return Arrays.stream(getArray(x)).map(value -> (String) value).collect(Collectors.toList()); default: throw new RuntimeException("Unknown array type detected " + fieldType); } } + private Object[] getArray(final Object x) { + try { + return (Object[]) ((PgArray) x).getArray(); + } catch (final SQLException e) { + LOGGER.error("Failed to convert PgArray:" + e); + throw new RuntimeException(e); + } + } + private int getTimePrecision(final RelationalColumn field) { return field.scale().orElse(-1); } From 6969d868e43e567ebc77b0834616d87274cbd1b2 Mon Sep 17 00:00:00 2001 From: subodh Date: Wed, 11 Jan 2023 11:00:40 +0530 Subject: [PATCH 3/4] upgrade version --- .../connectors/source-postgres-strict-encrypt/Dockerfile | 2 +- airbyte-integrations/connectors/source-postgres/Dockerfile | 2 +- docs/integrations/sources/postgres.md | 1 + 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile b/airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile index f7c9b147afed..0978075777e9 100644 --- a/airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile +++ b/airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres-strict-encrypt COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=1.0.35 +LABEL io.airbyte.version=1.0.36 LABEL io.airbyte.name=airbyte/source-postgres-strict-encrypt diff --git a/airbyte-integrations/connectors/source-postgres/Dockerfile b/airbyte-integrations/connectors/source-postgres/Dockerfile index 793e460515a3..6edcec59beb5 100644 --- a/airbyte-integrations/connectors/source-postgres/Dockerfile +++ b/airbyte-integrations/connectors/source-postgres/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=1.0.35 +LABEL io.airbyte.version=1.0.36 LABEL io.airbyte.name=airbyte/source-postgres diff --git a/docs/integrations/sources/postgres.md b/docs/integrations/sources/postgres.md index f57c7a2d4f63..7e401765c920 100644 --- a/docs/integrations/sources/postgres.md +++ b/docs/integrations/sources/postgres.md @@ -411,6 +411,7 @@ The root causes is that the WALs needed for the incremental sync has been remove | Version | Date | Pull Request | Subject | |:--------|:-----------|:----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 1.0.36 | 2023-01-11 | [21003](https://github.com/airbytehq/airbyte/pull/21003) | Handle null values for array data types in CDC mode gracefully. | | 1.0.35 | 2023-01-04 | [20469](https://github.com/airbytehq/airbyte/pull/20469) | Introduce feature to make LSN commit behaviour configurable. | | 1.0.34 | 2022-12-13 | [20378](https://github.com/airbytehq/airbyte/pull/20378) | Improve descriptions | | 1.0.33 | 2022-12-12 | [18959](https://github.com/airbytehq/airbyte/pull/18959) | CDC : Don't timeout if snapshot is not complete. | From b3ef4da6512d387077cac71e483e49544d64b2c2 Mon Sep 17 00:00:00 2001 From: Octavia Squidington III Date: Wed, 11 Jan 2023 06:37:31 +0000 Subject: [PATCH 4/4] auto-bump connector version --- .../init/src/main/resources/seed/source_definitions.yaml | 2 +- airbyte-config/init/src/main/resources/seed/source_specs.yaml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 8f9b622fa068..1c4d07cc252d 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -1320,7 +1320,7 @@ - name: Postgres sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750 dockerRepository: airbyte/source-postgres - dockerImageTag: 1.0.35 + dockerImageTag: 1.0.36 documentationUrl: https://docs.airbyte.com/integrations/sources/postgres icon: postgresql.svg sourceType: database diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 6528f9b8c170..7307d1c096e1 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -11353,7 +11353,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-postgres:1.0.35" +- dockerImage: "airbyte/source-postgres:1.0.36" spec: documentationUrl: "https://docs.airbyte.com/integrations/sources/postgres" connectionSpecification: