Skip to content

Commit

Permalink
test(connector): add test cases for postgres validation permission ch…
Browse files Browse the repository at this point in the history
  • Loading branch information
WillyKidd authored Mar 22, 2023
1 parent 4738ee9 commit b474059
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;

import com.risingwave.proto.ConnectorServiceProto;
import io.grpc.Grpc;
import io.grpc.InsecureChannelCredentials;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import com.risingwave.proto.Data;
import io.grpc.*;
import java.io.IOException;
import java.sql.Connection;
import java.sql.ResultSet;
Expand All @@ -41,7 +40,7 @@ public class PostgresSourceTest {
private static final Logger LOG = LoggerFactory.getLogger(PostgresSourceTest.class.getName());

private static final PostgreSQLContainer<?> pg =
new PostgreSQLContainer<>("postgres:12.3-alpine")
new PostgreSQLContainer<>("postgres:15-alpine")
.withDatabaseName("test")
.withUsername("postgres")
.withCommand("postgres -c wal_level=logical -c max_wal_senders=10");
Expand Down Expand Up @@ -145,6 +144,69 @@ public void testLines() throws InterruptedException, SQLException {
connection.close();
}

// test whether validation catches permission errors
@Test
public void testPermissionCheck() {
Connection connection = SourceTestClient.connect(pgDataSource);
String query =
"CREATE TABLE IF NOT EXISTS orders (o_key BIGINT NOT NULL, o_val INT, PRIMARY KEY (o_key))";
SourceTestClient.performQuery(connection, query);
// create a partial publication, check whether error is reported
query = "CREATE PUBLICATION dbz_publication FOR TABLE orders (o_key)";
SourceTestClient.performQuery(connection, query);
ConnectorServiceProto.TableSchema tableSchema =
ConnectorServiceProto.TableSchema.newBuilder()
.addColumns(
ConnectorServiceProto.TableSchema.Column.newBuilder()
.setName("o_key")
.setDataType(Data.DataType.TypeName.INT64)
.build())
.addColumns(
ConnectorServiceProto.TableSchema.Column.newBuilder()
.setName("o_val")
.setDataType(Data.DataType.TypeName.INT32)
.build())
.addPkIndices(0)
.build();
Iterator<ConnectorServiceProto.GetEventStreamResponse> eventStream1 =
testClient.getEventStreamValidate(
pg,
ConnectorServiceProto.SourceType.POSTGRES,
tableSchema,
"test",
"orders");
StatusRuntimeException exception1 =
assertThrows(
StatusRuntimeException.class,
() -> {
eventStream1.hasNext();
});
assertEquals(
exception1.getMessage(),
"INVALID_ARGUMENT: INTERNAL: The publication 'dbz_publication' does not cover all necessary columns in table orders");
query = "DROP PUBLICATION dbz_publication";
SourceTestClient.performQuery(connection, query);
// revoke superuser and replication, check if reports error
query = "ALTER USER " + pg.getUsername() + " nosuperuser noreplication";
SourceTestClient.performQuery(connection, query);
Iterator<ConnectorServiceProto.GetEventStreamResponse> eventStream2 =
testClient.getEventStreamValidate(
pg,
ConnectorServiceProto.SourceType.POSTGRES,
tableSchema,
"test",
"orders");
StatusRuntimeException exception2 =
assertThrows(
StatusRuntimeException.class,
() -> {
eventStream2.hasNext();
});
assertEquals(
exception2.getMessage(),
"INVALID_ARGUMENT: INTERNAL: Postgres user must be superuser or replication role to start walsender.");
}

// generates test cases for the risingwave debezium parser
@Ignore
@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,40 @@ protected static DataSource getDataSource(JdbcDatabaseContainer<?> container) {
return new HikariDataSource(hikariConfig);
}

protected Iterator<ConnectorServiceProto.GetEventStreamResponse> getEventStreamValidate(
JdbcDatabaseContainer<?> container,
ConnectorServiceProto.SourceType sourceType,
ConnectorServiceProto.TableSchema tableSchema,
String databaseName,
String tableName) {
String port = String.valueOf(URI.create(container.getJdbcUrl().substring(5)).getPort());
ConnectorServiceProto.GetEventStreamRequest req =
ConnectorServiceProto.GetEventStreamRequest.newBuilder()
.setValidate(
ConnectorServiceProto.GetEventStreamRequest.ValidateProperties
.newBuilder()
.setSourceId(0)
.setSourceType(sourceType)
.setTableSchema(tableSchema)
.putProperties("hostname", container.getHost())
.putProperties("port", port)
.putProperties("username", container.getUsername())
.putProperties("password", container.getPassword())
.putProperties("database.name", databaseName)
.putProperties("table.name", tableName)
.putProperties("schema.name", "public") // pg only
.putProperties("slot.name", "orders") // pg only
.putProperties("server.id", "1")) // mysql only
.build();
Iterator<ConnectorServiceProto.GetEventStreamResponse> responses = null;
try {
responses = blockingStub.getEventStream(req);
} catch (StatusRuntimeException e) {
fail("RPC failed: {}", e.getStatus());
}
return responses;
}

protected Iterator<ConnectorServiceProto.GetEventStreamResponse> getEventStreamStart(
JdbcDatabaseContainer<?> container,
ConnectorServiceProto.SourceType sourceType,
Expand Down

0 comments on commit b474059

Please sign in to comment.