Skip to content

Commit

Permalink
feat: Filter out system views out of system namespaces (#22221)
Browse files Browse the repository at this point in the history
* This changes allows to filter out system views created out of system namespaces

* Add extra view

* Fix issue

* Bump Postgres source version
  • Loading branch information
sergio-ropero authored Feb 6, 2023
1 parent ac370c9 commit 3bf87a9
Show file tree
Hide file tree
Showing 11 changed files with 40 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1362,7 +1362,7 @@
- name: Postgres
sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerRepository: airbyte/source-postgres
dockerImageTag: 1.0.42
dockerImageTag: 1.0.43
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
icon: postgresql.svg
sourceType: database
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11606,7 +11606,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-postgres:1.0.42"
- dockerImage: "airbyte/source-postgres:1.0.43"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/postgres"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@ public Set<String> getExcludedInternalNameSpaces() {
return Collections.emptySet();
}

@Override
protected Set<String> getExcludedViews() {
return Collections.emptySet();
}

@Override
protected List<TableInfo<CommonField<StandardSQLTypeName>>> discoverInternal(final BigQueryDatabase database) throws Exception {
return discoverInternal(database, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ public Set<String> getExcludedInternalNameSpaces() {
return Set.of("information_schema", "pg_catalog", "pg_internal", "catalog_history");
}

@Override
protected Set<String> getExcludedViews() {
return Set.of("pg_stat_statements", "pg_stat_statements_info");
}

public static void main(final String[] args) throws Exception {
final Source source = new JdbcSource();
LOGGER.info("starting source: {}", JdbcSource.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ public Set<String> getExcludedInternalNameSpaces() {
return Collections.emptySet();
}

@Override
protected Set<String> getExcludedViews() {
return Collections.emptySet();
}

@Override
protected List<TableInfo<CommonField<BsonType>>> discoverInternal(final MongoDatabase database)
throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.0.42
LABEL io.airbyte.version=1.0.43
LABEL io.airbyte.name=airbyte/source-postgres-strict-encrypt
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-postgres/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.0.42
LABEL io.airbyte.version=1.0.43
LABEL io.airbyte.name=airbyte/source-postgres
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,11 @@ public Set<String> getExcludedInternalNameSpaces() {
return Set.of("information_schema", "pg_catalog", "pg_internal", "catalog_history");
}

@Override
protected Set<String> getExcludedViews() {
return Set.of("pg_stat_statements", "pg_stat_statements_info");
}

@Override
public AirbyteCatalog discover(final JsonNode config) throws Exception {
final AirbyteCatalog catalog = super.discover(config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@ public Set<String> getExcludedInternalNameSpaces() {
return Set.of("information_schema", "pg_catalog", "pg_internal", "catalog_history");
}

@Override
protected Set<String> getExcludedViews() {
return Set.of("pg_stat_statements", "pg_stat_statements_info");
}

public static void main(final String[] args) throws Exception {
final Source source = new PostgresTestSource();
LOGGER.info("starting source: {}", PostgresTestSource.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,10 +271,11 @@ private List<TableInfo<CommonField<DataType>>> discoverWithoutSystemTables(
final Database database)
throws Exception {
final Set<String> systemNameSpaces = getExcludedInternalNameSpaces();
final Set<String> systemViews = getExcludedViews();
final List<TableInfo<CommonField<DataType>>> discoveredTables = discoverInternal(database);
return (systemNameSpaces == null || systemNameSpaces.isEmpty() ? discoveredTables
: discoveredTables.stream()
.filter(table -> !systemNameSpaces.contains(table.getNameSpace())).collect(
.filter(table -> !systemNameSpaces.contains(table.getNameSpace()) && !systemViews.contains(table.getName())).collect(
Collectors.toList()));
}

Expand Down Expand Up @@ -634,12 +635,19 @@ protected abstract List<CheckedConsumer<Database, Exception>> getCheckOperations
protected abstract JsonSchemaType getAirbyteType(DataType columnType);

/**
* Get list of system namespaces(schemas) in order to exclude them from the discover result list.
* Get list of system namespaces(schemas) in order to exclude them from the `discover` result list.
*
* @return set of system namespaces(schemas) to be excluded
*/
protected abstract Set<String> getExcludedInternalNameSpaces();

/**
* Get list of system views in order to exclude them from the `discover` result list.
*
* @return set of views to be excluded
*/
protected abstract Set<String> getExcludedViews();

/**
* Discover all available tables in the source database.
*
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/postgres.md
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,7 @@ The root causes is that the WALs needed for the incremental sync has been remove

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 1.0.43 | 2022-02-06 | [22221](https://github.com/airbytehq/airbyte/pull/22221) | Exclude new set of system tables when using `pg_stat_statements` extension. |
| 1.0.42 | 2022-01-23 | [21523](https://github.com/airbytehq/airbyte/pull/21523) | Check for null in cursor values before replacing. |
| 1.0.41 | 2022-01-25 | [20939](https://github.com/airbytehq/airbyte/pull/20939) | Adjust batch selection memory limits databases. |
| 1.0.40 | 2023-01-24 | [21825](https://github.com/airbytehq/airbyte/pull/21825) | Put back the original change that will cause an incremental sync to error if table contains a NULL value in cursor column. |
Expand Down

0 comments on commit 3bf87a9

Please sign in to comment.