Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,11 @@
public final class SessionWindows {

private final long gapMs;
private long maintainDurationMs;
private final long maintainDurationMs;

private SessionWindows(final long gapMs) {
private SessionWindows(final long gapMs, final long maintainDurationMs) {
this.gapMs = gapMs;
maintainDurationMs = Windows.DEFAULT_MAINTAIN_DURATION_MS;
this.maintainDurationMs = maintainDurationMs;
}

/**
Expand All @@ -85,7 +85,8 @@ public static SessionWindows with(final long inactivityGapMs) {
if (inactivityGapMs <= 0) {
throw new IllegalArgumentException("Gap time (inactivityGapMs) cannot be zero or negative.");
}
return new SessionWindows(inactivityGapMs);
final long oneDayMs = 24 * 60 * 60_000L;
return new SessionWindows(inactivityGapMs, oneDayMs);
}

/**
Expand All @@ -99,9 +100,8 @@ public SessionWindows until(final long durationMs) throws IllegalArgumentExcepti
if (durationMs < gapMs) {
throw new IllegalArgumentException("Window retention time (durationMs) cannot be smaller than window gap.");
}
maintainDurationMs = durationMs;

return this;
return new SessionWindows(gapMs, durationMs);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixes a lurking potential bug: using a mutable field in equals/hashcode.

}

/**
Expand Down
29 changes: 18 additions & 11 deletions streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,10 @@
*/
public abstract class Windows<W extends Window> {

private static final int DEFAULT_NUM_SEGMENTS = 3;
private long maintainDurationMs = 24 * 60 * 60 * 1000L; // default: one day
@Deprecated public int segments = 3;

static final long DEFAULT_MAINTAIN_DURATION_MS = 24 * 60 * 60 * 1000L; // one day

private long maintainDurationMs;

public int segments;

protected Windows() {
segments = DEFAULT_NUM_SEGMENTS;
maintainDurationMs = DEFAULT_MAINTAIN_DURATION_MS;
}
protected Windows() {}

/**
* Set the window maintain duration (retention time) in milliseconds.
Expand Down Expand Up @@ -76,14 +68,29 @@ public long maintainMs() {
return maintainDurationMs;
}

/**
* Return the segment interval in milliseconds.
*
* @return the segment interval
*/
@SuppressWarnings("deprecation") // The deprecation is on the public visibility of segments. We intend to make the field private later.
public long segmentInterval() {
// Pinned arbitrarily to a minimum of 60 seconds. Profiling may indicate a different value is more efficient.
final long minimumSegmentInterval = 60_000L;
// Scaled to the (possibly overridden) retention period
return Math.max(maintainMs() / (segments - 1), minimumSegmentInterval);
}

/**
* Set the number of segments to be used for rolling the window store.
* This function is not exposed to users but can be called by developers that extend this class.
*
* @param segments the number of segments to be used
* @return itself
* @throws IllegalArgumentException if specified segments is small than 2
* @deprecated since 2.1 Override segmentInterval() instead.
*/
@Deprecated
protected Windows<W> segments(final int segments) throws IllegalArgumentException {
if (segments < 2) {
throw new IllegalArgumentException("Number of segments must be at least 2.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -844,12 +844,17 @@ private static <K, V> StoreBuilder<WindowStore<K, V>> createWindowedStateStore(f
final Serde<K> keySerde,
final Serde<V> valueSerde,
final String storeName) {
return Stores.windowStoreBuilder(Stores.persistentWindowStore(storeName,
windows.maintainMs(),
windows.segments,
windows.size(),
true), keySerde, valueSerde);

return Stores.windowStoreBuilder(
Stores.persistentWindowStore(
storeName,
windows.maintainMs(),
windows.size(),
true,
windows.segmentInterval()
),
keySerde,
valueSerde
);
}

private class KStreamImplJoin {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,11 +155,13 @@ public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer, final Materialize
private <VR> StoreBuilder<WindowStore<K, VR>> materialize(final MaterializedInternal<K, VR, WindowStore<Bytes, byte[]>> materialized) {
WindowBytesStoreSupplier supplier = (WindowBytesStoreSupplier) materialized.storeSupplier();
if (supplier == null) {
supplier = Stores.persistentWindowStore(materialized.storeName(),
windows.maintainMs(),
windows.segments,
windows.size(),
false);
supplier = Stores.persistentWindowStore(
materialized.storeName(),
windows.maintainMs(),
windows.size(),
false,
windows.segmentInterval()
);
}
final StoreBuilder<WindowStore<K, VR>> builder = Stores.windowStoreBuilder(supplier,
materialized.keySerde(),
Expand Down
61 changes: 54 additions & 7 deletions streams/src/main/java/org/apache/kafka/streams/state/Stores.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public class Stores {
/**
* Create a persistent {@link KeyValueBytesStoreSupplier}.
* @param name name of the store (cannot be {@code null})
* @return an instance of a {@link KeyValueBytesStoreSupplier} that can be used
* @return an instance of a {@link KeyValueBytesStoreSupplier} that can be used
* to build a persistent store
*/
public static KeyValueBytesStoreSupplier persistentKeyValueStore(final String name) {
Expand All @@ -90,7 +90,7 @@ public static KeyValueBytesStoreSupplier persistentKeyValueStore(final String na
/**
* Create an in-memory {@link KeyValueBytesStoreSupplier}.
* @param name name of the store (cannot be {@code null})
* @return an instance of a {@link KeyValueBytesStoreSupplier} than can be used to
* @return an instance of a {@link KeyValueBytesStoreSupplier} than can be used to
* build an in-memory store
*/
public static KeyValueBytesStoreSupplier inMemoryKeyValueStore(final String name) {
Expand Down Expand Up @@ -151,25 +151,72 @@ public String metricsScope() {
* @param windowSize size of the windows (cannot be negative)
* @param retainDuplicates whether or not to retain duplicates.
* @return an instance of {@link WindowBytesStoreSupplier}
* @deprecated since 2.1 Use {@link Stores#persistentWindowStore(String, long, long, boolean, long)} instead
*/
@Deprecated
public static WindowBytesStoreSupplier persistentWindowStore(final String name,
final long retentionPeriod,
final int numSegments,
final long windowSize,
final boolean retainDuplicates) {
if (numSegments < 2) {
throw new IllegalArgumentException("numSegments cannot must smaller than 2");
}

final long legacySegmentInterval = Math.max(retentionPeriod / (numSegments - 1), 60_000L);

return persistentWindowStore(
name,
retentionPeriod,
windowSize,
retainDuplicates,
legacySegmentInterval
);
}

/**
* Create a persistent {@link WindowBytesStoreSupplier}.
* @param name name of the store (cannot be {@code null})
* @param retentionPeriod length of time to retain data in the store (cannot be negative)
* @param windowSize size of the windows (cannot be negative)
* @param retainDuplicates whether or not to retain duplicates.
* @return an instance of {@link WindowBytesStoreSupplier}
*/
public static WindowBytesStoreSupplier persistentWindowStore(final String name,
final long retentionPeriod,
final long windowSize,
final boolean retainDuplicates) {
// we're arbitrarily defaulting to segments no smaller than one minute.
final long defaultSegmentInterval = Math.max(retentionPeriod / 2, 60_000L);
return persistentWindowStore(name, retentionPeriod, windowSize, retainDuplicates, defaultSegmentInterval);
}

/**
* Create a persistent {@link WindowBytesStoreSupplier}.
* @param name name of the store (cannot be {@code null})
* @param retentionPeriod length of time to retain data in the store (cannot be negative)
* @param segmentInterval size of segments in ms (must be at least one minute)
* @param windowSize size of the windows (cannot be negative)
* @param retainDuplicates whether or not to retain duplicates.
* @return an instance of {@link WindowBytesStoreSupplier}
*/
public static WindowBytesStoreSupplier persistentWindowStore(final String name,
final long retentionPeriod,
final long windowSize,
final boolean retainDuplicates,
final long segmentInterval) {
Objects.requireNonNull(name, "name cannot be null");
if (retentionPeriod < 0) {
throw new IllegalArgumentException("retentionPeriod cannot be negative");
}
if (numSegments < 2) {
throw new IllegalArgumentException("numSegments cannot must smaller than 2");
}
if (windowSize < 0) {
throw new IllegalArgumentException("windowSize cannot be negative");
}
final long segmentIntervalMs = Math.max(retentionPeriod / (numSegments - 1), 60_000L);
if (segmentInterval < 60_000) {
throw new IllegalArgumentException("segmentInterval must be at least one minute");
}

return new RocksDbWindowBytesStoreSupplier(name, retentionPeriod, segmentIntervalMs, windowSize, retainDuplicates);
return new RocksDbWindowBytesStoreSupplier(name, retentionPeriod, segmentInterval, windowSize, retainDuplicates);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,22 @@ public interface WindowBytesStoreSupplier extends StoreSupplier<WindowStore<Byte
* It is also used to reduce the amount of data that is scanned when caching is enabled.
*
* @return number of segments
* @deprecated since 2.1. Use {@link WindowBytesStoreSupplier#segmentIntervalMs()} instead.
*/
@Deprecated
int segments();

/**
* The size of the windows any store created from this supplier is creating.
* The size of the segments (in milliseconds) the store has.
* If your store is segmented then this should be the size of segments in the underlying store.
* It is also used to reduce the amount of data that is scanned when caching is enabled.
*
* @return size of the segments (in milliseconds)
*/
long segmentIntervalMs();

/**
* The size of the windows (in milliseconds) any store created from this supplier is creating.
*
* @return window size
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public String metricsScope() {

@Override
public long segmentIntervalMs() {
// Selected somewhat arbitrarily. Profiling may reveal a different value is preferable.
return Math.max(retentionPeriod / 2, 60_000L);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,17 @@ public String metricsScope() {
return "rocksdb-window";
}

@Deprecated
@Override
public int segments() {
return (int) (retentionPeriod / segmentInterval) + 1;
}

@Override
public long segmentIntervalMs() {
return segmentInterval;
}

@Override
public long windowSize() {
return windowSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ private WindowStore<Bytes, byte[]> maybeWrapCaching(final WindowStore<Bytes, byt
keySerde,
valueSerde,
storeSupplier.windowSize(),
storeSupplier.segments());
storeSupplier.segmentIntervalMs());
}

private WindowStore<Bytes, byte[]> maybeWrapLogging(final WindowStore<Bytes, byte[]> inner) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,16 +112,6 @@ public void untilShouldSetMaintainDuration() {
assertEquals(windowSize, windowSpec.until(windowSize).maintainMs());
}

@Test
public void shouldUseWindowSizeForMaintainDurationWhenSizeLargerThanDefaultMaintainMs() {
final long size = Windows.DEFAULT_MAINTAIN_DURATION_MS;

final JoinWindows windowSpec = JoinWindows.of(size);
final long windowSize = windowSpec.size();

assertEquals(windowSize, windowSpec.maintainMs());
}

Copy link
Contributor Author

@vvcephei vvcephei Jun 21, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test didn't actually test... anything.

@Test
public void retentionTimeMustNoBeSmallerThanWindowSize() {
final JoinWindows windowSpec = JoinWindows.of(anySize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public void windowSizeMustNotBeZero() {

@Test
public void retentionTimeShouldBeGapIfGapIsLargerThanDefaultRetentionTime() {
final long windowGap = 2 * Windows.DEFAULT_MAINTAIN_DURATION_MS;
final long windowGap = 2 * SessionWindows.with(1).maintainMs();
assertEquals(windowGap, SessionWindows.with(windowGap).maintainMs());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public void shouldSetWindowRetentionTime() {

@Test
public void shouldUseWindowSizeAsRentitionTimeIfWindowSizeIsLargerThanDefaultRetentionTime() {
final long windowSize = 2 * Windows.DEFAULT_MAINTAIN_DURATION_MS;
final long windowSize = 2 * TimeWindows.of(1).maintainMs();
assertEquals(windowSize, TimeWindows.of(windowSize).maintainMs());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,13 @@ public long size() {
@Test
public void shouldSetNumberOfSegments() {
final int anySegmentSizeLargerThanOne = 5;
assertEquals(anySegmentSizeLargerThanOne, new TestWindows().segments(anySegmentSizeLargerThanOne).segments);
final TestWindows testWindow = new TestWindows();
final long maintainMs = testWindow.maintainMs();

assertEquals(
maintainMs / (anySegmentSizeLargerThanOne - 1),
testWindow.segments(anySegmentSizeLargerThanOne).segmentInterval()
);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
Expand Down Expand Up @@ -469,13 +469,18 @@ private void processStreamWithWindowStore(final String topic) {
setStreamProperties("simple-benchmark-streams-with-store");

final StreamsBuilder builder = new StreamsBuilder();
final StoreBuilder<WindowStore<Integer, byte[]>> storeBuilder
= Stores.windowStoreBuilder(Stores.persistentWindowStore("store",

final StoreBuilder<WindowStore<Integer, byte[]>> storeBuilder = Stores.windowStoreBuilder(
Stores.persistentWindowStore(
"store",
AGGREGATE_WINDOW_SIZE * 3,
3,
AGGREGATE_WINDOW_SIZE,
false),
INTEGER_SERDE, BYTE_SERDE);
false,
60_000L
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the arithmetic solution of the previous formula max((retention / (numSegments - 1)), 60,000) = max((1000 * 3 / (3 - 1)), 60,000) = max(3,000/2, 60,000) = 60,000. I point this out because it's important not to change any parameters on the benchmark.

),
INTEGER_SERDE,
BYTE_SERDE
);
builder.addStateStore(storeBuilder.withCachingEnabled());

final KStream<Integer, byte[]> source = builder.stream(topic);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,15 @@ public void shouldAddInternalTopicConfigForWindowStores() {
builder.setApplicationId("appId");
builder.addSource(null, "source", null, null, null, "topic");
builder.addProcessor("processor", new MockProcessorSupplier(), "source");
builder.addStateStore(Stores.windowStoreBuilder(Stores.persistentWindowStore("store", 30000, 3, 10000, false), Serdes.String(), Serdes.String()), "processor");

builder.addStateStore(
Stores.windowStoreBuilder(
Stores.persistentWindowStore("store", 30_000L, 10_000L, false),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in cases like this where we didn't actually care how many segments or what segment interval, I substituted in the new persistentWindowStore that doesn't take any of the segment parameters.

Serdes.String(),
Serdes.String()
),
"processor"
);
final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
final InternalTopologyBuilder.TopicsInfo topicsInfo = topicGroups.values().iterator().next();
final InternalTopicConfig topicConfig = topicsInfo.stateChangelogTopics.get("appId-store-changelog");
Expand Down
Loading