Skip to content

Commit

Permalink
[FLINK-32416] Fix flaky tests by ensuring test utilities produce reco…
Browse files Browse the repository at this point in the history
…rds with consistency and cleanup notify no more splits to ensure it is sent
  • Loading branch information
mas-chen committed Jan 17, 2024
1 parent eaeb781 commit 72bd59c
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ public class DynamicKafkaSourceEnumerator
private int kafkaMetadataServiceDiscoveryFailureCount;
private Map<String, Set<String>> latestClusterTopicsMap;
private Set<KafkaStream> latestKafkaStreams;
private boolean firstDiscoveryComplete;

public DynamicKafkaSourceEnumerator(
KafkaStreamSubscriber kafkaStreamSubscriber,
Expand Down Expand Up @@ -151,6 +152,7 @@ public DynamicKafkaSourceEnumerator(
DynamicKafkaSourceOptions.STREAM_METADATA_DISCOVERY_FAILURE_THRESHOLD,
Integer::parseInt);
this.kafkaMetadataServiceDiscoveryFailureCount = 0;
this.firstDiscoveryComplete = false;

this.kafkaMetadataService = kafkaMetadataService;
this.stoppableKafkaEnumContextProxyFactory = stoppableKafkaEnumContextProxyFactory;
Expand Down Expand Up @@ -212,32 +214,27 @@ public void start() {

private void handleNoMoreSplits() {
if (Boundedness.BOUNDED.equals(boundedness)) {
enumContext.runInCoordinatorThread(
() -> {
boolean allEnumeratorsHaveSignalledNoMoreSplits = true;
for (StoppableKafkaEnumContextProxy context :
clusterEnumContextMap.values()) {
allEnumeratorsHaveSignalledNoMoreSplits =
allEnumeratorsHaveSignalledNoMoreSplits
&& context.isNoMoreSplits();
}

if (allEnumeratorsHaveSignalledNoMoreSplits) {
logger.info(
"Signal no more splits to all readers: {}",
enumContext.registeredReaders().keySet());
enumContext
.registeredReaders()
.keySet()
.forEach(enumContext::signalNoMoreSplits);
}
});
boolean allEnumeratorsHaveSignalledNoMoreSplits = true;
for (StoppableKafkaEnumContextProxy context : clusterEnumContextMap.values()) {
allEnumeratorsHaveSignalledNoMoreSplits =
allEnumeratorsHaveSignalledNoMoreSplits && context.isNoMoreSplits();
}

if (firstDiscoveryComplete && allEnumeratorsHaveSignalledNoMoreSplits) {
logger.info(
"Signal no more splits to all readers: {}",
enumContext.registeredReaders().keySet());
enumContext.registeredReaders().keySet().forEach(enumContext::signalNoMoreSplits);
} else {
logger.info("Not ready to notify no more splits to readers.");
}
}
}

// --------------- private methods for metadata discovery ---------------

private void onHandleSubscribedStreamsFetch(Set<KafkaStream> fetchedKafkaStreams, Throwable t) {
firstDiscoveryComplete = true;
Set<KafkaStream> handledFetchKafkaStreams =
handleFetchSubscribedStreamsError(fetchedKafkaStreams, t);

Expand Down Expand Up @@ -370,9 +367,19 @@ private KafkaSourceEnumerator createEnumeratorWithAssignedTopicPartitions(
Set<String> topics,
KafkaSourceEnumState kafkaSourceEnumState,
Properties fetchedProperties) {
final Runnable signalNoMoreSplitsCallback;
if (Boundedness.BOUNDED.equals(boundedness)) {
signalNoMoreSplitsCallback = this::handleNoMoreSplits;
} else {
signalNoMoreSplitsCallback = null;
}

StoppableKafkaEnumContextProxy context =
stoppableKafkaEnumContextProxyFactory.create(
enumContext, kafkaClusterId, kafkaMetadataService);
enumContext,
kafkaClusterId,
kafkaMetadataService,
signalNoMoreSplitsCallback);

Properties consumerProps = new Properties();
KafkaPropertiesUtil.copyProperties(fetchedProperties, consumerProps);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.util.Collection;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -69,6 +71,7 @@ public class StoppableKafkaEnumContextProxy
private final KafkaMetadataService kafkaMetadataService;
private final SplitEnumeratorContext<DynamicKafkaSourceSplit> enumContext;
private final ScheduledExecutorService subEnumeratorWorker;
private final Runnable signalNoMoreSplitsCallback;
private boolean noMoreSplits = false;
private volatile boolean isClosing;

Expand All @@ -79,17 +82,20 @@ public class StoppableKafkaEnumContextProxy
* KafkaSourceEnumerator
* @param kafkaMetadataService the Kafka metadata service to facilitate error handling
* @param enumContext the underlying enumerator context
* @param signalNoMoreSplitsCallback the callback when signal no more splits is invoked
*/
public StoppableKafkaEnumContextProxy(
String kafkaClusterId,
KafkaMetadataService kafkaMetadataService,
SplitEnumeratorContext<DynamicKafkaSourceSplit> enumContext) {
SplitEnumeratorContext<DynamicKafkaSourceSplit> enumContext,
@Nullable Runnable signalNoMoreSplitsCallback) {
this.kafkaClusterId = kafkaClusterId;
this.kafkaMetadataService = kafkaMetadataService;
this.enumContext = enumContext;
this.subEnumeratorWorker =
Executors.newScheduledThreadPool(
1, new ExecutorThreadFactory(kafkaClusterId + "-enum-worker"));
this.signalNoMoreSplitsCallback = signalNoMoreSplitsCallback;
this.isClosing = false;
}

Expand Down Expand Up @@ -147,8 +153,14 @@ public void assignSplits(SplitsAssignment<KafkaPartitionSplit> newSplitAssignmen

@Override
public void signalNoMoreSplits(int subtask) {
// there are no more splits for this cluster
// There are no more splits for this cluster, but we need to wait until all clusters are
// finished with their respective split discoveries. In the Kafka Source, this is called in
// the coordinator thread, ensuring thread safety, for all source readers at the same time.
noMoreSplits = true;
if (signalNoMoreSplitsCallback != null) {
// Thread safe idempotent callback
signalNoMoreSplitsCallback.run();
}
}

/** Execute the one time callables in the coordinator. */
Expand Down Expand Up @@ -286,12 +298,19 @@ public interface StoppableKafkaEnumContextProxyFactory {
StoppableKafkaEnumContextProxy create(
SplitEnumeratorContext<DynamicKafkaSourceSplit> enumContext,
String kafkaClusterId,
KafkaMetadataService kafkaMetadataService);
KafkaMetadataService kafkaMetadataService,
Runnable signalNoMoreSplitsCallback);

static StoppableKafkaEnumContextProxyFactory getDefaultFactory() {
return (enumContext, kafkaClusterId, kafkaMetadataService) ->
return (enumContext,
kafkaClusterId,
kafkaMetadataService,
signalNoMoreSplitsCallback) ->
new StoppableKafkaEnumContextProxy(
kafkaClusterId, kafkaMetadataService, enumContext);
kafkaClusterId,
kafkaMetadataService,
enumContext,
signalNoMoreSplitsCallback);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,8 @@ public void start() {

@Override
public InputStatus pollNext(ReaderOutput<T> readerOutput) throws Exception {
// do not return end of input if no more splits has not yet been signaled
if (!isNoMoreSplits && clusterReaderMap.isEmpty()) {
// at startup, do not return end of input if metadata event has not been received
if (clusterReaderMap.isEmpty()) {
return logAndReturnInputStatus(InputStatus.NOTHING_AVAILABLE);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -919,11 +919,13 @@ private DynamicKafkaSourceEnumState getCheckpointState() throws Throwable {

private static class TestKafkaEnumContextProxyFactory
implements StoppableKafkaEnumContextProxy.StoppableKafkaEnumContextProxyFactory {

@Override
public StoppableKafkaEnumContextProxy create(
SplitEnumeratorContext<DynamicKafkaSourceSplit> enumContext,
String kafkaClusterId,
KafkaMetadataService kafkaMetadataService) {
KafkaMetadataService kafkaMetadataService,
Runnable signalNoMoreSplitsCallback) {
return new TestKafkaEnumContextProxy(
kafkaClusterId,
kafkaMetadataService,
Expand All @@ -939,7 +941,7 @@ public TestKafkaEnumContextProxy(
String kafkaClusterId,
KafkaMetadataService kafkaMetadataService,
MockSplitEnumeratorContext<DynamicKafkaSourceSplit> enumContext) {
super(kafkaClusterId, kafkaMetadataService, enumContext);
super(kafkaClusterId, kafkaMetadataService, enumContext, null);
this.enumContext = enumContext;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ private StoppableKafkaEnumContextProxy createStoppableKafkaEnumContextProxy(
return new StoppableKafkaEnumContextProxy(
contextKafkaCluster,
new MockKafkaMetadataService(Collections.singleton(mockStream)),
enumContext);
enumContext,
null);
}

// this modeled after `KafkaSourceEnumerator` topic partition subscription to throw the same
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ public static <K, V> void produceToKafka(
throws Throwable {
Properties props = new Properties();
props.putAll(clusterProperties);
props.setProperty(ProducerConfig.ACKS_CONFIG, "all");
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializerClass.getName());
props.setProperty(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializerClass.getName());
Expand All @@ -219,8 +220,9 @@ public static <K, V> void produceToKafka(
};
try (KafkaProducer<K, V> producer = new KafkaProducer<>(props)) {
for (ProducerRecord<K, V> record : records) {
producer.send(record, callback).get();
producer.send(record, callback);
}
producer.flush();
}
if (sendingError.get() != null) {
throw sendingError.get();
Expand Down

0 comments on commit 72bd59c

Please sign in to comment.