diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 4b87f1e73457a..7825ee1a0819a 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -45,7 +45,7 @@ - name: AlloyDB for PostgreSQL sourceDefinitionId: 1fa90628-2b9e-11ed-a261-0242ac120002 dockerRepository: airbyte/source-alloydb - dockerImageTag: 1.0.36 + dockerImageTag: 1.0.43 documentationUrl: https://docs.airbyte.com/integrations/sources/alloydb icon: alloydb.svg sourceType: database @@ -1362,7 +1362,7 @@ - name: Postgres sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750 dockerRepository: airbyte/source-postgres - dockerImageTag: 1.0.42 + dockerImageTag: 1.0.43 documentationUrl: https://docs.airbyte.com/integrations/sources/postgres icon: postgresql.svg sourceType: database diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 6d93b5ef780f9..1888f7a4ce9bf 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -370,7 +370,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-alloydb:1.0.36" +- dockerImage: "airbyte/source-alloydb:1.0.43" spec: documentationUrl: "https://docs.airbyte.com/integrations/sources/postgres" connectionSpecification: @@ -11606,7 +11606,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-postgres:1.0.42" +- dockerImage: "airbyte/source-postgres:1.0.43" spec: documentationUrl: "https://docs.airbyte.com/integrations/sources/postgres" connectionSpecification: diff --git a/airbyte-integrations/connectors/source-alloydb-strict-encrypt/Dockerfile b/airbyte-integrations/connectors/source-alloydb-strict-encrypt/Dockerfile index e02b1482975ad..610d5185acdec 100644 --- a/airbyte-integrations/connectors/source-alloydb-strict-encrypt/Dockerfile +++ b/airbyte-integrations/connectors/source-alloydb-strict-encrypt/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION source-alloydb-strict-encrypt COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=1.0.36 +LABEL io.airbyte.version=1.0.43 LABEL io.airbyte.name=airbyte/source-alloydb-strict-encrypt diff --git a/airbyte-integrations/connectors/source-alloydb/Dockerfile b/airbyte-integrations/connectors/source-alloydb/Dockerfile index c18c20b855713..1c03fb04184f2 100644 --- a/airbyte-integrations/connectors/source-alloydb/Dockerfile +++ b/airbyte-integrations/connectors/source-alloydb/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION source-alloydb COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=1.0.36 +LABEL io.airbyte.version=1.0.43 LABEL io.airbyte.name=airbyte/source-alloydb diff --git a/airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile b/airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile index 972cbb614037e..71c118d54497f 100644 --- a/airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile +++ b/airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres-strict-encrypt COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=1.0.42 +LABEL io.airbyte.version=1.0.43 LABEL io.airbyte.name=airbyte/source-postgres-strict-encrypt diff --git a/airbyte-integrations/connectors/source-postgres/Dockerfile b/airbyte-integrations/connectors/source-postgres/Dockerfile index f05fd2e03c1f8..bae41a4ddc261 100644 --- a/airbyte-integrations/connectors/source-postgres/Dockerfile +++ b/airbyte-integrations/connectors/source-postgres/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=1.0.42 +LABEL io.airbyte.version=1.0.43 LABEL io.airbyte.name=airbyte/source-postgres diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java index 6d491c8df10a7..aca4706e9600a 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java @@ -40,7 +40,10 @@ import java.time.OffsetDateTime; import java.time.OffsetTime; import java.time.format.DateTimeParseException; +import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import org.postgresql.geometric.PGbox; import org.postgresql.geometric.PGcircle; import org.postgresql.geometric.PGline; @@ -60,6 +63,12 @@ public class PostgresSourceOperations extends AbstractJdbcCompatibleSourceOperat private static final String TIMESTAMPTZ = "timestamptz"; private static final String TIMETZ = "timetz"; private static final ObjectMapper OBJECT_MAPPER = MoreMappers.initMapper(); + private static final Map POSTGRES_TYPE_DICT = new HashMap<>(); + private final Map> streamColumnInfo = new HashMap<>(); + + static { + Arrays.stream(PostgresType.class.getEnumConstants()).forEach(c -> POSTGRES_TYPE_DICT.put(c.type, c)); + } @Override public JsonNode rowToJson(final ResultSet queryContext) throws SQLException { @@ -69,27 +78,6 @@ public JsonNode rowToJson(final ResultSet queryContext) throws SQLException { final ObjectNode jsonNode = (ObjectNode) Jsons.jsonNode(Collections.emptyMap()); for (int i = 1; i <= columnCount; i++) { - final String columnType = metadata.getColumnTypeName(i); - // attempt to access the column. this allows us to know if it is null before we do type-specific - // parsing. if it is null, we can move on. while awkward, this seems to be the agreed upon way of - // checking for null values with jdbc. - - if (columnType.equalsIgnoreCase("money")) { - // when a column is of type MONEY, getObject will throw exception - // this is a bug that will not be fixed: - // https://github.com/pgjdbc/pgjdbc/issues/425 - // https://github.com/pgjdbc/pgjdbc/issues/1835 - queryContext.getString(i); - } else if (columnType.equalsIgnoreCase("bit")) { - // getObject will fail as it tries to parse the value as boolean - queryContext.getString(i); - } else if (columnType.equalsIgnoreCase("numeric") || columnType.equalsIgnoreCase("decimal")) { - // getObject will fail when the value is 'infinity' - queryContext.getDouble(i); - } else { - queryContext.getObject(i); - } - // convert to java types that will convert into reasonable json. copyToJsonField(queryContext, i, jsonNode); } @@ -166,14 +154,14 @@ protected void setDate(final PreparedStatement preparedStatement, final int para public void copyToJsonField(final ResultSet resultSet, final int colIndex, final ObjectNode json) throws SQLException { final PgResultSetMetaData metadata = (PgResultSetMetaData) resultSet.getMetaData(); final String columnName = metadata.getColumnName(colIndex); - final String columnTypeName = metadata.getColumnTypeName(colIndex).toLowerCase(); - final PostgresType columnType = safeGetJdbcType(metadata.getColumnType(colIndex)); - if (resultSet.getString(colIndex) == null) { + final ColumnInfo columnInfo = getColumnInfo(colIndex, metadata, columnName); + final String value = resultSet.getString(colIndex); + if (value == null) { json.putNull(columnName); } else { - switch (columnTypeName) { + switch (columnInfo.columnTypeName) { case "bool", "boolean" -> putBoolean(json, columnName, resultSet, colIndex); - case "bytea" -> putString(json, columnName, resultSet, colIndex); + case "bytea" -> json.put(columnName, value); case TIMETZ -> putTimeWithTimezone(json, columnName, resultSet, colIndex); case TIMESTAMPTZ -> putTimestampWithTimezone(json, columnName, resultSet, colIndex); case "hstore" -> putHstoreAsJson(json, columnName, resultSet, colIndex); @@ -199,8 +187,8 @@ public void copyToJsonField(final ResultSet resultSet, final int colIndex, final case "_timetz" -> putTimeTzArray(json, columnName, resultSet, colIndex); case "_time" -> putTimeArray(json, columnName, resultSet, colIndex); default -> { - switch (columnType) { - case BOOLEAN -> putBoolean(json, columnName, resultSet, colIndex); + switch (columnInfo.columnType) { + case BOOLEAN -> json.put(columnName, value.equalsIgnoreCase("t")); case TINYINT, SMALLINT -> putShortInt(json, columnName, resultSet, colIndex); case INTEGER -> putInteger(json, columnName, resultSet, colIndex); case BIGINT -> putBigInt(json, columnName, resultSet, colIndex); @@ -208,13 +196,13 @@ public void copyToJsonField(final ResultSet resultSet, final int colIndex, final case REAL -> putFloat(json, columnName, resultSet, colIndex); case NUMERIC, DECIMAL -> putBigDecimal(json, columnName, resultSet, colIndex); // BIT is a bit string in Postgres, e.g. '0100' - case BIT, CHAR, VARCHAR, LONGVARCHAR -> putString(json, columnName, resultSet, colIndex); + case BIT, CHAR, VARCHAR, LONGVARCHAR -> json.put(columnName, value); case DATE -> putDate(json, columnName, resultSet, colIndex); case TIME -> putTime(json, columnName, resultSet, colIndex); case TIMESTAMP -> putTimestamp(json, columnName, resultSet, colIndex); case BLOB, BINARY, VARBINARY, LONGVARBINARY -> putBinary(json, columnName, resultSet, colIndex); case ARRAY -> putArray(json, columnName, resultSet, colIndex); - default -> putDefault(json, columnName, resultSet, colIndex); + default -> json.put(columnName, value); } } } @@ -412,7 +400,7 @@ public PostgresType getDatabaseFieldType(final JsonNode field) { case "bytea" -> PostgresType.VARCHAR; case TIMESTAMPTZ -> PostgresType.TIMESTAMP_WITH_TIMEZONE; case TIMETZ -> PostgresType.TIME_WITH_TIMEZONE; - default -> PostgresType.valueOf(field.get(INTERNAL_COLUMN_TYPE).asInt()); + default -> PostgresType.valueOf(field.get(INTERNAL_COLUMN_TYPE).asInt(), POSTGRES_TYPE_DICT); }; } catch (final IllegalArgumentException ex) { LOGGER.warn(String.format("Could not convert column: %s from table: %s.%s with type: %s. Casting to VARCHAR.", @@ -582,4 +570,35 @@ public boolean isCursorType(final PostgresType type) { return PostgresUtils.ALLOWED_CURSOR_TYPES.contains(type); } + private ColumnInfo getColumnInfo(final int colIndex, final PgResultSetMetaData metadata, final String columnName) throws SQLException { + final String tableName = metadata.getBaseTableName(colIndex); + final String schemaName = metadata.getBaseSchemaName(colIndex); + final String key = schemaName + tableName; + if (!streamColumnInfo.containsKey(key)) { + streamColumnInfo.clear(); + streamColumnInfo.put(key, new HashMap<>(metadata.getColumnCount())); + } + + final Map stringColumnInfoMap = streamColumnInfo.get(key); + if (stringColumnInfoMap.containsKey(columnName)) { + return stringColumnInfoMap.get(columnName); + } else { + final PostgresType columnType = safeGetJdbcType(metadata.getColumnType(colIndex), POSTGRES_TYPE_DICT); + final ColumnInfo columnInfo = new ColumnInfo(metadata.getColumnTypeName(colIndex).toLowerCase(), columnType); + stringColumnInfoMap.put(columnName, columnInfo); + return columnInfo; + } + } + + private static class ColumnInfo { + public String columnTypeName; + public PostgresType columnType; + + public ColumnInfo(final String columnTypeName, final PostgresType columnType) { + this.columnTypeName = columnTypeName; + this.columnType = columnType; + } + + } + } diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresType.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresType.java index 43d61299cf5f8..f3a4c5f249cef 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresType.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresType.java @@ -6,6 +6,7 @@ import java.sql.SQLType; import java.sql.Types; +import java.util.Map; public enum PostgresType implements SQLType { @@ -74,7 +75,7 @@ public enum PostgresType implements SQLType { /** * The Integer value for the JDBCType. It maps to a value in {@code Types.java} */ - private Integer type; + protected Integer type; /** * Constructor to specify the data type value from {@code Types) for @@ -121,18 +122,17 @@ public Integer getVendorTypeNumber() { * {@code Types} value * @see Types */ - public static PostgresType valueOf(int type) { - for (PostgresType sqlType : PostgresType.class.getEnumConstants()) { - if (type == sqlType.type) - return sqlType; + public static PostgresType valueOf(final int type, final Map postgresTypeMap) { + if (postgresTypeMap.containsKey(type)) { + return postgresTypeMap.get(type); } throw new IllegalArgumentException("Type:" + type + " is not a valid " + "Types.java value."); } - public static PostgresType safeGetJdbcType(final int columnTypeInt) { + public static PostgresType safeGetJdbcType(final int columnTypeInt, final Map postgresTypeMap) { try { - return PostgresType.valueOf(columnTypeInt); + return PostgresType.valueOf(columnTypeInt, postgresTypeMap); } catch (final Exception e) { return PostgresType.VARCHAR; } diff --git a/docs/integrations/sources/alloydb.md b/docs/integrations/sources/alloydb.md index be51e0187a219..9ed74fb9f3508 100644 --- a/docs/integrations/sources/alloydb.md +++ b/docs/integrations/sources/alloydb.md @@ -327,6 +327,7 @@ According to Postgres [documentation](https://www.postgresql.org/docs/14/datatyp | Version | Date | Pull Request | Subject | |:--------|:-----------|:---------------------------------------------------------|:-------------------------------------------------| +| 1.0.43 | 2022-02-06 | [21634](https://github.com/airbytehq/airbyte/pull/21634) | Improve Standard sync performance by caching objects.| | 1.0.36 | 2023-01-24 | [21825](https://github.com/airbytehq/airbyte/pull/21825) | Put back the original change that will cause an incremental sync to error if table contains a NULL value in cursor column.| | 1.0.35 | 2022-12-14 | [20436](https://github.com/airbytehq/airbyte/pull/20346) | Consolidate date/time values mapping for JDBC sources | | 1.0.34 | 2022-12-13 | [20378](https://github.com/airbytehq/airbyte/pull/20378) | Improve descriptions | diff --git a/docs/integrations/sources/postgres.md b/docs/integrations/sources/postgres.md index a92717f6f3978..ee9de3b54624d 100644 --- a/docs/integrations/sources/postgres.md +++ b/docs/integrations/sources/postgres.md @@ -411,6 +411,7 @@ The root causes is that the WALs needed for the incremental sync has been remove | Version | Date | Pull Request | Subject | |:--------|:-----------|:---------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 1.0.43 | 2022-02-06 | [21634](https://github.com/airbytehq/airbyte/pull/21634) | Improve Standard sync performance by caching objects. | | 1.0.42 | 2022-01-23 | [21523](https://github.com/airbytehq/airbyte/pull/21523) | Check for null in cursor values before replacing. | | 1.0.41 | 2022-01-25 | [20939](https://github.com/airbytehq/airbyte/pull/20939) | Adjust batch selection memory limits databases. | | 1.0.40 | 2023-01-24 | [21825](https://github.com/airbytehq/airbyte/pull/21825) | Put back the original change that will cause an incremental sync to error if table contains a NULL value in cursor column. |