diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImplTest.java index 5a06341eb86c5..96d301decade3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImplTest.java @@ -23,9 +23,9 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThrows; -import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; import java.util.Properties; -import java.util.Set; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; @@ -64,8 +64,6 @@ public class SlidingWindowedCogroupedKStreamImplTest { private KGroupedStream groupedStream; - private KGroupedStream groupedStream2; - private CogroupedKStream cogroupedStream; private TimeWindowedCogroupedKStream windowedCogroupedStream; private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String()); @@ -78,8 +76,8 @@ public void setup() { .with(Serdes.String(), Serdes.String())); groupedStream = stream.groupByKey(Grouped.with(Serdes.String(), Serdes.String())); - groupedStream2 = stream2.groupByKey(Grouped.with(Serdes.String(), Serdes.String())); - cogroupedStream = groupedStream.cogroup(MockAggregator.TOSTRING_ADDER) + final KGroupedStream groupedStream2 = stream2.groupByKey(Grouped.with(Serdes.String(), Serdes.String())); + final CogroupedKStream cogroupedStream = groupedStream.cogroup(MockAggregator.TOSTRING_ADDER) .cogroup(groupedStream2, MockAggregator.TOSTRING_REMOVER); windowedCogroupedStream = cogroupedStream.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis( WINDOW_SIZE_MS), ofMillis(2000L))); @@ -169,34 +167,37 @@ public void slidingWindowAggregateStreamsTest() { testInputTopic.pipeInput("k2", "B", 504); testInputTopic.pipeInput("k1", "B", 504); - final Set> results = new HashSet<>(); - while (!testOutputTopic.isEmpty()) { - final TestRecord, String> realRecord = testOutputTopic.readRecord(); - final TestRecord nonWindowedRecord = new TestRecord<>( - realRecord.getKey().key(), realRecord.getValue(), null, realRecord.timestamp()); - results.add(nonWindowedRecord); - } - final Set> expected = new HashSet<>(); - expected.add(new TestRecord<>("k1", "0+A", null, 500L)); - expected.add(new TestRecord<>("k2", "0+A", null, 500L)); - expected.add(new TestRecord<>("k2", "0+A", null, 501L)); - expected.add(new TestRecord<>("k2", "0+A+A", null, 501L)); - expected.add(new TestRecord<>("k1", "0+A", null, 502L)); - expected.add(new TestRecord<>("k1", "0+A+A", null, 502L)); - expected.add(new TestRecord<>("k1", "0+A+B", null, 503L)); - expected.add(new TestRecord<>("k1", "0+B", null, 503L)); - expected.add(new TestRecord<>("k1", "0+A+A+B", null, 503L)); - expected.add(new TestRecord<>("k2", "0+A+B", null, 503L)); - expected.add(new TestRecord<>("k2", "0+B", null, 503L)); - expected.add(new TestRecord<>("k2", "0+A+A+B", null, 503L)); - expected.add(new TestRecord<>("k2", "0+A+B+B", null, 504L)); - expected.add(new TestRecord<>("k2", "0+B+B", null, 504L)); - expected.add(new TestRecord<>("k2", "0+B", null, 504L)); - expected.add(new TestRecord<>("k2", "0+A+A+B+B", null, 504L)); - expected.add(new TestRecord<>("k1", "0+A+B+B", null, 504L)); - expected.add(new TestRecord<>("k1", "0+B+B", null, 504L)); - expected.add(new TestRecord<>("k1", "0+B", null, 504L)); - expected.add(new TestRecord<>("k1", "0+A+A+B+B", null, 504L)); + final List, String>> results = testOutputTopic.readRecordsToList(); + + final List, String>> expected = new LinkedList<>(); + // k1-A-500 + expected.add(new TestRecord<>(new Windowed<>("k1", new TimeWindow(0L, 500L)), "0+A", null, 500L)); + // k2-A-500 + expected.add(new TestRecord<>(new Windowed<>("k2", new TimeWindow(0L, 500L)), "0+A", null, 500L)); + // k2-A-501 + expected.add(new TestRecord<>(new Windowed<>("k2", new TimeWindow(501L, 1001L)), "0+A", null, 501L)); + expected.add(new TestRecord<>(new Windowed<>("k2", new TimeWindow(1L, 501L)), "0+A+A", null, 501L)); + // k1-A-502 + expected.add(new TestRecord<>(new Windowed<>("k1", new TimeWindow(501L, 1001L)), "0+A", null, 502L)); + expected.add(new TestRecord<>(new Windowed<>("k1", new TimeWindow(2L, 502L)), "0+A+A", null, 502L)); + // k1-B-503 + expected.add(new TestRecord<>(new Windowed<>("k1", new TimeWindow(501L, 1001L)), "0+A+B", null, 503L)); + expected.add(new TestRecord<>(new Windowed<>("k1", new TimeWindow(503L, 1003L)), "0+B", null, 503L)); + expected.add(new TestRecord<>(new Windowed<>("k1", new TimeWindow(3L, 503L)), "0+A+A+B", null, 503L)); + // k2-B-503 + expected.add(new TestRecord<>(new Windowed<>("k2", new TimeWindow(501L, 1001L)), "0+A+B", null, 503L)); + expected.add(new TestRecord<>(new Windowed<>("k2", new TimeWindow(502L, 1002)), "0+B", null, 503L)); + expected.add(new TestRecord<>(new Windowed<>("k2", new TimeWindow(3L, 503L)), "0+A+A+B", null, 503L)); + // k2-B-504 + expected.add(new TestRecord<>(new Windowed<>("k2", new TimeWindow(502L, 1002L)), "0+B+B", null, 504L)); + expected.add(new TestRecord<>(new Windowed<>("k2", new TimeWindow(501L, 1001L)), "0+A+B+B", null, 504L)); + expected.add(new TestRecord<>(new Windowed<>("k2", new TimeWindow(504L, 1004L)), "0+B", null, 504L)); + expected.add(new TestRecord<>(new Windowed<>("k2", new TimeWindow(4L, 504L)), "0+A+A+B+B", null, 504L)); + // k1-B-504 + expected.add(new TestRecord<>(new Windowed<>("k1", new TimeWindow(503L, 1003L)), "0+B+B", null, 504L)); + expected.add(new TestRecord<>(new Windowed<>("k1", new TimeWindow(501L, 1001L)), "0+A+B+B", null, 504L)); + expected.add(new TestRecord<>(new Windowed<>("k1", new TimeWindow(504L, 1004L)), "0+B", null, 504L)); + expected.add(new TestRecord<>(new Windowed<>("k1", new TimeWindow(4L, 504L)), "0+A+A+B+B", null, 504L)); assertEquals(expected, results); }