Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source Postgres : Fast query for estimate messages #21683

Merged
merged 7 commits into from
Jan 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1338,7 +1338,7 @@
- name: Postgres
sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerRepository: airbyte/source-postgres
dockerImageTag: 1.0.38
dockerImageTag: 1.0.39
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
icon: postgresql.svg
sourceType: database
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11476,7 +11476,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-postgres:1.0.38"
- dockerImage: "airbyte/source-postgres:1.0.39"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/postgres"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.0.38
LABEL io.airbyte.version=1.0.39
LABEL io.airbyte.name=airbyte/source-postgres-strict-encrypt
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.0.38
LABEL io.airbyte.version=1.0.39
LABEL io.airbyte.name=airbyte/source-postgres
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class PostgresQueryUtils {

public static final String TABLE_ESTIMATE_QUERY =
"""
SELECT (SELECT COUNT(*) FROM %s) AS %s,
SELECT (select reltuples::int8 as count from pg_class c JOIN pg_catalog.pg_namespace n ON n.oid=c.relnamespace where nspname='%s' AND relname='%s') AS %s,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe both versions should be kept and done in a try/catch... I wonder if there are certain older versions of postgres that don't have a pg_catalog table?

pg_relation_size('%s') AS %s;
""";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -557,13 +557,20 @@ protected void estimateFullRefreshSyncSize(final JdbcDatabase database,
final String fullTableName =
getFullyQualifiedTableNameWithQuoting(schemaName, tableName, getQuoteString());

final List<JsonNode> tableEstimateResult = getFullTableEstimate(database, fullTableName);
final List<JsonNode> tableEstimateResult = getFullTableEstimate(database, fullTableName, schemaName, tableName);

if (!tableEstimateResult.isEmpty() && tableEstimateResult.get(0).has(ROW_COUNT_RESULT_COL) &&
tableEstimateResult.get(0).has(TOTAL_BYTES_RESULT_COL)) {
final long syncRowCount = tableEstimateResult.get(0).get(ROW_COUNT_RESULT_COL).asLong();
final long syncByteCount = tableEstimateResult.get(0).get(TOTAL_BYTES_RESULT_COL).asLong();

// The fast count query can return negative or otherwise invalid results for small tables. In this
// case, we can skip emitting an
// estimate trace altogether since the sync will likely complete quickly.
if (syncRowCount <= 0) {
return;
}

// Here, we double the bytes estimate to account for serialization. Perhaps a better way to do this
// is to
// read a row and Stringify it to better understand the accurate volume of data sent over the wire.
Expand All @@ -588,20 +595,23 @@ protected void estimateIncrementalSyncSize(final JdbcDatabase database,
final String fullTableName =
getFullyQualifiedTableNameWithQuoting(schemaName, tableName, getQuoteString());

final List<JsonNode> tableEstimateResult = getFullTableEstimate(database, fullTableName);
final List<JsonNode> tableEstimateResult = getFullTableEstimate(database, fullTableName, schemaName, tableName);

final long tableRowCount = tableEstimateResult.get(0).get(ROW_COUNT_RESULT_COL).asLong();
final long tableByteCount = tableEstimateResult.get(0).get(TOTAL_BYTES_RESULT_COL).asLong();

// The fast count query can return negative or otherwise invalid results for small tables. In this
// case, we can skip emitting an
// estimate trace altogether since the sync will likely complete quickly.
if (tableRowCount <= 0) {
return;
}

final long syncRowCount;
final long syncByteCount;

syncRowCount = getIncrementalTableRowCount(database, fullTableName, cursorInfo, cursorFieldType);
if (tableRowCount == 0) {
syncByteCount = 0;
} else {
syncByteCount = (tableByteCount / tableRowCount) * syncRowCount;
}
syncByteCount = (tableByteCount / tableRowCount) * syncRowCount;

// Here, we double the bytes estimate to account for serialization. Perhaps a better way to do this
// is to
Expand All @@ -615,10 +625,14 @@ protected void estimateIncrementalSyncSize(final JdbcDatabase database,
}
}

private List<JsonNode> getFullTableEstimate(final JdbcDatabase database, final String fullTableName) throws SQLException {
private List<JsonNode> getFullTableEstimate(final JdbcDatabase database,
final String fullTableName,
final String schemaName,
final String tableName)
throws SQLException {
// Construct the table estimate query.
final String tableEstimateQuery =
String.format(TABLE_ESTIMATE_QUERY, fullTableName, ROW_COUNT_RESULT_COL, fullTableName, TOTAL_BYTES_RESULT_COL);
String.format(TABLE_ESTIMATE_QUERY, schemaName, tableName, ROW_COUNT_RESULT_COL, fullTableName, TOTAL_BYTES_RESULT_COL);
LOGGER.debug("table estimate query: {}", tableEstimateQuery);
final List<JsonNode> jsonNodes = database.bufferedResultSetQuery(conn -> conn.createStatement().executeQuery(tableEstimateQuery),
resultSet -> JdbcUtils.getDefaultSourceOperations().rowToJson(resultSet));
Expand Down
Loading