diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/sourcenode/SourceRequestHandler.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/sourcenode/SourceRequestHandler.java index f865a36197774..c94ab2677873e 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/sourcenode/SourceRequestHandler.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/sourcenode/SourceRequestHandler.java @@ -263,6 +263,159 @@ private void validateDbProperties( } } } + // check whether user is superuser or replication role + try (var stmt = + conn.prepareStatement(sqlStmts.getProperty("postgres.role.check"))) { + stmt.setString(1, props.get(DbzConnectorConfig.USER)); + var res = stmt.executeQuery(); + while (res.next()) { + if (!res.getBoolean(1)) { + throw new StatusException( + Status.INTERNAL.withDescription( + "Postgres user must be superuser or replication role to start walsender.")); + } + } + } + // check whether user has select privilege on table for initial snapshot + try (var stmt = + conn.prepareStatement( + sqlStmts.getProperty("postgres.table_privilege.check"))) { + stmt.setString(1, props.get(DbzConnectorConfig.TABLE_NAME)); + stmt.setString(2, props.get(DbzConnectorConfig.USER)); + var res = stmt.executeQuery(); + while (res.next()) { + if (!res.getBoolean(1)) { + throw new StatusException( + Status.INTERNAL.withDescription( + "Postgres user must have select privilege on table " + + props.get( + DbzConnectorConfig.TABLE_NAME))); + } + } + } + // check whether publication exists + boolean publicationExists = false; + boolean partialPublication = false; + try (var stmt = conn.createStatement()) { + var res = + stmt.executeQuery( + sqlStmts.getProperty("postgres.publication_att_exists")); + while (res.next()) { + partialPublication = res.getBoolean(1); + } + } + // pg 15 and up supports partial publication of table + // check whether publication covers all columns + if (partialPublication) { + try (var stmt = + conn.prepareStatement( + sqlStmts.getProperty("postgres.publication_att"))) { + stmt.setString(1, props.get(DbzConnectorConfig.PG_SCHEMA_NAME)); + stmt.setString(2, props.get(DbzConnectorConfig.TABLE_NAME)); + var res = stmt.executeQuery(); + while (res.next()) { + String[] columnsPub = + (String[]) res.getArray("attnames").getArray(); + var sourceSchema = validate.getTableSchema(); + for (int i = 0; i < sourceSchema.getColumnsCount(); i++) { + String columnName = sourceSchema.getColumns(i).getName(); + if (Arrays.stream(columnsPub).noneMatch(columnName::equals)) { + throw new StatusException( + Status.INTERNAL.withDescription( + "The publication 'dbz_publication' does not cover all necessary columns in table " + + props.get( + DbzConnectorConfig + .TABLE_NAME))); + } + if (i == sourceSchema.getColumnsCount() - 1) { + publicationExists = true; + } + } + if (publicationExists) { + LOG.info("publication exists"); + break; + } + } + } + } else { // check directly whether publication exists + try (var stmt = + conn.prepareStatement( + sqlStmts.getProperty("postgres.publication_cnt"))) { + stmt.setString(1, props.get(DbzConnectorConfig.PG_SCHEMA_NAME)); + stmt.setString(2, props.get(DbzConnectorConfig.TABLE_NAME)); + var res = stmt.executeQuery(); + while (res.next()) { + if (res.getInt("count") > 0) { + publicationExists = true; + LOG.info("publication exists"); + break; + } + } + } + } + // if publication does not exist, check permission to create publication + if (!publicationExists) { + // check create privilege on database + try (var stmt = + conn.prepareStatement( + sqlStmts.getProperty( + "postgres.database_privilege.check"))) { + stmt.setString(1, props.get(DbzConnectorConfig.USER)); + stmt.setString(2, props.get(DbzConnectorConfig.DB_NAME)); + stmt.setString(3, props.get(DbzConnectorConfig.USER)); + var res = stmt.executeQuery(); + while (res.next()) { + if (!res.getBoolean(1)) { + throw new StatusException( + Status.INTERNAL.withDescription( + "Postgres user must have create privilege on database" + + props.get( + DbzConnectorConfig.DB_NAME))); + } + } + } + // check ownership on table + boolean isTableOwner = false; + String owner = null; + // check if user is owner + try (var stmt = + conn.prepareStatement( + sqlStmts.getProperty("postgres.table_owner"))) { + stmt.setString(1, props.get(DbzConnectorConfig.PG_SCHEMA_NAME)); + stmt.setString(2, props.get(DbzConnectorConfig.TABLE_NAME)); + var res = stmt.executeQuery(); + while (res.next()) { + owner = res.getString("tableowner"); + if (owner.equals(props.get(DbzConnectorConfig.USER))) { + isTableOwner = true; + break; + } + } + } + // if user is not owner, check if user belongs to owner group + if (!isTableOwner && !owner.isEmpty()) { + try (var stmt = + conn.prepareStatement( + sqlStmts.getProperty("postgres.users_of_group"))) { + stmt.setString(1, owner); + var res = stmt.executeQuery(); + while (res.next()) { + String[] users = (String[]) res.getArray("members").getArray(); + if (Arrays.stream(users) + .anyMatch(props.get(DbzConnectorConfig.USER)::equals)) { + isTableOwner = true; + break; + } + } + } + } + if (!isTableOwner) { + throw new StatusException( + Status.INTERNAL.withDescription( + "Postgres user must be owner of table " + + props.get(DbzConnectorConfig.TABLE_NAME))); + } + } break; default: break; diff --git a/java/connector-node/risingwave-connector-service/src/main/resources/validate_sql.properties b/java/connector-node/risingwave-connector-service/src/main/resources/validate_sql.properties index 0f84c4401baa9..bff8753a85d06 100644 --- a/java/connector-node/risingwave-connector-service/src/main/resources/validate_sql.properties +++ b/java/connector-node/risingwave-connector-service/src/main/resources/validate_sql.properties @@ -8,3 +8,28 @@ postgres.table=SELECT EXISTS ( SELECT FROM pg_tables WHERE schemaname = ? AND ta postgres.pk=SELECT a.attname, format_type(a.atttypid, a.atttypmod) AS data_type FROM pg_index i JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey) WHERE i.indrelid = ?::regclass AND i.indisprimary postgres.table_schema=SELECT column_name, data_type FROM information_schema.columns WHERE table_schema = ? AND table_name = ? ORDER BY ordinal_position postgres.slot.check=SELECT slot_name FROM pg_replication_slots WHERE slot_name = ? +postgres.role.check=SELECT rolreplication OR rolsuper FROM pg_roles WHERE rolname = ? +postgres.database_privilege.check=SELECT has_database_privilege(?, ?, 'create') FROM pg_roles WHERE rolname = ? +postgres.table_privilege.check=SELECT (COUNT(*) = 1) FROM information_schema.role_table_grants WHERE table_name = ? AND grantee = ? and privilege_type = 'SELECT' +postgres.table_owner=SELECT tableowner FROM pg_tables WHERE schemaname = ? and tablename = ? +postgres.publication_att_exists=SELECT count(*) > 0 FROM information_schema.columns WHERE table_name = 'pg_publication_tables' AND column_name = 'attnames' +postgres.publication_att=SELECT attnames FROM pg_publication_tables WHERE schemaname = ? AND tablename = ? AND pubname = 'dbz_publication' +postgres.publication_cnt=SELECT COUNT(*) AS count FROM pg_publication_tables WHERE schemaname = ? AND tablename = ? AND pubname = 'dbz_publication' +postgres.users_of_group=WITH RECURSIVE base (g, m) AS (( \ +SELECT r1.rolname as group, ARRAY_AGG(DISTINCT(r2.rolname)) as members FROM pg_auth_members am \ +INNER JOIN pg_roles r1 ON r1.oid = am.roleid \ +INNER JOIN pg_roles r2 ON r2.oid = am.member \ +WHERE r1.rolname = ? \ +GROUP BY r1.rolname \ +) \ +UNION ALL ( \ +WITH groups AS (SELECT DISTINCT(UNNEST(m)) AS g FROM base) \ +SELECT r1.rolname as group, ARRAY_AGG(DISTINCT(r2.rolname)) as members FROM pg_auth_members am \ +INNER JOIN pg_roles r1 ON r1.oid = am.roleid \ +INNER JOIN pg_roles r2 ON r2.oid = am.member \ +INNER JOIN groups ON r1.rolname = groups.g \ +GROUP BY r1.rolname \ +) \ +), \ +tmp AS (SELECT DISTINCT(UNNEST(m)) AS members FROM base) \ +SELECT ARRAY_AGG(members) AS members FROM tmp diff --git a/java/connector-node/risingwave-source-test/pom.xml b/java/connector-node/risingwave-source-test/pom.xml index 6da046213366b..8959ebb39467a 100644 --- a/java/connector-node/risingwave-source-test/pom.xml +++ b/java/connector-node/risingwave-source-test/pom.xml @@ -17,6 +17,18 @@ + + org.apache.logging.log4j + log4j-api + + + org.apache.logging.log4j + log4j-slf4j-impl + + + org.apache.logging.log4j + log4j-core + junit junit