diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImplTest.java index 6be96b9f61c9d..5a06341eb86c5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImplTest.java @@ -59,6 +59,7 @@ public class SlidingWindowedCogroupedKStreamImplTest { private static final String TOPIC = "topic"; private static final String TOPIC2 = "topic2"; private static final String OUTPUT = "output"; + private static final long WINDOW_SIZE_MS = 500L; private final StreamsBuilder builder = new StreamsBuilder(); private KGroupedStream groupedStream; @@ -80,7 +81,8 @@ public void setup() { groupedStream2 = stream2.groupByKey(Grouped.with(Serdes.String(), Serdes.String())); cogroupedStream = groupedStream.cogroup(MockAggregator.TOSTRING_ADDER) .cogroup(groupedStream2, MockAggregator.TOSTRING_REMOVER); - windowedCogroupedStream = cogroupedStream.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(500L), ofMillis(2000L))); + windowedCogroupedStream = cogroupedStream.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis( + WINDOW_SIZE_MS), ofMillis(2000L))); } @Test @@ -130,7 +132,7 @@ public void namedParamShouldSetName() { .with(Serdes.String(), Serdes.String())); groupedStream = stream.groupByKey(Grouped.with(Serdes.String(), Serdes.String())); groupedStream.cogroup(MockAggregator.TOSTRING_ADDER) - .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(500L), ofMillis(2000L))) + .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(WINDOW_SIZE_MS), ofMillis(2000L))) .aggregate(MockInitializer.STRING_INIT, Named.as("foo")); assertThat(builder.build().describe().toString(), equalTo( @@ -156,7 +158,7 @@ public void slidingWindowAggregateStreamsTest() { final TestInputTopic testInputTopic = driver.createInputTopic( TOPIC, new StringSerializer(), new StringSerializer()); final TestOutputTopic, String> testOutputTopic = driver.createOutputTopic( - OUTPUT, new TimeWindowedDeserializer<>(new StringDeserializer()), new StringDeserializer()); + OUTPUT, new TimeWindowedDeserializer<>(new StringDeserializer(), WINDOW_SIZE_MS), new StringDeserializer()); testInputTopic.pipeInput("k1", "A", 500); testInputTopic.pipeInput("k2", "A", 500); @@ -204,7 +206,7 @@ public void slidingWindowAggregateStreamsTest() { public void slidingWindowAggregateOverlappingWindowsTest() { final KTable, String> customers = groupedStream.cogroup(MockAggregator.TOSTRING_ADDER) - .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(500L), ofMillis(2000L))).aggregate( + .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(WINDOW_SIZE_MS), ofMillis(2000L))).aggregate( MockInitializer.STRING_INIT, Materialized.with(Serdes.String(), Serdes.String())); customers.toStream().to(OUTPUT); @@ -212,7 +214,7 @@ public void slidingWindowAggregateOverlappingWindowsTest() { final TestInputTopic testInputTopic = driver.createInputTopic( TOPIC, new StringSerializer(), new StringSerializer()); final TestOutputTopic, String> testOutputTopic = driver.createOutputTopic( - OUTPUT, new TimeWindowedDeserializer<>(new StringDeserializer()), new StringDeserializer()); + OUTPUT, new TimeWindowedDeserializer<>(new StringDeserializer(), WINDOW_SIZE_MS), new StringDeserializer()); testInputTopic.pipeInput("k1", "A", 500); testInputTopic.pipeInput("k2", "A", 500);