diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java index 837891071a49a..d89f577688f35 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java @@ -196,6 +196,7 @@ protected abstract void producerSendFailed( List toSend; protected Map taskConfig; protected boolean started = false; + private volatile boolean producerClosed = false; protected AbstractWorkerSourceTask(ConnectorTaskId id, SourceTask task, @@ -315,6 +316,7 @@ protected void close() { private void closeProducer(Duration duration) { if (producer != null) { + producerClosed = true; Utils.closeQuietly(() -> producer.close(duration), "source task producer"); } } @@ -397,7 +399,11 @@ boolean sendRecords() { producerRecord, (recordMetadata, e) -> { if (e != null) { - log.debug("{} failed to send record to {}: ", AbstractWorkerSourceTask.this, topic, e); + if (producerClosed) { + log.trace("{} failed to send record to {}; this is expected as the producer has already been closed", AbstractWorkerSourceTask.this, topic, e); + } else { + log.error("{} failed to send record to {}: ", AbstractWorkerSourceTask.this, topic, e); + } log.trace("{} Failed record: {}", AbstractWorkerSourceTask.this, preTransformRecord); producerSendFailed(false, producerRecord, preTransformRecord, e); } else { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java index 1a63cc1532d4a..37d93a3fe8685 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java @@ -174,8 +174,6 @@ protected void producerSendFailed( ); commitTaskRecord(preTransformRecord, null); } else { - log.error("{} failed to send record to {}: ", WorkerSourceTask.this, topic, e); - log.trace("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord); producerSendException.compareAndSet(null, e); } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java index 91dd4800c80ac..f8442351b8ec0 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java @@ -340,6 +340,7 @@ public void putTaskConfigs(final @PathParam("connector") String connector, @PUT @Path("/{connector}/fence") + @Operation(hidden = true, summary = "This operation is only for inter-worker communications") public void fenceZombies(final @PathParam("connector") String connector, final @Context HttpHeaders headers, final @QueryParam("forward") Boolean forward, diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorHandle.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorHandle.java index b31455b248483..bed05fa21e41c 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorHandle.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorHandle.java @@ -112,6 +112,14 @@ public void deleteTask(String taskId) { taskHandles.remove(taskId); } + /** + * Delete all task handles for this connector. + */ + public void clearTasks() { + log.info("Clearing {} existing task handles for connector {}", taskHandles.size(), connectorName); + taskHandles.clear(); + } + /** * Set the number of expected records for this connector. * diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java new file mode 100644 index 0000000000000..25a419ed8f8f3 --- /dev/null +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java @@ -0,0 +1,1130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.integration; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.acl.AccessControlEntry; +import org.apache.kafka.common.acl.AclBinding; +import org.apache.kafka.common.acl.AclOperation; +import org.apache.kafka.common.acl.AclPermissionType; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.errors.ProducerFencedException; +import org.apache.kafka.common.resource.PatternType; +import org.apache.kafka.common.resource.ResourcePattern; +import org.apache.kafka.common.resource.ResourceType; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.connector.Task; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.json.JsonConverter; +import org.apache.kafka.connect.json.JsonConverterConfig; +import org.apache.kafka.connect.runtime.Worker; +import org.apache.kafka.connect.runtime.WorkerConfig; +import org.apache.kafka.connect.runtime.distributed.DistributedConfig; +import org.apache.kafka.connect.runtime.rest.entities.ConfigInfo; +import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos; +import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException; +import org.apache.kafka.connect.source.SourceConnector; +import org.apache.kafka.connect.source.SourceRecord; +import org.apache.kafka.connect.source.SourceTask; +import org.apache.kafka.connect.storage.StringConverter; +import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; +import org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions; +import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster; +import org.apache.kafka.test.IntegrationTest; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.LongStream; + +import static org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.TRANSACTIONAL_ID_CONFIG; +import static org.apache.kafka.connect.integration.MonitorableSourceConnector.CUSTOM_EXACTLY_ONCE_SUPPORT_CONFIG; +import static org.apache.kafka.connect.integration.MonitorableSourceConnector.CUSTOM_TRANSACTION_BOUNDARIES_CONFIG; +import static org.apache.kafka.connect.integration.MonitorableSourceConnector.MESSAGES_PER_POLL_CONFIG; +import static org.apache.kafka.connect.integration.MonitorableSourceConnector.TOPIC_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX; +import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX; +import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX; +import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.NAME_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG; +import static org.apache.kafka.connect.runtime.SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_CONFIG; +import static org.apache.kafka.connect.runtime.SourceConnectorConfig.OFFSETS_TOPIC_CONFIG; +import static org.apache.kafka.connect.runtime.SourceConnectorConfig.TRANSACTION_BOUNDARY_CONFIG; +import static org.apache.kafka.connect.runtime.SourceConnectorConfig.TRANSACTION_BOUNDARY_INTERVAL_CONFIG; +import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG; +import static org.apache.kafka.connect.source.SourceTask.TransactionBoundary.CONNECTOR; +import static org.apache.kafka.connect.source.SourceTask.TransactionBoundary.INTERVAL; +import static org.apache.kafka.connect.source.SourceTask.TransactionBoundary.POLL; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +@Category(IntegrationTest.class) +public class ExactlyOnceSourceIntegrationTest { + + private static final Logger log = LoggerFactory.getLogger(ExactlyOnceSourceIntegrationTest.class); + private static final String CLUSTER_GROUP_ID = "exactly-once-source-integration-test"; + private static final String CONNECTOR_NAME = "exactlyOnceQuestionMark"; + + private static final int CONSUME_RECORDS_TIMEOUT_MS = 60_000; + private static final int SOURCE_TASK_PRODUCE_TIMEOUT_MS = 30_000; + private static final int DEFAULT_NUM_WORKERS = 3; + + private Properties brokerProps; + private Map workerProps; + private EmbeddedConnectCluster.Builder connectBuilder; + private EmbeddedConnectCluster connect; + private ConnectorHandle connectorHandle; + + @Before + public void setup() { + workerProps = new HashMap<>(); + workerProps.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "enabled"); + workerProps.put(DistributedConfig.GROUP_ID_CONFIG, CLUSTER_GROUP_ID); + + brokerProps = new Properties(); + brokerProps.put("transaction.state.log.replication.factor", "1"); + brokerProps.put("transaction.state.log.min.isr", "1"); + + // build a Connect cluster backed by Kafka and Zk + connectBuilder = new EmbeddedConnectCluster.Builder() + .numWorkers(DEFAULT_NUM_WORKERS) + .numBrokers(1) + .workerProps(workerProps) + .brokerProps(brokerProps); + + // get a handle to the connector + connectorHandle = RuntimeHandles.get().connectorHandle(CONNECTOR_NAME); + } + + private void startConnect() { + connect = connectBuilder.build(); + connect.start(); + } + + @After + public void close() { + try { + // stop all Connect, Kafka and Zk threads. + connect.stop(); + } finally { + // Clear the handle for the connector. Fun fact: if you don't do this, your tests become quite flaky. + RuntimeHandles.get().deleteConnector(CONNECTOR_NAME); + } + } + + /** + * A simple test for the pre-flight validation API for connectors to provide their own delivery guarantees. + */ + @Test + public void testPreflightValidation() { + connectBuilder.numWorkers(1); + startConnect(); + + Map props = new HashMap<>(); + props.put(CONNECTOR_CLASS_CONFIG, MonitorableSourceConnector.class.getName()); + props.put(TASKS_MAX_CONFIG, "1"); + props.put(TOPIC_CONFIG, "topic"); + props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); + props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); + props.put(NAME_CONFIG, CONNECTOR_NAME); + + // Test out the "exactly.once.support" property + props.put(EXACTLY_ONCE_SUPPORT_CONFIG, "required"); + + // Connector will return null from SourceConnector::exactlyOnceSupport + props.put(CUSTOM_EXACTLY_ONCE_SUPPORT_CONFIG, MonitorableSourceConnector.EXACTLY_ONCE_NULL); + ConfigInfos validation = connect.validateConnectorConfig(MonitorableSourceConnector.class.getSimpleName(), props); + assertEquals("Preflight validation should have exactly one error", 1, validation.errorCount()); + ConfigInfo propertyValidation = findConfigInfo(EXACTLY_ONCE_SUPPORT_CONFIG, validation); + assertFalse("Preflight validation for exactly-once support property should have at least one error message", + propertyValidation.configValue().errors().isEmpty()); + + // Connector will return UNSUPPORTED from SourceConnector::exactlyOnceSupport + props.put(CUSTOM_EXACTLY_ONCE_SUPPORT_CONFIG, MonitorableSourceConnector.EXACTLY_ONCE_UNSUPPORTED); + validation = connect.validateConnectorConfig(MonitorableSourceConnector.class.getSimpleName(), props); + assertEquals("Preflight validation should have exactly one error", 1, validation.errorCount()); + propertyValidation = findConfigInfo(EXACTLY_ONCE_SUPPORT_CONFIG, validation); + assertFalse("Preflight validation for exactly-once support property should have at least one error message", + propertyValidation.configValue().errors().isEmpty()); + + // Connector will throw an exception from SourceConnector::exactlyOnceSupport + props.put(CUSTOM_EXACTLY_ONCE_SUPPORT_CONFIG, MonitorableSourceConnector.EXACTLY_ONCE_FAIL); + validation = connect.validateConnectorConfig(MonitorableSourceConnector.class.getSimpleName(), props); + assertEquals("Preflight validation should have exactly one error", 1, validation.errorCount()); + propertyValidation = findConfigInfo(EXACTLY_ONCE_SUPPORT_CONFIG, validation); + assertFalse("Preflight validation for exactly-once support property should have at least one error message", + propertyValidation.configValue().errors().isEmpty()); + + // Connector will return SUPPORTED from SourceConnector::exactlyOnceSupport + props.put(CUSTOM_EXACTLY_ONCE_SUPPORT_CONFIG, MonitorableSourceConnector.EXACTLY_ONCE_SUPPORTED); + validation = connect.validateConnectorConfig(MonitorableSourceConnector.class.getSimpleName(), props); + assertEquals("Preflight validation should have zero errors", 0, validation.errorCount()); + + // Test out the transaction boundary definition property + props.put(TRANSACTION_BOUNDARY_CONFIG, CONNECTOR.toString()); + + // Connector will return null from SourceConnector::canDefineTransactionBoundaries + props.put(CUSTOM_TRANSACTION_BOUNDARIES_CONFIG, MonitorableSourceConnector.TRANSACTION_BOUNDARIES_NULL); + validation = connect.validateConnectorConfig(MonitorableSourceConnector.class.getSimpleName(), props); + assertEquals("Preflight validation should have exactly one error", 1, validation.errorCount()); + propertyValidation = findConfigInfo(TRANSACTION_BOUNDARY_CONFIG, validation); + assertFalse("Preflight validation for transaction boundary property should have at least one error message", + propertyValidation.configValue().errors().isEmpty()); + + // Connector will return UNSUPPORTED from SourceConnector::canDefineTransactionBoundaries + props.put(CUSTOM_TRANSACTION_BOUNDARIES_CONFIG, MonitorableSourceConnector.TRANSACTION_BOUNDARIES_UNSUPPORTED); + validation = connect.validateConnectorConfig(MonitorableSourceConnector.class.getSimpleName(), props); + assertEquals("Preflight validation should have exactly one error", 1, validation.errorCount()); + propertyValidation = findConfigInfo(TRANSACTION_BOUNDARY_CONFIG, validation); + assertFalse("Preflight validation for transaction boundary property should have at least one error message", + propertyValidation.configValue().errors().isEmpty()); + + // Connector will throw an exception from SourceConnector::canDefineTransactionBoundaries + props.put(CUSTOM_TRANSACTION_BOUNDARIES_CONFIG, MonitorableSourceConnector.TRANSACTION_BOUNDARIES_FAIL); + validation = connect.validateConnectorConfig(MonitorableSourceConnector.class.getSimpleName(), props); + assertEquals("Preflight validation should have exactly one error", 1, validation.errorCount()); + propertyValidation = findConfigInfo(TRANSACTION_BOUNDARY_CONFIG, validation); + assertFalse("Preflight validation for transaction boundary property should have at least one error message", + propertyValidation.configValue().errors().isEmpty()); + + // Connector will return SUPPORTED from SourceConnector::canDefineTransactionBoundaries + props.put(CUSTOM_TRANSACTION_BOUNDARIES_CONFIG, MonitorableSourceConnector.TRANSACTION_BOUNDARIES_SUPPORTED); + validation = connect.validateConnectorConfig(MonitorableSourceConnector.class.getSimpleName(), props); + assertEquals("Preflight validation should have zero errors", 0, validation.errorCount()); + } + + /** + * A simple green-path test that ensures the worker can start up a source task with exactly-once support enabled + * and write some records to Kafka that will be visible to a downstream consumer using the "READ_COMMITTED" + * isolation level. The "poll" transaction boundary is used. + */ + @Test + public void testPollBoundary() throws Exception { + // Much slower offset commit interval; should never be triggered during this test + workerProps.put(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG, "600000"); + connectBuilder.numWorkers(1); + startConnect(); + + String topic = "test-topic"; + connect.kafka().createTopic(topic, 3); + + int numTasks = 1; + int recordsProduced = 100; + + Map props = new HashMap<>(); + props.put(CONNECTOR_CLASS_CONFIG, MonitorableSourceConnector.class.getName()); + props.put(TASKS_MAX_CONFIG, Integer.toString(numTasks)); + props.put(TOPIC_CONFIG, topic); + props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); + props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); + props.put(NAME_CONFIG, CONNECTOR_NAME); + props.put(TRANSACTION_BOUNDARY_CONFIG, POLL.toString()); + props.put(MESSAGES_PER_POLL_CONFIG, Integer.toString(recordsProduced)); + + // expect all records to be consumed and committed by the connector + connectorHandle.expectedRecords(recordsProduced); + connectorHandle.expectedCommits(recordsProduced); + + // start a source connector + connect.configureConnector(CONNECTOR_NAME, props); + + log.info("Waiting for records to be provided to worker by task"); + // wait for the connector tasks to produce enough records + connectorHandle.awaitRecords(SOURCE_TASK_PRODUCE_TIMEOUT_MS); + + log.info("Waiting for records to be committed to Kafka by worker"); + // wait for the connector tasks to commit enough records + connectorHandle.awaitCommits(TimeUnit.MINUTES.toMillis(1)); + + StartAndStopLatch connectorStop = connectorHandle.expectedStops(1, true); + connect.deleteConnector(CONNECTOR_NAME); + assertConnectorStopped(connectorStop); + + // consume all records from the source topic or fail, to ensure that they were correctly produced + ConsumerRecords records = connect.kafka().consumeAll( + CONSUME_RECORDS_TIMEOUT_MS, + Collections.singletonMap(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"), + null, + topic + ); + assertTrue("Not enough records produced by source connector. Expected at least: " + recordsProduced + " + but got " + records.count(), + records.count() >= recordsProduced); + assertExactlyOnceSeqnos(records, numTasks); + } + + /** + * A simple green-path test that ensures the worker can start up a source task with exactly-once support enabled + * and write some records to Kafka that will be visible to a downstream consumer using the "READ_COMMITTED" + * isolation level. The "interval" transaction boundary is used with a connector-specific override. + */ + @Test + public void testIntervalBoundary() throws Exception { + // Much slower offset commit interval; should never be triggered during this test + workerProps.put(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG, "600000"); + connectBuilder.numWorkers(1); + startConnect(); + + String topic = "test-topic"; + connect.kafka().createTopic(topic, 3); + + int numTasks = 1; + int recordsProduced = 100; + + Map props = new HashMap<>(); + props.put(CONNECTOR_CLASS_CONFIG, MonitorableSourceConnector.class.getName()); + props.put(TASKS_MAX_CONFIG, Integer.toString(numTasks)); + props.put(TOPIC_CONFIG, topic); + props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); + props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); + props.put(NAME_CONFIG, CONNECTOR_NAME); + props.put(TRANSACTION_BOUNDARY_CONFIG, INTERVAL.toString()); + props.put(TRANSACTION_BOUNDARY_INTERVAL_CONFIG, "10000"); + props.put(MESSAGES_PER_POLL_CONFIG, Integer.toString(recordsProduced)); + + // expect all records to be consumed and committed by the connector + connectorHandle.expectedRecords(recordsProduced); + connectorHandle.expectedCommits(recordsProduced); + + // start a source connector + connect.configureConnector(CONNECTOR_NAME, props); + + log.info("Waiting for records to be provided to worker by task"); + // wait for the connector tasks to produce enough records + connectorHandle.awaitRecords(SOURCE_TASK_PRODUCE_TIMEOUT_MS); + + log.info("Waiting for records to be committed to Kafka by worker"); + // wait for the connector tasks to commit enough records + connectorHandle.awaitCommits(TimeUnit.MINUTES.toMillis(1)); + + StartAndStopLatch connectorStop = connectorHandle.expectedStops(1, true); + connect.deleteConnector(CONNECTOR_NAME); + assertConnectorStopped(connectorStop); + + // consume all records from the source topic or fail, to ensure that they were correctly produced + ConsumerRecords records = connect.kafka().consumeAll( + CONSUME_RECORDS_TIMEOUT_MS, + Collections.singletonMap(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"), + null, + topic + ); + assertTrue("Not enough records produced by source connector. Expected at least: " + recordsProduced + " + but got " + records.count(), + records.count() >= recordsProduced); + assertExactlyOnceSeqnos(records, numTasks); + } + + /** + * A simple green-path test that ensures the worker can start up a source task with exactly-once support enabled + * and write some records to Kafka that will be visible to a downstream consumer using the "READ_COMMITTED" + * isolation level. The "connector" transaction boundary is used with a connector that defines transactions whose + * size correspond to successive elements of the Fibonacci sequence, where transactions with an even number of + * records are aborted, and those with an odd number of records are committed. + */ + @Test + public void testConnectorBoundary() throws Exception { + String offsetsTopic = "exactly-once-source-cluster-offsets"; + workerProps.put(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, offsetsTopic); + connectBuilder.numWorkers(1); + startConnect(); + + String topic = "test-topic"; + connect.kafka().createTopic(topic, 3); + + int recordsProduced = 100; + + Map props = new HashMap<>(); + props.put(CONNECTOR_CLASS_CONFIG, MonitorableSourceConnector.class.getName()); + props.put(TASKS_MAX_CONFIG, "1"); + props.put(TOPIC_CONFIG, topic); + props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); + props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); + props.put(NAME_CONFIG, CONNECTOR_NAME); + props.put(TRANSACTION_BOUNDARY_CONFIG, CONNECTOR.toString()); + props.put(CUSTOM_TRANSACTION_BOUNDARIES_CONFIG, MonitorableSourceConnector.TRANSACTION_BOUNDARIES_SUPPORTED); + props.put(MESSAGES_PER_POLL_CONFIG, Integer.toString(recordsProduced)); + + // expect all records to be consumed and committed by the connector + connectorHandle.expectedRecords(recordsProduced); + connectorHandle.expectedCommits(recordsProduced); + + // start a source connector + connect.configureConnector(CONNECTOR_NAME, props); + + log.info("Waiting for records to be provided to worker by task"); + // wait for the connector tasks to produce enough records + connectorHandle.awaitRecords(SOURCE_TASK_PRODUCE_TIMEOUT_MS); + + log.info("Waiting for records to be committed to Kafka by worker"); + // wait for the connector tasks to commit enough records + connectorHandle.awaitCommits(TimeUnit.MINUTES.toMillis(1)); + + Map consumerProps = new HashMap<>(); + consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); + // consume all records from the source topic or fail, to ensure that they were correctly produced + ConsumerRecords sourceRecords = connect.kafka() + .consume( + recordsProduced, + TimeUnit.MINUTES.toMillis(1), + consumerProps, + "test-topic"); + assertTrue("Not enough records produced by source connector. Expected at least: " + recordsProduced + " + but got " + sourceRecords.count(), + sourceRecords.count() >= recordsProduced); + + // also consume from the cluster's offsets topic to verify that the expected offsets (which should correspond to the connector's + // custom transaction boundaries) were committed + List expectedOffsetSeqnos = new ArrayList<>(); + long lastExpectedOffsetSeqno = 1; + long nextExpectedOffsetSeqno = 1; + while (nextExpectedOffsetSeqno <= recordsProduced) { + expectedOffsetSeqnos.add(nextExpectedOffsetSeqno); + nextExpectedOffsetSeqno += lastExpectedOffsetSeqno; + lastExpectedOffsetSeqno = nextExpectedOffsetSeqno - lastExpectedOffsetSeqno; + } + ConsumerRecords offsetRecords = connect.kafka() + .consume( + expectedOffsetSeqnos.size(), + TimeUnit.MINUTES.toMillis(1), + consumerProps, + offsetsTopic + ); + + List actualOffsetSeqnos = new ArrayList<>(); + offsetRecords.forEach(record -> actualOffsetSeqnos.add(parseAndAssertOffsetForSingleTask(record))); + + assertEquals("Committed offsets should match connector-defined transaction boundaries", + expectedOffsetSeqnos, actualOffsetSeqnos.subList(0, expectedOffsetSeqnos.size())); + + List expectedRecordSeqnos = LongStream.range(1, recordsProduced + 1).boxed().collect(Collectors.toList()); + long priorBoundary = 1; + long nextBoundary = 2; + while (priorBoundary < expectedRecordSeqnos.get(expectedRecordSeqnos.size() - 1)) { + if (nextBoundary % 2 == 0) { + for (long i = priorBoundary + 1; i < nextBoundary + 1; i++) { + expectedRecordSeqnos.remove(i); + } + } + nextBoundary += priorBoundary; + priorBoundary = nextBoundary - priorBoundary; + } + List actualRecordSeqnos = parseAndAssertValuesForSingleTask(sourceRecords); + // Have to sort the records by seqno since we produce to multiple partitions and in-order consumption isn't guaranteed + Collections.sort(actualRecordSeqnos); + assertEquals("Committed records should exclude connector-aborted transactions", + expectedRecordSeqnos, actualRecordSeqnos.subList(0, expectedRecordSeqnos.size())); + } + + /** + * Brings up a one-node cluster, then intentionally fences out the transactional producer used by the leader + * for writes to the config topic to simulate a zombie leader being active in the cluster. The leader should + * automatically recover, verify that it is still the leader, and then succeed to create a connector when the + * user resends the request. + */ + @Test + public void testFencedLeaderRecovery() throws Exception { + // Much slower offset commit interval; should never be triggered during this test + workerProps.put(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG, "600000"); + startConnect(); + + String topic = "test-topic"; + connect.kafka().createTopic(topic, 3); + + int numTasks = 1; + int recordsProduced = 100; + + Map props = new HashMap<>(); + props.put(CONNECTOR_CLASS_CONFIG, MonitorableSourceConnector.class.getName()); + props.put(TASKS_MAX_CONFIG, Integer.toString(numTasks)); + props.put(TOPIC_CONFIG, topic); + props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); + props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); + props.put(NAME_CONFIG, CONNECTOR_NAME); + props.put(TRANSACTION_BOUNDARY_CONFIG, POLL.toString()); + props.put(MESSAGES_PER_POLL_CONFIG, Integer.toString(recordsProduced)); + + // expect all records to be consumed and committed by the connector + connectorHandle.expectedRecords(recordsProduced); + connectorHandle.expectedCommits(recordsProduced); + + // make sure the worker is actually up (otherwise, it may fence out our simulated zombie leader, instead of the other way around) + assertEquals(404, connect.requestGet(connect.endpointForResource("connectors/nonexistent")).getStatus()); + + // fence out the leader of the cluster + Producer zombieLeader = transactionalProducer(DistributedConfig.transactionalProducerId(CLUSTER_GROUP_ID)); + zombieLeader.initTransactions(); + zombieLeader.close(); + + // start a source connector--should fail the first time + assertThrows(ConnectRestException.class, () -> connect.configureConnector(CONNECTOR_NAME, props)); + + // the second request should succeed because the leader has reclaimed write privileges for the config topic + connect.configureConnector(CONNECTOR_NAME, props); + + log.info("Waiting for records to be provided to worker by task"); + // wait for the connector tasks to produce enough records + connectorHandle.awaitRecords(SOURCE_TASK_PRODUCE_TIMEOUT_MS); + + log.info("Waiting for records to be committed to Kafka by worker"); + // wait for the connector tasks to commit enough records + connectorHandle.awaitCommits(TimeUnit.MINUTES.toMillis(1)); + + StartAndStopLatch connectorStop = connectorHandle.expectedStops(1, true); + connect.deleteConnector(CONNECTOR_NAME); + assertConnectorStopped(connectorStop); + + // consume all records from the source topic or fail, to ensure that they were correctly produced + ConsumerRecords records = connect.kafka().consumeAll( + CONSUME_RECORDS_TIMEOUT_MS, + Collections.singletonMap(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"), + null, + topic + ); + assertTrue("Not enough records produced by source connector. Expected at least: " + recordsProduced + " + but got " + records.count(), + records.count() >= recordsProduced); + assertExactlyOnceSeqnos(records, numTasks); + } + + /** + * A moderately-complex green-path test that ensures the worker can start up and run tasks for a source + * connector that gets reconfigured, and will fence out potential zombie tasks for older generations before + * bringing up new task instances. + */ + @Test + public void testConnectorReconfiguration() throws Exception { + // Much slower offset commit interval; should never be triggered during this test + workerProps.put(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG, "600000"); + startConnect(); + + String topic = "test-topic"; + connect.kafka().createTopic(topic, 3); + + int recordsProduced = 100; + + Map props = new HashMap<>(); + props.put(CONNECTOR_CLASS_CONFIG, MonitorableSourceConnector.class.getName()); + props.put(TOPIC_CONFIG, topic); + props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); + props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); + props.put(NAME_CONFIG, CONNECTOR_NAME); + props.put(MESSAGES_PER_POLL_CONFIG, Integer.toString(recordsProduced)); + + // expect all records to be consumed and committed by the connector + connectorHandle.expectedRecords(recordsProduced); + connectorHandle.expectedCommits(recordsProduced); + + StartAndStopLatch connectorStart = connectorAndTaskStart(3); + props.put(TASKS_MAX_CONFIG, "3"); + // start a source connector + connect.configureConnector(CONNECTOR_NAME, props); + assertConnectorStarted(connectorStart); + + assertProducersAreFencedOnReconfiguration(3, 5, topic, props); + assertProducersAreFencedOnReconfiguration(5, 1, topic, props); + assertProducersAreFencedOnReconfiguration(1, 5, topic, props); + assertProducersAreFencedOnReconfiguration(5, 3, topic, props); + + // Do a final sanity check to make sure that the last generation of tasks is able to run + log.info("Waiting for records to be provided to worker by task"); + // wait for the connector tasks to produce enough records + connectorHandle.awaitRecords(SOURCE_TASK_PRODUCE_TIMEOUT_MS); + + log.info("Waiting for records to be committed to Kafka by worker"); + // wait for the connector tasks to commit enough records + connectorHandle.awaitCommits(TimeUnit.MINUTES.toMillis(1)); + + StartAndStopLatch connectorStop = connectorHandle.expectedStops(1, true); + connect.deleteConnector(CONNECTOR_NAME); + assertConnectorStopped(connectorStop); + + // consume all records from the source topic or fail, to ensure that they were correctly produced + ConsumerRecords records = connect.kafka().consumeAll( + CONSUME_RECORDS_TIMEOUT_MS, + Collections.singletonMap(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"), + null, + topic + ); + assertTrue("Not enough records produced by source connector. Expected at least: " + recordsProduced + " + but got " + records.count(), + records.count() >= recordsProduced); + // We used at most five tasks during the tests; each of them should have been able to produce records + assertExactlyOnceSeqnos(records, 5); + } + + /** + * This test ensures that tasks are marked failed in the status API when the round of + * zombie fencing that takes place before they are brought up fails. In addition, once + * the issue with the connector config that made fencing impossible is rectified, tasks + * can be successfully restarted. + *

+ * Fencing failures are induced by bringing up an ACL-secured Kafka cluster and creating + * a connector whose principal is not authorized to access the transactional IDs that Connect + * uses for its tasks. + *

+ * When the connector is initially brought up, no fencing is necessary. However, once it is + * reconfigured and generates new task configs, a round of zombie fencing is triggered, + * and all of its tasks fail when that round of zombie fencing fails. + *

+ * After, the connector's principal is granted access to the necessary transactional IDs, + * all of its tasks are restarted, and we verify that they are able to come up successfully + * this time. + */ + @Test + public void testTasksFailOnInabilityToFence() throws Exception { + brokerProps.put("authorizer.class.name", "kafka.security.authorizer.AclAuthorizer"); + brokerProps.put("sasl.enabled.mechanisms", "PLAIN"); + brokerProps.put("sasl.mechanism.inter.broker.protocol", "PLAIN"); + brokerProps.put("security.inter.broker.protocol", "SASL_PLAINTEXT"); + brokerProps.put("listeners", "SASL_PLAINTEXT://localhost:0"); + brokerProps.put("listener.name.sasl_plaintext.plain.sasl.jaas.config", + "org.apache.kafka.common.security.plain.PlainLoginModule required " + + "username=\"super\" " + + "password=\"super_pwd\" " + + "user_connector=\"connector_pwd\" " + + "user_super=\"super_pwd\";"); + brokerProps.put("super.users", "User:super"); + + Map superUserClientConfig = new HashMap<>(); + superUserClientConfig.put("sasl.mechanism", "PLAIN"); + superUserClientConfig.put("security.protocol", "SASL_PLAINTEXT"); + superUserClientConfig.put("sasl.jaas.config", + "org.apache.kafka.common.security.plain.PlainLoginModule required " + + "username=\"super\" " + + "password=\"super_pwd\";"); + // Give the worker super-user privileges + workerProps.putAll(superUserClientConfig); + + final String globalOffsetsTopic = "connect-worker-offsets-topic"; + workerProps.put(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, globalOffsetsTopic); + + startConnect(); + + String topic = "test-topic"; + Admin admin = connect.kafka().createAdminClient(Utils.mkProperties(superUserClientConfig)); + admin.createTopics(Collections.singleton(new NewTopic(topic, 3, (short) 1))).all().get(); + + Map props = new HashMap<>(); + int tasksMax = 2; // Use two tasks since single-task connectors don't require zombie fencing + props.put(CONNECTOR_CLASS_CONFIG, MonitorableSourceConnector.class.getName()); + props.put(TOPIC_CONFIG, topic); + props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); + props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); + props.put(NAME_CONFIG, CONNECTOR_NAME); + props.put(TASKS_MAX_CONFIG, Integer.toString(tasksMax)); + // Give the connectors' consumer and producer super-user privileges + superUserClientConfig.forEach((property, value) -> { + props.put(CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + property, value); + props.put(CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX + property, value); + }); + // But limit its admin client's privileges + props.put(CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX + "sasl.mechanism", "PLAIN"); + props.put(CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX + "security.protocol", "SASL_PLAINTEXT"); + props.put(CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX + "sasl.jaas.config", + "org.apache.kafka.common.security.plain.PlainLoginModule required " + + "username=\"connector\" " + + "password=\"connector_pwd\";"); + // Grant the connector's admin permissions to access the topics for its records and offsets + // Intentionally leave out permissions required for fencing + admin.createAcls(Arrays.asList( + new AclBinding( + new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL), + new AccessControlEntry("User:connector", "*", AclOperation.ALL, AclPermissionType.ALLOW) + ), + new AclBinding( + new ResourcePattern(ResourceType.TOPIC, globalOffsetsTopic, PatternType.LITERAL), + new AccessControlEntry("User:connector", "*", AclOperation.ALL, AclPermissionType.ALLOW) + ) + )).all().get(); + + StartAndStopLatch connectorStart = connectorAndTaskStart(tasksMax); + + log.info("Bringing up connector with fresh slate; fencing should not be necessary"); + connect.configureConnector(CONNECTOR_NAME, props); + assertConnectorStarted(connectorStart); + // Verify that the connector and its tasks have been able to start successfully + connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, tasksMax, "Connector and task should have started successfully"); + + log.info("Reconfiguring connector; fencing should be necessary, and tasks should fail to start"); + props.put("message.in.a.bottle", "19e184427ac45bd34c8588a4e771aa1a"); + connect.configureConnector(CONNECTOR_NAME, props); + + // Verify that the task has failed, and that the failure is visible to users via the REST API + connect.assertions().assertConnectorIsRunningAndTasksHaveFailed(CONNECTOR_NAME, tasksMax, "Task should have failed on startup"); + + // Now grant the necessary permissions for fencing to the connector's admin + admin.createAcls(Arrays.asList( + new AclBinding( + new ResourcePattern(ResourceType.TRANSACTIONAL_ID, Worker.taskTransactionalId(CLUSTER_GROUP_ID, CONNECTOR_NAME, 0), PatternType.LITERAL), + new AccessControlEntry("User:connector", "*", AclOperation.ALL, AclPermissionType.ALLOW) + ), + new AclBinding( + new ResourcePattern(ResourceType.TRANSACTIONAL_ID, Worker.taskTransactionalId(CLUSTER_GROUP_ID, CONNECTOR_NAME, 1), PatternType.LITERAL), + new AccessControlEntry("User:connector", "*", AclOperation.ALL, AclPermissionType.ALLOW) + ) + )); + + log.info("Restarting connector after tweaking its ACLs; fencing should succeed this time"); + connect.restartConnectorAndTasks(CONNECTOR_NAME, false, true, false); + // Verify that the connector and its tasks have been able to restart successfully + connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, tasksMax, "Connector and task should have restarted successfully"); + } + + @Test + public void testSeparateOffsetsTopic() throws Exception { + final String globalOffsetsTopic = "connect-worker-offsets-topic"; + workerProps.put(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, globalOffsetsTopic); + + startConnect(); + EmbeddedKafkaCluster connectorTargetedCluster = new EmbeddedKafkaCluster(1, brokerProps); + try (Closeable clusterShutdown = connectorTargetedCluster::stop) { + connectorTargetedCluster.start(); + String topic = "test-topic"; + connectorTargetedCluster.createTopic(topic, 3); + + int numTasks = 1; + int recordsProduced = 100; + + Map props = new HashMap<>(); + props.put(CONNECTOR_CLASS_CONFIG, MonitorableSourceConnector.class.getName()); + props.put(TASKS_MAX_CONFIG, Integer.toString(numTasks)); + props.put(TOPIC_CONFIG, topic); + props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); + props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); + props.put(NAME_CONFIG, CONNECTOR_NAME); + props.put(TRANSACTION_BOUNDARY_CONFIG, POLL.toString()); + props.put(MESSAGES_PER_POLL_CONFIG, Integer.toString(recordsProduced)); + props.put(CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX + BOOTSTRAP_SERVERS_CONFIG, connectorTargetedCluster.bootstrapServers()); + props.put(CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + BOOTSTRAP_SERVERS_CONFIG, connectorTargetedCluster.bootstrapServers()); + props.put(CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX + BOOTSTRAP_SERVERS_CONFIG, connectorTargetedCluster.bootstrapServers()); + String offsetsTopic = CONNECTOR_NAME + "-offsets"; + props.put(OFFSETS_TOPIC_CONFIG, offsetsTopic); + + // expect all records to be consumed and committed by the connector + connectorHandle.expectedRecords(recordsProduced); + connectorHandle.expectedCommits(recordsProduced); + + // start a source connector + connect.configureConnector(CONNECTOR_NAME, props); + + log.info("Waiting for records to be provided to worker by task"); + // wait for the connector tasks to produce enough records + connectorHandle.awaitRecords(SOURCE_TASK_PRODUCE_TIMEOUT_MS); + + log.info("Waiting for records to be committed to Kafka by worker"); + // wait for the connector tasks to commit enough records + connectorHandle.awaitCommits(TimeUnit.MINUTES.toMillis(1)); + + // consume all records from the source topic or fail, to ensure that they were correctly produced + int recordNum = connectorTargetedCluster + .consume( + recordsProduced, + TimeUnit.MINUTES.toMillis(1), + Collections.singletonMap(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"), + "test-topic") + .count(); + assertTrue("Not enough records produced by source connector. Expected at least: " + recordsProduced + " + but got " + recordNum, + recordNum >= recordsProduced); + + // also consume from the connector's dedicated offsets topic; just need to read one offset record + ConsumerRecord offsetRecord = connectorTargetedCluster + .consume( + 1, + TimeUnit.MINUTES.toMillis(1), + Collections.singletonMap(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"), + offsetsTopic + ).iterator().next(); + long seqno = parseAndAssertOffsetForSingleTask(offsetRecord); + assertEquals("Offset commits should occur on connector-defined poll boundaries, which happen every " + recordsProduced + " records", + 0, seqno % recordsProduced); + + // also consume from the cluster's global offsets topic; again, just need to read one offset record + offsetRecord = connect.kafka() + .consume( + 1, + TimeUnit.MINUTES.toMillis(1), + globalOffsetsTopic + ).iterator().next(); + seqno = parseAndAssertOffsetForSingleTask(offsetRecord); + assertEquals("Offset commits should occur on connector-defined poll boundaries, which happen every " + recordsProduced + " records", + 0, seqno % recordsProduced); + + // Shut down the whole cluster + connect.workers().forEach(connect::removeWorker); + // Reconfigure the cluster with exactly-once support disabled + workerProps.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "disabled"); + + // Establish new expectations for records+offsets + connectorHandle.expectedRecords(recordsProduced); + connectorHandle.expectedCommits(recordsProduced); + + // Restart the whole cluster + for (int i = 0; i < DEFAULT_NUM_WORKERS; i++) { + connect.addWorker(); + } + + // And perform a basic sanity check that the cluster is able to come back up, our connector and its task are able to resume running, + // and the task is still able to produce source records and commit offsets + connect.assertions().assertAtLeastNumWorkersAreUp(DEFAULT_NUM_WORKERS, "cluster did not restart in time"); + connect.assertions().assertConnectorAndExactlyNumTasksAreRunning( + CONNECTOR_NAME, + 1, + "connector and tasks did not resume running after cluster restart in time" + ); + + log.info("Waiting for records to be provided to worker by task"); + // wait for the connector tasks to produce enough records + connectorHandle.awaitRecords(SOURCE_TASK_PRODUCE_TIMEOUT_MS); + + log.info("Waiting for records to be committed to Kafka by worker"); + // wait for the connector tasks to commit enough records + connectorHandle.awaitCommits(TimeUnit.MINUTES.toMillis(1)); + + StartAndStopLatch connectorStop = connectorHandle.expectedStops(1, true); + connect.deleteConnector(CONNECTOR_NAME); + assertConnectorStopped(connectorStop); + + // consume all records from the source topic or fail, to ensure that they were correctly produced + ConsumerRecords records = connectorTargetedCluster.consumeAll( + CONSUME_RECORDS_TIMEOUT_MS, + Collections.singletonMap(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"), + null, + topic + ); + assertTrue("Not enough records produced by source connector. Expected at least: " + recordsProduced + " + but got " + records.count(), + records.count() >= recordsProduced); + assertExactlyOnceSeqnos(records, numTasks); + } + } + + /** + * A simple test to ensure that source tasks fail when trying to produce to their own offsets topic. + *

+ * We fail the tasks in order to prevent deadlock that occurs when: + *

    + *
  1. + * A task provides a record whose topic is the task's offsets topic + *
  2. + *
  3. + * That record is dispatched to the task's producer in a transaction that remains open + * at least until the worker polls the task again + *
  4. + *
  5. + * In the subsequent call to SourceTask::poll, the task requests offsets from the worker + * (which requires a read to the end of the offsets topic, and will block until any open + * transactions on the topic are either committed or aborted) + *
  6. + *
+ */ + @Test + public void testPotentialDeadlockWhenProducingToOffsetsTopic() throws Exception { + connectBuilder.numWorkers(1); + startConnect(); + + String topic = "test-topic"; + connect.kafka().createTopic(topic, 3); + + int recordsProduced = 100; + + Map props = new HashMap<>(); + // See below; this connector does nothing except request offsets from the worker in SourceTask::poll + // and then return a single record targeted at its offsets topic + props.put(CONNECTOR_CLASS_CONFIG, NaughtyConnector.class.getName()); + props.put(TASKS_MAX_CONFIG, "1"); + props.put(NAME_CONFIG, CONNECTOR_NAME); + props.put(TRANSACTION_BOUNDARY_CONFIG, INTERVAL.toString()); + props.put(MESSAGES_PER_POLL_CONFIG, Integer.toString(recordsProduced)); + props.put(OFFSETS_TOPIC_CONFIG, "whoops"); + + // start a source connector + connect.configureConnector(CONNECTOR_NAME, props); + + connect.assertions().assertConnectorIsRunningAndTasksHaveFailed( + CONNECTOR_NAME, 1, "Task should have failed after trying to produce to its own offsets topic"); + } + + private ConfigInfo findConfigInfo(String property, ConfigInfos validationResult) { + return validationResult.values().stream() + .filter(info -> property.equals(info.configKey().name())) + .findAny() + .orElseThrow(() -> new AssertionError("Failed to find configuration validation result for property '" + property + "'")); + } + + @SuppressWarnings("unchecked") + private long parseAndAssertOffsetForSingleTask(ConsumerRecord offsetRecord) { + JsonConverter offsetsConverter = new JsonConverter(); + // The JSON converter behaves identically for keys and values. If that ever changes, we may need to update this test to use + // separate converter instances. + + offsetsConverter.configure(Collections.singletonMap(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, "false"), false); + Object keyObject = offsetsConverter.toConnectData("topic name is not used by converter", offsetRecord.key()).value(); + Object valueObject = offsetsConverter.toConnectData("topic name is not used by converter", offsetRecord.value()).value(); + + assertNotNull("Offset value should not be null", valueObject); + + assertEquals("Serialized source partition should match expected format", + Arrays.asList(CONNECTOR_NAME, MonitorableSourceConnector.sourcePartition(MonitorableSourceConnector.taskId(CONNECTOR_NAME, 0))), + keyObject); + + Map value = assertAndCast(valueObject, Map.class, "Value"); + + Object seqnoObject = value.get("saved"); + assertNotNull("Serialized source offset should contain 'seqno' field from MonitorableSourceConnector", seqnoObject); + return assertAndCast(seqnoObject, Long.class, "Seqno offset field"); + } + + private List parseAndAssertValuesForSingleTask(ConsumerRecords sourceRecords) { + Map> parsedValues = parseValuesForTasks(sourceRecords); + assertEquals("Expected records to only be produced from a single task", Collections.singleton(0), parsedValues.keySet()); + return parsedValues.get(0); + } + + private void assertExactlyOnceSeqnos(ConsumerRecords sourceRecords, int numTasks) { + Map> parsedValues = parseValuesForTasks(sourceRecords); + Set expectedKeys = IntStream.range(0, numTasks).boxed().collect(Collectors.toSet()); + assertEquals("Expected records to be produced by each task", expectedKeys, parsedValues.keySet()); + + parsedValues.forEach((taskId, seqnos) -> { + // We don't check for order here because the records may have been produced to multiple topic partitions, + // which makes in-order consumption impossible + Set expectedSeqnos = LongStream.range(1, seqnos.size() + 1).boxed().collect(Collectors.toSet()); + Set actualSeqnos = new HashSet<>(seqnos); + assertEquals( + "Seqnos for task " + taskId + " should start at 1 and increase strictly by 1 with each record", + expectedSeqnos, + actualSeqnos + ); + }); + } + + private Map> parseValuesForTasks(ConsumerRecords sourceRecords) { + Map> result = new HashMap<>(); + for (ConsumerRecord sourceRecord : sourceRecords) { + assertNotNull("Record key should not be null", sourceRecord.key()); + assertNotNull("Record value should not be null", sourceRecord.value()); + + String key = new String(sourceRecord.key()); + String value = new String(sourceRecord.value()); + + String keyPrefix = "key-"; + String valuePrefix = "value-"; + + assertTrue("Key should start with \"" + keyPrefix + "\"", key.startsWith(keyPrefix)); + assertTrue("Value should start with \"" + valuePrefix + "\"", value.startsWith(valuePrefix)); + assertEquals( + "key and value should be identical after prefix", + key.substring(keyPrefix.length()), + value.substring(valuePrefix.length()) + ); + + String[] split = key.substring(keyPrefix.length()).split("-"); + assertEquals("Key should match pattern 'key---", 3, split.length); + assertEquals("Key should match pattern 'key---", CONNECTOR_NAME, split[0]); + + int taskId; + try { + taskId = Integer.parseInt(split[1], 10); + } catch (NumberFormatException e) { + throw new AssertionError("Task ID in key should be an integer, was '" + split[1] + "'", e); + } + + long seqno; + try { + seqno = Long.parseLong(split[2], 10); + } catch (NumberFormatException e) { + throw new AssertionError("Seqno in key should be a long, was '" + split[2] + "'", e); + } + + result.computeIfAbsent(taskId, t -> new ArrayList<>()).add(seqno); + } + return result; + } + + @SuppressWarnings("unchecked") + private static T assertAndCast(Object o, Class klass, String objectDescription) { + String className = o == null ? "null" : o.getClass().getName(); + assertTrue(objectDescription + " should be " + klass.getName() + "; was " + className + " instead", klass.isInstance(o)); + return (T) o; + } + + /** + * Clear all existing task handles for the connector, then preemptively create {@code numTasks} many task handles for it, + * and return a {@link StartAndStopLatch} that can be used to {@link StartAndStopLatch#await(long, TimeUnit) await} + * the startup of that connector and the expected number of tasks. + * @param numTasks the number of tasks that should be started + * @return a {@link StartAndStopLatch} that will block until the connector and the expected number of tasks have started + */ + private StartAndStopLatch connectorAndTaskStart(int numTasks) { + connectorHandle.clearTasks(); + IntStream.range(0, numTasks) + .mapToObj(i -> MonitorableSourceConnector.taskId(CONNECTOR_NAME, i)) + .forEach(connectorHandle::taskHandle); + return connectorHandle.expectedStarts(1, true); + } + + private void assertConnectorStarted(StartAndStopLatch connectorStart) throws InterruptedException { + assertTrue("Connector and tasks did not finish startup in time", + connectorStart.await( + EmbeddedConnectClusterAssertions.CONNECTOR_SETUP_DURATION_MS, + TimeUnit.MILLISECONDS + ) + ); + } + + private void assertConnectorStopped(StartAndStopLatch connectorStop) throws InterruptedException { + assertTrue( + "Connector and tasks did not finish shutdown in time", + connectorStop.await( + EmbeddedConnectClusterAssertions.CONNECTOR_SHUTDOWN_DURATION_MS, + TimeUnit.MILLISECONDS + ) + ); + } + + private void assertProducersAreFencedOnReconfiguration( + int currentNumTasks, + int newNumTasks, + String topic, + Map baseConnectorProps) throws InterruptedException { + + // create a collection of producers that simulate the producers used for the existing tasks + List> producers = IntStream.range(0, currentNumTasks) + .mapToObj(i -> Worker.taskTransactionalId(CLUSTER_GROUP_ID, CONNECTOR_NAME, i)) + .map(this::transactionalProducer) + .collect(Collectors.toList()); + + producers.forEach(KafkaProducer::initTransactions); + + // reconfigure the connector with a new number of tasks + StartAndStopLatch connectorStart = connectorAndTaskStart(newNumTasks); + baseConnectorProps.put(TASKS_MAX_CONFIG, Integer.toString(newNumTasks)); + log.info("Reconfiguring connector from {} tasks to {}", currentNumTasks, newNumTasks); + connect.configureConnector(CONNECTOR_NAME, baseConnectorProps); + assertConnectorStarted(connectorStart); + + // validate that the old producers were fenced out + producers.forEach(producer -> assertTransactionalProducerIsFenced(producer, topic)); + } + + private KafkaProducer transactionalProducer(String transactionalId) { + Map transactionalProducerProps = new HashMap<>(); + transactionalProducerProps.put(ENABLE_IDEMPOTENCE_CONFIG, true); + transactionalProducerProps.put(TRANSACTIONAL_ID_CONFIG, transactionalId); + return connect.kafka().createProducer(transactionalProducerProps); + } + + private void assertTransactionalProducerIsFenced(KafkaProducer producer, String topic) { + producer.beginTransaction(); + assertThrows("Producer should be fenced out", + ProducerFencedException.class, + () -> { + producer.send(new ProducerRecord<>(topic, new byte[] {69}, new byte[] {96})); + producer.commitTransaction(); + } + ); + producer.close(Duration.ZERO); + } + + public static class NaughtyConnector extends SourceConnector { + private Map props; + + @Override + public void start(Map props) { + this.props = props; + } + + @Override + public Class taskClass() { + return NaughtyTask.class; + } + + @Override + public List> taskConfigs(int maxTasks) { + return IntStream.range(0, maxTasks).mapToObj(i -> props).collect(Collectors.toList()); + } + + @Override + public void stop() { + } + + @Override + public ConfigDef config() { + return new ConfigDef(); + } + + @Override + public String version() { + return "none"; + } + } + + public static class NaughtyTask extends SourceTask { + private String topic; + + @Override + public void start(Map props) { + if (!props.containsKey(OFFSETS_TOPIC_CONFIG)) { + throw new ConnectException("No offsets topic"); + } + this.topic = props.get(OFFSETS_TOPIC_CONFIG); + } + + @Override + public List poll() { + // Request a read to the end of the offsets topic + context.offsetStorageReader().offset(Collections.singletonMap("", null)); + // Produce a record to the offsets topic + return Collections.singletonList(new SourceRecord(null, null, topic, null, "", null, null)); + } + + @Override + public void stop() { + } + + @Override + public String version() { + return "none"; + } + } +} diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java index 4f13ad08a2d76..c2820315d6b82 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java @@ -20,8 +20,11 @@ import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.header.ConnectHeaders; import org.apache.kafka.connect.runtime.SampleSourceConnector; +import org.apache.kafka.connect.source.ConnectorTransactionBoundaries; +import org.apache.kafka.connect.source.ExactlyOnceSupport; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTask; import org.apache.kafka.tools.ThroughputThrottler; @@ -32,6 +35,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; @@ -47,6 +51,20 @@ public class MonitorableSourceConnector extends SampleSourceConnector { private static final Logger log = LoggerFactory.getLogger(MonitorableSourceConnector.class); public static final String TOPIC_CONFIG = "topic"; + public static final String MESSAGES_PER_POLL_CONFIG = "messages.per.poll"; + + public static final String CUSTOM_EXACTLY_ONCE_SUPPORT_CONFIG = "custom.exactly.once.support"; + public static final String EXACTLY_ONCE_SUPPORTED = "supported"; + public static final String EXACTLY_ONCE_UNSUPPORTED = "unsupported"; + public static final String EXACTLY_ONCE_NULL = "null"; + public static final String EXACTLY_ONCE_FAIL = "fail"; + + public static final String CUSTOM_TRANSACTION_BOUNDARIES_CONFIG = "custom.transaction.boundaries"; + public static final String TRANSACTION_BOUNDARIES_SUPPORTED = "supported"; + public static final String TRANSACTION_BOUNDARIES_UNSUPPORTED = "unsupported"; + public static final String TRANSACTION_BOUNDARIES_NULL = "null"; + public static final String TRANSACTION_BOUNDARIES_FAIL = "fail"; + private String connectorName; private ConnectorHandle connectorHandle; private Map commonConfigs; @@ -74,7 +92,7 @@ public List> taskConfigs(int maxTasks) { for (int i = 0; i < maxTasks; i++) { Map config = new HashMap<>(commonConfigs); config.put("connector.name", connectorName); - config.put("task.id", connectorName + "-" + i); + config.put("task.id", taskId(connectorName, i)); configs.add(config); } return configs; @@ -92,18 +110,55 @@ public ConfigDef config() { return new ConfigDef(); } + @Override + public ExactlyOnceSupport exactlyOnceSupport(Map connectorConfig) { + String supportLevel = connectorConfig.getOrDefault(CUSTOM_EXACTLY_ONCE_SUPPORT_CONFIG, "null").toLowerCase(Locale.ROOT); + switch (supportLevel) { + case EXACTLY_ONCE_SUPPORTED: + return ExactlyOnceSupport.SUPPORTED; + case EXACTLY_ONCE_UNSUPPORTED: + return ExactlyOnceSupport.UNSUPPORTED; + case EXACTLY_ONCE_FAIL: + throw new ConnectException("oops"); + default: + case EXACTLY_ONCE_NULL: + return null; + } + } + + @Override + public ConnectorTransactionBoundaries canDefineTransactionBoundaries(Map connectorConfig) { + String supportLevel = connectorConfig.getOrDefault(CUSTOM_TRANSACTION_BOUNDARIES_CONFIG, TRANSACTION_BOUNDARIES_UNSUPPORTED).toLowerCase(Locale.ROOT); + switch (supportLevel) { + case TRANSACTION_BOUNDARIES_SUPPORTED: + return ConnectorTransactionBoundaries.SUPPORTED; + case TRANSACTION_BOUNDARIES_FAIL: + throw new ConnectException("oh no :("); + case TRANSACTION_BOUNDARIES_NULL: + return null; + default: + case TRANSACTION_BOUNDARIES_UNSUPPORTED: + return ConnectorTransactionBoundaries.UNSUPPORTED; + } + } + + public static String taskId(String connectorName, int taskId) { + return connectorName + "-" + taskId; + } + public static class MonitorableSourceTask extends SourceTask { - private String connectorName; private String taskId; private String topicName; private TaskHandle taskHandle; private volatile boolean stopped; private long startingSeqno; private long seqno; - private long throughput; private int batchSize; private ThroughputThrottler throttler; + private long priorTransactionBoundary; + private long nextTransactionBoundary; + @Override public String version() { return "unknown"; @@ -112,21 +167,24 @@ public String version() { @Override public void start(Map props) { taskId = props.get("task.id"); - connectorName = props.get("connector.name"); + String connectorName = props.get("connector.name"); topicName = props.getOrDefault(TOPIC_CONFIG, "sequential-topic"); - throughput = Long.parseLong(props.getOrDefault("throughput", "-1")); - batchSize = Integer.parseInt(props.getOrDefault("messages.per.poll", "1")); + batchSize = Integer.parseInt(props.getOrDefault(MESSAGES_PER_POLL_CONFIG, "1")); taskHandle = RuntimeHandles.get().connectorHandle(connectorName).taskHandle(taskId); Map offset = Optional.ofNullable( context.offsetStorageReader().offset(Collections.singletonMap("task.id", taskId))) .orElse(Collections.emptyMap()); startingSeqno = Optional.ofNullable((Long) offset.get("saved")).orElse(0L); + seqno = startingSeqno; log.info("Started {} task {} with properties {}", this.getClass().getSimpleName(), taskId, props); - throttler = new ThroughputThrottler(throughput, System.currentTimeMillis()); + throttler = new ThroughputThrottler(Long.parseLong(props.getOrDefault("throughput", "-1")), System.currentTimeMillis()); taskHandle.recordTaskStart(); + priorTransactionBoundary = 0; + nextTransactionBoundary = 1; if (Boolean.parseBoolean(props.getOrDefault("task-" + taskId + ".start.inject.error", "false"))) { throw new RuntimeException("Injecting errors during task start"); } + calculateNextBoundary(); } @Override @@ -136,19 +194,24 @@ public List poll() { throttler.throttle(); } taskHandle.record(batchSize); - log.info("Returning batch of {} records", batchSize); + log.trace("Returning batch of {} records", batchSize); return LongStream.range(0, batchSize) - .mapToObj(i -> new SourceRecord( - Collections.singletonMap("task.id", taskId), - Collections.singletonMap("saved", ++seqno), - topicName, - null, - Schema.STRING_SCHEMA, - "key-" + taskId + "-" + seqno, - Schema.STRING_SCHEMA, - "value-" + taskId + "-" + seqno, - null, - new ConnectHeaders().addLong("header-" + seqno, seqno))) + .mapToObj(i -> { + seqno++; + SourceRecord record = new SourceRecord( + sourcePartition(taskId), + sourceOffset(seqno), + topicName, + null, + Schema.STRING_SCHEMA, + "key-" + taskId + "-" + seqno, + Schema.STRING_SCHEMA, + "value-" + taskId + "-" + seqno, + null, + new ConnectHeaders().addLong("header-" + seqno, seqno)); + maybeDefineTransactionBoundary(record); + return record; + }) .collect(Collectors.toList()); } return null; @@ -172,5 +235,43 @@ public void stop() { stopped = true; taskHandle.recordTaskStop(); } + + /** + * Calculate the next transaction boundary, i.e., the seqno whose corresponding source record should be used to + * either {@link org.apache.kafka.connect.source.TransactionContext#commitTransaction(SourceRecord) commit} + * or {@link org.apache.kafka.connect.source.TransactionContext#abortTransaction(SourceRecord) abort} the next transaction. + *

+ * This connector defines transactions whose size correspond to successive elements of the Fibonacci sequence, + * where transactions with an even number of records are aborted, and those with an odd number of records are committed. + */ + private void calculateNextBoundary() { + while (nextTransactionBoundary <= seqno) { + nextTransactionBoundary += priorTransactionBoundary; + priorTransactionBoundary = nextTransactionBoundary - priorTransactionBoundary; + } + } + + private void maybeDefineTransactionBoundary(SourceRecord record) { + if (context.transactionContext() == null || seqno != nextTransactionBoundary) { + return; + } + // If the transaction boundary ends on an even-numbered offset, abort it + // Otherwise, commit + boolean abort = nextTransactionBoundary % 2 == 0; + calculateNextBoundary(); + if (abort) { + context.transactionContext().abortTransaction(record); + } else { + context.transactionContext().commitTransaction(record); + } + } + } + + public static Map sourcePartition(String taskId) { + return Collections.singletonMap("task.id", taskId); + } + + public static Map sourceOffset(long seqno) { + return Collections.singletonMap("saved", seqno); } } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java index 44b12eb6e97bb..c026cb72903da 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java @@ -47,6 +47,9 @@ public class EmbeddedConnectClusterAssertions { public static final long WORKER_SETUP_DURATION_MS = TimeUnit.MINUTES.toMillis(5); public static final long VALIDATION_DURATION_MS = TimeUnit.SECONDS.toMillis(30); public static final long CONNECTOR_SETUP_DURATION_MS = TimeUnit.MINUTES.toMillis(2); + // Creating a connector requires two rounds of rebalance; destroying one only requires one + // Assume it'll take ~half the time to destroy a connector as it does to create one + public static final long CONNECTOR_SHUTDOWN_DURATION_MS = TimeUnit.MINUTES.toMillis(1); private static final long CONNECT_INTERNAL_TOPIC_UPDATES_DURATION_MS = TimeUnit.SECONDS.toMillis(60); private final EmbeddedConnectCluster connect; diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java index f1a63a4615caa..5bbbc684c2ecb 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java @@ -26,14 +26,18 @@ import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.DescribeTopicsResult; +import org.apache.kafka.clients.admin.ListOffsetsOptions; import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.admin.OffsetSpec; import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.TopicPartition; @@ -45,6 +49,7 @@ import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.metadata.BrokerState; import org.slf4j.Logger; @@ -55,9 +60,11 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; @@ -66,6 +73,8 @@ import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -75,6 +84,9 @@ import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG; import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; +import static org.junit.Assert.assertFalse; /** * Setup an embedded Kafka cluster with specified number of brokers and specified broker properties. To be used for @@ -439,9 +451,23 @@ public Admin createAdminClient() { * @return a {@link ConsumerRecords} collection containing at least n records. */ public ConsumerRecords consume(int n, long maxDuration, String... topics) { + return consume(n, maxDuration, Collections.emptyMap(), topics); + } + + /** + * Consume at least n records in a given duration or throw an exception. + * + * @param n the number of expected records in this topic. + * @param maxDuration the max duration to wait for these records (in milliseconds). + * @param topics the topics to subscribe and consume records from. + * @param consumerProps overrides to the default properties the consumer is constructed with; + * may not be null + * @return a {@link ConsumerRecords} collection containing at least n records. + */ + public ConsumerRecords consume(int n, long maxDuration, Map consumerProps, String... topics) { Map>> records = new HashMap<>(); int consumedRecords = 0; - try (KafkaConsumer consumer = createConsumerAndSubscribeTo(Collections.emptyMap(), topics)) { + try (KafkaConsumer consumer = createConsumerAndSubscribeTo(consumerProps, topics)) { final long startMillis = System.currentTimeMillis(); long allowedDuration = maxDuration; while (allowedDuration > 0) { @@ -466,6 +492,108 @@ public ConsumerRecords consume(int n, long maxDuration, String.. throw new RuntimeException("Could not find enough records. found " + consumedRecords + ", expected " + n); } + /** + * Consume all currently-available records for the specified topics in a given duration, or throw an exception. + * @param maxDurationMs the max duration to wait for these records (in milliseconds). + * @param consumerProps overrides to the default properties the consumer is constructed with; may be null + * @param adminProps overrides to the default properties the admin used to query Kafka cluster metadata is constructed with; may be null + * @param topics the topics to consume from + * @return a {@link ConsumerRecords} collection containing the records for all partitions of the given topics + */ + public ConsumerRecords consumeAll( + long maxDurationMs, + Map consumerProps, + Map adminProps, + String... topics + ) throws TimeoutException, InterruptedException, ExecutionException { + long endTimeMs = System.currentTimeMillis() + maxDurationMs; + + Consumer consumer = createConsumer(consumerProps != null ? consumerProps : Collections.emptyMap()); + Admin admin = createAdminClient(Utils.mkObjectProperties(adminProps != null ? adminProps : Collections.emptyMap())); + + long remainingTimeMs = endTimeMs - System.currentTimeMillis(); + Set topicPartitions = listPartitions(remainingTimeMs, admin, Arrays.asList(topics)); + + remainingTimeMs = endTimeMs - System.currentTimeMillis(); + Map endOffsets = readEndOffsets(remainingTimeMs, admin, topicPartitions); + + Map>> records = topicPartitions.stream() + .collect(Collectors.toMap( + Function.identity(), + tp -> new ArrayList<>() + )); + consumer.assign(topicPartitions); + + while (!endOffsets.isEmpty()) { + Iterator> it = endOffsets.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry entry = it.next(); + TopicPartition topicPartition = entry.getKey(); + long endOffset = entry.getValue(); + long lastConsumedOffset = consumer.position(topicPartition); + if (lastConsumedOffset >= endOffset) { + // We've reached the end offset for the topic partition; can stop polling it now + it.remove(); + } else { + remainingTimeMs = endTimeMs - System.currentTimeMillis(); + if (remainingTimeMs <= 0) { + throw new AssertionError("failed to read to end of topic(s) " + Arrays.asList(topics) + " within " + maxDurationMs + "ms"); + } + // We haven't reached the end offset yet; need to keep polling + ConsumerRecords recordBatch = consumer.poll(Duration.ofMillis(remainingTimeMs)); + recordBatch.partitions().forEach(tp -> records.get(tp) + .addAll(recordBatch.records(tp)) + ); + } + } + } + + return new ConsumerRecords<>(records); + } + + /** + * List all the known partitions for the given {@link Collection} of topics + * @param maxDurationMs the max duration to wait for while fetching metadata from Kafka (in milliseconds). + * @param admin the admin client to use for fetching metadata from the Kafka cluster + * @param topics the topics whose partitions should be listed + * @return a {@link Set} of {@link TopicPartition topic partitions} for the given topics; never null, and never empty + */ + private Set listPartitions( + long maxDurationMs, + Admin admin, + Collection topics + ) throws TimeoutException, InterruptedException, ExecutionException { + assertFalse("collection of topics may not be empty", topics.isEmpty()); + return admin.describeTopics(topics) + .allTopicNames().get(maxDurationMs, TimeUnit.MILLISECONDS) + .entrySet().stream() + .flatMap(e -> e.getValue().partitions().stream().map(p -> new TopicPartition(e.getKey(), p.partition()))) + .collect(Collectors.toSet()); + } + + /** + * List the latest current offsets for the given {@link Collection} of {@link TopicPartition topic partitions} + * @param maxDurationMs the max duration to wait for while fetching metadata from Kafka (in milliseconds) + * @param admin the admin client to use for fetching metadata from the Kafka cluster + * @param topicPartitions the topic partitions to list end offsets for + * @return a {@link Map} containing the latest offset for each requested {@link TopicPartition topic partition}; never null, and never empty + */ + private Map readEndOffsets( + long maxDurationMs, + Admin admin, + Collection topicPartitions + ) throws TimeoutException, InterruptedException, ExecutionException { + assertFalse("collection of topic partitions may not be empty", topicPartitions.isEmpty()); + Map offsetSpecMap = topicPartitions.stream().collect(Collectors.toMap(Function.identity(), tp -> OffsetSpec.latest())); + return admin.listOffsets(offsetSpecMap, new ListOffsetsOptions(IsolationLevel.READ_UNCOMMITTED)) + .all().get(maxDurationMs, TimeUnit.MILLISECONDS) + .entrySet().stream() + .collect(Collectors.toMap( + Map.Entry::getKey, + e -> e.getValue().offset() + )); + } + public KafkaConsumer createConsumer(Map consumerProps) { Map props = new HashMap<>(consumerProps); @@ -495,6 +623,26 @@ public KafkaConsumer createConsumerAndSubscribeTo(Map createProducer(Map producerProps) { + Map props = new HashMap<>(producerProps); + + putIfAbsent(props, BOOTSTRAP_SERVERS_CONFIG, bootstrapServers()); + putIfAbsent(props, KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); + putIfAbsent(props, VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); + if (sslEnabled()) { + putIfAbsent(props, SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, brokerConfig.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG)); + putIfAbsent(props, SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, brokerConfig.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)); + putIfAbsent(props, CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); + } + KafkaProducer producer; + try { + producer = new KafkaProducer<>(props); + } catch (Throwable t) { + throw new ConnectException("Failed to create producer", t); + } + return producer; + } + private static void putIfAbsent(final Map props, final String propertyKey, final Object propertyValue) { if (!props.containsKey(propertyKey)) { props.put(propertyKey, propertyValue);