From ce24d579a23c7d6dd70fe2252fb908062e74a200 Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Sat, 14 Feb 2026 01:58:50 +0000 Subject: [PATCH 1/4] [Dataflow Streaming] Update windmill timer clasification logic to work with windmill state tag encoding v2 --- .../worker/StreamingModeExecutionContext.java | 28 ++++----- .../worker/WindmillKeyedWorkItem.java | 15 +++-- .../worker/WindmillTimerInternals.java | 28 ++++----- ...pacePrefix.java => WindmillTimerType.java} | 17 +++--- .../windmill/state/WindmillTagEncoding.java | 20 +++---- .../windmill/state/WindmillTagEncodingV1.java | 59 ++++++++++--------- .../windmill/state/WindmillTagEncodingV2.java | 43 +++++++------- .../StreamingGroupAlsoByWindowFnsTest.java | 2 +- .../worker/WindmillKeyedWorkItemTest.java | 2 +- .../state/WindmillTagEncodingV1Test.java | 18 +++--- .../state/WindmillTagEncodingV2Test.java | 55 +++++++++-------- 11 files changed, 138 insertions(+), 149 deletions(-) rename runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/{WindmillNamespacePrefix.java => WindmillTimerType.java} (79%) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java index 2c936f88e28c..b33a55ec924a 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java @@ -798,7 +798,7 @@ public void start( this.systemTimerInternals = new WindmillTimerInternals( stateFamily, - WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, + WindmillTimerType.SYSTEM_TIMER, processingTime, watermarks, windmillTagEncoding, @@ -807,7 +807,7 @@ public void start( this.userTimerInternals = new WindmillTimerInternals( stateFamily, - WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, + WindmillTimerType.USER_TIMER, processingTime, watermarks, windmillTagEncoding, @@ -832,17 +832,13 @@ public TimerData getNextFiredTimer(Coder windowCode if (cachedFiredSystemTimers == null) { cachedFiredSystemTimers = FluentIterable.from(StreamingModeExecutionContext.this.getFiredTimers()) - .filter( - timer -> - WindmillTimerInternals.isSystemTimer(timer) - && timer.getStateFamily().equals(stateFamily)) + .filter(timer -> timer.getStateFamily().equals(stateFamily)) .transform( timer -> windmillTagEncoding.windmillTimerToTimerData( - WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, - timer, - windowCoder, - getDrainMode())) + timer, windowCoder, getDrainMode())) + .filter(pair -> pair.getLeft() == WindmillTimerType.SYSTEM_TIMER) + .transform(pair -> pair.getRight()) .iterator(); } @@ -895,17 +891,13 @@ public TimerData getNextFiredUserTimer(Coder window cachedFiredUserTimers = Iterators.peekingIterator( FluentIterable.from(StreamingModeExecutionContext.this.getFiredTimers()) - .filter( - timer -> - WindmillTimerInternals.isUserTimer(timer) - && timer.getStateFamily().equals(stateFamily)) + .filter(timer -> timer.getStateFamily().equals(stateFamily)) .transform( timer -> windmillTagEncoding.windmillTimerToTimerData( - WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, - timer, - windowCoder, - getDrainMode())) + timer, windowCoder, getDrainMode())) + .filter(pair -> pair.getLeft() == WindmillTimerType.USER_TIMER) + .transform(pair -> pair.getRight()) .iterator()); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java index da69f1a23718..159432deb95f 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java @@ -17,6 +17,8 @@ */ package org.apache.beam.runners.dataflow.worker; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; + import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -25,6 +27,7 @@ import java.util.List; import java.util.Objects; import org.apache.beam.model.fnexecution.v1.BeamFnApi; +import org.apache.beam.repackaged.core.org.apache.commons.lang3.tuple.Pair; import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.runners.core.KeyedWorkItemCoder; import org.apache.beam.runners.core.TimerInternals.TimerData; @@ -101,12 +104,12 @@ public Iterable timersIterable() { return eventTimers .append(nonEventTimers) .transform( - timer -> - windmillTagEncoding.windmillTimerToTimerData( - WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, - timer, - windowCoder, - drainMode)); + timer -> { + Pair pair = + windmillTagEncoding.windmillTimerToTimerData(timer, windowCoder, drainMode); + checkState(pair.getLeft() == WindmillTimerType.SYSTEM_TIMER); + return pair.getRight(); + }); } @Override diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java index c62839333c7d..ec7a0adb7629 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java @@ -60,13 +60,13 @@ class WindmillTimerInternals implements TimerInternals { private final Watermarks watermarks; private final Instant processingTime; private final String stateFamily; - private final WindmillNamespacePrefix prefix; + private final WindmillTimerType type; private final Consumer onTimerModified; private final WindmillTagEncoding windmillTagEncoding; public WindmillTimerInternals( String stateFamily, // unique identifies a step - WindmillNamespacePrefix prefix, // partitions user and system namespaces into "/u" and "/s" + WindmillTimerType type, // partitions user and system namespaces into "/u" and "/s" Instant processingTime, Watermarks watermarks, WindmillTagEncoding windmillTagEncoding, @@ -74,14 +74,14 @@ public WindmillTimerInternals( this.watermarks = watermarks; this.processingTime = checkNotNull(processingTime); this.stateFamily = stateFamily; - this.prefix = prefix; + this.type = type; this.windmillTagEncoding = windmillTagEncoding; this.onTimerModified = onTimerModified; } - public WindmillTimerInternals withPrefix(WindmillNamespacePrefix prefix) { + public WindmillTimerInternals withType(WindmillTimerType type) { return new WindmillTimerInternals( - stateFamily, prefix, processingTime, watermarks, windmillTagEncoding, onTimerModified); + stateFamily, type, processingTime, watermarks, windmillTagEncoding, onTimerModified); } @Override @@ -197,7 +197,7 @@ public void persistTo(Windmill.WorkItemCommitRequest.Builder outputBuilder) { Timer.Builder timer = windmillTagEncoding.buildWindmillTimerFromTimerData( - stateFamily, prefix, timerData, outputBuilder.addOutputTimersBuilder()); + stateFamily, type, timerData, outputBuilder.addOutputTimersBuilder()); if (value.getValue()) { // Setting the timer. If it is a user timer, set a hold. @@ -210,7 +210,7 @@ public void persistTo(Windmill.WorkItemCommitRequest.Builder outputBuilder) { // Setting a timer, clear any prior hold and set to the new value outputBuilder .addWatermarkHoldsBuilder() - .setTag(windmillTagEncoding.timerHoldTag(prefix, timerData, timer.getTag())) + .setTag(windmillTagEncoding.timerHoldTag(type, timerData, timer.getTag())) .setStateFamily(stateFamily) .setReset(true) .addTimestamps( @@ -219,7 +219,7 @@ public void persistTo(Windmill.WorkItemCommitRequest.Builder outputBuilder) { // Clear the hold in case a previous iteration of this timer set one. outputBuilder .addWatermarkHoldsBuilder() - .setTag(windmillTagEncoding.timerHoldTag(prefix, timerData, timer.getTag())) + .setTag(windmillTagEncoding.timerHoldTag(type, timerData, timer.getTag())) .setStateFamily(stateFamily) .setReset(true); } @@ -234,7 +234,7 @@ public void persistTo(Windmill.WorkItemCommitRequest.Builder outputBuilder) { // We are deleting timer; clear the hold outputBuilder .addWatermarkHoldsBuilder() - .setTag(windmillTagEncoding.timerHoldTag(prefix, timerData, timer.getTag())) + .setTag(windmillTagEncoding.timerHoldTag(type, timerData, timer.getTag())) .setStateFamily(stateFamily) .setReset(true); } @@ -247,15 +247,7 @@ public void persistTo(Windmill.WorkItemCommitRequest.Builder outputBuilder) { private boolean needsWatermarkHold(TimerData timerData) { // If it is a user timer or a system timer with outputTimestamp different than timestamp - return WindmillNamespacePrefix.USER_NAMESPACE_PREFIX.equals(prefix) + return WindmillTimerType.USER_TIMER.equals(type) || !timerData.getTimestamp().isEqual(timerData.getOutputTimestamp()); } - - public static boolean isSystemTimer(Windmill.Timer timer) { - return timer.getTag().startsWith(WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX.byteString()); - } - - public static boolean isUserTimer(Windmill.Timer timer) { - return timer.getTag().startsWith(WindmillNamespacePrefix.USER_NAMESPACE_PREFIX.byteString()); - } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillNamespacePrefix.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerType.java similarity index 79% rename from runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillNamespacePrefix.java rename to runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerType.java index 4dc95aa1a0c2..c1af0acfb1a4 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillNamespacePrefix.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerType.java @@ -20,27 +20,24 @@ import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString; -/** - * A prefix for a Windmill state or timer tag to separate user state and timers from system state - * and timers. - */ +/** A type for a Windmill timer to separate user state and timers from system state and timers. */ @Internal -public enum WindmillNamespacePrefix { - USER_NAMESPACE_PREFIX { +public enum WindmillTimerType { + USER_TIMER { @Override - public ByteString byteString() { + public ByteString namespacePrefix() { return USER_NAMESPACE_BYTESTRING; } }, - SYSTEM_NAMESPACE_PREFIX { + SYSTEM_TIMER { @Override - public ByteString byteString() { + public ByteString namespacePrefix() { return SYSTEM_NAMESPACE_BYTESTRING; } }; - public abstract ByteString byteString(); + public abstract ByteString namespacePrefix(); private static final ByteString USER_NAMESPACE_BYTESTRING = ByteString.copyFromUtf8("/u"); private static final ByteString SYSTEM_NAMESPACE_BYTESTRING = ByteString.copyFromUtf8("/s"); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncoding.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncoding.java index a979a1d982c4..00bef5e7a148 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncoding.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncoding.java @@ -18,11 +18,12 @@ package org.apache.beam.runners.dataflow.worker.windmill.state; import javax.annotation.concurrent.ThreadSafe; +import org.apache.beam.repackaged.core.org.apache.commons.lang3.tuple.Pair; import org.apache.beam.runners.core.StateNamespace; import org.apache.beam.runners.core.StateTag; import org.apache.beam.runners.core.TimerInternals.TimerData; -import org.apache.beam.runners.dataflow.worker.WindmillNamespacePrefix; import org.apache.beam.runners.dataflow.worker.WindmillTimeUtils; +import org.apache.beam.runners.dataflow.worker.WindmillTimerType; import org.apache.beam.runners.dataflow.worker.util.common.worker.InternedByteString; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Timer; import org.apache.beam.sdk.annotations.Internal; @@ -58,7 +59,7 @@ public abstract class WindmillTagEncoding { * @param timerTag tag of the timer that maps to the hold. */ public abstract ByteString timerHoldTag( - WindmillNamespacePrefix prefix, TimerData timerData, ByteString timerTag); + WindmillTimerType windmillTimerType, TimerData timerData, ByteString timerTag); /** * Produce a tag that is guaranteed to be unique for the given prefix, namespace, domain and @@ -66,14 +67,11 @@ public abstract ByteString timerHoldTag( * *

This is necessary because Windmill will deduplicate based only on this tag. */ - public abstract ByteString timerTag(WindmillNamespacePrefix prefix, TimerData timerData); + public abstract ByteString timerTag(WindmillTimerType windmillTimerType, TimerData timerData); /** Converts Windmill Timer to beam TimerData */ - public abstract TimerData windmillTimerToTimerData( - WindmillNamespacePrefix prefix, - Timer timer, - Coder windowCoder, - boolean draining); + public abstract Pair windmillTimerToTimerData( + Timer timer, Coder windowCoder, boolean draining); /** * Uses the given {@link Timer} builder to build a windmill {@link Timer} from {@link TimerData}. @@ -82,11 +80,13 @@ public abstract TimerData windmillTimerToTimerData( */ public Timer.Builder buildWindmillTimerFromTimerData( @Nullable String stateFamily, - WindmillNamespacePrefix prefix, + WindmillTimerType windmillTimerType, TimerData timerData, Timer.Builder builder) { - builder.setTag(timerTag(prefix, timerData)).setType(timerType(timerData.getDomain())); + builder + .setTag(timerTag(windmillTimerType, timerData)) + .setType(timerType(timerData.getDomain())); if (stateFamily != null) { builder.setStateFamily(stateFamily); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1.java index 14c3f8c01794..2a7ed6def09c 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1.java @@ -17,16 +17,15 @@ */ package org.apache.beam.runners.dataflow.worker.windmill.state; -import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; - import java.io.IOException; import javax.annotation.concurrent.ThreadSafe; +import org.apache.beam.repackaged.core.org.apache.commons.lang3.tuple.Pair; import org.apache.beam.runners.core.StateNamespace; import org.apache.beam.runners.core.StateNamespaces; import org.apache.beam.runners.core.StateTag; import org.apache.beam.runners.core.TimerInternals.TimerData; -import org.apache.beam.runners.dataflow.worker.WindmillNamespacePrefix; import org.apache.beam.runners.dataflow.worker.WindmillTimeUtils; +import org.apache.beam.runners.dataflow.worker.WindmillTimerType; import org.apache.beam.runners.dataflow.worker.util.ThreadLocalByteStringOutputStream; import org.apache.beam.runners.dataflow.worker.util.ThreadLocalByteStringOutputStream.StreamHandle; import org.apache.beam.runners.dataflow.worker.util.common.worker.InternedByteString; @@ -71,11 +70,11 @@ public InternedByteString stateTag(StateNamespace namespace, StateTag address /** {@inheritDoc} */ @Override public ByteString timerHoldTag( - WindmillNamespacePrefix prefix, TimerData timerData, ByteString unusedTimerTag) { + WindmillTimerType windmillTimerType, TimerData timerData, ByteString unusedTimerTag) { String tagString; if ("".equals(timerData.getTimerFamilyId())) { tagString = - prefix.byteString().toStringUtf8() + windmillTimerType.namespacePrefix().toStringUtf8() + // this never ends with a slash TIMER_HOLD_PREFIX + // this never ends with a slash @@ -86,7 +85,7 @@ public ByteString timerHoldTag( ; } else { tagString = - prefix.byteString().toStringUtf8() + windmillTimerType.namespacePrefix().toStringUtf8() + // this never ends with a slash TIMER_HOLD_PREFIX + // this never ends with a slash @@ -105,11 +104,11 @@ public ByteString timerHoldTag( /** {@inheritDoc} */ @Override - public ByteString timerTag(WindmillNamespacePrefix prefix, TimerData timerData) { + public ByteString timerTag(WindmillTimerType windmillTimerType, TimerData timerData) { String tagString; if (useNewTimerTagEncoding(timerData)) { tagString = - prefix.byteString().toStringUtf8() + windmillTimerType.namespacePrefix().toStringUtf8() + // this never ends with a slash timerData.getNamespace().stringKey() + // this must begin and end with a slash @@ -121,7 +120,7 @@ public ByteString timerTag(WindmillNamespacePrefix prefix, TimerData timerData) } else { // Timers without timerFamily would have timerFamily would be an empty string tagString = - prefix.byteString().toStringUtf8() + windmillTimerType.namespacePrefix().toStringUtf8() + // this never ends with a slash timerData.getNamespace().stringKey() + // this must begin and end with a slash @@ -134,11 +133,8 @@ public ByteString timerTag(WindmillNamespacePrefix prefix, TimerData timerData) /** {@inheritDoc} */ @Override - public TimerData windmillTimerToTimerData( - WindmillNamespacePrefix prefix, - Timer timer, - Coder windowCoder, - boolean draining) { + public Pair windmillTimerToTimerData( + Timer timer, Coder windowCoder, boolean draining) { // The tag is a path-structure string but cheaper to parse than a proper URI. It follows // this pattern, where no component but the ID can contain a slash @@ -159,17 +155,21 @@ public TimerData windmillTimerToTimerData( // - the Global StateNamespace is different, and becomes "/" // - the id is totally arbitrary; currently unescaped though that could change - ByteString tag = timer.getTag(); - checkArgument( - tag.startsWith(prefix.byteString()), - "Expected timer tag %s to start with prefix %s", - tag, - prefix.byteString()); + ByteString tag = ByteString.copyFrom(timer.getTag().asReadOnlyByteBuffer()); + WindmillTimerType timerType; + if (tag.startsWith(WindmillTimerType.SYSTEM_TIMER.namespacePrefix())) { + timerType = WindmillTimerType.SYSTEM_TIMER; + } else if (tag.startsWith(WindmillTimerType.USER_TIMER.namespacePrefix())) { + timerType = WindmillTimerType.USER_TIMER; + } else { + throw new IllegalArgumentException("Unknown timer tag prefix: " + tag.toStringUtf8()); + } Instant timestamp = WindmillTimeUtils.windmillToHarnessTimestamp(timer.getTimestamp()); // Parse the namespace. - int namespaceStart = prefix.byteString().size(); // drop the prefix, leave the begin slash + int namespaceStart = + timerType.namespacePrefix().size(); // drop the prefix, leave the begin slash int namespaceEnd = namespaceStart; while (namespaceEnd < tag.size() && tag.byteAt(namespaceEnd) != '+') { namespaceEnd++; @@ -225,15 +225,16 @@ public TimerData windmillTimerToTimerData( } StateNamespace namespace = StateNamespaces.fromString(namespaceString, windowCoder); - return TimerData.of( - timerId, - timerFamily, - namespace, - timestamp, - outputTimestamp, - timerTypeToTimeDomain(timer.getType())); + return Pair.of( + timerType, + TimerData.of( + timerId, + timerFamily, + namespace, + timestamp, + outputTimestamp, + timerTypeToTimeDomain(timer.getType()))); // todo add draining (https://github.com/apache/beam/issues/36884) - } private static boolean useNewTimerTagEncoding(TimerData timerData) { diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2.java index 0702c3752820..d25f95000472 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2.java @@ -17,11 +17,10 @@ */ package org.apache.beam.runners.dataflow.worker.windmill.state; -import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; - import java.io.IOException; import java.io.InputStream; import javax.annotation.concurrent.ThreadSafe; +import org.apache.beam.repackaged.core.org.apache.commons.lang3.tuple.Pair; import org.apache.beam.runners.core.StateNamespace; import org.apache.beam.runners.core.StateNamespaces; import org.apache.beam.runners.core.StateNamespaces.GlobalNamespace; @@ -30,8 +29,8 @@ import org.apache.beam.runners.core.StateTag; import org.apache.beam.runners.core.StateTags; import org.apache.beam.runners.core.TimerInternals.TimerData; -import org.apache.beam.runners.dataflow.worker.WindmillNamespacePrefix; import org.apache.beam.runners.dataflow.worker.WindmillTimeUtils; +import org.apache.beam.runners.dataflow.worker.WindmillTimerType; import org.apache.beam.runners.dataflow.worker.util.ThreadLocalByteStringOutputStream; import org.apache.beam.runners.dataflow.worker.util.ThreadLocalByteStringOutputStream.StreamHandle; import org.apache.beam.runners.dataflow.worker.util.common.worker.InternedByteString; @@ -219,7 +218,7 @@ public InternedByteString stateTag(StateNamespace namespace, StateTag address /** {@inheritDoc} */ @Override public ByteString timerHoldTag( - WindmillNamespacePrefix prefix, TimerData timerData, ByteString timerTag) { + WindmillTimerType windmillTimerType, TimerData timerData, ByteString timerTag) { // Same encoding for timer tag and timer hold tag. // They are put in different places and won't collide. return timerTag; @@ -227,16 +226,16 @@ public ByteString timerHoldTag( /** {@inheritDoc} */ @Override - public ByteString timerTag(WindmillNamespacePrefix prefix, TimerData timerData) { + public ByteString timerTag(WindmillTimerType windmillTimerType, TimerData timerData) { try (StreamHandle streamHandle = ThreadLocalByteStringOutputStream.acquire()) { ByteStringOutputStream stream = streamHandle.stream(); encodeNamespace(timerData.getNamespace(), stream); - if (WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX.equals(prefix)) { + if (WindmillTimerType.SYSTEM_TIMER.equals(windmillTimerType)) { stream.write(SYSTEM_TIMER_BYTE); - } else if (WindmillNamespacePrefix.USER_NAMESPACE_PREFIX.equals(prefix)) { + } else if (WindmillTimerType.USER_TIMER.equals(windmillTimerType)) { stream.write(USER_TIMER_BYTE); } else { - throw new IllegalStateException("Unexpected WindmillNamespacePrefix" + prefix); + throw new IllegalStateException("Unexpected WindmillTimerType" + windmillTimerType); } StringUtf8Coder.of().encode(timerData.getTimerFamilyId(), stream); StringUtf8Coder.of().encode(timerData.getTimerId(), stream); @@ -248,21 +247,19 @@ public ByteString timerTag(WindmillNamespacePrefix prefix, TimerData timerData) /** {@inheritDoc} */ @Override - public TimerData windmillTimerToTimerData( - WindmillNamespacePrefix prefix, - Timer timer, - Coder windowCoder, - boolean draining) { + public Pair windmillTimerToTimerData( + Timer timer, Coder windowCoder, boolean draining) { InputStream stream = timer.getTag().newInput(); try { StateNamespace stateNamespace = decodeNamespace(stream, windowCoder); int nextByte = stream.read(); + WindmillTimerType timerType; if (nextByte == SYSTEM_TIMER_BYTE) { - checkState(WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX.equals(prefix)); + timerType = WindmillTimerType.SYSTEM_TIMER; } else if (nextByte == USER_TIMER_BYTE) { - checkState(WindmillNamespacePrefix.USER_NAMESPACE_PREFIX.equals(prefix)); + timerType = WindmillTimerType.USER_TIMER; } else { throw new IllegalStateException("Unexpected timer tag byte: " + nextByte); } @@ -282,13 +279,15 @@ public TimerData windmillTimerToTimerData( } } - return TimerData.of( - timerId, - timerFamilyId, - stateNamespace, - timestamp, - outputTimestamp, - timerTypeToTimeDomain(timer.getType())); + return Pair.of( + timerType, + TimerData.of( + timerId, + timerFamilyId, + stateNamespace, + timestamp, + outputTimestamp, + timerTypeToTimeDomain(timer.getType()))); } catch (IOException e) { throw new RuntimeException(e); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java index 35dc19dd7816..20dec94de088 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java @@ -152,7 +152,7 @@ private void addTimer( .setTag( WindmillTagEncodingV1.instance() .timerTag( - WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, + WindmillTimerType.SYSTEM_TIMER, TimerData.of( namespace, timestamp, diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java index 9062c881096f..e5de780ac4eb 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java @@ -198,7 +198,7 @@ private Windmill.Timer makeSerializedTimer( return Windmill.Timer.newBuilder() .setTag( windmillTagEncoding.timerTag( - WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, + WindmillTimerType.SYSTEM_TIMER, TimerData.of( ns, new Instant(timestamp), diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1Test.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1Test.java index 48751c57754c..1290c530da07 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1Test.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1Test.java @@ -23,13 +23,14 @@ import java.io.IOException; import java.util.List; +import org.apache.beam.repackaged.core.org.apache.commons.lang3.tuple.Pair; import org.apache.beam.runners.core.StateNamespace; import org.apache.beam.runners.core.StateNamespaceForTest; import org.apache.beam.runners.core.StateNamespaces; import org.apache.beam.runners.core.StateTag; import org.apache.beam.runners.core.StateTags; import org.apache.beam.runners.core.TimerInternals.TimerData; -import org.apache.beam.runners.dataflow.worker.WindmillNamespacePrefix; +import org.apache.beam.runners.dataflow.worker.WindmillTimerType; import org.apache.beam.runners.dataflow.worker.util.common.worker.InternedByteString; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Timer; import org.apache.beam.sdk.coders.Coder; @@ -140,7 +141,7 @@ public void testTimerDataToFromTimer() { StateNamespace namespace = coderAndNamespace.getValue(); for (TimeDomain timeDomain : TimeDomain.values()) { - for (WindmillNamespacePrefix prefix : WindmillNamespacePrefix.values()) { + for (WindmillTimerType prefix : WindmillTimerType.values()) { for (Instant timestamp : TEST_TIMESTAMPS) { List anonymousTimers = ImmutableList.of( @@ -157,16 +158,17 @@ public void testTimerDataToFromTimer() { timer.getOutputTimestamp().isBefore(BoundedWindow.TIMESTAMP_MIN_VALUE) ? BoundedWindow.TIMESTAMP_MIN_VALUE : timer.getOutputTimestamp(); - TimerData computed = + Pair computedPair = WindmillTagEncodingV1.instance() .windmillTimerToTimerData( - prefix, WindmillTagEncodingV1.instance() .buildWindmillTimerFromTimerData( stateFamily, prefix, timer, Timer.newBuilder()) .build(), coder, false); + TimerData computed = computedPair.getRight(); + assertEquals(prefix, computedPair.getLeft()); // The function itself bounds output, so we dont expect the original input as the // output, we expect it to be bounded TimerData expected = @@ -221,17 +223,17 @@ public void testTimerDataToFromTimer() { timer.getTimestamp(), expectedTimestamp, timer.getDomain()); - assertThat( + Pair computedPair = WindmillTagEncodingV1.instance() .windmillTimerToTimerData( - prefix, WindmillTagEncodingV1.instance() .buildWindmillTimerFromTimerData( stateFamily, prefix, timer, Timer.newBuilder()) .build(), coder, - false), - equalTo(expected)); + false); + assertEquals(prefix, computedPair.getLeft()); + assertThat(computedPair.getRight(), equalTo(expected)); } } } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2Test.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2Test.java index 1284c46a99ab..2f16ea6855e0 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2Test.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2Test.java @@ -29,14 +29,15 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.stream.Collectors; import java.util.stream.IntStream; +import org.apache.beam.repackaged.core.org.apache.commons.lang3.tuple.Pair; import org.apache.beam.runners.core.StateNamespace; import org.apache.beam.runners.core.StateNamespaces; import org.apache.beam.runners.core.StateNamespaces.GlobalNamespace; import org.apache.beam.runners.core.StateTag; import org.apache.beam.runners.core.StateTags; import org.apache.beam.runners.core.TimerInternals.TimerData; -import org.apache.beam.runners.dataflow.worker.WindmillNamespacePrefix; import org.apache.beam.runners.dataflow.worker.WindmillTimeUtils; +import org.apache.beam.runners.dataflow.worker.WindmillTimerType; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Timer; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.InstantCoder; @@ -235,52 +236,52 @@ public static Collection data() { ImmutableList.of( new Object[] { GLOBAL_NAMESPACE, - WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, + WindmillTimerType.USER_TIMER, GLOBAL_NAMESPACE_BYTES.concat(expectedUserTimerBytes) }, new Object[] { GLOBAL_NAMESPACE, - WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, + WindmillTimerType.SYSTEM_TIMER, GLOBAL_NAMESPACE_BYTES.concat(expectedSystemTimerBytes) }, new Object[] { INTERVAL_WINDOW_NAMESPACE, - WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, + WindmillTimerType.USER_TIMER, INTERVAL_WINDOW_NAMESPACE_BYTES.concat(expectedUserTimerBytes) }, new Object[] { INTERVAL_WINDOW_NAMESPACE, - WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, + WindmillTimerType.SYSTEM_TIMER, INTERVAL_WINDOW_NAMESPACE_BYTES.concat(expectedSystemTimerBytes) }, new Object[] { OTHER_WINDOW_NAMESPACE, - WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, + WindmillTimerType.USER_TIMER, OTHER_WINDOW_NAMESPACE_BYTES.concat(expectedUserTimerBytes) }, new Object[] { OTHER_WINDOW_NAMESPACE, - WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, + WindmillTimerType.SYSTEM_TIMER, OTHER_WINDOW_NAMESPACE_BYTES.concat(expectedSystemTimerBytes) }, new Object[] { INTERVAL_WINDOW_AND_TRIGGER_NAMESPACE, - WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, + WindmillTimerType.USER_TIMER, INTERVAL_WINDOW_AND_TRIGGER_NAMESPACE_BYTES.concat(expectedUserTimerBytes) }, new Object[] { INTERVAL_WINDOW_AND_TRIGGER_NAMESPACE, - WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, + WindmillTimerType.SYSTEM_TIMER, INTERVAL_WINDOW_AND_TRIGGER_NAMESPACE_BYTES.concat(expectedSystemTimerBytes) }, new Object[] { OTHER_WINDOW_AND_TRIGGER_NAMESPACE, - WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, + WindmillTimerType.USER_TIMER, OTHER_WINDOW_AND_TRIGGER_NAMESPACE_BYTES.concat(expectedUserTimerBytes) }, new Object[] { OTHER_WINDOW_AND_TRIGGER_NAMESPACE, - WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, + WindmillTimerType.SYSTEM_TIMER, OTHER_WINDOW_AND_TRIGGER_NAMESPACE_BYTES.concat(expectedSystemTimerBytes) }); @@ -298,7 +299,7 @@ public static Collection data() { public StateNamespace namespace; @Parameter(1) - public WindmillNamespacePrefix prefix; + public WindmillTimerType prefix; @Parameter(2) public ByteString expectedBytes; @@ -340,61 +341,61 @@ public static Collection data() { ImmutableList.of( new Object[] { GLOBAL_NAMESPACE, - WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, + WindmillTimerType.USER_TIMER, GLOBAL_NAMESPACE_BYTES.concat(USER_TIMER_BYTES), GlobalWindow.Coder.INSTANCE }, new Object[] { GLOBAL_NAMESPACE, - WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, + WindmillTimerType.SYSTEM_TIMER, GLOBAL_NAMESPACE_BYTES.concat(SYSTEM_TIMER_BYTES), GlobalWindow.Coder.INSTANCE }, new Object[] { INTERVAL_WINDOW_NAMESPACE, - WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, + WindmillTimerType.USER_TIMER, INTERVAL_WINDOW_NAMESPACE_BYTES.concat(USER_TIMER_BYTES), IntervalWindow.getCoder() }, new Object[] { INTERVAL_WINDOW_NAMESPACE, - WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, + WindmillTimerType.SYSTEM_TIMER, INTERVAL_WINDOW_NAMESPACE_BYTES.concat(SYSTEM_TIMER_BYTES), IntervalWindow.getCoder() }, new Object[] { OTHER_WINDOW_NAMESPACE, - WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, + WindmillTimerType.USER_TIMER, OTHER_WINDOW_NAMESPACE_BYTES.concat(USER_TIMER_BYTES), new CustomWindow.CustomWindowCoder() }, new Object[] { OTHER_WINDOW_NAMESPACE, - WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, + WindmillTimerType.SYSTEM_TIMER, OTHER_WINDOW_NAMESPACE_BYTES.concat(SYSTEM_TIMER_BYTES), new CustomWindow.CustomWindowCoder() }, new Object[] { INTERVAL_WINDOW_AND_TRIGGER_NAMESPACE, - WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, + WindmillTimerType.USER_TIMER, INTERVAL_WINDOW_AND_TRIGGER_NAMESPACE_BYTES.concat(USER_TIMER_BYTES), IntervalWindow.getCoder() }, new Object[] { INTERVAL_WINDOW_AND_TRIGGER_NAMESPACE, - WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, + WindmillTimerType.SYSTEM_TIMER, INTERVAL_WINDOW_AND_TRIGGER_NAMESPACE_BYTES.concat(SYSTEM_TIMER_BYTES), IntervalWindow.getCoder() }, new Object[] { OTHER_WINDOW_AND_TRIGGER_NAMESPACE, - WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, + WindmillTimerType.USER_TIMER, OTHER_WINDOW_AND_TRIGGER_NAMESPACE_BYTES.concat(USER_TIMER_BYTES), new CustomWindow.CustomWindowCoder() }, new Object[] { OTHER_WINDOW_AND_TRIGGER_NAMESPACE, - WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, + WindmillTimerType.SYSTEM_TIMER, OTHER_WINDOW_AND_TRIGGER_NAMESPACE_BYTES.concat(SYSTEM_TIMER_BYTES), new CustomWindow.CustomWindowCoder() }); @@ -415,7 +416,7 @@ public static Collection data() { public StateNamespace namespace; @Parameter(1) - public WindmillNamespacePrefix prefix; + public WindmillTimerType prefix; @Parameter(2) public ByteString timerTag; @@ -444,8 +445,10 @@ public void testTimerDataFromTimer() { .setMetadataTimestamp(WindmillTimeUtils.harnessToWindmillTimestamp(outputTimestamp)) .setType(timerType(timeDomain)) .build(); - assertEquals( - timerData, encoding.windmillTimerToTimerData(prefix, timer, windowCoder, draining)); + Pair computedPair = + encoding.windmillTimerToTimerData(timer, windowCoder, draining); + assertEquals(prefix, computedPair.getLeft()); + assertEquals(timerData, computedPair.getRight()); } } @@ -467,7 +470,7 @@ public void testTimerHoldTagUsesTimerTag() { ByteString timerTag = ByteString.copyFrom(bytes); assertEquals( WindmillTagEncodingV2.instance() - .timerHoldTag(WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, timerData, timerTag), + .timerHoldTag(WindmillTimerType.SYSTEM_TIMER, timerData, timerTag), timerTag); } } From e25b3d66ae7544e1ad46140428e9d4ab469a158e Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Thu, 19 Feb 2026 10:51:57 +0000 Subject: [PATCH 2/4] Address comments --- .../worker/StreamingModeExecutionContext.java | 14 ++++-- .../worker/WindmillKeyedWorkItem.java | 9 ++-- .../worker/WindmillTimerInternals.java | 2 +- .../dataflow/worker/WindmillTimerType.java | 21 +------- .../windmill/state/WindmillTagEncoding.java | 8 ++- .../windmill/state/WindmillTagEncodingV1.java | 50 +++++++++++++++---- .../windmill/state/WindmillTagEncodingV2.java | 5 +- .../windmill/state/WindmillTimerData.java | 34 +++++++++++++ .../state/WindmillTagEncodingV1Test.java | 19 ++++--- .../state/WindmillTagEncodingV2Test.java | 18 +++---- 10 files changed, 114 insertions(+), 66 deletions(-) create mode 100644 runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTimerData.java diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java index b33a55ec924a..b45688771437 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java @@ -66,6 +66,7 @@ import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTagEncoding; import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTagEncodingV1; import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTagEncodingV2; +import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTimerData; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.UnboundedSource; @@ -837,8 +838,10 @@ public TimerData getNextFiredTimer(Coder windowCode timer -> windmillTagEncoding.windmillTimerToTimerData( timer, windowCoder, getDrainMode())) - .filter(pair -> pair.getLeft() == WindmillTimerType.SYSTEM_TIMER) - .transform(pair -> pair.getRight()) + .filter( + windmillTimerData -> + windmillTimerData.getWindmillTimerType() == WindmillTimerType.SYSTEM_TIMER) + .transform(WindmillTimerData::getTimerData) .iterator(); } @@ -896,8 +899,11 @@ public TimerData getNextFiredUserTimer(Coder window timer -> windmillTagEncoding.windmillTimerToTimerData( timer, windowCoder, getDrainMode())) - .filter(pair -> pair.getLeft() == WindmillTimerType.USER_TIMER) - .transform(pair -> pair.getRight()) + .filter( + windmillTimerData -> + windmillTimerData.getWindmillTimerType() + == WindmillTimerType.USER_TIMER) + .transform(WindmillTimerData::getTimerData) .iterator()); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java index 159432deb95f..59489babf0bd 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java @@ -27,13 +27,13 @@ import java.util.List; import java.util.Objects; import org.apache.beam.model.fnexecution.v1.BeamFnApi; -import org.apache.beam.repackaged.core.org.apache.commons.lang3.tuple.Pair; import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.runners.core.KeyedWorkItemCoder; import org.apache.beam.runners.core.TimerInternals.TimerData; import org.apache.beam.runners.dataflow.worker.windmill.Windmill; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Timer; import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTagEncoding; +import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTimerData; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StructuredCoder; @@ -105,10 +105,11 @@ public Iterable timersIterable() { .append(nonEventTimers) .transform( timer -> { - Pair pair = + WindmillTimerData windmillTimerData = windmillTagEncoding.windmillTimerToTimerData(timer, windowCoder, drainMode); - checkState(pair.getLeft() == WindmillTimerType.SYSTEM_TIMER); - return pair.getRight(); + checkState( + windmillTimerData.getWindmillTimerType() == WindmillTimerType.SYSTEM_TIMER); + return windmillTimerData.getTimerData(); }); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java index ec7a0adb7629..07ce62d59339 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java @@ -66,7 +66,7 @@ class WindmillTimerInternals implements TimerInternals { public WindmillTimerInternals( String stateFamily, // unique identifies a step - WindmillTimerType type, // partitions user and system namespaces into "/u" and "/s" + WindmillTimerType type, Instant processingTime, Watermarks watermarks, WindmillTagEncoding windmillTagEncoding, diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerType.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerType.java index c1af0acfb1a4..a421d94a5827 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerType.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerType.java @@ -18,27 +18,10 @@ package org.apache.beam.runners.dataflow.worker; import org.apache.beam.sdk.annotations.Internal; -import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString; /** A type for a Windmill timer to separate user state and timers from system state and timers. */ @Internal public enum WindmillTimerType { - USER_TIMER { - @Override - public ByteString namespacePrefix() { - return USER_NAMESPACE_BYTESTRING; - } - }, - - SYSTEM_TIMER { - @Override - public ByteString namespacePrefix() { - return SYSTEM_NAMESPACE_BYTESTRING; - } - }; - - public abstract ByteString namespacePrefix(); - - private static final ByteString USER_NAMESPACE_BYTESTRING = ByteString.copyFromUtf8("/u"); - private static final ByteString SYSTEM_NAMESPACE_BYTESTRING = ByteString.copyFromUtf8("/s"); + USER_TIMER, + SYSTEM_TIMER } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncoding.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncoding.java index 00bef5e7a148..f9e1e7f8cab0 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncoding.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncoding.java @@ -18,7 +18,6 @@ package org.apache.beam.runners.dataflow.worker.windmill.state; import javax.annotation.concurrent.ThreadSafe; -import org.apache.beam.repackaged.core.org.apache.commons.lang3.tuple.Pair; import org.apache.beam.runners.core.StateNamespace; import org.apache.beam.runners.core.StateTag; import org.apache.beam.runners.core.TimerInternals.TimerData; @@ -62,15 +61,14 @@ public abstract ByteString timerHoldTag( WindmillTimerType windmillTimerType, TimerData timerData, ByteString timerTag); /** - * Produce a tag that is guaranteed to be unique for the given prefix, namespace, domain and - * timestamp. + * Produce a tag that is guaranteed to be unique for the given timer type and TimerData * *

This is necessary because Windmill will deduplicate based only on this tag. */ public abstract ByteString timerTag(WindmillTimerType windmillTimerType, TimerData timerData); - /** Converts Windmill Timer to beam TimerData */ - public abstract Pair windmillTimerToTimerData( + /** Converts Windmill Timer to TimerData */ + public abstract WindmillTimerData windmillTimerToTimerData( Timer timer, Coder windowCoder, boolean draining); /** diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1.java index 2a7ed6def09c..9efdf5e925c1 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1.java @@ -19,7 +19,6 @@ import java.io.IOException; import javax.annotation.concurrent.ThreadSafe; -import org.apache.beam.repackaged.core.org.apache.commons.lang3.tuple.Pair; import org.apache.beam.runners.core.StateNamespace; import org.apache.beam.runners.core.StateNamespaces; import org.apache.beam.runners.core.StateTag; @@ -74,7 +73,7 @@ public ByteString timerHoldTag( String tagString; if ("".equals(timerData.getTimerFamilyId())) { tagString = - windmillTimerType.namespacePrefix().toStringUtf8() + namespacePrefixString(windmillTimerType) + // this never ends with a slash TIMER_HOLD_PREFIX + // this never ends with a slash @@ -85,7 +84,7 @@ public ByteString timerHoldTag( ; } else { tagString = - windmillTimerType.namespacePrefix().toStringUtf8() + namespacePrefixString(windmillTimerType) + // this never ends with a slash TIMER_HOLD_PREFIX + // this never ends with a slash @@ -108,7 +107,7 @@ public ByteString timerTag(WindmillTimerType windmillTimerType, TimerData timerD String tagString; if (useNewTimerTagEncoding(timerData)) { tagString = - windmillTimerType.namespacePrefix().toStringUtf8() + namespacePrefixString(windmillTimerType) + // this never ends with a slash timerData.getNamespace().stringKey() + // this must begin and end with a slash @@ -120,7 +119,7 @@ public ByteString timerTag(WindmillTimerType windmillTimerType, TimerData timerD } else { // Timers without timerFamily would have timerFamily would be an empty string tagString = - windmillTimerType.namespacePrefix().toStringUtf8() + namespacePrefixString(windmillTimerType) + // this never ends with a slash timerData.getNamespace().stringKey() + // this must begin and end with a slash @@ -133,7 +132,7 @@ public ByteString timerTag(WindmillTimerType windmillTimerType, TimerData timerD /** {@inheritDoc} */ @Override - public Pair windmillTimerToTimerData( + public WindmillTimerData windmillTimerToTimerData( Timer timer, Coder windowCoder, boolean draining) { // The tag is a path-structure string but cheaper to parse than a proper URI. It follows @@ -155,11 +154,11 @@ public Pair windmillTimerToTimerData( // - the Global StateNamespace is different, and becomes "/" // - the id is totally arbitrary; currently unescaped though that could change - ByteString tag = ByteString.copyFrom(timer.getTag().asReadOnlyByteBuffer()); + ByteString tag = timer.getTag(); WindmillTimerType timerType; - if (tag.startsWith(WindmillTimerType.SYSTEM_TIMER.namespacePrefix())) { + if (tag.startsWith(namespacePrefixByteString(WindmillTimerType.SYSTEM_TIMER))) { timerType = WindmillTimerType.SYSTEM_TIMER; - } else if (tag.startsWith(WindmillTimerType.USER_TIMER.namespacePrefix())) { + } else if (tag.startsWith(namespacePrefixByteString(WindmillTimerType.USER_TIMER))) { timerType = WindmillTimerType.USER_TIMER; } else { throw new IllegalArgumentException("Unknown timer tag prefix: " + tag.toStringUtf8()); @@ -169,7 +168,7 @@ public Pair windmillTimerToTimerData( // Parse the namespace. int namespaceStart = - timerType.namespacePrefix().size(); // drop the prefix, leave the begin slash + namespacePrefixByteString(timerType).size(); // drop the prefix, leave the begin slash int namespaceEnd = namespaceStart; while (namespaceEnd < tag.size() && tag.byteAt(namespaceEnd) != '+') { namespaceEnd++; @@ -225,7 +224,7 @@ public Pair windmillTimerToTimerData( } StateNamespace namespace = StateNamespaces.fromString(namespaceString, windowCoder); - return Pair.of( + return WindmillTimerData.create( timerType, TimerData.of( timerId, @@ -241,6 +240,35 @@ private static boolean useNewTimerTagEncoding(TimerData timerData) { return !timerData.getTimerFamilyId().isEmpty(); } + private static final String USER_NAMESPACE_STRING = "/u"; + private static final ByteString USER_NAMESPACE_BYTESTRING = + ByteString.copyFromUtf8(USER_NAMESPACE_STRING); + private static final String SYSTEM_NAMESPACE_STRING = "/s"; + private static final ByteString SYSTEM_NAMESPACE_BYTESTRING = + ByteString.copyFromUtf8(SYSTEM_NAMESPACE_STRING); + + private static String namespacePrefixString(WindmillTimerType windmillTimerType) { + switch (windmillTimerType) { + case USER_TIMER: + return USER_NAMESPACE_STRING; + case SYSTEM_TIMER: + return SYSTEM_NAMESPACE_STRING; + default: + throw new IllegalStateException("unexpected windmill timer type"); + } + } + + private static ByteString namespacePrefixByteString(WindmillTimerType windmillTimerType) { + switch (windmillTimerType) { + case USER_TIMER: + return USER_NAMESPACE_BYTESTRING; + case SYSTEM_TIMER: + return SYSTEM_NAMESPACE_BYTESTRING; + default: + throw new IllegalStateException("unexpected windmill timer type"); + } + } + /** @return the singleton WindmillStateTagUtil */ public static WindmillTagEncodingV1 instance() { return INSTANCE; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2.java index d25f95000472..c607698ad9f9 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2.java @@ -20,7 +20,6 @@ import java.io.IOException; import java.io.InputStream; import javax.annotation.concurrent.ThreadSafe; -import org.apache.beam.repackaged.core.org.apache.commons.lang3.tuple.Pair; import org.apache.beam.runners.core.StateNamespace; import org.apache.beam.runners.core.StateNamespaces; import org.apache.beam.runners.core.StateNamespaces.GlobalNamespace; @@ -247,7 +246,7 @@ public ByteString timerTag(WindmillTimerType windmillTimerType, TimerData timerD /** {@inheritDoc} */ @Override - public Pair windmillTimerToTimerData( + public WindmillTimerData windmillTimerToTimerData( Timer timer, Coder windowCoder, boolean draining) { InputStream stream = timer.getTag().newInput(); @@ -279,7 +278,7 @@ public Pair windmillTimerToTimerData( } } - return Pair.of( + return WindmillTimerData.create( timerType, TimerData.of( timerId, diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTimerData.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTimerData.java new file mode 100644 index 000000000000..f3708f7bcfd0 --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTimerData.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.state; + +import com.google.auto.value.AutoValue; +import org.apache.beam.runners.core.TimerInternals.TimerData; +import org.apache.beam.runners.dataflow.worker.WindmillTimerType; + +@AutoValue +public abstract class WindmillTimerData { + + public abstract WindmillTimerType getWindmillTimerType(); + + public abstract TimerData getTimerData(); + + public static WindmillTimerData create(WindmillTimerType windmillTimerType, TimerData timerData) { + return new AutoValue_WindmillTimerData(windmillTimerType, timerData); + } +} diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1Test.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1Test.java index 1290c530da07..48aaa7c473b9 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1Test.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1Test.java @@ -23,7 +23,6 @@ import java.io.IOException; import java.util.List; -import org.apache.beam.repackaged.core.org.apache.commons.lang3.tuple.Pair; import org.apache.beam.runners.core.StateNamespace; import org.apache.beam.runners.core.StateNamespaceForTest; import org.apache.beam.runners.core.StateNamespaces; @@ -141,7 +140,7 @@ public void testTimerDataToFromTimer() { StateNamespace namespace = coderAndNamespace.getValue(); for (TimeDomain timeDomain : TimeDomain.values()) { - for (WindmillTimerType prefix : WindmillTimerType.values()) { + for (WindmillTimerType windmillTimerType : WindmillTimerType.values()) { for (Instant timestamp : TEST_TIMESTAMPS) { List anonymousTimers = ImmutableList.of( @@ -158,17 +157,17 @@ public void testTimerDataToFromTimer() { timer.getOutputTimestamp().isBefore(BoundedWindow.TIMESTAMP_MIN_VALUE) ? BoundedWindow.TIMESTAMP_MIN_VALUE : timer.getOutputTimestamp(); - Pair computedPair = + WindmillTimerData windmillTimerData = WindmillTagEncodingV1.instance() .windmillTimerToTimerData( WindmillTagEncodingV1.instance() .buildWindmillTimerFromTimerData( - stateFamily, prefix, timer, Timer.newBuilder()) + stateFamily, windmillTimerType, timer, Timer.newBuilder()) .build(), coder, false); - TimerData computed = computedPair.getRight(); - assertEquals(prefix, computedPair.getLeft()); + TimerData computed = windmillTimerData.getTimerData(); + assertEquals(windmillTimerData, windmillTimerData.getWindmillTimerType()); // The function itself bounds output, so we dont expect the original input as the // output, we expect it to be bounded TimerData expected = @@ -223,17 +222,17 @@ public void testTimerDataToFromTimer() { timer.getTimestamp(), expectedTimestamp, timer.getDomain()); - Pair computedPair = + WindmillTimerData windmillTimerData = WindmillTagEncodingV1.instance() .windmillTimerToTimerData( WindmillTagEncodingV1.instance() .buildWindmillTimerFromTimerData( - stateFamily, prefix, timer, Timer.newBuilder()) + stateFamily, windmillTimerType, timer, Timer.newBuilder()) .build(), coder, false); - assertEquals(prefix, computedPair.getLeft()); - assertThat(computedPair.getRight(), equalTo(expected)); + assertEquals(windmillTimerType, windmillTimerData.getWindmillTimerType()); + assertThat(windmillTimerData.getTimerData(), equalTo(expected)); } } } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2Test.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2Test.java index 2f16ea6855e0..fbbcfed7cf0d 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2Test.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2Test.java @@ -29,7 +29,6 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.stream.Collectors; import java.util.stream.IntStream; -import org.apache.beam.repackaged.core.org.apache.commons.lang3.tuple.Pair; import org.apache.beam.runners.core.StateNamespace; import org.apache.beam.runners.core.StateNamespaces; import org.apache.beam.runners.core.StateNamespaces.GlobalNamespace; @@ -223,7 +222,7 @@ public static class TimerTagTest { @Parameters( name = - "{index}: namespace={0} prefix={1} expectedBytes={2} includeTimerId={3}" + "{index}: namespace={0} windmillTimerType={1} expectedBytes={2} includeTimerId={3}" + " includeTimerFamilyId={4} timeDomain={4}") public static Collection data() { List data = new ArrayList<>(); @@ -299,7 +298,7 @@ public static Collection data() { public StateNamespace namespace; @Parameter(1) - public WindmillTimerType prefix; + public WindmillTimerType windmillTimerType; @Parameter(2) public ByteString expectedBytes; @@ -328,14 +327,15 @@ public void testTimerTag() { new Instant(456), timeDomain, CausedByDrain.NORMAL); - assertEquals(expectedBytes, WindmillTagEncodingV2.instance().timerTag(prefix, timerData)); + assertEquals( + expectedBytes, WindmillTagEncodingV2.instance().timerTag(windmillTimerType, timerData)); } } @RunWith(Parameterized.class) public static class TimerDataFromTimerTest { - @Parameters(name = "{index}: namespace={0} prefix={1} draining={4} timeDomain={5}") + @Parameters(name = "{index}: namespace={0} windmillTimerType={1} draining={4} timeDomain={5}") public static Collection data() { List tests = ImmutableList.of( @@ -416,7 +416,7 @@ public static Collection data() { public StateNamespace namespace; @Parameter(1) - public WindmillTimerType prefix; + public WindmillTimerType windmillTimerType; @Parameter(2) public ByteString timerTag; @@ -445,10 +445,10 @@ public void testTimerDataFromTimer() { .setMetadataTimestamp(WindmillTimeUtils.harnessToWindmillTimestamp(outputTimestamp)) .setType(timerType(timeDomain)) .build(); - Pair computedPair = + WindmillTimerData windmillTimerData = encoding.windmillTimerToTimerData(timer, windowCoder, draining); - assertEquals(prefix, computedPair.getLeft()); - assertEquals(timerData, computedPair.getRight()); + assertEquals(windmillTimerType, windmillTimerData.getWindmillTimerType()); + assertEquals(timerData, windmillTimerData.getTimerData()); } } From 600af786c7a5a33ac26d89a9a136354165ecd4a4 Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Thu, 19 Feb 2026 11:05:42 +0000 Subject: [PATCH 3/4] Fix test --- .../worker/windmill/state/WindmillTagEncodingV1Test.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1Test.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1Test.java index 48aaa7c473b9..6b5f0e7dfee8 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1Test.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1Test.java @@ -166,8 +166,7 @@ public void testTimerDataToFromTimer() { .build(), coder, false); - TimerData computed = windmillTimerData.getTimerData(); - assertEquals(windmillTimerData, windmillTimerData.getWindmillTimerType()); + assertEquals(windmillTimerType, windmillTimerData.getWindmillTimerType()); // The function itself bounds output, so we dont expect the original input as the // output, we expect it to be bounded TimerData expected = From 079e81d84b6c2b87547ba000b0ee85f7b714a578 Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Thu, 19 Feb 2026 11:06:41 +0000 Subject: [PATCH 4/4] Fix test --- .../worker/windmill/state/WindmillTagEncodingV1Test.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1Test.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1Test.java index 6b5f0e7dfee8..fcac966ff963 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1Test.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1Test.java @@ -166,6 +166,7 @@ public void testTimerDataToFromTimer() { .build(), coder, false); + TimerData timerData = windmillTimerData.getTimerData(); assertEquals(windmillTimerType, windmillTimerData.getWindmillTimerType()); // The function itself bounds output, so we dont expect the original input as the // output, we expect it to be bounded @@ -177,7 +178,7 @@ public void testTimerDataToFromTimer() { timer.getDomain(), CausedByDrain.NORMAL); - assertThat(computed, equalTo(expected)); + assertThat(timerData, equalTo(expected)); } for (String timerId : TEST_TIMER_IDS) {