diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java index cce8ab28b..e14a36d9f 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java @@ -96,6 +96,7 @@ public class DynamicKafkaSourceEnumerator private int kafkaMetadataServiceDiscoveryFailureCount; private Map> latestClusterTopicsMap; private Set latestKafkaStreams; + private boolean firstDiscoveryComplete; public DynamicKafkaSourceEnumerator( KafkaStreamSubscriber kafkaStreamSubscriber, @@ -151,6 +152,7 @@ public DynamicKafkaSourceEnumerator( DynamicKafkaSourceOptions.STREAM_METADATA_DISCOVERY_FAILURE_THRESHOLD, Integer::parseInt); this.kafkaMetadataServiceDiscoveryFailureCount = 0; + this.firstDiscoveryComplete = false; this.kafkaMetadataService = kafkaMetadataService; this.stoppableKafkaEnumContextProxyFactory = stoppableKafkaEnumContextProxyFactory; @@ -212,32 +214,27 @@ public void start() { private void handleNoMoreSplits() { if (Boundedness.BOUNDED.equals(boundedness)) { - enumContext.runInCoordinatorThread( - () -> { - boolean allEnumeratorsHaveSignalledNoMoreSplits = true; - for (StoppableKafkaEnumContextProxy context : - clusterEnumContextMap.values()) { - allEnumeratorsHaveSignalledNoMoreSplits = - allEnumeratorsHaveSignalledNoMoreSplits - && context.isNoMoreSplits(); - } - - if (allEnumeratorsHaveSignalledNoMoreSplits) { - logger.info( - "Signal no more splits to all readers: {}", - enumContext.registeredReaders().keySet()); - enumContext - .registeredReaders() - .keySet() - .forEach(enumContext::signalNoMoreSplits); - } - }); + boolean allEnumeratorsHaveSignalledNoMoreSplits = true; + for (StoppableKafkaEnumContextProxy context : clusterEnumContextMap.values()) { + allEnumeratorsHaveSignalledNoMoreSplits = + allEnumeratorsHaveSignalledNoMoreSplits && context.isNoMoreSplits(); + } + + if (firstDiscoveryComplete && allEnumeratorsHaveSignalledNoMoreSplits) { + logger.info( + "Signal no more splits to all readers: {}", + enumContext.registeredReaders().keySet()); + enumContext.registeredReaders().keySet().forEach(enumContext::signalNoMoreSplits); + } else { + logger.info("Not ready to notify no more splits to readers."); + } } } // --------------- private methods for metadata discovery --------------- private void onHandleSubscribedStreamsFetch(Set fetchedKafkaStreams, Throwable t) { + firstDiscoveryComplete = true; Set handledFetchKafkaStreams = handleFetchSubscribedStreamsError(fetchedKafkaStreams, t); @@ -370,9 +367,19 @@ private KafkaSourceEnumerator createEnumeratorWithAssignedTopicPartitions( Set topics, KafkaSourceEnumState kafkaSourceEnumState, Properties fetchedProperties) { + final Runnable signalNoMoreSplitsCallback; + if (Boundedness.BOUNDED.equals(boundedness)) { + signalNoMoreSplitsCallback = this::handleNoMoreSplits; + } else { + signalNoMoreSplitsCallback = null; + } + StoppableKafkaEnumContextProxy context = stoppableKafkaEnumContextProxyFactory.create( - enumContext, kafkaClusterId, kafkaMetadataService); + enumContext, + kafkaClusterId, + kafkaMetadataService, + signalNoMoreSplitsCallback); Properties consumerProps = new Properties(); KafkaPropertiesUtil.copyProperties(fetchedProperties, consumerProps); diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/StoppableKafkaEnumContextProxy.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/StoppableKafkaEnumContextProxy.java index 5506c4446..752a5d6b2 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/StoppableKafkaEnumContextProxy.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/StoppableKafkaEnumContextProxy.java @@ -34,6 +34,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + import java.util.Collection; import java.util.HashMap; import java.util.List; @@ -69,6 +71,7 @@ public class StoppableKafkaEnumContextProxy private final KafkaMetadataService kafkaMetadataService; private final SplitEnumeratorContext enumContext; private final ScheduledExecutorService subEnumeratorWorker; + private final Runnable signalNoMoreSplitsCallback; private boolean noMoreSplits = false; private volatile boolean isClosing; @@ -79,17 +82,20 @@ public class StoppableKafkaEnumContextProxy * KafkaSourceEnumerator * @param kafkaMetadataService the Kafka metadata service to facilitate error handling * @param enumContext the underlying enumerator context + * @param signalNoMoreSplitsCallback the callback when signal no more splits is invoked */ public StoppableKafkaEnumContextProxy( String kafkaClusterId, KafkaMetadataService kafkaMetadataService, - SplitEnumeratorContext enumContext) { + SplitEnumeratorContext enumContext, + @Nullable Runnable signalNoMoreSplitsCallback) { this.kafkaClusterId = kafkaClusterId; this.kafkaMetadataService = kafkaMetadataService; this.enumContext = enumContext; this.subEnumeratorWorker = Executors.newScheduledThreadPool( 1, new ExecutorThreadFactory(kafkaClusterId + "-enum-worker")); + this.signalNoMoreSplitsCallback = signalNoMoreSplitsCallback; this.isClosing = false; } @@ -147,8 +153,14 @@ public void assignSplits(SplitsAssignment newSplitAssignmen @Override public void signalNoMoreSplits(int subtask) { - // there are no more splits for this cluster + // There are no more splits for this cluster, but we need to wait until all clusters are + // finished with their respective split discoveries. In the Kafka Source, this is called in + // the coordinator thread, ensuring thread safety, for all source readers at the same time. noMoreSplits = true; + if (signalNoMoreSplitsCallback != null) { + // Thread safe idempotent callback + signalNoMoreSplitsCallback.run(); + } } /** Execute the one time callables in the coordinator. */ @@ -286,12 +298,19 @@ public interface StoppableKafkaEnumContextProxyFactory { StoppableKafkaEnumContextProxy create( SplitEnumeratorContext enumContext, String kafkaClusterId, - KafkaMetadataService kafkaMetadataService); + KafkaMetadataService kafkaMetadataService, + Runnable signalNoMoreSplitsCallback); static StoppableKafkaEnumContextProxyFactory getDefaultFactory() { - return (enumContext, kafkaClusterId, kafkaMetadataService) -> + return (enumContext, + kafkaClusterId, + kafkaMetadataService, + signalNoMoreSplitsCallback) -> new StoppableKafkaEnumContextProxy( - kafkaClusterId, kafkaMetadataService, enumContext); + kafkaClusterId, + kafkaMetadataService, + enumContext, + signalNoMoreSplitsCallback); } } } diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java index f7193418d..4f307e11a 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java @@ -132,8 +132,8 @@ public void start() { @Override public InputStatus pollNext(ReaderOutput readerOutput) throws Exception { - // do not return end of input if no more splits has not yet been signaled - if (!isNoMoreSplits && clusterReaderMap.isEmpty()) { + // at startup, do not return end of input if metadata event has not been received + if (clusterReaderMap.isEmpty()) { return logAndReturnInputStatus(InputStatus.NOTHING_AVAILABLE); } diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumeratorTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumeratorTest.java index 05046d406..3c3a76e8f 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumeratorTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumeratorTest.java @@ -919,11 +919,13 @@ private DynamicKafkaSourceEnumState getCheckpointState() throws Throwable { private static class TestKafkaEnumContextProxyFactory implements StoppableKafkaEnumContextProxy.StoppableKafkaEnumContextProxyFactory { + @Override public StoppableKafkaEnumContextProxy create( SplitEnumeratorContext enumContext, String kafkaClusterId, - KafkaMetadataService kafkaMetadataService) { + KafkaMetadataService kafkaMetadataService, + Runnable signalNoMoreSplitsCallback) { return new TestKafkaEnumContextProxy( kafkaClusterId, kafkaMetadataService, @@ -939,7 +941,7 @@ public TestKafkaEnumContextProxy( String kafkaClusterId, KafkaMetadataService kafkaMetadataService, MockSplitEnumeratorContext enumContext) { - super(kafkaClusterId, kafkaMetadataService, enumContext); + super(kafkaClusterId, kafkaMetadataService, enumContext, null); this.enumContext = enumContext; } diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/StoppableKafkaEnumContextProxyTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/StoppableKafkaEnumContextProxyTest.java index 694c95c65..e3dbf4fdf 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/StoppableKafkaEnumContextProxyTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/StoppableKafkaEnumContextProxyTest.java @@ -150,7 +150,8 @@ private StoppableKafkaEnumContextProxy createStoppableKafkaEnumContextProxy( return new StoppableKafkaEnumContextProxy( contextKafkaCluster, new MockKafkaMetadataService(Collections.singleton(mockStream)), - enumContext); + enumContext, + null); } // this modeled after `KafkaSourceEnumerator` topic partition subscription to throw the same diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/DynamicKafkaSourceTestHelper.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/DynamicKafkaSourceTestHelper.java index c6ecfd061..0ec02cc0e 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/DynamicKafkaSourceTestHelper.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/DynamicKafkaSourceTestHelper.java @@ -204,6 +204,7 @@ public static void produceToKafka( throws Throwable { Properties props = new Properties(); props.putAll(clusterProperties); + props.setProperty(ProducerConfig.ACKS_CONFIG, "all"); props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializerClass.getName()); props.setProperty( ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializerClass.getName()); @@ -219,8 +220,9 @@ public static void produceToKafka( }; try (KafkaProducer producer = new KafkaProducer<>(props)) { for (ProducerRecord record : records) { - producer.send(record, callback).get(); + producer.send(record, callback); } + producer.flush(); } if (sendingError.get() != null) { throw sendingError.get();