Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ public void assertNoTopicStatusInStatusTopic() {
}
}
}
verifiableConsumer.close();
}

private Map<String, String> defaultSourceConnectorProps(String topic) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -655,8 +655,9 @@ public void testTasksFailOnInabilityToFence() throws Exception {
startConnect();

String topic = "test-topic";
Admin admin = connect.kafka().createAdminClient();
admin.createTopics(Collections.singleton(new NewTopic(topic, 3, (short) 1))).all().get();
try (Admin admin = connect.kafka().createAdminClient()) {
admin.createTopics(Collections.singleton(new NewTopic(topic, 3, (short) 1))).all().get();
}

Map<String, String> props = new HashMap<>();
int tasksMax = 2; // Use two tasks since single-task connectors don't require zombie fencing
Expand All @@ -680,16 +681,18 @@ public void testTasksFailOnInabilityToFence() throws Exception {
+ "password=\"connector_pwd\";");
// Grant the connector's admin permissions to access the topics for its records and offsets
// Intentionally leave out permissions required for fencing
admin.createAcls(Arrays.asList(
new AclBinding(
new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL),
new AccessControlEntry("User:connector", "*", AclOperation.ALL, AclPermissionType.ALLOW)
),
new AclBinding(
new ResourcePattern(ResourceType.TOPIC, globalOffsetsTopic, PatternType.LITERAL),
new AccessControlEntry("User:connector", "*", AclOperation.ALL, AclPermissionType.ALLOW)
)
)).all().get();
try (Admin admin = connect.kafka().createAdminClient()) {
admin.createAcls(Arrays.asList(
new AclBinding(
new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL),
new AccessControlEntry("User:connector", "*", AclOperation.ALL, AclPermissionType.ALLOW)
),
new AclBinding(
new ResourcePattern(ResourceType.TOPIC, globalOffsetsTopic, PatternType.LITERAL),
new AccessControlEntry("User:connector", "*", AclOperation.ALL, AclPermissionType.ALLOW)
)
)).all().get();
}

StartAndStopLatch connectorStart = connectorAndTaskStart(tasksMax);

Expand All @@ -707,16 +710,18 @@ public void testTasksFailOnInabilityToFence() throws Exception {
connect.assertions().assertConnectorIsRunningAndTasksHaveFailed(CONNECTOR_NAME, tasksMax, "Task should have failed on startup");

// Now grant the necessary permissions for fencing to the connector's admin
admin.createAcls(Arrays.asList(
new AclBinding(
new ResourcePattern(ResourceType.TRANSACTIONAL_ID, Worker.taskTransactionalId(CLUSTER_GROUP_ID, CONNECTOR_NAME, 0), PatternType.LITERAL),
new AccessControlEntry("User:connector", "*", AclOperation.ALL, AclPermissionType.ALLOW)
),
new AclBinding(
new ResourcePattern(ResourceType.TRANSACTIONAL_ID, Worker.taskTransactionalId(CLUSTER_GROUP_ID, CONNECTOR_NAME, 1), PatternType.LITERAL),
new AccessControlEntry("User:connector", "*", AclOperation.ALL, AclPermissionType.ALLOW)
)
));
try (Admin admin = connect.kafka().createAdminClient()) {
admin.createAcls(Arrays.asList(
new AclBinding(
new ResourcePattern(ResourceType.TRANSACTIONAL_ID, Worker.taskTransactionalId(CLUSTER_GROUP_ID, CONNECTOR_NAME, 0), PatternType.LITERAL),
new AccessControlEntry("User:connector", "*", AclOperation.ALL, AclPermissionType.ALLOW)
),
new AclBinding(
new ResourcePattern(ResourceType.TRANSACTIONAL_ID, Worker.taskTransactionalId(CLUSTER_GROUP_ID, CONNECTOR_NAME, 1), PatternType.LITERAL),
new AccessControlEntry("User:connector", "*", AclOperation.ALL, AclPermissionType.ALLOW)
)
));
}

log.info("Restarting connector after tweaking its ACLs; fencing should succeed this time");
connect.restartConnectorAndTasks(CONNECTOR_NAME, false, true, false);
Expand Down
Loading