diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java index 0614ba8a9f741..61be2769f3b24 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java @@ -296,6 +296,7 @@ public void assertNoTopicStatusInStatusTopic() { } } } + verifiableConsumer.close(); } private Map defaultSourceConnectorProps(String topic) { diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java index 4eac236810a80..b793cbf020911 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java @@ -655,8 +655,9 @@ public void testTasksFailOnInabilityToFence() throws Exception { startConnect(); String topic = "test-topic"; - Admin admin = connect.kafka().createAdminClient(); - admin.createTopics(Collections.singleton(new NewTopic(topic, 3, (short) 1))).all().get(); + try (Admin admin = connect.kafka().createAdminClient()) { + admin.createTopics(Collections.singleton(new NewTopic(topic, 3, (short) 1))).all().get(); + } Map props = new HashMap<>(); int tasksMax = 2; // Use two tasks since single-task connectors don't require zombie fencing @@ -680,16 +681,18 @@ public void testTasksFailOnInabilityToFence() throws Exception { + "password=\"connector_pwd\";"); // Grant the connector's admin permissions to access the topics for its records and offsets // Intentionally leave out permissions required for fencing - admin.createAcls(Arrays.asList( - new AclBinding( - new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL), - new AccessControlEntry("User:connector", "*", AclOperation.ALL, AclPermissionType.ALLOW) - ), - new AclBinding( - new ResourcePattern(ResourceType.TOPIC, globalOffsetsTopic, PatternType.LITERAL), - new AccessControlEntry("User:connector", "*", AclOperation.ALL, AclPermissionType.ALLOW) - ) - )).all().get(); + try (Admin admin = connect.kafka().createAdminClient()) { + admin.createAcls(Arrays.asList( + new AclBinding( + new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL), + new AccessControlEntry("User:connector", "*", AclOperation.ALL, AclPermissionType.ALLOW) + ), + new AclBinding( + new ResourcePattern(ResourceType.TOPIC, globalOffsetsTopic, PatternType.LITERAL), + new AccessControlEntry("User:connector", "*", AclOperation.ALL, AclPermissionType.ALLOW) + ) + )).all().get(); + } StartAndStopLatch connectorStart = connectorAndTaskStart(tasksMax); @@ -707,16 +710,18 @@ public void testTasksFailOnInabilityToFence() throws Exception { connect.assertions().assertConnectorIsRunningAndTasksHaveFailed(CONNECTOR_NAME, tasksMax, "Task should have failed on startup"); // Now grant the necessary permissions for fencing to the connector's admin - admin.createAcls(Arrays.asList( - new AclBinding( - new ResourcePattern(ResourceType.TRANSACTIONAL_ID, Worker.taskTransactionalId(CLUSTER_GROUP_ID, CONNECTOR_NAME, 0), PatternType.LITERAL), - new AccessControlEntry("User:connector", "*", AclOperation.ALL, AclPermissionType.ALLOW) - ), - new AclBinding( - new ResourcePattern(ResourceType.TRANSACTIONAL_ID, Worker.taskTransactionalId(CLUSTER_GROUP_ID, CONNECTOR_NAME, 1), PatternType.LITERAL), - new AccessControlEntry("User:connector", "*", AclOperation.ALL, AclPermissionType.ALLOW) - ) - )); + try (Admin admin = connect.kafka().createAdminClient()) { + admin.createAcls(Arrays.asList( + new AclBinding( + new ResourcePattern(ResourceType.TRANSACTIONAL_ID, Worker.taskTransactionalId(CLUSTER_GROUP_ID, CONNECTOR_NAME, 0), PatternType.LITERAL), + new AccessControlEntry("User:connector", "*", AclOperation.ALL, AclPermissionType.ALLOW) + ), + new AclBinding( + new ResourcePattern(ResourceType.TRANSACTIONAL_ID, Worker.taskTransactionalId(CLUSTER_GROUP_ID, CONNECTOR_NAME, 1), PatternType.LITERAL), + new AccessControlEntry("User:connector", "*", AclOperation.ALL, AclPermissionType.ALLOW) + ) + )); + } log.info("Restarting connector after tweaking its ACLs; fencing should succeed this time"); connect.restartConnectorAndTasks(CONNECTOR_NAME, false, true, false); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java index 0de444be2025e..6a34fad6e9383 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java @@ -44,6 +44,7 @@ import org.apache.kafka.common.metrics.stats.Avg; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.connector.Connector; import org.apache.kafka.connect.connector.ConnectorContext; import org.apache.kafka.connect.connector.Task; @@ -97,6 +98,7 @@ import org.mockito.MockedConstruction; import org.mockito.Mockito; import org.mockito.MockitoSession; +import org.mockito.invocation.InvocationOnMock; import org.mockito.quality.Strictness; import javax.management.MBeanServer; @@ -162,7 +164,6 @@ import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.mockConstructionWithAnswer; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.times; @@ -235,6 +236,8 @@ public class WorkerTest { private final boolean enableTopicCreation; private MockedConstruction sourceTaskMockedConstruction; + private MockedConstruction eosSourceTaskMockedConstruction; + private MockedConstruction sinkTaskMockedConstruction; private MockitoSession mockitoSession; @ParameterizedTest.Parameters @@ -293,20 +296,18 @@ public void setup() { connectorProps = anyConnectorConfigMap(); // Make calls to new WorkerSourceTask() return a mock to avoid the source task trying to connect to a broker. - sourceTaskMockedConstruction = mockConstructionWithAnswer(WorkerSourceTask.class, invocation -> { - - // provide implementations of three methods used during testing - switch (invocation.getMethod().getName()) { - case "id": - return TASK_ID; - case "loader": - return pluginLoader; - case "awaitStop": - return true; - default: - return null; - } - }); + sourceTaskMockedConstruction = Mockito.mockConstruction( + WorkerSourceTask.class, + context -> Mockito.withSettings().defaultAnswer(this::workerTaskMethod), + WorkerTest::workerTaskConstructor); + eosSourceTaskMockedConstruction = Mockito.mockConstruction( + ExactlyOnceWorkerSourceTask.class, + context -> Mockito.withSettings().defaultAnswer(this::workerTaskMethod), + WorkerTest::workerTaskConstructor); + sinkTaskMockedConstruction = Mockito.mockConstruction( + WorkerSinkTask.class, + context -> Mockito.withSettings().defaultAnswer(this::workerTaskMethod), + WorkerTest::workerTaskConstructor); } @After @@ -315,6 +316,8 @@ public void teardown() { // Ideal would be to use try-with-resources in an individual test, but it introduced a rather large level of // indentation of most test bodies, hence sticking with setup() / teardown() sourceTaskMockedConstruction.close(); + eosSourceTaskMockedConstruction.close(); + sinkTaskMockedConstruction.close(); mockitoSession.finishMocking(); } @@ -1309,48 +1312,60 @@ public void testOffsetStoreForRegularSourceConnector() { SourceConnectorConfig sourceConfig = new SourceConnectorConfig(plugins, connectorProps, enableTopicCreation); // With no connector-specific offsets topic in the config, we should only use the worker-global offsets store ConnectorOffsetBackingStore connectorStore = worker.offsetStoreForRegularSourceConnector(sourceConfig, CONNECTOR_ID, sourceConnector, null); + connectorStore.configure(config); assertTrue(connectorStore.hasWorkerGlobalStore()); assertFalse(connectorStore.hasConnectorSpecificStore()); + connectorStore.stop(); connectorProps.put(SourceConnectorConfig.OFFSETS_TOPIC_CONFIG, "connector-offsets-topic"); sourceConfig = new SourceConnectorConfig(plugins, connectorProps, enableTopicCreation); // With a connector-specific offsets topic in the config (whose name differs from the worker's offsets topic), we should use both a // connector-specific store and the worker-global store connectorStore = worker.offsetStoreForRegularSourceConnector(sourceConfig, CONNECTOR_ID, sourceConnector, null); + connectorStore.configure(config); assertTrue(connectorStore.hasWorkerGlobalStore()); assertTrue(connectorStore.hasConnectorSpecificStore()); + connectorStore.stop(); connectorProps.put(SourceConnectorConfig.OFFSETS_TOPIC_CONFIG, workerOffsetsTopic); sourceConfig = new SourceConnectorConfig(plugins, connectorProps, enableTopicCreation); // With a connector-specific offsets topic in the config whose name matches the worker's offsets topic, and no overridden bootstrap.servers // for the connector, we should only use a connector-specific offsets store connectorStore = worker.offsetStoreForRegularSourceConnector(sourceConfig, CONNECTOR_ID, sourceConnector, null); + connectorStore.configure(config); assertFalse(connectorStore.hasWorkerGlobalStore()); assertTrue(connectorStore.hasConnectorSpecificStore()); + connectorStore.stop(); connectorProps.put(CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX + BOOTSTRAP_SERVERS_CONFIG, workerBootstrapServers); sourceConfig = new SourceConnectorConfig(plugins, connectorProps, enableTopicCreation); // With a connector-specific offsets topic in the config whose name matches the worker's offsets topic, and an overridden bootstrap.servers // for the connector that exactly matches the worker's, we should only use a connector-specific offsets store connectorStore = worker.offsetStoreForRegularSourceConnector(sourceConfig, CONNECTOR_ID, sourceConnector, null); + connectorStore.configure(config); assertFalse(connectorStore.hasWorkerGlobalStore()); assertTrue(connectorStore.hasConnectorSpecificStore()); + connectorStore.stop(); connectorProps.put(CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX + BOOTSTRAP_SERVERS_CONFIG, "localhost:1111"); sourceConfig = new SourceConnectorConfig(plugins, connectorProps, enableTopicCreation); // With a connector-specific offsets topic in the config whose name matches the worker's offsets topic, and an overridden bootstrap.servers // for the connector that doesn't match the worker's, we should use both a connector-specific store and the worker-global store connectorStore = worker.offsetStoreForRegularSourceConnector(sourceConfig, CONNECTOR_ID, sourceConnector, null); + connectorStore.configure(config); assertTrue(connectorStore.hasWorkerGlobalStore()); assertTrue(connectorStore.hasConnectorSpecificStore()); + connectorStore.stop(); connectorProps.remove(SourceConnectorConfig.OFFSETS_TOPIC_CONFIG); sourceConfig = new SourceConnectorConfig(plugins, connectorProps, enableTopicCreation); // With no connector-specific offsets topic in the config, even with an overridden bootstrap.servers // for the connector that doesn't match the worker's, we should still only use the worker-global offsets store connectorStore = worker.offsetStoreForRegularSourceConnector(sourceConfig, CONNECTOR_ID, sourceConnector, null); + connectorStore.configure(config); assertTrue(connectorStore.hasWorkerGlobalStore()); assertFalse(connectorStore.hasConnectorSpecificStore()); + connectorStore.stop(); worker.stop(); @@ -1384,48 +1399,60 @@ public void testOffsetStoreForExactlyOnceSourceConnector() { SourceConnectorConfig sourceConfig = new SourceConnectorConfig(plugins, connectorProps, enableTopicCreation); // With no connector-specific offsets topic in the config, we should only use a connector-specific offsets store ConnectorOffsetBackingStore connectorStore = worker.offsetStoreForExactlyOnceSourceConnector(sourceConfig, CONNECTOR_ID, sourceConnector, null); + connectorStore.configure(config); assertFalse(connectorStore.hasWorkerGlobalStore()); assertTrue(connectorStore.hasConnectorSpecificStore()); + connectorStore.stop(); connectorProps.put(SourceConnectorConfig.OFFSETS_TOPIC_CONFIG, "connector-offsets-topic"); sourceConfig = new SourceConnectorConfig(plugins, connectorProps, enableTopicCreation); // With a connector-specific offsets topic in the config (whose name differs from the worker's offsets topic), we should use both a // connector-specific store and the worker-global store connectorStore = worker.offsetStoreForExactlyOnceSourceConnector(sourceConfig, CONNECTOR_ID, sourceConnector, null); + connectorStore.configure(config); assertTrue(connectorStore.hasWorkerGlobalStore()); assertTrue(connectorStore.hasConnectorSpecificStore()); + connectorStore.stop(); connectorProps.put(SourceConnectorConfig.OFFSETS_TOPIC_CONFIG, workerOffsetsTopic); sourceConfig = new SourceConnectorConfig(plugins, connectorProps, enableTopicCreation); // With a connector-specific offsets topic in the config whose name matches the worker's offsets topic, and no overridden bootstrap.servers // for the connector, we should only use a connector-specific store connectorStore = worker.offsetStoreForExactlyOnceSourceConnector(sourceConfig, CONNECTOR_ID, sourceConnector, null); + connectorStore.configure(config); assertFalse(connectorStore.hasWorkerGlobalStore()); assertTrue(connectorStore.hasConnectorSpecificStore()); + connectorStore.stop(); connectorProps.put(CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX + BOOTSTRAP_SERVERS_CONFIG, workerBootstrapServers); sourceConfig = new SourceConnectorConfig(plugins, connectorProps, enableTopicCreation); // With a connector-specific offsets topic in the config whose name matches the worker's offsets topic, and an overridden bootstrap.servers // for the connector that exactly matches the worker's, we should only use a connector-specific store connectorStore = worker.offsetStoreForExactlyOnceSourceConnector(sourceConfig, CONNECTOR_ID, sourceConnector, null); + connectorStore.configure(config); assertFalse(connectorStore.hasWorkerGlobalStore()); assertTrue(connectorStore.hasConnectorSpecificStore()); + connectorStore.stop(); connectorProps.put(CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX + BOOTSTRAP_SERVERS_CONFIG, "localhost:1111"); sourceConfig = new SourceConnectorConfig(plugins, connectorProps, enableTopicCreation); // With a connector-specific offsets topic in the config whose name matches the worker's offsets topic, and an overridden bootstrap.servers // for the connector that doesn't match the worker's, we should use both a connector-specific store and the worker-global store connectorStore = worker.offsetStoreForExactlyOnceSourceConnector(sourceConfig, CONNECTOR_ID, sourceConnector, null); + connectorStore.configure(config); assertTrue(connectorStore.hasWorkerGlobalStore()); assertTrue(connectorStore.hasConnectorSpecificStore()); + connectorStore.stop(); connectorProps.remove(SourceConnectorConfig.OFFSETS_TOPIC_CONFIG); sourceConfig = new SourceConnectorConfig(plugins, connectorProps, enableTopicCreation); // With no connector-specific offsets topic in the config and an overridden bootstrap.servers // for the connector that doesn't match the worker's, we should use both a connector-specific store and the worker-global store connectorStore = worker.offsetStoreForExactlyOnceSourceConnector(sourceConfig, CONNECTOR_ID, sourceConnector, null); + connectorStore.configure(config); assertTrue(connectorStore.hasWorkerGlobalStore()); assertTrue(connectorStore.hasConnectorSpecificStore()); + connectorStore.stop(); worker.stop(); @@ -1467,14 +1494,17 @@ public void testOffsetStoreForRegularSourceTask() { // With no connector-specific offsets topic in the config, we should only use the worker-global store // Pass in a null topic admin to make sure that with these parameters, the method doesn't require a topic admin ConnectorOffsetBackingStore connectorStore = worker.offsetStoreForRegularSourceTask(TASK_ID, sourceConfigWithoutOffsetsTopic, sourceConnector.getClass(), producer, producerProps, null); + connectorStore.configure(config); assertTrue(connectorStore.hasWorkerGlobalStore()); assertFalse(connectorStore.hasConnectorSpecificStore()); + connectorStore.stop(); connectorProps.put(SourceConnectorConfig.OFFSETS_TOPIC_CONFIG, "connector-offsets-topic"); final SourceConnectorConfig sourceConfigWithOffsetsTopic = new SourceConnectorConfig(plugins, connectorProps, enableTopicCreation); // With a connector-specific offsets topic in the config (whose name differs from the worker's offsets topic), we should use both a // connector-specific store and the worker-global store connectorStore = worker.offsetStoreForRegularSourceTask(TASK_ID, sourceConfigWithOffsetsTopic, sourceConnector.getClass(), producer, producerProps, topicAdmin); + connectorStore.configure(config); assertTrue(connectorStore.hasWorkerGlobalStore()); assertTrue(connectorStore.hasConnectorSpecificStore()); assertThrows(NullPointerException.class, @@ -1482,12 +1512,14 @@ public void testOffsetStoreForRegularSourceTask() { TASK_ID, sourceConfigWithOffsetsTopic, sourceConnector.getClass(), producer, producerProps, null ) ); + connectorStore.stop(); connectorProps.put(SourceConnectorConfig.OFFSETS_TOPIC_CONFIG, workerOffsetsTopic); final SourceConnectorConfig sourceConfigWithSameOffsetsTopicAsWorker = new SourceConnectorConfig(plugins, connectorProps, enableTopicCreation); // With a connector-specific offsets topic in the config whose name matches the worker's offsets topic, and no overridden bootstrap.servers // for the connector, we should only use a connector-specific store connectorStore = worker.offsetStoreForRegularSourceTask(TASK_ID, sourceConfigWithSameOffsetsTopicAsWorker, sourceConnector.getClass(), producer, producerProps, topicAdmin); + connectorStore.configure(config); assertFalse(connectorStore.hasWorkerGlobalStore()); assertTrue(connectorStore.hasConnectorSpecificStore()); assertThrows( @@ -1496,11 +1528,13 @@ public void testOffsetStoreForRegularSourceTask() { TASK_ID, sourceConfigWithSameOffsetsTopicAsWorker, sourceConnector.getClass(), producer, producerProps, null ) ); + connectorStore.stop(); producerProps.put(BOOTSTRAP_SERVERS_CONFIG, workerBootstrapServers); // With a connector-specific offsets topic in the config whose name matches the worker's offsets topic, and an overridden bootstrap.servers // for the connector that exactly matches the worker's, we should only use a connector-specific store connectorStore = worker.offsetStoreForRegularSourceTask(TASK_ID, sourceConfigWithSameOffsetsTopicAsWorker, sourceConnector.getClass(), producer, producerProps, topicAdmin); + connectorStore.configure(config); assertFalse(connectorStore.hasWorkerGlobalStore()); assertTrue(connectorStore.hasConnectorSpecificStore()); assertThrows( @@ -1509,11 +1543,13 @@ public void testOffsetStoreForRegularSourceTask() { TASK_ID, sourceConfigWithSameOffsetsTopicAsWorker, sourceConnector.getClass(), producer, producerProps, null ) ); + connectorStore.stop(); producerProps.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:1111"); // With a connector-specific offsets topic in the config whose name matches the worker's offsets topic, and an overridden bootstrap.servers // for the connector that doesn't match the worker's, we should use both a connector-specific store and the worker-global store connectorStore = worker.offsetStoreForRegularSourceTask(TASK_ID, sourceConfigWithSameOffsetsTopicAsWorker, sourceConnector.getClass(), producer, producerProps, topicAdmin); + connectorStore.configure(config); assertTrue(connectorStore.hasWorkerGlobalStore()); assertTrue(connectorStore.hasConnectorSpecificStore()); assertThrows( @@ -1522,14 +1558,17 @@ public void testOffsetStoreForRegularSourceTask() { TASK_ID, sourceConfigWithSameOffsetsTopicAsWorker, sourceConnector.getClass(), producer, producerProps, null ) ); + connectorStore.stop(); connectorProps.remove(SourceConnectorConfig.OFFSETS_TOPIC_CONFIG); // With no connector-specific offsets topic in the config and an overridden bootstrap.servers // for the connector that doesn't match the worker's, we should still only use the worker-global store // Pass in a null topic admin to make sure that with these parameters, the method doesn't require a topic admin connectorStore = worker.offsetStoreForRegularSourceTask(TASK_ID, sourceConfigWithoutOffsetsTopic, sourceConnector.getClass(), producer, producerProps, null); + connectorStore.configure(config); assertTrue(connectorStore.hasWorkerGlobalStore()); assertFalse(connectorStore.hasConnectorSpecificStore()); + connectorStore.stop(); worker.stop(); @@ -1569,48 +1608,60 @@ public void testOffsetStoreForExactlyOnceSourceTask() { producerProps.put(BOOTSTRAP_SERVERS_CONFIG, workerBootstrapServers); // With no connector-specific offsets topic in the config, we should only use a connector-specific offsets store ConnectorOffsetBackingStore connectorStore = worker.offsetStoreForExactlyOnceSourceTask(TASK_ID, sourceConfig, sourceConnector.getClass(), producer, producerProps, topicAdmin); + connectorStore.configure(config); assertFalse(connectorStore.hasWorkerGlobalStore()); assertTrue(connectorStore.hasConnectorSpecificStore()); + connectorStore.stop(); connectorProps.put(SourceConnectorConfig.OFFSETS_TOPIC_CONFIG, "connector-offsets-topic"); sourceConfig = new SourceConnectorConfig(plugins, connectorProps, enableTopicCreation); // With a connector-specific offsets topic in the config (whose name differs from the worker's offsets topic), we should use both a // connector-specific store and the worker-global store connectorStore = worker.offsetStoreForExactlyOnceSourceTask(TASK_ID, sourceConfig, sourceConnector.getClass(), producer, producerProps, topicAdmin); + connectorStore.configure(config); assertTrue(connectorStore.hasWorkerGlobalStore()); assertTrue(connectorStore.hasConnectorSpecificStore()); + connectorStore.stop(); connectorProps.put(SourceConnectorConfig.OFFSETS_TOPIC_CONFIG, workerOffsetsTopic); sourceConfig = new SourceConnectorConfig(plugins, connectorProps, enableTopicCreation); // With a connector-specific offsets topic in the config whose name matches the worker's offsets topic, and no overridden bootstrap.servers // for the connector, we should only use a connector-specific store connectorStore = worker.offsetStoreForExactlyOnceSourceTask(TASK_ID, sourceConfig, sourceConnector.getClass(), producer, producerProps, topicAdmin); + connectorStore.configure(config); assertFalse(connectorStore.hasWorkerGlobalStore()); assertTrue(connectorStore.hasConnectorSpecificStore()); + connectorStore.stop(); producerProps.put(BOOTSTRAP_SERVERS_CONFIG, workerBootstrapServers); sourceConfig = new SourceConnectorConfig(plugins, connectorProps, enableTopicCreation); // With a connector-specific offsets topic in the config whose name matches the worker's offsets topic, and an overridden bootstrap.servers // for the connector that exactly matches the worker's, we should only use a connector-specific store connectorStore = worker.offsetStoreForExactlyOnceSourceTask(TASK_ID, sourceConfig, sourceConnector.getClass(), producer, producerProps, topicAdmin); + connectorStore.configure(config); assertFalse(connectorStore.hasWorkerGlobalStore()); assertTrue(connectorStore.hasConnectorSpecificStore()); + connectorStore.stop(); producerProps.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:1111"); sourceConfig = new SourceConnectorConfig(plugins, connectorProps, enableTopicCreation); // With a connector-specific offsets topic in the config whose name matches the worker's offsets topic, and an overridden bootstrap.servers // for the connector that doesn't match the worker's, we should use both a connector-specific store and the worker-global store connectorStore = worker.offsetStoreForExactlyOnceSourceTask(TASK_ID, sourceConfig, sourceConnector.getClass(), producer, producerProps, topicAdmin); + connectorStore.configure(config); assertTrue(connectorStore.hasWorkerGlobalStore()); assertTrue(connectorStore.hasConnectorSpecificStore()); + connectorStore.stop(); connectorProps.remove(SourceConnectorConfig.OFFSETS_TOPIC_CONFIG); sourceConfig = new SourceConnectorConfig(plugins, connectorProps, enableTopicCreation); // With no connector-specific offsets topic in the config and an overridden bootstrap.servers // for the connector that doesn't match the worker's, we should use both a connector-specific store and the worker-global store connectorStore = worker.offsetStoreForExactlyOnceSourceTask(TASK_ID, sourceConfig, sourceConnector.getClass(), producer, producerProps, topicAdmin); + connectorStore.configure(config); assertTrue(connectorStore.hasWorkerGlobalStore()); assertTrue(connectorStore.hasConnectorSpecificStore()); + connectorStore.stop(); worker.stop(); @@ -1703,6 +1754,8 @@ public void testExecutorServiceShutdownWhenTerminationThrowsException() throws I assertEquals(Collections.emptySet(), worker.connectorNames()); worker.stop(); + // Clear the interrupted status so that the test infrastructure doesn't hit an unexpected interrupt. + assertTrue(Thread.interrupted()); verifyKafkaClusterId(); verify(executorService, times(1)).shutdown(); verify(executorService, times(1)).shutdownNow(); @@ -2643,6 +2696,39 @@ private Map anyConnectorConfigMap() { return props; } + /** + * This method is called in place of the constructor of WorkerTask subclasses. + * All AutoClosable objects (producers, consumers, admin clients, etc.) are closed, as their lifetimes + * are managed by the WorkerTask. While the worker task is mocked, it cannot manage the lifetimes itself. + */ + private static void workerTaskConstructor(WorkerTask mock, MockedConstruction.Context context) { + for (Object argument : context.arguments()) { + if (argument instanceof AutoCloseable) { + Utils.closeQuietly((AutoCloseable) argument, "worker task client"); + } + if (argument instanceof OffsetBackingStore) { + Utils.closeQuietly(((OffsetBackingStore) argument)::stop, "offset backing store"); + } + } + } + + /** + * This method is called in place of methods on WorkerTask subclasses. + */ + private Object workerTaskMethod(InvocationOnMock invocation) { + // provide implementations of three methods used during testing + switch (invocation.getMethod().getName()) { + case "id": + return TASK_ID; + case "loader": + return pluginLoader; + case "awaitStop": + return true; + default: + return null; + } + } + private static class TestSourceTask extends SourceTask { public TestSourceTask() { } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMemberTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMemberTest.java index ea7ee186f807a..77d70d8fd9a7b 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMemberTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMemberTest.java @@ -91,6 +91,7 @@ public void testMetrics() throws Exception { MBeanServer server = ManagementFactory.getPlatformMBeanServer(); //verify metric exists with correct prefix assertNotNull(server.getObjectInstance(new ObjectName("kafka.connect:type=grp1,client-id=client-1"))); + member.stop(); } @Test diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java index 19b8090f69db0..e3694cae2911a 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java @@ -554,42 +554,47 @@ public ConsumerRecords consumeAll( ) throws TimeoutException, InterruptedException, ExecutionException { long endTimeMs = System.currentTimeMillis() + maxDurationMs; - Consumer consumer = createConsumer(consumerProps != null ? consumerProps : Collections.emptyMap()); - Admin admin = createAdminClient(adminProps != null ? adminProps : Collections.emptyMap()); + long remainingTimeMs; + Set topicPartitions; + Map endOffsets; + try (Admin admin = createAdminClient(adminProps != null ? adminProps : Collections.emptyMap())) { - long remainingTimeMs = endTimeMs - System.currentTimeMillis(); - Set topicPartitions = listPartitions(remainingTimeMs, admin, Arrays.asList(topics)); + remainingTimeMs = endTimeMs - System.currentTimeMillis(); + topicPartitions = listPartitions(remainingTimeMs, admin, Arrays.asList(topics)); - remainingTimeMs = endTimeMs - System.currentTimeMillis(); - Map endOffsets = readEndOffsets(remainingTimeMs, admin, topicPartitions); + remainingTimeMs = endTimeMs - System.currentTimeMillis(); + endOffsets = readEndOffsets(remainingTimeMs, admin, topicPartitions); + } Map>> records = topicPartitions.stream() .collect(Collectors.toMap( Function.identity(), tp -> new ArrayList<>() )); - consumer.assign(topicPartitions); - - while (!endOffsets.isEmpty()) { - Iterator> it = endOffsets.entrySet().iterator(); - while (it.hasNext()) { - Map.Entry entry = it.next(); - TopicPartition topicPartition = entry.getKey(); - long endOffset = entry.getValue(); - long lastConsumedOffset = consumer.position(topicPartition); - if (lastConsumedOffset >= endOffset) { - // We've reached the end offset for the topic partition; can stop polling it now - it.remove(); - } else { - remainingTimeMs = endTimeMs - System.currentTimeMillis(); - if (remainingTimeMs <= 0) { - throw new AssertionError("failed to read to end of topic(s) " + Arrays.asList(topics) + " within " + maxDurationMs + "ms"); + try (Consumer consumer = createConsumer(consumerProps != null ? consumerProps : Collections.emptyMap())) { + consumer.assign(topicPartitions); + + while (!endOffsets.isEmpty()) { + Iterator> it = endOffsets.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry entry = it.next(); + TopicPartition topicPartition = entry.getKey(); + long endOffset = entry.getValue(); + long lastConsumedOffset = consumer.position(topicPartition); + if (lastConsumedOffset >= endOffset) { + // We've reached the end offset for the topic partition; can stop polling it now + it.remove(); + } else { + remainingTimeMs = endTimeMs - System.currentTimeMillis(); + if (remainingTimeMs <= 0) { + throw new AssertionError("failed to read to end of topic(s) " + Arrays.asList(topics) + " within " + maxDurationMs + "ms"); + } + // We haven't reached the end offset yet; need to keep polling + ConsumerRecords recordBatch = consumer.poll(Duration.ofMillis(remainingTimeMs)); + recordBatch.partitions().forEach(tp -> records.get(tp) + .addAll(recordBatch.records(tp)) + ); } - // We haven't reached the end offset yet; need to keep polling - ConsumerRecords recordBatch = consumer.poll(Duration.ofMillis(remainingTimeMs)); - recordBatch.partitions().forEach(tp -> records.get(tp) - .addAll(recordBatch.records(tp)) - ); } } }