Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(connector): add postgres permission checks in validation phase #8525

Merged
merged 10 commits into from
Mar 20, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,158 @@ private void validateDbProperties(
}
}
}
// check whether user is superuser or replication role
try (var stmt =
conn.prepareStatement(sqlStmts.getProperty("postgres.role.check"))) {
stmt.setString(1, props.get(DbzConnectorConfig.USER));
var res = stmt.executeQuery();
while (res.next()) {
if (!res.getBoolean(1)) {
throw new StatusException(
Status.INTERNAL.withDescription(
"Postgres user must be superuser or replication role to start walsender."));
}
}
}
// check whether user has select privilege on table for initial snapshot
try (var stmt =
conn.prepareStatement(
sqlStmts.getProperty("postgres.table_privilege.check"))) {
stmt.setString(1, props.get(DbzConnectorConfig.TABLE_NAME));
stmt.setString(2, props.get(DbzConnectorConfig.USER));
var res = stmt.executeQuery();
while (res.next()) {
if (!res.getBoolean(1)) {
throw new StatusException(
Status.INTERNAL.withDescription(
"Postgres user must have select privilege on table "
+ props.get(
DbzConnectorConfig.TABLE_NAME)));
}
}
}
// check whether publication exists
boolean publicationExists = false;
float version = 0;
try (var stmt = conn.createStatement()) {
var res = stmt.executeQuery(sqlStmts.getProperty("postgres.version"));
while (res.next()) {
version = res.getFloat("setting");
}
}
if (version == 0) {
throw new StatusException(
Status.INTERNAL.withDescription(
"Failed to get postgres version"
+ props.get(DbzConnectorConfig.TABLE_NAME)));
}
// pg 15 and up supports partial publication of table
// check whether publication covers all columns
if (version >= 15) {
try (var stmt =
conn.prepareStatement(
sqlStmts.getProperty("postgres.publication_att"))) {
stmt.setString(1, props.get(DbzConnectorConfig.PG_SCHEMA_NAME));
stmt.setString(2, props.get(DbzConnectorConfig.TABLE_NAME));
var res = stmt.executeQuery();
while (res.next()) {
String[] columnsPub =
(String[]) res.getArray("attnames").getArray();
var sourceSchema = validate.getTableSchema();
for (int i = 0; i < sourceSchema.getColumnsCount(); i++) {
String columnName = sourceSchema.getColumns(i).getName();
if (Arrays.stream(columnsPub).noneMatch(columnName::equals)) {
break;
WillyKidd marked this conversation as resolved.
Show resolved Hide resolved
}
if (i == sourceSchema.getColumnsCount() - 1) {
publicationExists = true;
}
}
if (publicationExists) {
LOG.info("publication exists");
break;
}
}
}
} else { // check directly whether publication exists
try (var stmt =
conn.prepareStatement(
sqlStmts.getProperty("postgres.publication_cnt"))) {
WillyKidd marked this conversation as resolved.
Show resolved Hide resolved
stmt.setString(1, props.get(DbzConnectorConfig.PG_SCHEMA_NAME));
stmt.setString(2, props.get(DbzConnectorConfig.TABLE_NAME));
var res = stmt.executeQuery();
while (res.next()) {
if (res.getInt("count") > 0) {
WillyKidd marked this conversation as resolved.
Show resolved Hide resolved
publicationExists = true;
LOG.info("publication exists");
break;
}
}
}
}
// if publication does not exist, check permission to create publication
if (!publicationExists) {
// check create privilege on database
try (var stmt =
conn.prepareStatement(
sqlStmts.getProperty(
"postgres.database_privilege.check"))) {
stmt.setString(1, props.get(DbzConnectorConfig.USER));
stmt.setString(2, props.get(DbzConnectorConfig.DB_NAME));
stmt.setString(3, props.get(DbzConnectorConfig.USER));
var res = stmt.executeQuery();
while (res.next()) {
if (!res.getBoolean(1)) {
throw new StatusException(
Status.INTERNAL.withDescription(
"Postgres user must have create privilege on database"
+ props.get(
DbzConnectorConfig.DB_NAME)));
}
}
}
// check ownership on table
boolean isTableOwner = false;
String owner = null;
// check if user is owner
try (var stmt =
conn.prepareStatement(
sqlStmts.getProperty("postgres.table_owner"))) {
stmt.setString(1, props.get(DbzConnectorConfig.PG_SCHEMA_NAME));
stmt.setString(2, props.get(DbzConnectorConfig.TABLE_NAME));
var res = stmt.executeQuery();
while (res.next()) {
owner = res.getString("tableowner");
if (owner.equals(props.get(DbzConnectorConfig.USER))) {
isTableOwner = true;
break;
}
}
}
// if user is not owner, check if user belongs to owner group
if (!isTableOwner && !owner.isEmpty()) {
try (var stmt =
conn.prepareStatement(
sqlStmts.getProperty("postgres.users_of_group"))) {
stmt.setString(1, owner);
var res = stmt.executeQuery();
while (res.next()) {
String[] users = (String[]) res.getArray("members").getArray();
if (Arrays.stream(users)
.anyMatch(props.get(DbzConnectorConfig.USER)::equals)) {
isTableOwner = true;
break;
}
}
}
}
if (!isTableOwner) {
throw new StatusException(
Status.INTERNAL.withDescription(
"Postgres user must be owner of table "
+ props.get(DbzConnectorConfig.TABLE_NAME)));
}
}
break;
default:
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,28 @@ postgres.table=SELECT EXISTS ( SELECT FROM pg_tables WHERE schemaname = ? AND ta
postgres.pk=SELECT a.attname, format_type(a.atttypid, a.atttypmod) AS data_type FROM pg_index i JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey) WHERE i.indrelid = ?::regclass AND i.indisprimary
postgres.table_schema=SELECT column_name, data_type FROM information_schema.columns WHERE table_schema = ? AND table_name = ? ORDER BY ordinal_position
postgres.slot.check=SELECT slot_name FROM pg_replication_slots WHERE slot_name = ?
postgres.role.check=SELECT rolreplication OR rolsuper FROM pg_roles WHERE rolname = ?
postgres.database_privilege.check=SELECT has_database_privilege(?, ?, 'create') FROM pg_roles WHERE rolname = ?
postgres.table_privilege.check=SELECT (COUNT(*) = 1) FROM information_schema.role_table_grants WHERE table_name = ? AND grantee = ? and privilege_type = 'SELECT'
postgres.table_owner=SELECT tableowner FROM pg_tables WHERE schemaname = ? and tablename = ?
postgres.version=SELECT setting FROM pg_settings WHERE name = 'server_version'
postgres.publication_cnt=SELECT COUNT(*) FROM pg_publication_tables WHERE schemaname = ? AND tablename = ?
postgres.publication_att=SELECT attnames FROM pg_publication_tables WHERE schemaname = ? AND tablename = ?
postgres.users_of_group=WITH RECURSIVE base (g, m) AS (( \
SELECT r1.rolname as group, ARRAY_AGG(DISTINCT(r2.rolname)) as members FROM pg_auth_members am \
INNER JOIN pg_roles r1 ON r1.oid = am.roleid \
INNER JOIN pg_roles r2 ON r2.oid = am.member \
WHERE r1.rolname = ? \
GROUP BY r1.rolname \
) \
UNION ALL ( \
WITH groups AS (SELECT DISTINCT(UNNEST(m)) AS g FROM base) \
SELECT r1.rolname as group, ARRAY_AGG(DISTINCT(r2.rolname)) as members FROM pg_auth_members am \
INNER JOIN pg_roles r1 ON r1.oid = am.roleid \
INNER JOIN pg_roles r2 ON r2.oid = am.member \
INNER JOIN groups ON r1.rolname = groups.g \
GROUP BY r1.rolname \
) \
), \
tmp AS (SELECT DISTINCT(UNNEST(m)) AS members FROM base) \
SELECT ARRAY_AGG(members) AS members FROM tmp
12 changes: 12 additions & 0 deletions java/connector-node/risingwave-source-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,18 @@
</properties>

<dependencies>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down