Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public class SlidingWindowedCogroupedKStreamImplTest {
private static final String TOPIC = "topic";
private static final String TOPIC2 = "topic2";
private static final String OUTPUT = "output";
private static final long WINDOW_SIZE_MS = 500L;
private final StreamsBuilder builder = new StreamsBuilder();

private KGroupedStream<String, String> groupedStream;
Expand All @@ -80,7 +81,8 @@ public void setup() {
groupedStream2 = stream2.groupByKey(Grouped.with(Serdes.String(), Serdes.String()));
cogroupedStream = groupedStream.cogroup(MockAggregator.TOSTRING_ADDER)
.cogroup(groupedStream2, MockAggregator.TOSTRING_REMOVER);
windowedCogroupedStream = cogroupedStream.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(500L), ofMillis(2000L)));
windowedCogroupedStream = cogroupedStream.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(
WINDOW_SIZE_MS), ofMillis(2000L)));
}

@Test
Expand Down Expand Up @@ -130,7 +132,7 @@ public void namedParamShouldSetName() {
.with(Serdes.String(), Serdes.String()));
groupedStream = stream.groupByKey(Grouped.with(Serdes.String(), Serdes.String()));
groupedStream.cogroup(MockAggregator.TOSTRING_ADDER)
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(500L), ofMillis(2000L)))
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(WINDOW_SIZE_MS), ofMillis(2000L)))
.aggregate(MockInitializer.STRING_INIT, Named.as("foo"));

assertThat(builder.build().describe().toString(), equalTo(
Expand All @@ -156,7 +158,7 @@ public void slidingWindowAggregateStreamsTest() {
final TestInputTopic<String, String> testInputTopic = driver.createInputTopic(
TOPIC, new StringSerializer(), new StringSerializer());
final TestOutputTopic<Windowed<String>, String> testOutputTopic = driver.createOutputTopic(
OUTPUT, new TimeWindowedDeserializer<>(new StringDeserializer()), new StringDeserializer());
OUTPUT, new TimeWindowedDeserializer<>(new StringDeserializer(), WINDOW_SIZE_MS), new StringDeserializer());

testInputTopic.pipeInput("k1", "A", 500);
testInputTopic.pipeInput("k2", "A", 500);
Expand Down Expand Up @@ -204,15 +206,15 @@ public void slidingWindowAggregateStreamsTest() {
public void slidingWindowAggregateOverlappingWindowsTest() {

final KTable<Windowed<String>, String> customers = groupedStream.cogroup(MockAggregator.TOSTRING_ADDER)
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(500L), ofMillis(2000L))).aggregate(
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(WINDOW_SIZE_MS), ofMillis(2000L))).aggregate(
MockInitializer.STRING_INIT, Materialized.with(Serdes.String(), Serdes.String()));
customers.toStream().to(OUTPUT);

try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
final TestInputTopic<String, String> testInputTopic = driver.createInputTopic(
TOPIC, new StringSerializer(), new StringSerializer());
final TestOutputTopic<Windowed<String>, String> testOutputTopic = driver.createOutputTopic(
OUTPUT, new TimeWindowedDeserializer<>(new StringDeserializer()), new StringDeserializer());
OUTPUT, new TimeWindowedDeserializer<>(new StringDeserializer(), WINDOW_SIZE_MS), new StringDeserializer());

testInputTopic.pipeInput("k1", "A", 500);
testInputTopic.pipeInput("k2", "A", 500);
Expand Down