From 4b266c1b7db8c3f2a1fd48ab044fdd6f349614ca Mon Sep 17 00:00:00 2001 From: Yurii Bidiuk <35812734+yurii-bidiuk@users.noreply.github.com> Date: Mon, 6 Jun 2022 22:25:29 +0300 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=9B=20Source=20Postgres:=20write=20hst?= =?UTF-8?q?ore=20as=20json=20(#13367)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * src-postgres: write hstore as json * fix PR remarks * bump version * fix PR comments * add text blocks --- .../connectors/source-postgres/Dockerfile | 2 +- .../source/postgres/PostgresSourceOperations.java | 15 +++++++++++++++ .../sources/CdcPostgresSourceDatatypeTest.java | 15 +++++++++++++++ .../sources/PostgresSourceDatatypeTest.java | 15 +++++++++++++++ docs/integrations/sources/postgres.md | 2 ++ 5 files changed, 48 insertions(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-postgres/Dockerfile b/airbyte-integrations/connectors/source-postgres/Dockerfile index 8822f74f19c3..a3c9bb568e86 100644 --- a/airbyte-integrations/connectors/source-postgres/Dockerfile +++ b/airbyte-integrations/connectors/source-postgres/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.4.19 +LABEL io.airbyte.version=0.4.20 LABEL io.airbyte.name=airbyte/source-postgres diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java index 7610a62f6800..4d8247798a79 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java @@ -10,9 +10,12 @@ import static io.airbyte.db.jdbc.JdbcConstants.INTERNAL_SCHEMA_NAME; import static io.airbyte.db.jdbc.JdbcConstants.INTERNAL_TABLE_NAME; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.annotations.VisibleForTesting; +import io.airbyte.commons.jackson.MoreMappers; import io.airbyte.commons.json.Jsons; import io.airbyte.db.DataTypeUtils; import io.airbyte.db.jdbc.JdbcSourceOperations; @@ -37,6 +40,7 @@ public class PostgresSourceOperations extends JdbcSourceOperations { private static final Logger LOGGER = LoggerFactory.getLogger(PostgresSourceOperations.class); private static final String TIMESTAMPTZ = "timestamptz"; private static final String TIMETZ = "timetz"; + private static final ObjectMapper OBJECT_MAPPER = MoreMappers.initMapper(); @Override public JsonNode rowToJson(final ResultSet queryContext) throws SQLException { @@ -98,6 +102,7 @@ public void setJsonField(final ResultSet resultSet, final int colIndex, final Ob case "bytea" -> putString(json, columnName, resultSet, colIndex); case TIMETZ -> putTimeWithTimezone(json, columnName, resultSet, colIndex); case TIMESTAMPTZ -> putTimestampWithTimezone(json, columnName, resultSet, colIndex); + case "hstore" -> putHstoreAsJson(json, columnName, resultSet, colIndex); default -> { switch (columnType) { case BOOLEAN -> putBoolean(json, columnName, resultSet, colIndex); @@ -211,6 +216,16 @@ private void putMoney(final ObjectNode node, final String columnName, final Resu node.put(columnName, DataTypeUtils.returnNullIfInvalid(() -> Double.valueOf(moneyValue), Double::isFinite)); } + private void putHstoreAsJson(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) + throws SQLException { + final var data = resultSet.getObject(index); + try { + node.put(columnName, OBJECT_MAPPER.writeValueAsString(data)); + } catch (JsonProcessingException e) { + throw new RuntimeException("Could not parse 'hstore' value:" + e); + } + } + /** * @return monetary value in numbers without the currency symbol or thousands separators. */ diff --git a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcPostgresSourceDatatypeTest.java b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcPostgresSourceDatatypeTest.java index 5b924aa123cd..12c1305d1842 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcPostgresSourceDatatypeTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcPostgresSourceDatatypeTest.java @@ -73,6 +73,7 @@ protected Database setupDatabase() throws Exception { database.query(ctx -> { ctx.execute("SELECT pg_create_logical_replication_slot('" + SLOT_NAME_BASE + "', 'pgoutput');"); ctx.execute("CREATE PUBLICATION " + PUBLICATION + " FOR ALL TABLES;"); + ctx.execute("CREATE EXTENSION hstore;"); return null; }); @@ -541,6 +542,20 @@ protected void initTests() { .addInsertValues("null", "'fat & (rat | cat)'::tsquery", "'fat:ab & cat'::tsquery") .addExpectedValues(null, "'fat' & ( 'rat' | 'cat' )", "'fat':AB & 'cat'") .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("hstore") + .airbyteType(JsonSchemaType.STRING) + .addInsertValues(""" + '"paperback" => "243","publisher" => "postgresqltutorial.com", + "language" => "English","ISBN-13" => "978-1449370000", + "weight" => "11.2 ounces"' + """, null) + .addExpectedValues(""" + {"ISBN-13":"978-1449370000","weight":"11.2 ounces","paperback":"243","publisher":"postgresqltutorial.com","language":"English"}""", + null) + .build()); } } diff --git a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresSourceDatatypeTest.java b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresSourceDatatypeTest.java index f6786aec2dd1..4be8859c1c53 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresSourceDatatypeTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresSourceDatatypeTest.java @@ -68,6 +68,7 @@ protected Database setupDatabase() throws SQLException { // Set up a fixed timezone here so that timetz and timestamptz always have the same time zone // wherever the tests are running on. ctx.execute("SET TIMEZONE TO 'MST'"); + ctx.execute("CREATE EXTENSION hstore;"); return null; }); @@ -562,6 +563,20 @@ protected void initTests() { .addInsertValues("ROW('fuzzy dice', 42, 1.99)", "null") .addExpectedValues("(\"fuzzy dice\",42,1.99)", null) .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("hstore") + .airbyteType(JsonSchemaType.STRING) + .addInsertValues(""" + '"paperback" => "243","publisher" => "postgresqltutorial.com", + "language" => "English","ISBN-13" => "978-1449370000", + "weight" => "11.2 ounces"' + """, null) + .addExpectedValues(""" + {"ISBN-13":"978-1449370000","weight":"11.2 ounces","paperback":"243","publisher":"postgresqltutorial.com","language":"English"}""", + null) + .build()); } } diff --git a/docs/integrations/sources/postgres.md b/docs/integrations/sources/postgres.md index 8e2fdf21f3ec..764b3f024205 100644 --- a/docs/integrations/sources/postgres.md +++ b/docs/integrations/sources/postgres.md @@ -237,6 +237,7 @@ According to Postgres [documentation](https://www.postgresql.org/docs/14/datatyp | `circle` | string | | | `date` | string | Parsed as ISO8601 date time at midnight. Does not support B.C. dates. Issue: [#8903](https://github.com/airbytehq/airbyte/issues/8903). | | `double precision`, `float`, `float8` | number | `Infinity`, `-Infinity`, and `NaN` are not supported and converted to `null`. Issue: [#8902](https://github.com/airbytehq/airbyte/issues/8902). | +| `hstore` | string | | | `inet` | string | | | `integer`, `int`, `int4` | number | | | `interval` | string | | @@ -274,6 +275,7 @@ According to Postgres [documentation](https://www.postgresql.org/docs/14/datatyp | Version | Date | Pull Request | Subject | |:--------|:-----------|:-------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------| +| 0.4.20 | 2022-06-02 | [13367](https://github.com/airbytehq/airbyte/pull/13367) | Added convertion hstore to json format | | 0.4.19 | 2022-05-25 | [13166](https://github.com/airbytehq/airbyte/pull/13166) | Added timezone awareness and handle BC dates | | 0.4.18 | 2022-05-25 | [13083](https://github.com/airbytehq/airbyte/pull/13083) | Add support for tsquey type | | 0.4.17 | 2022-05-19 | [13016](https://github.com/airbytehq/airbyte/pull/13016) | CDC modify schema to allow null values |