From e9d048f579eafe3518684ad30fb8251311fc0a3e Mon Sep 17 00:00:00 2001 From: Suvrat1629 Date: Mon, 5 Jan 2026 20:15:46 +0530 Subject: [PATCH 1/5] additional metadata --- .../worker/UngroupedWindmillReader.java | 95 +++++++++++-------- 1 file changed, 54 insertions(+), 41 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java index c248259a12de..fbf510da049e 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java @@ -22,6 +22,7 @@ import com.google.auto.service.AutoService; import java.io.IOException; import java.io.InputStream; +import java.nio.charset.StandardCharsets; import java.util.Collection; import java.util.Map; import org.apache.beam.model.fnexecution.v1.BeamFnApi; @@ -38,6 +39,7 @@ import org.apache.beam.sdk.values.WindowedValues; import org.apache.beam.sdk.values.WindowedValues.FullWindowedValueCoder; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.Longs; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Instant; @@ -108,49 +110,60 @@ public boolean advance() throws IOException { } return super.advance(); } +@Override +protected WindowedValue decodeMessage(Windmill.Message message) throws IOException { + Instant timestampMillis = + WindmillTimeUtils.windmillToHarnessTimestamp(message.getTimestamp()); + InputStream data = message.getData().newInput(); + InputStream metadata = message.getMetadata().newInput(); + Collection windows = + WindmillSink.decodeMetadataWindows(windowsCoder, message.getMetadata()); + PaneInfo paneInfo = WindmillSink.decodeMetadataPane(message.getMetadata()); - @Override - protected WindowedValue decodeMessage(Windmill.Message message) throws IOException { - Instant timestampMillis = - WindmillTimeUtils.windmillToHarnessTimestamp(message.getTimestamp()); - InputStream data = message.getData().newInput(); - InputStream metadata = message.getMetadata().newInput(); - Collection windows = - WindmillSink.decodeMetadataWindows(windowsCoder, message.getMetadata()); - PaneInfo paneInfo = WindmillSink.decodeMetadataPane(message.getMetadata()); - /** - * https://s.apache.org/beam-drain-mode - propagate drain bit if aggregation/expiry induced by - * drain happened upstream - */ - boolean drainingValueFromUpstream = false; - if (WindowedValues.WindowedValueCoder.isMetadataSupported()) { - BeamFnApi.Elements.ElementMetadata elementMetadata = - WindmillSink.decodeAdditionalMetadata(windowsCoder, message.getMetadata()); - drainingValueFromUpstream = - elementMetadata.getDrain() == BeamFnApi.Elements.DrainMode.Enum.DRAINING; - } - if (valueCoder instanceof KvCoder) { - KvCoder kvCoder = (KvCoder) valueCoder; - InputStream key = context.getSerializedKey().newInput(); - notifyElementRead(key.available() + data.available() + metadata.available()); - - @SuppressWarnings("unchecked") - T result = - (T) KV.of(decode(kvCoder.getKeyCoder(), key), decode(kvCoder.getValueCoder(), data)); - return WindowedValues.of( - result, timestampMillis, windows, paneInfo, null, null, drainingValueFromUpstream); - } else { - notifyElementRead(data.available() + metadata.available()); - return WindowedValues.of( - decode(valueCoder, data), - timestampMillis, - windows, - paneInfo, - null, - null, - drainingValueFromUpstream); - } + // Existing drain propagation from current master + boolean drainingValueFromUpstream = false; + if (WindowedValues.WindowedValueCoder.isMetadataSupported()) { + BeamFnApi.Elements.ElementMetadata elementMetadata = + WindmillSink.decodeAdditionalMetadata(windowsCoder, message.getMetadata()); + drainingValueFromUpstream = + elementMetadata.getDrain() == BeamFnApi.Elements.DrainMode.Enum.DRAINING; + } + + // New: propagate record ID and offset + String recordId = null; + Long recordOffset = null; + if (context.offsetBasedDeduplicationSupported()) { + byte[] rawId = context.getCurrentRecordId(); + if (rawId != null) { + recordId = new String(rawId, StandardCharsets.UTF_8); + } + byte[] rawOffset = context.getCurrentRecordOffset(); + if (rawOffset != null && rawOffset.length == Longs.BYTES) { + recordOffset = Longs.fromByteArray(rawOffset); } + } + + if (valueCoder instanceof KvCoder) { + KvCoder kvCoder = (KvCoder) valueCoder; + InputStream key = context.getSerializedKey().newInput(); + notifyElementRead(key.available() + data.available() + metadata.available()); + @SuppressWarnings("unchecked") + T result = + (T) KV.of(decode(kvCoder.getKeyCoder(), key), decode(kvCoder.getValueCoder(), data)); + + return WindowedValues.of( + result, timestampMillis, windows, paneInfo, + recordId, recordOffset, drainingValueFromUpstream); + } else { + notifyElementRead(data.available() + metadata.available()); + return WindowedValues.of( + decode(valueCoder, data), + timestampMillis, + windows, + paneInfo, + recordId, recordOffset, drainingValueFromUpstream); + } +} private X decode(Coder coder, InputStream input) throws IOException { return coder.decode(input, Coder.Context.OUTER); From eba0eac03819350f10542521da1b68a6c7a2d739 Mon Sep 17 00:00:00 2001 From: Suvrat1629 Date: Tue, 6 Jan 2026 19:05:06 +0530 Subject: [PATCH 2/5] minor import fix --- .../worker/UngroupedWindmillReader.java | 120 +++++++++--------- 1 file changed, 60 insertions(+), 60 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java index fbf510da049e..ac8c0efcd47a 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java @@ -1,13 +1,13 @@ /* * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file + * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file + * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -47,8 +47,8 @@ * A Reader that receives input data from a Windmill server, and returns it as individual elements. */ @SuppressWarnings({ - "rawtypes", // TODO(https://github.com/apache/beam/issues/20447) - "nullness" // TODO(https://github.com/apache/beam/issues/20497) + "rawtypes", // TOD[](https://github.com/apache/beam/issues/20447) + "nullness" // TOD[](https://github.com/apache/beam/issues/20497) }) class UngroupedWindmillReader extends NativeReader> { private final Coder valueCoder; @@ -65,7 +65,6 @@ class UngroupedWindmillReader extends NativeReader> { /** A {@link ReaderFactory.Registrar} for ungrouped windmill sources. */ @AutoService(ReaderFactory.Registrar.class) public static class Registrar implements ReaderFactory.Registrar { - @Override public Map factories() { Factory factory = new Factory(); @@ -110,60 +109,61 @@ public boolean advance() throws IOException { } return super.advance(); } -@Override -protected WindowedValue decodeMessage(Windmill.Message message) throws IOException { - Instant timestampMillis = - WindmillTimeUtils.windmillToHarnessTimestamp(message.getTimestamp()); - InputStream data = message.getData().newInput(); - InputStream metadata = message.getMetadata().newInput(); - Collection windows = - WindmillSink.decodeMetadataWindows(windowsCoder, message.getMetadata()); - PaneInfo paneInfo = WindmillSink.decodeMetadataPane(message.getMetadata()); - - // Existing drain propagation from current master - boolean drainingValueFromUpstream = false; - if (WindowedValues.WindowedValueCoder.isMetadataSupported()) { - BeamFnApi.Elements.ElementMetadata elementMetadata = - WindmillSink.decodeAdditionalMetadata(windowsCoder, message.getMetadata()); - drainingValueFromUpstream = - elementMetadata.getDrain() == BeamFnApi.Elements.DrainMode.Enum.DRAINING; - } - // New: propagate record ID and offset - String recordId = null; - Long recordOffset = null; - if (context.offsetBasedDeduplicationSupported()) { - byte[] rawId = context.getCurrentRecordId(); - if (rawId != null) { - recordId = new String(rawId, StandardCharsets.UTF_8); - } - byte[] rawOffset = context.getCurrentRecordOffset(); - if (rawOffset != null && rawOffset.length == Longs.BYTES) { - recordOffset = Longs.fromByteArray(rawOffset); - } - } + @Override + protected WindowedValue decodeMessage(Windmill.Message message) throws IOException { + Instant timestampMillis = + WindmillTimeUtils.windmillToHarnessTimestamp(message.getTimestamp()); + InputStream data = message.getData().newInput(); + InputStream metadata = message.getMetadata().newInput(); + Collection windows = + WindmillSink.decodeMetadataWindows(windowsCoder, message.getMetadata()); + PaneInfo paneInfo = WindmillSink.decodeMetadataPane(message.getMetadata()); + + // Propagate drain bit (existing in current master) + boolean drainingValueFromUpstream = false; + if (WindowedValues.WindowedValueCoder.isMetadataSupported()) { + BeamFnApi.Elements.ElementMetadata elementMetadata = + WindmillSink.decodeAdditionalMetadata(windowsCoder, message.getMetadata()); + drainingValueFromUpstream = + elementMetadata.getDrain() == BeamFnApi.Elements.DrainMode.Enum.DRAINING; + } - if (valueCoder instanceof KvCoder) { - KvCoder kvCoder = (KvCoder) valueCoder; - InputStream key = context.getSerializedKey().newInput(); - notifyElementRead(key.available() + data.available() + metadata.available()); - @SuppressWarnings("unchecked") - T result = - (T) KV.of(decode(kvCoder.getKeyCoder(), key), decode(kvCoder.getValueCoder(), data)); - - return WindowedValues.of( - result, timestampMillis, windows, paneInfo, - recordId, recordOffset, drainingValueFromUpstream); - } else { - notifyElementRead(data.available() + metadata.available()); - return WindowedValues.of( - decode(valueCoder, data), - timestampMillis, - windows, - paneInfo, - recordId, recordOffset, drainingValueFromUpstream); - } -} + // Propagate record ID and offset + String recordId = null; + Long recordOffset = null; + if (context.offsetBasedDeduplicationSupported()) { + byte[] rawId = context.getCurrentRecordId(); + if (rawId != null) { + recordId = new String(rawId, StandardCharsets.UTF_8); + } + byte[] rawOffset = context.getCurrentRecordOffset(); + if (rawOffset != null && rawOffset.length == Longs.BYTES) { + recordOffset = Longs.fromByteArray(rawOffset); + } + } + + if (valueCoder instanceof KvCoder) { + KvCoder kvCoder = (KvCoder) valueCoder; + InputStream key = context.getSerializedKey().newInput(); + notifyElementRead(key.available() + data.available() + metadata.available()); + @SuppressWarnings("unchecked") + T result = + (T) KV.of(decode(kvCoder.getKeyCoder(), key), decode(kvCoder.getValueCoder(), data)); + + return WindowedValues.of( + result, timestampMillis, windows, paneInfo, + recordId, recordOffset, drainingValueFromUpstream); + } else { + notifyElementRead(data.available() + metadata.available()); + return WindowedValues.of( + decode(valueCoder, data), + timestampMillis, + windows, + paneInfo, + recordId, recordOffset, drainingValueFromUpstream); + } + } private X decode(Coder coder, InputStream input) throws IOException { return coder.decode(input, Coder.Context.OUTER); @@ -174,4 +174,4 @@ private X decode(Coder coder, InputStream input) throws IOException { public boolean supportsRestart() { return true; } -} +} \ No newline at end of file From f718f9caf0b493782bddb94cccaca1bc510cc0c0 Mon Sep 17 00:00:00 2001 From: Suvrat1629 Date: Tue, 6 Jan 2026 21:32:37 +0530 Subject: [PATCH 3/5] fix --- .../worker/UngroupedWindmillReader.java | 23 ++++++++++++------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java index ac8c0efcd47a..efdb9517d952 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java @@ -1,13 +1,13 @@ /* * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file + * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file + * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -152,8 +152,13 @@ protected WindowedValue decodeMessage(Windmill.Message message) throws IOExce (T) KV.of(decode(kvCoder.getKeyCoder(), key), decode(kvCoder.getValueCoder(), data)); return WindowedValues.of( - result, timestampMillis, windows, paneInfo, - recordId, recordOffset, drainingValueFromUpstream); + result, + timestampMillis, + windows, + paneInfo, + recordId, + recordOffset, + drainingValueFromUpstream); } else { notifyElementRead(data.available() + metadata.available()); return WindowedValues.of( @@ -161,7 +166,9 @@ protected WindowedValue decodeMessage(Windmill.Message message) throws IOExce timestampMillis, windows, paneInfo, - recordId, recordOffset, drainingValueFromUpstream); + recordId, + recordOffset, + drainingValueFromUpstream); } } @@ -174,4 +181,4 @@ private X decode(Coder coder, InputStream input) throws IOException { public boolean supportsRestart() { return true; } -} \ No newline at end of file +} From 300f50c0cae8b6f77ea87b2e38fa66525321c8b7 Mon Sep 17 00:00:00 2001 From: Suvrat1629 Date: Tue, 6 Jan 2026 21:35:17 +0530 Subject: [PATCH 4/5] minor changes --- .../runners/dataflow/worker/UngroupedWindmillReader.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java index efdb9517d952..60c9582ad1d5 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java @@ -47,8 +47,8 @@ * A Reader that receives input data from a Windmill server, and returns it as individual elements. */ @SuppressWarnings({ - "rawtypes", // TOD[](https://github.com/apache/beam/issues/20447) - "nullness" // TOD[](https://github.com/apache/beam/issues/20497) + "rawtypes", // TODO(https://github.com/apache/beam/issues/20447) + "nullness" // TODO(https://github.com/apache/beam/issues/20497) }) class UngroupedWindmillReader extends NativeReader> { private final Coder valueCoder; @@ -120,7 +120,10 @@ protected WindowedValue decodeMessage(Windmill.Message message) throws IOExce WindmillSink.decodeMetadataWindows(windowsCoder, message.getMetadata()); PaneInfo paneInfo = WindmillSink.decodeMetadataPane(message.getMetadata()); - // Propagate drain bit (existing in current master) + /** + * https://s.apache.org/beam-drain-mode - propagate drain bit if aggregation/expiry induced by + * drain happened upstream + */ boolean drainingValueFromUpstream = false; if (WindowedValues.WindowedValueCoder.isMetadataSupported()) { BeamFnApi.Elements.ElementMetadata elementMetadata = From 3e97f03e07b9eed2da1b589eb820203bcbfb7c22 Mon Sep 17 00:00:00 2001 From: Suvrat Acharya <140749446+Suvrat1629@users.noreply.github.com> Date: Wed, 7 Jan 2026 09:49:44 +0530 Subject: [PATCH 5/5] addressing gemini comments Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- .../beam/runners/dataflow/worker/UngroupedWindmillReader.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java index 60c9582ad1d5..b63413b4f9b1 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java @@ -137,7 +137,7 @@ protected WindowedValue decodeMessage(Windmill.Message message) throws IOExce Long recordOffset = null; if (context.offsetBasedDeduplicationSupported()) { byte[] rawId = context.getCurrentRecordId(); - if (rawId != null) { + if (rawId != null && rawId.length > 0) { recordId = new String(rawId, StandardCharsets.UTF_8); } byte[] rawOffset = context.getCurrentRecordOffset();