Skip to content

Commit

Permalink
KAFKA-16331 Remove EOSv1 from Kafka Streams integration tests (#17110)
Browse files Browse the repository at this point in the history
Reviewers: Bill Bejeck <bbejeck@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
  • Loading branch information
mjsax authored Sep 16, 2024
1 parent d0f4d69 commit 21e67b3
Show file tree
Hide file tree
Showing 10 changed files with 80 additions and 1,310 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,8 @@
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import java.io.File;
import java.io.IOException;
Expand All @@ -58,7 +57,6 @@
/**
* Test the unclean shutdown behavior around state store cleanup.
*/
@SuppressWarnings("deprecation")
@Tag("integration")
@Timeout(600)
public class EOSUncleanShutdownIntegrationTest {
Expand All @@ -71,8 +69,8 @@ public static void startCluster() throws IOException {
CLUSTER.start();
STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL);
STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, TEST_FOLDER.getPath());
}
Expand All @@ -88,12 +86,11 @@ public static void closeCluster() throws IOException {

private static final int RECORD_TOTAL = 3;

@ParameterizedTest
@ValueSource(strings = {StreamsConfig.EXACTLY_ONCE, StreamsConfig.EXACTLY_ONCE_V2})
public void shouldWorkWithUncleanShutdownWipeOutStateStore(final String eosConfig) throws InterruptedException {
@Test
public void shouldWorkWithUncleanShutdownWipeOutStateStore() throws InterruptedException {
final String appId = "shouldWorkWithUncleanShutdownWipeOutStateStore";
STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
STREAMS_CONFIG.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig);
STREAMS_CONFIG.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);

final String input = "input-topic";
cleanStateBeforeTest(CLUSTER, input);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,13 +176,13 @@ public void createTopics() throws Exception {
}

@ParameterizedTest
@ValueSource(strings = {StreamsConfig.AT_LEAST_ONCE, StreamsConfig.EXACTLY_ONCE, StreamsConfig.EXACTLY_ONCE_V2})
@ValueSource(strings = {StreamsConfig.AT_LEAST_ONCE, StreamsConfig.EXACTLY_ONCE_V2})
public void shouldBeAbleToRunWithEosEnabled(final String eosConfig) throws Exception {
runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC, false, eosConfig);
}

@ParameterizedTest
@ValueSource(strings = {StreamsConfig.AT_LEAST_ONCE, StreamsConfig.EXACTLY_ONCE, StreamsConfig.EXACTLY_ONCE_V2})
@ValueSource(strings = {StreamsConfig.AT_LEAST_ONCE, StreamsConfig.EXACTLY_ONCE_V2})
public void shouldCommitCorrectOffsetIfInputTopicIsTransactional(final String eosConfig) throws Exception {
runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC, true, eosConfig);

Expand Down Expand Up @@ -210,31 +210,31 @@ public void shouldCommitCorrectOffsetIfInputTopicIsTransactional(final String eo
}

@ParameterizedTest
@ValueSource(strings = {StreamsConfig.AT_LEAST_ONCE, StreamsConfig.EXACTLY_ONCE, StreamsConfig.EXACTLY_ONCE_V2})
@ValueSource(strings = {StreamsConfig.AT_LEAST_ONCE, StreamsConfig.EXACTLY_ONCE_V2})
public void shouldBeAbleToRestartAfterClose(final String eosConfig) throws Exception {
runSimpleCopyTest(2, SINGLE_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC, false, eosConfig);
}

@ParameterizedTest
@ValueSource(strings = {StreamsConfig.AT_LEAST_ONCE, StreamsConfig.EXACTLY_ONCE, StreamsConfig.EXACTLY_ONCE_V2})
@ValueSource(strings = {StreamsConfig.AT_LEAST_ONCE, StreamsConfig.EXACTLY_ONCE_V2})
public void shouldBeAbleToCommitToMultiplePartitions(final String eosConfig) throws Exception {
runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, null, MULTI_PARTITION_OUTPUT_TOPIC, false, eosConfig);
}

@ParameterizedTest
@ValueSource(strings = {StreamsConfig.AT_LEAST_ONCE, StreamsConfig.EXACTLY_ONCE, StreamsConfig.EXACTLY_ONCE_V2})
@ValueSource(strings = {StreamsConfig.AT_LEAST_ONCE, StreamsConfig.EXACTLY_ONCE_V2})
public void shouldBeAbleToCommitMultiplePartitionOffsets(final String eosConfig) throws Exception {
runSimpleCopyTest(1, MULTI_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC, false, eosConfig);
}

@ParameterizedTest
@ValueSource(strings = {StreamsConfig.AT_LEAST_ONCE, StreamsConfig.EXACTLY_ONCE, StreamsConfig.EXACTLY_ONCE_V2})
@ValueSource(strings = {StreamsConfig.AT_LEAST_ONCE, StreamsConfig.EXACTLY_ONCE_V2})
public void shouldBeAbleToRunWithTwoSubtopologies(final String eosConfig) throws Exception {
runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, SINGLE_PARTITION_THROUGH_TOPIC, SINGLE_PARTITION_OUTPUT_TOPIC, false, eosConfig);
}

@ParameterizedTest
@ValueSource(strings = {StreamsConfig.AT_LEAST_ONCE, StreamsConfig.EXACTLY_ONCE, StreamsConfig.EXACTLY_ONCE_V2})
@ValueSource(strings = {StreamsConfig.AT_LEAST_ONCE, StreamsConfig.EXACTLY_ONCE_V2})
public void shouldBeAbleToRunWithTwoSubtopologiesAndMultiplePartitions(final String eosConfig) throws Exception {
runSimpleCopyTest(1, MULTI_PARTITION_INPUT_TOPIC, MULTI_PARTITION_THROUGH_TOPIC, MULTI_PARTITION_OUTPUT_TOPIC, false, eosConfig);
}
Expand Down Expand Up @@ -327,7 +327,7 @@ private List<KeyValue<Long, Long>> getAllRecordPerKey(final Long key, final List
}

@ParameterizedTest
@ValueSource(strings = {StreamsConfig.AT_LEAST_ONCE, StreamsConfig.EXACTLY_ONCE, StreamsConfig.EXACTLY_ONCE_V2})
@ValueSource(strings = {StreamsConfig.AT_LEAST_ONCE, StreamsConfig.EXACTLY_ONCE_V2})
public void shouldBeAbleToPerformMultipleTransactions(final String eosConfig) throws Exception {
final StreamsBuilder builder = new StreamsBuilder();
builder.stream(SINGLE_PARTITION_INPUT_TOPIC).to(SINGLE_PARTITION_OUTPUT_TOPIC);
Expand Down Expand Up @@ -378,8 +378,6 @@ public void shouldBeAbleToPerformMultipleTransactions(final String eosConfig) th
@CsvSource({
StreamsConfig.AT_LEAST_ONCE + ",true",
StreamsConfig.AT_LEAST_ONCE + ",false",
StreamsConfig.EXACTLY_ONCE + ",true",
StreamsConfig.EXACTLY_ONCE + ",false",
StreamsConfig.EXACTLY_ONCE_V2 + ",true",
StreamsConfig.EXACTLY_ONCE_V2 + ",false"
})
Expand Down Expand Up @@ -489,8 +487,6 @@ public void shouldNotViolateEosIfOneTaskFails(final String eosConfig, final bool
@CsvSource({
StreamsConfig.AT_LEAST_ONCE + ",true",
StreamsConfig.AT_LEAST_ONCE + ",false",
StreamsConfig.EXACTLY_ONCE + ",true",
StreamsConfig.EXACTLY_ONCE + ",false",
StreamsConfig.EXACTLY_ONCE_V2 + ",true",
StreamsConfig.EXACTLY_ONCE_V2 + ",false"
})
Expand Down Expand Up @@ -615,8 +611,6 @@ public void shouldNotViolateEosIfOneTaskFailsWithState(final String eosConfig, f
@CsvSource({
StreamsConfig.AT_LEAST_ONCE + ",true",
StreamsConfig.AT_LEAST_ONCE + ",false",
StreamsConfig.EXACTLY_ONCE + ",true",
StreamsConfig.EXACTLY_ONCE + ",false",
StreamsConfig.EXACTLY_ONCE_V2 + ",true",
StreamsConfig.EXACTLY_ONCE_V2 + ",false"
})
Expand Down Expand Up @@ -785,8 +779,6 @@ public void shouldNotViolateEosIfOneTaskGetsFencedUsingIsolatedAppInstances(fina
@CsvSource({
StreamsConfig.AT_LEAST_ONCE + ",true",
StreamsConfig.AT_LEAST_ONCE + ",false",
StreamsConfig.EXACTLY_ONCE + ",true",
StreamsConfig.EXACTLY_ONCE + ",false",
StreamsConfig.EXACTLY_ONCE_V2 + ",true",
StreamsConfig.EXACTLY_ONCE_V2 + ",false"
})
Expand Down Expand Up @@ -826,8 +818,6 @@ public void shouldWriteLatestOffsetsToCheckpointOnShutdown(final String eosConfi
@CsvSource({
StreamsConfig.AT_LEAST_ONCE + ",true",
StreamsConfig.AT_LEAST_ONCE + ",false",
StreamsConfig.EXACTLY_ONCE + ",true",
StreamsConfig.EXACTLY_ONCE + ",false",
StreamsConfig.EXACTLY_ONCE_V2 + ",true",
StreamsConfig.EXACTLY_ONCE_V2 + ",false"
})
Expand All @@ -840,8 +830,6 @@ public void shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoringStateU
@CsvSource({
StreamsConfig.AT_LEAST_ONCE + ",true",
StreamsConfig.AT_LEAST_ONCE + ",false",
StreamsConfig.EXACTLY_ONCE + ",true",
StreamsConfig.EXACTLY_ONCE + ",false",
StreamsConfig.EXACTLY_ONCE_V2 + ",true",
StreamsConfig.EXACTLY_ONCE_V2 + ",false"
})
Expand All @@ -856,7 +844,7 @@ private void shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoring(
final String eosConfig,
final boolean processingThreadsEnabled,
final boolean stateUpdaterEnabled) throws Exception {
if (!eosConfig.equals(StreamsConfig.EXACTLY_ONCE) && !eosConfig.equals(StreamsConfig.EXACTLY_ONCE_V2)) {
if (!eosConfig.equals(StreamsConfig.EXACTLY_ONCE_V2)) {
return;
}
final Properties streamsConfiguration = new Properties();
Expand Down
Loading

0 comments on commit 21e67b3

Please sign in to comment.