Skip to content

Commit

Permalink
🐛 Source Postgres: write hstore as json (#13367)
Browse files Browse the repository at this point in the history
* src-postgres: write hstore as json

* fix PR remarks

* bump version

* fix PR comments

* add text blocks
  • Loading branch information
yurii-bidiuk authored Jun 6, 2022
1 parent 40cb78e commit 4b266c1
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 1 deletion.
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-postgres/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.4.19
LABEL io.airbyte.version=0.4.20
LABEL io.airbyte.name=airbyte/source-postgres
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@
import static io.airbyte.db.jdbc.JdbcConstants.INTERNAL_SCHEMA_NAME;
import static io.airbyte.db.jdbc.JdbcConstants.INTERNAL_TABLE_NAME;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.annotations.VisibleForTesting;
import io.airbyte.commons.jackson.MoreMappers;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.DataTypeUtils;
import io.airbyte.db.jdbc.JdbcSourceOperations;
Expand All @@ -37,6 +40,7 @@ public class PostgresSourceOperations extends JdbcSourceOperations {
private static final Logger LOGGER = LoggerFactory.getLogger(PostgresSourceOperations.class);
private static final String TIMESTAMPTZ = "timestamptz";
private static final String TIMETZ = "timetz";
private static final ObjectMapper OBJECT_MAPPER = MoreMappers.initMapper();

@Override
public JsonNode rowToJson(final ResultSet queryContext) throws SQLException {
Expand Down Expand Up @@ -98,6 +102,7 @@ public void setJsonField(final ResultSet resultSet, final int colIndex, final Ob
case "bytea" -> putString(json, columnName, resultSet, colIndex);
case TIMETZ -> putTimeWithTimezone(json, columnName, resultSet, colIndex);
case TIMESTAMPTZ -> putTimestampWithTimezone(json, columnName, resultSet, colIndex);
case "hstore" -> putHstoreAsJson(json, columnName, resultSet, colIndex);
default -> {
switch (columnType) {
case BOOLEAN -> putBoolean(json, columnName, resultSet, colIndex);
Expand Down Expand Up @@ -211,6 +216,16 @@ private void putMoney(final ObjectNode node, final String columnName, final Resu
node.put(columnName, DataTypeUtils.returnNullIfInvalid(() -> Double.valueOf(moneyValue), Double::isFinite));
}

private void putHstoreAsJson(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index)
throws SQLException {
final var data = resultSet.getObject(index);
try {
node.put(columnName, OBJECT_MAPPER.writeValueAsString(data));
} catch (JsonProcessingException e) {
throw new RuntimeException("Could not parse 'hstore' value:" + e);
}
}

/**
* @return monetary value in numbers without the currency symbol or thousands separators.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ protected Database setupDatabase() throws Exception {
database.query(ctx -> {
ctx.execute("SELECT pg_create_logical_replication_slot('" + SLOT_NAME_BASE + "', 'pgoutput');");
ctx.execute("CREATE PUBLICATION " + PUBLICATION + " FOR ALL TABLES;");
ctx.execute("CREATE EXTENSION hstore;");

return null;
});
Expand Down Expand Up @@ -541,6 +542,20 @@ protected void initTests() {
.addInsertValues("null", "'fat & (rat | cat)'::tsquery", "'fat:ab & cat'::tsquery")
.addExpectedValues(null, "'fat' & ( 'rat' | 'cat' )", "'fat':AB & 'cat'")
.build());

addDataTypeTestData(
TestDataHolder.builder()
.sourceType("hstore")
.airbyteType(JsonSchemaType.STRING)
.addInsertValues("""
'"paperback" => "243","publisher" => "postgresqltutorial.com",
"language" => "English","ISBN-13" => "978-1449370000",
"weight" => "11.2 ounces"'
""", null)
.addExpectedValues("""
{"ISBN-13":"978-1449370000","weight":"11.2 ounces","paperback":"243","publisher":"postgresqltutorial.com","language":"English"}""",
null)
.build());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ protected Database setupDatabase() throws SQLException {
// Set up a fixed timezone here so that timetz and timestamptz always have the same time zone
// wherever the tests are running on.
ctx.execute("SET TIMEZONE TO 'MST'");
ctx.execute("CREATE EXTENSION hstore;");
return null;
});

Expand Down Expand Up @@ -562,6 +563,20 @@ protected void initTests() {
.addInsertValues("ROW('fuzzy dice', 42, 1.99)", "null")
.addExpectedValues("(\"fuzzy dice\",42,1.99)", null)
.build());

addDataTypeTestData(
TestDataHolder.builder()
.sourceType("hstore")
.airbyteType(JsonSchemaType.STRING)
.addInsertValues("""
'"paperback" => "243","publisher" => "postgresqltutorial.com",
"language" => "English","ISBN-13" => "978-1449370000",
"weight" => "11.2 ounces"'
""", null)
.addExpectedValues("""
{"ISBN-13":"978-1449370000","weight":"11.2 ounces","paperback":"243","publisher":"postgresqltutorial.com","language":"English"}""",
null)
.build());
}

}
2 changes: 2 additions & 0 deletions docs/integrations/sources/postgres.md
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ According to Postgres [documentation](https://www.postgresql.org/docs/14/datatyp
| `circle` | string | |
| `date` | string | Parsed as ISO8601 date time at midnight. Does not support B.C. dates. Issue: [#8903](https://github.com/airbytehq/airbyte/issues/8903). |
| `double precision`, `float`, `float8` | number | `Infinity`, `-Infinity`, and `NaN` are not supported and converted to `null`. Issue: [#8902](https://github.com/airbytehq/airbyte/issues/8902). |
| `hstore` | string | |
| `inet` | string | |
| `integer`, `int`, `int4` | number | |
| `interval` | string | |
Expand Down Expand Up @@ -274,6 +275,7 @@ According to Postgres [documentation](https://www.postgresql.org/docs/14/datatyp

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------|
| 0.4.20 | 2022-06-02 | [13367](https://github.com/airbytehq/airbyte/pull/13367) | Added convertion hstore to json format |
| 0.4.19 | 2022-05-25 | [13166](https://github.com/airbytehq/airbyte/pull/13166) | Added timezone awareness and handle BC dates |
| 0.4.18 | 2022-05-25 | [13083](https://github.com/airbytehq/airbyte/pull/13083) | Add support for tsquey type |
| 0.4.17 | 2022-05-19 | [13016](https://github.com/airbytehq/airbyte/pull/13016) | CDC modify schema to allow null values |
Expand Down

0 comments on commit 4b266c1

Please sign in to comment.