Skip to content

[Dataflow Streaming] Update windmill timer clasification logic to work with windmill state tag encoding v2#37604

Open
arunpandianp wants to merge 1 commit intoapache:masterfrom
arunpandianp:fixTagEncodingv2
Open

[Dataflow Streaming] Update windmill timer clasification logic to work with windmill state tag encoding v2#37604
arunpandianp wants to merge 1 commit intoapache:masterfrom
arunpandianp:fixTagEncodingv2

Conversation

@arunpandianp
Copy link
Contributor

@arunpandianp arunpandianp commented Feb 14, 2026

Timers were not firing due to the filtering which was based on encoding v1. Also adding a post submit test to catch such bugs early in #37603

The feature is not enabled anywhere yet and the bug does not affect real workloads.

@arunpandianp
Copy link
Contributor Author

R: @scwhittle

@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @arunpandianp, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request addresses a critical issue in Dataflow Streaming where timers were failing to fire due to an outdated classification logic tied to encoding v1. The core change involves refactoring the timer classification mechanism from WindmillNamespacePrefix to a more explicit WindmillTimerType enum. This update propagates through the timer encoding, decoding, and filtering components, ensuring that timers are correctly identified and processed according to windmill state tag encoding v2. The fix prevents future timer-related malfunctions and improves the reliability of streaming pipelines.

Highlights

  • Refactored Timer Classification: The WindmillNamespacePrefix enum has been replaced with a new WindmillTimerType enum to provide a clearer and more robust classification of system and user timers within the Windmill state tag encoding.
  • Updated Timer Encoding and Decoding Logic: The WindmillTagEncoding abstract class and its concrete implementations (V1 and V2) have been modified to correctly encode and decode timer tags using the new WindmillTimerType. The windmillTimerToTimerData method now returns a Pair containing both the WindmillTimerType and the TimerData.
  • Adjusted Timer Filtering in Execution Context: The StreamingModeExecutionContext has been updated to correctly filter fired system and user timers based on the new WindmillTimerType returned during timer decoding, resolving an issue where timers were not firing due to incorrect filtering.
  • Improved WindmillKeyedWorkItem Handling: The WindmillKeyedWorkItem now correctly processes timers by adapting to the new Pair return type from windmillTimerToTimerData and includes a checkState assertion for system timers.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
    • Updated WindmillTimerInternals initialization to use WindmillTimerType instead of WindmillNamespacePrefix.
    • Modified timer filtering logic for both system and user timers to correctly extract and filter by WindmillTimerType.
  • runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java
    • Imported checkState and Pair classes.
    • Updated the timersIterable method to correctly handle the Pair return type from windmillTagEncoding.windmillTimerToTimerData and added a state check for system timers.
  • runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillNamespacePrefix.java
    • Renamed file to WindmillTimerType.java.
    • Renamed enum WindmillNamespacePrefix to WindmillTimerType.
    • Renamed enum members USER_NAMESPACE_PREFIX to USER_TIMER and SYSTEM_NAMESPACE_PREFIX to SYSTEM_TIMER.
    • Renamed abstract method byteString() to namespacePrefix().
  • runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
    • Replaced WindmillNamespacePrefix with WindmillTimerType in class fields, constructor, and withType method.
    • Updated persistTo and needsWatermarkHold methods to use WindmillTimerType.
    • Removed static helper methods isSystemTimer and isUserTimer.
  • runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncoding.java
    • Imported Pair and WindmillTimerType.
    • Updated method signatures for timerHoldTag, timerTag, and windmillTimerToTimerData to use WindmillTimerType.
    • Modified windmillTimerToTimerData to return a Pair<WindmillTimerType, TimerData>.
  • runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1.java
    • Updated imports to use Pair and WindmillTimerType.
    • Adapted timerHoldTag, timerTag, and windmillTimerToTimerData methods to the new WindmillTimerType and Pair return type.
  • runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2.java
    • Updated imports to use Pair and WindmillTimerType.
    • Adapted timerHoldTag, timerTag, and windmillTimerToTimerData methods to the new WindmillTimerType and Pair return type, including internal logic for determining timer type.
  • runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java
    • Updated test method addTimer to use WindmillTimerType.SYSTEM_TIMER.
  • runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java
    • Updated test method makeSerializedTimer to use WindmillTimerType.SYSTEM_TIMER.
  • runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1Test.java
    • Updated imports to use Pair and WindmillTimerType.
    • Modified test loops and assertions to accommodate WindmillTimerType and the Pair return type from windmillTimerToTimerData.
  • runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2Test.java
    • Updated imports to use Pair and WindmillTimerType.
    • Modified test data, test loops, and assertions to accommodate WindmillTimerType and the Pair return type from windmillTimerToTimerData.
Activity
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@github-actions
Copy link
Contributor

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment assign set of reviewers

WindmillTimerType windmillTimerType, TimerData timerData, ByteString timerTag);

/**
* Produce a tag that is guaranteed to be unique for the given prefix, namespace, domain and
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update comment

if ("".equals(timerData.getTimerFamilyId())) {
tagString =
prefix.byteString().toStringUtf8()
windmillTimerType.namespacePrefix().toStringUtf8()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could have namespacePrefixString method on the timertype to avoid doing this same conversion many times

"Expected timer tag %s to start with prefix %s",
tag,
prefix.byteString());
ByteString tag = ByteString.copyFrom(timer.getTag().asReadOnlyByteBuffer());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we have teh namespaceprefixstring method could we avoid this copy to bytestring?

timerTypeToTimeDomain(timer.getType()));
return Pair.of(
timerType,
TimerData.of(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about putting the timertype in TimerData? The timedomain could be a function instead to keep the size same but then it is one less allocation than this Pair

windowCoder,
getDrainMode()))
timer, windowCoder, getDrainMode()))
.filter(pair -> pair.getLeft() == WindmillTimerType.SYSTEM_TIMER)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this remain above the transform since it's cheaper?

windowCoder,
getDrainMode()))
timer, windowCoder, getDrainMode()))
.filter(pair -> pair.getLeft() == WindmillTimerType.USER_TIMER)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

public WindmillTimerInternals(
String stateFamily, // unique identifies a step
WindmillNamespacePrefix prefix, // partitions user and system namespaces into "/u" and "/s"
WindmillTimerType type, // partitions user and system namespaces into "/u" and "/s"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove the /u and /s specific comment?

};

public abstract ByteString byteString();
public abstract ByteString namespacePrefix();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe this should not be a method on the enum but static function within the v1 tag class?
That enforces that elsewhere we are always using the encoding agnostic type


@Parameter(1)
public WindmillNamespacePrefix prefix;
public WindmillTimerType prefix;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants