diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaFetcher.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaFetcher.java index bee233975..9c4d8387c 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaFetcher.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaFetcher.java @@ -24,6 +24,7 @@ import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.util.Collector; +import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.SerializedValue; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -142,6 +143,12 @@ public void runFetchLoop() throws Exception { partitionConsumerRecordsHandler(partitionRecords, partition); } } + } catch (Handover.ClosedException ex) { + if (running) { + // rethrow, only if we are running, if fetcher is not running we should not throw + // the ClosedException, as we are stopping gracefully + ExceptionUtils.rethrowException(ex); + } } finally { // this signals the consumer thread that no more work is to be done consumerThread.shutdown(); diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerITCase.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerITCase.java new file mode 100644 index 000000000..90c773730 --- /dev/null +++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerITCase.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.serialization.TypeInformationSerializationSchema; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.kafka.testutils.KafkaSourceTestEnv; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.core.execution.SavepointFormatType; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.DiscardingSink; +import org.apache.flink.test.util.MiniClusterWithClientResource; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.junit.ClassRule; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.TestInstance.Lifecycle; +import org.junit.jupiter.api.io.TempDir; + +import java.nio.file.Path; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; + +/** ITCase tests class for {@link FlinkKafkaConsumer}. */ +@TestInstance(Lifecycle.PER_CLASS) +public class FlinkKafkaConsumerITCase { + private static final String TOPIC1 = "FlinkKafkaConsumerITCase_topic1"; + + @ClassRule + public static final MiniClusterWithClientResource MINI_CLUSTER = + new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .setConfiguration(new Configuration()) + .build()); + + @BeforeAll + public void setup() throws Throwable { + KafkaSourceTestEnv.setup(); + KafkaSourceTestEnv.setupTopic( + TOPIC1, true, true, KafkaSourceTestEnv::getRecordsForTopicWithoutTimestamp); + } + + @AfterAll + public void tearDown() throws Exception { + KafkaSourceTestEnv.tearDown(); + } + + @Test + public void testStopWithSavepoint(@TempDir Path savepointsDir) throws Exception { + Configuration config = + new Configuration() + .set( + CheckpointingOptions.SAVEPOINT_DIRECTORY, + savepointsDir.toUri().toString()); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config); + env.setParallelism(1); + + Properties properties = new Properties(); + properties.setProperty( + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, + KafkaSourceTestEnv.brokerConnectionStrings); + properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "testStopWithSavepoint"); + properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + FlinkKafkaConsumer kafkaConsumer = + new FlinkKafkaConsumer<>( + TOPIC1, + new TypeInformationSerializationSchema<>( + BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig()), + properties); + DataStreamSource stream = env.addSource(kafkaConsumer); + + ProgressLatchingIdentityFunction.resetBeforeUse(); + stream.map(new ProgressLatchingIdentityFunction()).addSink(new DiscardingSink<>()); + + JobClient jobClient = env.executeAsync(); + + ProgressLatchingIdentityFunction.getProgressLatch().await(); + + // Check that stopWithSavepoint completes successfully + jobClient.stopWithSavepoint(false, null, SavepointFormatType.CANONICAL).get(); + // TODO: ideally we should test recovery, that there were no data losses etc, but this + // is already a deprecated class, so I'm not adding new tests for that now. + } + + private static class ProgressLatchingIdentityFunction implements MapFunction { + + static CountDownLatch progressLatch; + + static void resetBeforeUse() { + progressLatch = new CountDownLatch(1); + } + + public static CountDownLatch getProgressLatch() { + return progressLatch; + } + + @Override + public Integer map(Integer integer) throws Exception { + progressLatch.countDown(); + return integer; + } + } +} diff --git a/flink-connector-kafka/src/test/resources/log4j2-test.properties b/flink-connector-kafka/src/test/resources/log4j2-test.properties index 863665cf4..aa3967318 100644 --- a/flink-connector-kafka/src/test/resources/log4j2-test.properties +++ b/flink-connector-kafka/src/test/resources/log4j2-test.properties @@ -18,7 +18,7 @@ # Set root logger level to OFF to not flood build logs # set manually to INFO for debugging purposes -rootLogger.level = OFF +rootLogger.level = INFO rootLogger.appenderRef.test.ref = TestLogger appender.testlogger.name = TestLogger