Skip to content

Commit

Permalink
fix-postgres-source: get rid of short lived objects (#21634)
Browse files Browse the repository at this point in the history
* fix-postgres-source: get rid of short lived objects

* cache column info as well

* get rid of constructor

* upgrade version

* auto-bump connector version

* update definition

---------

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
subodh1810 and octavia-squidington-iii authored Feb 6, 2023
1 parent b87647d commit df614d4
Show file tree
Hide file tree
Showing 10 changed files with 67 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
- name: AlloyDB for PostgreSQL
sourceDefinitionId: 1fa90628-2b9e-11ed-a261-0242ac120002
dockerRepository: airbyte/source-alloydb
dockerImageTag: 1.0.36
dockerImageTag: 1.0.43
documentationUrl: https://docs.airbyte.com/integrations/sources/alloydb
icon: alloydb.svg
sourceType: database
Expand Down Expand Up @@ -1362,7 +1362,7 @@
- name: Postgres
sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerRepository: airbyte/source-postgres
dockerImageTag: 1.0.42
dockerImageTag: 1.0.43
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
icon: postgresql.svg
sourceType: database
Expand Down
4 changes: 2 additions & 2 deletions airbyte-config/init/src/main/resources/seed/source_specs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-alloydb:1.0.36"
- dockerImage: "airbyte/source-alloydb:1.0.43"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/postgres"
connectionSpecification:
Expand Down Expand Up @@ -11606,7 +11606,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-postgres:1.0.42"
- dockerImage: "airbyte/source-postgres:1.0.43"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/postgres"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-alloydb-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.0.36
LABEL io.airbyte.version=1.0.43
LABEL io.airbyte.name=airbyte/source-alloydb-strict-encrypt
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-alloydb/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-alloydb

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.0.36
LABEL io.airbyte.version=1.0.43
LABEL io.airbyte.name=airbyte/source-alloydb
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.0.42
LABEL io.airbyte.version=1.0.43
LABEL io.airbyte.name=airbyte/source-postgres-strict-encrypt
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-postgres/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.0.42
LABEL io.airbyte.version=1.0.43
LABEL io.airbyte.name=airbyte/source-postgres
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@
import java.time.OffsetDateTime;
import java.time.OffsetTime;
import java.time.format.DateTimeParseException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.postgresql.geometric.PGbox;
import org.postgresql.geometric.PGcircle;
import org.postgresql.geometric.PGline;
Expand All @@ -60,6 +63,12 @@ public class PostgresSourceOperations extends AbstractJdbcCompatibleSourceOperat
private static final String TIMESTAMPTZ = "timestamptz";
private static final String TIMETZ = "timetz";
private static final ObjectMapper OBJECT_MAPPER = MoreMappers.initMapper();
private static final Map<Integer, PostgresType> POSTGRES_TYPE_DICT = new HashMap<>();
private final Map<String, Map<String, ColumnInfo>> streamColumnInfo = new HashMap<>();

static {
Arrays.stream(PostgresType.class.getEnumConstants()).forEach(c -> POSTGRES_TYPE_DICT.put(c.type, c));
}

@Override
public JsonNode rowToJson(final ResultSet queryContext) throws SQLException {
Expand All @@ -69,27 +78,6 @@ public JsonNode rowToJson(final ResultSet queryContext) throws SQLException {
final ObjectNode jsonNode = (ObjectNode) Jsons.jsonNode(Collections.emptyMap());

for (int i = 1; i <= columnCount; i++) {
final String columnType = metadata.getColumnTypeName(i);
// attempt to access the column. this allows us to know if it is null before we do type-specific
// parsing. if it is null, we can move on. while awkward, this seems to be the agreed upon way of
// checking for null values with jdbc.

if (columnType.equalsIgnoreCase("money")) {
// when a column is of type MONEY, getObject will throw exception
// this is a bug that will not be fixed:
// https://github.com/pgjdbc/pgjdbc/issues/425
// https://github.com/pgjdbc/pgjdbc/issues/1835
queryContext.getString(i);
} else if (columnType.equalsIgnoreCase("bit")) {
// getObject will fail as it tries to parse the value as boolean
queryContext.getString(i);
} else if (columnType.equalsIgnoreCase("numeric") || columnType.equalsIgnoreCase("decimal")) {
// getObject will fail when the value is 'infinity'
queryContext.getDouble(i);
} else {
queryContext.getObject(i);
}

// convert to java types that will convert into reasonable json.
copyToJsonField(queryContext, i, jsonNode);
}
Expand Down Expand Up @@ -166,14 +154,14 @@ protected void setDate(final PreparedStatement preparedStatement, final int para
public void copyToJsonField(final ResultSet resultSet, final int colIndex, final ObjectNode json) throws SQLException {
final PgResultSetMetaData metadata = (PgResultSetMetaData) resultSet.getMetaData();
final String columnName = metadata.getColumnName(colIndex);
final String columnTypeName = metadata.getColumnTypeName(colIndex).toLowerCase();
final PostgresType columnType = safeGetJdbcType(metadata.getColumnType(colIndex));
if (resultSet.getString(colIndex) == null) {
final ColumnInfo columnInfo = getColumnInfo(colIndex, metadata, columnName);
final String value = resultSet.getString(colIndex);
if (value == null) {
json.putNull(columnName);
} else {
switch (columnTypeName) {
switch (columnInfo.columnTypeName) {
case "bool", "boolean" -> putBoolean(json, columnName, resultSet, colIndex);
case "bytea" -> putString(json, columnName, resultSet, colIndex);
case "bytea" -> json.put(columnName, value);
case TIMETZ -> putTimeWithTimezone(json, columnName, resultSet, colIndex);
case TIMESTAMPTZ -> putTimestampWithTimezone(json, columnName, resultSet, colIndex);
case "hstore" -> putHstoreAsJson(json, columnName, resultSet, colIndex);
Expand All @@ -199,22 +187,22 @@ public void copyToJsonField(final ResultSet resultSet, final int colIndex, final
case "_timetz" -> putTimeTzArray(json, columnName, resultSet, colIndex);
case "_time" -> putTimeArray(json, columnName, resultSet, colIndex);
default -> {
switch (columnType) {
case BOOLEAN -> putBoolean(json, columnName, resultSet, colIndex);
switch (columnInfo.columnType) {
case BOOLEAN -> json.put(columnName, value.equalsIgnoreCase("t"));
case TINYINT, SMALLINT -> putShortInt(json, columnName, resultSet, colIndex);
case INTEGER -> putInteger(json, columnName, resultSet, colIndex);
case BIGINT -> putBigInt(json, columnName, resultSet, colIndex);
case FLOAT, DOUBLE -> putDouble(json, columnName, resultSet, colIndex);
case REAL -> putFloat(json, columnName, resultSet, colIndex);
case NUMERIC, DECIMAL -> putBigDecimal(json, columnName, resultSet, colIndex);
// BIT is a bit string in Postgres, e.g. '0100'
case BIT, CHAR, VARCHAR, LONGVARCHAR -> putString(json, columnName, resultSet, colIndex);
case BIT, CHAR, VARCHAR, LONGVARCHAR -> json.put(columnName, value);
case DATE -> putDate(json, columnName, resultSet, colIndex);
case TIME -> putTime(json, columnName, resultSet, colIndex);
case TIMESTAMP -> putTimestamp(json, columnName, resultSet, colIndex);
case BLOB, BINARY, VARBINARY, LONGVARBINARY -> putBinary(json, columnName, resultSet, colIndex);
case ARRAY -> putArray(json, columnName, resultSet, colIndex);
default -> putDefault(json, columnName, resultSet, colIndex);
default -> json.put(columnName, value);
}
}
}
Expand Down Expand Up @@ -412,7 +400,7 @@ public PostgresType getDatabaseFieldType(final JsonNode field) {
case "bytea" -> PostgresType.VARCHAR;
case TIMESTAMPTZ -> PostgresType.TIMESTAMP_WITH_TIMEZONE;
case TIMETZ -> PostgresType.TIME_WITH_TIMEZONE;
default -> PostgresType.valueOf(field.get(INTERNAL_COLUMN_TYPE).asInt());
default -> PostgresType.valueOf(field.get(INTERNAL_COLUMN_TYPE).asInt(), POSTGRES_TYPE_DICT);
};
} catch (final IllegalArgumentException ex) {
LOGGER.warn(String.format("Could not convert column: %s from table: %s.%s with type: %s. Casting to VARCHAR.",
Expand Down Expand Up @@ -582,4 +570,35 @@ public boolean isCursorType(final PostgresType type) {
return PostgresUtils.ALLOWED_CURSOR_TYPES.contains(type);
}

private ColumnInfo getColumnInfo(final int colIndex, final PgResultSetMetaData metadata, final String columnName) throws SQLException {
final String tableName = metadata.getBaseTableName(colIndex);
final String schemaName = metadata.getBaseSchemaName(colIndex);
final String key = schemaName + tableName;
if (!streamColumnInfo.containsKey(key)) {
streamColumnInfo.clear();
streamColumnInfo.put(key, new HashMap<>(metadata.getColumnCount()));
}

final Map<String, ColumnInfo> stringColumnInfoMap = streamColumnInfo.get(key);
if (stringColumnInfoMap.containsKey(columnName)) {
return stringColumnInfoMap.get(columnName);
} else {
final PostgresType columnType = safeGetJdbcType(metadata.getColumnType(colIndex), POSTGRES_TYPE_DICT);
final ColumnInfo columnInfo = new ColumnInfo(metadata.getColumnTypeName(colIndex).toLowerCase(), columnType);
stringColumnInfoMap.put(columnName, columnInfo);
return columnInfo;
}
}

private static class ColumnInfo {
public String columnTypeName;
public PostgresType columnType;

public ColumnInfo(final String columnTypeName, final PostgresType columnType) {
this.columnTypeName = columnTypeName;
this.columnType = columnType;
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import java.sql.SQLType;
import java.sql.Types;
import java.util.Map;

public enum PostgresType implements SQLType {

Expand Down Expand Up @@ -74,7 +75,7 @@ public enum PostgresType implements SQLType {
/**
* The Integer value for the JDBCType. It maps to a value in {@code Types.java}
*/
private Integer type;
protected Integer type;

/**
* Constructor to specify the data type value from {@code Types) for
Expand Down Expand Up @@ -121,18 +122,17 @@ public Integer getVendorTypeNumber() {
* {@code Types} value
* @see Types
*/
public static PostgresType valueOf(int type) {
for (PostgresType sqlType : PostgresType.class.getEnumConstants()) {
if (type == sqlType.type)
return sqlType;
public static PostgresType valueOf(final int type, final Map<Integer, PostgresType> postgresTypeMap) {
if (postgresTypeMap.containsKey(type)) {
return postgresTypeMap.get(type);
}
throw new IllegalArgumentException("Type:" + type + " is not a valid "
+ "Types.java value.");
}

public static PostgresType safeGetJdbcType(final int columnTypeInt) {
public static PostgresType safeGetJdbcType(final int columnTypeInt, final Map<Integer, PostgresType> postgresTypeMap) {
try {
return PostgresType.valueOf(columnTypeInt);
return PostgresType.valueOf(columnTypeInt, postgresTypeMap);
} catch (final Exception e) {
return PostgresType.VARCHAR;
}
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/alloydb.md
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@ According to Postgres [documentation](https://www.postgresql.org/docs/14/datatyp

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:-------------------------------------------------|
| 1.0.43 | 2022-02-06 | [21634](https://github.com/airbytehq/airbyte/pull/21634) | Improve Standard sync performance by caching objects.|
| 1.0.36 | 2023-01-24 | [21825](https://github.com/airbytehq/airbyte/pull/21825) | Put back the original change that will cause an incremental sync to error if table contains a NULL value in cursor column.|
| 1.0.35 | 2022-12-14 | [20436](https://github.com/airbytehq/airbyte/pull/20346) | Consolidate date/time values mapping for JDBC sources |
| 1.0.34 | 2022-12-13 | [20378](https://github.com/airbytehq/airbyte/pull/20378) | Improve descriptions |
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/postgres.md
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,7 @@ The root causes is that the WALs needed for the incremental sync has been remove

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 1.0.43 | 2022-02-06 | [21634](https://github.com/airbytehq/airbyte/pull/21634) | Improve Standard sync performance by caching objects. |
| 1.0.42 | 2022-01-23 | [21523](https://github.com/airbytehq/airbyte/pull/21523) | Check for null in cursor values before replacing. |
| 1.0.41 | 2022-01-25 | [20939](https://github.com/airbytehq/airbyte/pull/20939) | Adjust batch selection memory limits databases. |
| 1.0.40 | 2023-01-24 | [21825](https://github.com/airbytehq/airbyte/pull/21825) | Put back the original change that will cause an incremental sync to error if table contains a NULL value in cursor column. |
Expand Down

0 comments on commit df614d4

Please sign in to comment.