Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;

import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
Expand Down Expand Up @@ -64,8 +64,6 @@ public class SlidingWindowedCogroupedKStreamImplTest {

private KGroupedStream<String, String> groupedStream;

private KGroupedStream<String, String> groupedStream2;
private CogroupedKStream<String, String> cogroupedStream;
private TimeWindowedCogroupedKStream<String, String> windowedCogroupedStream;

private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
Expand All @@ -78,8 +76,8 @@ public void setup() {
.with(Serdes.String(), Serdes.String()));

groupedStream = stream.groupByKey(Grouped.with(Serdes.String(), Serdes.String()));
groupedStream2 = stream2.groupByKey(Grouped.with(Serdes.String(), Serdes.String()));
cogroupedStream = groupedStream.cogroup(MockAggregator.TOSTRING_ADDER)
final KGroupedStream<String, String> groupedStream2 = stream2.groupByKey(Grouped.with(Serdes.String(), Serdes.String()));
final CogroupedKStream<String, String> cogroupedStream = groupedStream.cogroup(MockAggregator.TOSTRING_ADDER)
.cogroup(groupedStream2, MockAggregator.TOSTRING_REMOVER);
windowedCogroupedStream = cogroupedStream.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(
WINDOW_SIZE_MS), ofMillis(2000L)));
Expand Down Expand Up @@ -169,34 +167,37 @@ public void slidingWindowAggregateStreamsTest() {
testInputTopic.pipeInput("k2", "B", 504);
testInputTopic.pipeInput("k1", "B", 504);

final Set<TestRecord<String, String>> results = new HashSet<>();
while (!testOutputTopic.isEmpty()) {
final TestRecord<Windowed<String>, String> realRecord = testOutputTopic.readRecord();
final TestRecord<String, String> nonWindowedRecord = new TestRecord<>(
realRecord.getKey().key(), realRecord.getValue(), null, realRecord.timestamp());
results.add(nonWindowedRecord);
}
final Set<TestRecord<String, String>> expected = new HashSet<>();
expected.add(new TestRecord<>("k1", "0+A", null, 500L));
expected.add(new TestRecord<>("k2", "0+A", null, 500L));
expected.add(new TestRecord<>("k2", "0+A", null, 501L));
expected.add(new TestRecord<>("k2", "0+A+A", null, 501L));
expected.add(new TestRecord<>("k1", "0+A", null, 502L));
expected.add(new TestRecord<>("k1", "0+A+A", null, 502L));
expected.add(new TestRecord<>("k1", "0+A+B", null, 503L));
expected.add(new TestRecord<>("k1", "0+B", null, 503L));
expected.add(new TestRecord<>("k1", "0+A+A+B", null, 503L));
expected.add(new TestRecord<>("k2", "0+A+B", null, 503L));
expected.add(new TestRecord<>("k2", "0+B", null, 503L));
expected.add(new TestRecord<>("k2", "0+A+A+B", null, 503L));
expected.add(new TestRecord<>("k2", "0+A+B+B", null, 504L));
expected.add(new TestRecord<>("k2", "0+B+B", null, 504L));
expected.add(new TestRecord<>("k2", "0+B", null, 504L));
expected.add(new TestRecord<>("k2", "0+A+A+B+B", null, 504L));
expected.add(new TestRecord<>("k1", "0+A+B+B", null, 504L));
expected.add(new TestRecord<>("k1", "0+B+B", null, 504L));
expected.add(new TestRecord<>("k1", "0+B", null, 504L));
expected.add(new TestRecord<>("k1", "0+A+A+B+B", null, 504L));
final List<TestRecord<Windowed<String>, String>> results = testOutputTopic.readRecordsToList();

final List<TestRecord<Windowed<String>, String>> expected = new LinkedList<>();
// k1-A-500
expected.add(new TestRecord<>(new Windowed<>("k1", new TimeWindow(0L, 500L)), "0+A", null, 500L));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just realized that using TimeWindows is actually incorrect... (filed https://issues.apache.org/jira/browse/KAFKA-12839)

\cc @lct45 @ableegoldman

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A {@link TimeWindow} covers a half-open time interval

I was about to say we should just make TimeWindow un-opinionated, but this is literally the first thing in the javadocs for the class. So I'd say it's pretty clear about what it's representing -- totally missed this before, I thought it was just a basic container class

// k2-A-500
expected.add(new TestRecord<>(new Windowed<>("k2", new TimeWindow(0L, 500L)), "0+A", null, 500L));
// k2-A-501
expected.add(new TestRecord<>(new Windowed<>("k2", new TimeWindow(501L, 1001L)), "0+A", null, 501L));
expected.add(new TestRecord<>(new Windowed<>("k2", new TimeWindow(1L, 501L)), "0+A+A", null, 501L));
// k1-A-502
expected.add(new TestRecord<>(new Windowed<>("k1", new TimeWindow(501L, 1001L)), "0+A", null, 502L));
expected.add(new TestRecord<>(new Windowed<>("k1", new TimeWindow(2L, 502L)), "0+A+A", null, 502L));
// k1-B-503
expected.add(new TestRecord<>(new Windowed<>("k1", new TimeWindow(501L, 1001L)), "0+A+B", null, 503L));
expected.add(new TestRecord<>(new Windowed<>("k1", new TimeWindow(503L, 1003L)), "0+B", null, 503L));
expected.add(new TestRecord<>(new Windowed<>("k1", new TimeWindow(3L, 503L)), "0+A+A+B", null, 503L));
// k2-B-503
expected.add(new TestRecord<>(new Windowed<>("k2", new TimeWindow(501L, 1001L)), "0+A+B", null, 503L));
expected.add(new TestRecord<>(new Windowed<>("k2", new TimeWindow(502L, 1002)), "0+B", null, 503L));
expected.add(new TestRecord<>(new Windowed<>("k2", new TimeWindow(3L, 503L)), "0+A+A+B", null, 503L));
// k2-B-504
expected.add(new TestRecord<>(new Windowed<>("k2", new TimeWindow(502L, 1002L)), "0+B+B", null, 504L));
expected.add(new TestRecord<>(new Windowed<>("k2", new TimeWindow(501L, 1001L)), "0+A+B+B", null, 504L));
expected.add(new TestRecord<>(new Windowed<>("k2", new TimeWindow(504L, 1004L)), "0+B", null, 504L));
expected.add(new TestRecord<>(new Windowed<>("k2", new TimeWindow(4L, 504L)), "0+A+A+B+B", null, 504L));
// k1-B-504
expected.add(new TestRecord<>(new Windowed<>("k1", new TimeWindow(503L, 1003L)), "0+B+B", null, 504L));
expected.add(new TestRecord<>(new Windowed<>("k1", new TimeWindow(501L, 1001L)), "0+A+B+B", null, 504L));
expected.add(new TestRecord<>(new Windowed<>("k1", new TimeWindow(504L, 1004L)), "0+B", null, 504L));
expected.add(new TestRecord<>(new Windowed<>("k1", new TimeWindow(4L, 504L)), "0+A+A+B+B", null, 504L));

assertEquals(expected, results);
}
Expand Down