Skip to content

Commit

Permalink
Source postgres: fix schema permission issue (#19024)
Browse files Browse the repository at this point in the history
* Source postgres: fix schema permission issue

* Source postgres: add test for schema permission issue

* Source postgres: format code

* Source postgres: added unit test

* Source postgres: bump version

* auto-bump connector version

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
Co-authored-by: Rodi Reich Zilberman <867491+rodireich@users.noreply.github.com>
  • Loading branch information
3 people authored Dec 2, 2022
1 parent b72cd71 commit 1fd4a03
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1242,7 +1242,7 @@
- name: Postgres
sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerRepository: airbyte/source-postgres
dockerImageTag: 1.0.28
dockerImageTag: 1.0.30
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
icon: postgresql.svg
sourceType: database
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11313,7 +11313,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-postgres:1.0.28"
- dockerImage: "airbyte/source-postgres:1.0.30"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/postgres"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.0.28
LABEL io.airbyte.version=1.0.30
LABEL io.airbyte.name=airbyte/source-postgres-strict-encrypt
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-postgres/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.0.28
LABEL io.airbyte.version=1.0.30
LABEL io.airbyte.name=airbyte/source-postgres
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@ public Set<JdbcPrivilegeDto> getPrivilegesTableForCurrentUser(final JdbcDatabase
FROM pg_class c
JOIN pg_namespace n on c.relnamespace = n.oid
WHERE has_table_privilege(c.oid, 'SELECT')
AND has_schema_privilege(current_user, nspname, 'USAGE')
-- r = ordinary table, i = index, S = sequence, t = TOAST table, v = view, m = materialized view, c = composite type, f = foreign table, p = partitioned table, I = partitioned index
AND relkind in ('r', 'm', 'v', 't', 'f', 'p')
and ((? is null) OR nspname = ?)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

package io.airbyte.integrations.io.airbyte.integration_tests.sources;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
Expand All @@ -16,6 +19,8 @@
import io.airbyte.integrations.standardtest.source.SourceAcceptanceTest;
import io.airbyte.integrations.standardtest.source.TestDestinationEnv;
import io.airbyte.integrations.util.HostPortResolver;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
Expand All @@ -24,39 +29,36 @@
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.protocol.models.SyncMode;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.List;
import org.jooq.DSLContext;
import org.jooq.SQLDialect;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.PostgreSQLContainer;

public class PostgresSourceAcceptanceTest extends SourceAcceptanceTest {

private static final String STREAM_NAME = "public.id_and_name";
private static final String STREAM_NAME2 = "public.starships";
private static final String STREAM_NAME_MATERIALIZED_VIEW = "public.testview";
public static final String LIMIT_PERMISSION_SCHEMA = "limit_perm_schema";
public static final String LIMIT_PERMISSION_ROLE = "limit_perm_role";
public static final String LIMIT_PERMISSION_ROLE_PASSWORD = "test";

private PostgreSQLContainer<?> container;
private JsonNode config;
private Database database;
private ConfiguredAirbyteCatalog configCatalog;

@Override
protected void setupEnvironment(final TestDestinationEnv environment) throws Exception {
container = new PostgreSQLContainer<>("postgres:13-alpine");
container.start();
final JsonNode replicationMethod = Jsons.jsonNode(ImmutableMap.builder()
.put("method", "Standard")
.build());
config = Jsons.jsonNode(ImmutableMap.builder()
.put(JdbcUtils.HOST_KEY, HostPortResolver.resolveHost(container))
.put(JdbcUtils.PORT_KEY, HostPortResolver.resolvePort(container))
.put(JdbcUtils.DATABASE_KEY, container.getDatabaseName())
.put(JdbcUtils.SCHEMAS_KEY, Jsons.jsonNode(List.of("public")))
.put(JdbcUtils.USERNAME_KEY, container.getUsername())
.put(JdbcUtils.PASSWORD_KEY, container.getPassword())
.put(JdbcUtils.SSL_KEY, false)
.put("replication_method", replicationMethod)
.build());

String username = container.getUsername();
String password = container.getPassword();
List<String> schemas = List.of("public");
config = getConfig(username, password, schemas);
try (final DSLContext dslContext = DSLContextFactory.create(
config.get(JdbcUtils.USERNAME_KEY).asText(),
config.get(JdbcUtils.PASSWORD_KEY).asText(),
Expand All @@ -66,7 +68,7 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc
container.getFirstMappedPort(),
config.get(JdbcUtils.DATABASE_KEY).asText()),
SQLDialect.POSTGRES)) {
final Database database = new Database(dslContext);
database = new Database(dslContext);

database.query(ctx -> {
ctx.fetch("CREATE TABLE id_and_name(id INTEGER, name VARCHAR(200));");
Expand All @@ -76,9 +78,26 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc
ctx.fetch("CREATE MATERIALIZED VIEW testview AS select * from id_and_name where id = '2';");
return null;
});
configCatalog = getCommonConfigCatalog();
}
}

private JsonNode getConfig(String username, String password, List<String> schemas) {
final JsonNode replicationMethod = Jsons.jsonNode(ImmutableMap.builder()
.put("method", "Standard")
.build());
return Jsons.jsonNode(ImmutableMap.builder()
.put(JdbcUtils.HOST_KEY, HostPortResolver.resolveHost(container))
.put(JdbcUtils.PORT_KEY, HostPortResolver.resolvePort(container))
.put(JdbcUtils.DATABASE_KEY, container.getDatabaseName())
.put(JdbcUtils.SCHEMAS_KEY, Jsons.jsonNode(schemas))
.put(JdbcUtils.USERNAME_KEY, username)
.put(JdbcUtils.PASSWORD_KEY, password)
.put(JdbcUtils.SSL_KEY, false)
.put("replication_method", replicationMethod)
.build());
}

@Override
protected void tearDown(final TestDestinationEnv testEnv) {
container.close();
Expand All @@ -101,6 +120,71 @@ protected JsonNode getConfig() {

@Override
protected ConfiguredAirbyteCatalog getConfiguredCatalog() {
return configCatalog;
}

@Override
protected JsonNode getState() {
return Jsons.jsonNode(new HashMap<>());
}

@Override
protected boolean supportsPerStream() {
return true;
}

@Test
public void testFullRefreshWithRevokingSchemaPermissions() throws Exception {
prepareEnvForUserWithoutPermissions(database);

config = getConfig(LIMIT_PERMISSION_ROLE, LIMIT_PERMISSION_ROLE_PASSWORD, List.of(LIMIT_PERMISSION_SCHEMA));
final ConfiguredAirbyteCatalog configuredCatalog = getLimitPermissionConfiguredCatalog();

final List<AirbyteRecordMessage> fullRefreshRecords = filterRecords(runRead(configuredCatalog));
final String assertionMessage = "Expected records after full refresh sync for user with schema permission";
assertFalse(fullRefreshRecords.isEmpty(), assertionMessage);

revokeSchemaPermissions(database);

final List<AirbyteRecordMessage> lessPermFullRefreshRecords = filterRecords(runRead(configuredCatalog));
final String assertionMessageWithoutPermission = "Expected no records after full refresh sync for user without schema permission";
assertTrue(lessPermFullRefreshRecords.isEmpty(), assertionMessageWithoutPermission);

}

@Test
public void testDiscoverWithRevokingSchemaPermissions() throws Exception {
prepareEnvForUserWithoutPermissions(database);
revokeSchemaPermissions(database);
config = getConfig(LIMIT_PERMISSION_ROLE, LIMIT_PERMISSION_ROLE_PASSWORD, List.of(LIMIT_PERMISSION_SCHEMA));

runDiscover();
AirbyteCatalog lastPersistedCatalogSecond = getLastPersistedCatalog();
final String assertionMessageWithoutPermission = "Expected no streams after discover for user without schema permissions";
assertTrue(lastPersistedCatalogSecond.getStreams().isEmpty(), assertionMessageWithoutPermission);
}

private void revokeSchemaPermissions(Database database) throws SQLException {
database.query(ctx -> {
ctx.fetch(String.format("REVOKE USAGE ON schema %s FROM %s;", LIMIT_PERMISSION_SCHEMA, LIMIT_PERMISSION_ROLE));
return null;
});
}

private void prepareEnvForUserWithoutPermissions(Database database) throws SQLException {
database.query(ctx -> {
ctx.fetch(String.format("CREATE ROLE %s WITH LOGIN PASSWORD '%s';", LIMIT_PERMISSION_ROLE, LIMIT_PERMISSION_ROLE_PASSWORD));
ctx.fetch(String.format("CREATE SCHEMA %s;", LIMIT_PERMISSION_SCHEMA));
ctx.fetch(String.format("GRANT CONNECT ON DATABASE test TO %s;", LIMIT_PERMISSION_ROLE));
ctx.fetch(String.format("GRANT USAGE ON schema %s TO %s;", LIMIT_PERMISSION_SCHEMA, LIMIT_PERMISSION_ROLE));
ctx.fetch(String.format("CREATE TABLE %s.id_and_name(id INTEGER, name VARCHAR(200));", LIMIT_PERMISSION_SCHEMA));
ctx.fetch(String.format("INSERT INTO %s.id_and_name (id, name) VALUES (1,'picard'), (2, 'crusher'), (3, 'vash');", LIMIT_PERMISSION_SCHEMA));
ctx.fetch(String.format("GRANT SELECT ON table %s.id_and_name TO %s;", LIMIT_PERMISSION_SCHEMA, LIMIT_PERMISSION_ROLE));
return null;
});
}

private ConfiguredAirbyteCatalog getCommonConfigCatalog() {
return new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(
new ConfiguredAirbyteStream()
.withSyncMode(SyncMode.INCREMENTAL)
Expand Down Expand Up @@ -131,14 +215,17 @@ protected ConfiguredAirbyteCatalog getConfiguredCatalog() {
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)))));
}

@Override
protected JsonNode getState() {
return Jsons.jsonNode(new HashMap<>());
}

@Override
protected boolean supportsPerStream() {
return true;
private ConfiguredAirbyteCatalog getLimitPermissionConfiguredCatalog() {
return new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(
new ConfiguredAirbyteStream()
.withSyncMode(SyncMode.INCREMENTAL)
.withCursorField(Lists.newArrayList("id"))
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(CatalogHelpers.createAirbyteStream(
LIMIT_PERMISSION_SCHEMA + "." + "id_and_name",
Field.of("id", JsonSchemaType.NUMBER),
Field.of("name", JsonSchemaType.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)))));
}

}
4 changes: 3 additions & 1 deletion docs/integrations/sources/postgres.md
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,9 @@ The root causes is that the WALs needed for the incremental sync has been remove

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-------------------------------------------------------------------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 1.0.28 | 2022-11-22 | [19514](https://github.com/airbytehq/airbyte/pull/19514) | Adjust batch selection memory limits databases. |
| 1.0.30 | 2022-11-29 | [19024](https://github.com/airbytehq/airbyte/pull/19024) | Skip tables from schema where user do not have Usage permission during discovery. |
| 1.0.29 | 2022-11-29 | [19623](https://github.com/airbytehq/airbyte/pull/19623) | Mark PSQLException related to using replica that is configured as a hot standby server as config error. |
| 1.0.28 | 2022-11-28 | [19514](https://github.com/airbytehq/airbyte/pull/19514) | Adjust batch selection memory limits databases. |
| 1.0.27 | 2022-11-28 | [16990](https://github.com/airbytehq/airbyte/pull/16990) | Handle arrays data types |
| 1.0.26 | 2022-11-18 | [19551](https://github.com/airbytehq/airbyte/pull/19551) | Fixes bug with ssl modes |
| 1.0.25 | 2022-11-16 | [19004](https://github.com/airbytehq/airbyte/pull/19004) | Use Debezium heartbeats to improve CDC replication of large databases. |
Expand Down

0 comments on commit 1fd4a03

Please sign in to comment.