Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTagEncoding;
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTagEncodingV1;
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTagEncodingV2;
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTimerData;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.UnboundedSource;
Expand Down Expand Up @@ -798,7 +799,7 @@ public void start(
this.systemTimerInternals =
new WindmillTimerInternals(
stateFamily,
WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
WindmillTimerType.SYSTEM_TIMER,
processingTime,
watermarks,
windmillTagEncoding,
Expand All @@ -807,7 +808,7 @@ public void start(
this.userTimerInternals =
new WindmillTimerInternals(
stateFamily,
WindmillNamespacePrefix.USER_NAMESPACE_PREFIX,
WindmillTimerType.USER_TIMER,
processingTime,
watermarks,
windmillTagEncoding,
Expand All @@ -832,17 +833,15 @@ public <W extends BoundedWindow> TimerData getNextFiredTimer(Coder<W> windowCode
if (cachedFiredSystemTimers == null) {
cachedFiredSystemTimers =
FluentIterable.from(StreamingModeExecutionContext.this.getFiredTimers())
.filter(
timer ->
WindmillTimerInternals.isSystemTimer(timer)
&& timer.getStateFamily().equals(stateFamily))
.filter(timer -> timer.getStateFamily().equals(stateFamily))
.transform(
timer ->
windmillTagEncoding.windmillTimerToTimerData(
WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
timer,
windowCoder,
getDrainMode()))
timer, windowCoder, getDrainMode()))
.filter(
windmillTimerData ->
windmillTimerData.getWindmillTimerType() == WindmillTimerType.SYSTEM_TIMER)
.transform(WindmillTimerData::getTimerData)
.iterator();
}

Expand Down Expand Up @@ -895,17 +894,16 @@ public <W extends BoundedWindow> TimerData getNextFiredUserTimer(Coder<W> window
cachedFiredUserTimers =
Iterators.peekingIterator(
FluentIterable.from(StreamingModeExecutionContext.this.getFiredTimers())
.filter(
timer ->
WindmillTimerInternals.isUserTimer(timer)
&& timer.getStateFamily().equals(stateFamily))
.filter(timer -> timer.getStateFamily().equals(stateFamily))
.transform(
timer ->
windmillTagEncoding.windmillTimerToTimerData(
WindmillNamespacePrefix.USER_NAMESPACE_PREFIX,
timer,
windowCoder,
getDrainMode()))
timer, windowCoder, getDrainMode()))
.filter(
windmillTimerData ->
windmillTimerData.getWindmillTimerType()
== WindmillTimerType.USER_TIMER)
.transform(WindmillTimerData::getTimerData)
.iterator());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.beam.runners.dataflow.worker;

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
Expand All @@ -31,6 +33,7 @@
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Timer;
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTagEncoding;
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTimerData;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StructuredCoder;
Expand Down Expand Up @@ -101,12 +104,13 @@ public Iterable<TimerData> timersIterable() {
return eventTimers
.append(nonEventTimers)
.transform(
timer ->
windmillTagEncoding.windmillTimerToTimerData(
WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
timer,
windowCoder,
drainMode));
timer -> {
WindmillTimerData windmillTimerData =
windmillTagEncoding.windmillTimerToTimerData(timer, windowCoder, drainMode);
checkState(
windmillTimerData.getWindmillTimerType() == WindmillTimerType.SYSTEM_TIMER);
return windmillTimerData.getTimerData();
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,28 +60,28 @@ class WindmillTimerInternals implements TimerInternals {
private final Watermarks watermarks;
private final Instant processingTime;
private final String stateFamily;
private final WindmillNamespacePrefix prefix;
private final WindmillTimerType type;
private final Consumer<TimerData> onTimerModified;
private final WindmillTagEncoding windmillTagEncoding;

public WindmillTimerInternals(
String stateFamily, // unique identifies a step
WindmillNamespacePrefix prefix, // partitions user and system namespaces into "/u" and "/s"
WindmillTimerType type,
Instant processingTime,
Watermarks watermarks,
WindmillTagEncoding windmillTagEncoding,
Consumer<TimerData> onTimerModified) {
this.watermarks = watermarks;
this.processingTime = checkNotNull(processingTime);
this.stateFamily = stateFamily;
this.prefix = prefix;
this.type = type;
this.windmillTagEncoding = windmillTagEncoding;
this.onTimerModified = onTimerModified;
}

public WindmillTimerInternals withPrefix(WindmillNamespacePrefix prefix) {
public WindmillTimerInternals withType(WindmillTimerType type) {
return new WindmillTimerInternals(
stateFamily, prefix, processingTime, watermarks, windmillTagEncoding, onTimerModified);
stateFamily, type, processingTime, watermarks, windmillTagEncoding, onTimerModified);
}

@Override
Expand Down Expand Up @@ -197,7 +197,7 @@ public void persistTo(Windmill.WorkItemCommitRequest.Builder outputBuilder) {

Timer.Builder timer =
windmillTagEncoding.buildWindmillTimerFromTimerData(
stateFamily, prefix, timerData, outputBuilder.addOutputTimersBuilder());
stateFamily, type, timerData, outputBuilder.addOutputTimersBuilder());

if (value.getValue()) {
// Setting the timer. If it is a user timer, set a hold.
Expand All @@ -210,7 +210,7 @@ public void persistTo(Windmill.WorkItemCommitRequest.Builder outputBuilder) {
// Setting a timer, clear any prior hold and set to the new value
outputBuilder
.addWatermarkHoldsBuilder()
.setTag(windmillTagEncoding.timerHoldTag(prefix, timerData, timer.getTag()))
.setTag(windmillTagEncoding.timerHoldTag(type, timerData, timer.getTag()))
.setStateFamily(stateFamily)
.setReset(true)
.addTimestamps(
Expand All @@ -219,7 +219,7 @@ public void persistTo(Windmill.WorkItemCommitRequest.Builder outputBuilder) {
// Clear the hold in case a previous iteration of this timer set one.
outputBuilder
.addWatermarkHoldsBuilder()
.setTag(windmillTagEncoding.timerHoldTag(prefix, timerData, timer.getTag()))
.setTag(windmillTagEncoding.timerHoldTag(type, timerData, timer.getTag()))
.setStateFamily(stateFamily)
.setReset(true);
}
Expand All @@ -234,7 +234,7 @@ public void persistTo(Windmill.WorkItemCommitRequest.Builder outputBuilder) {
// We are deleting timer; clear the hold
outputBuilder
.addWatermarkHoldsBuilder()
.setTag(windmillTagEncoding.timerHoldTag(prefix, timerData, timer.getTag()))
.setTag(windmillTagEncoding.timerHoldTag(type, timerData, timer.getTag()))
.setStateFamily(stateFamily)
.setReset(true);
}
Expand All @@ -247,15 +247,7 @@ public void persistTo(Windmill.WorkItemCommitRequest.Builder outputBuilder) {

private boolean needsWatermarkHold(TimerData timerData) {
// If it is a user timer or a system timer with outputTimestamp different than timestamp
return WindmillNamespacePrefix.USER_NAMESPACE_PREFIX.equals(prefix)
return WindmillTimerType.USER_TIMER.equals(type)
|| !timerData.getTimestamp().isEqual(timerData.getOutputTimestamp());
}

public static boolean isSystemTimer(Windmill.Timer timer) {
return timer.getTag().startsWith(WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX.byteString());
}

public static boolean isUserTimer(Windmill.Timer timer) {
return timer.getTag().startsWith(WindmillNamespacePrefix.USER_NAMESPACE_PREFIX.byteString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,10 @@
package org.apache.beam.runners.dataflow.worker;

import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString;

/**
* A prefix for a Windmill state or timer tag to separate user state and timers from system state
* and timers.
*/
/** A type for a Windmill timer to separate user state and timers from system state and timers. */
@Internal
public enum WindmillNamespacePrefix {
USER_NAMESPACE_PREFIX {
@Override
public ByteString byteString() {
return USER_NAMESPACE_BYTESTRING;
}
},

SYSTEM_NAMESPACE_PREFIX {
@Override
public ByteString byteString() {
return SYSTEM_NAMESPACE_BYTESTRING;
}
};

public abstract ByteString byteString();

private static final ByteString USER_NAMESPACE_BYTESTRING = ByteString.copyFromUtf8("/u");
private static final ByteString SYSTEM_NAMESPACE_BYTESTRING = ByteString.copyFromUtf8("/s");
public enum WindmillTimerType {
USER_TIMER,
SYSTEM_TIMER
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateTag;
import org.apache.beam.runners.core.TimerInternals.TimerData;
import org.apache.beam.runners.dataflow.worker.WindmillNamespacePrefix;
import org.apache.beam.runners.dataflow.worker.WindmillTimeUtils;
import org.apache.beam.runners.dataflow.worker.WindmillTimerType;
import org.apache.beam.runners.dataflow.worker.util.common.worker.InternedByteString;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Timer;
import org.apache.beam.sdk.annotations.Internal;
Expand Down Expand Up @@ -58,22 +58,18 @@ public abstract class WindmillTagEncoding {
* @param timerTag tag of the timer that maps to the hold.
*/
public abstract ByteString timerHoldTag(
WindmillNamespacePrefix prefix, TimerData timerData, ByteString timerTag);
WindmillTimerType windmillTimerType, TimerData timerData, ByteString timerTag);

/**
* Produce a tag that is guaranteed to be unique for the given prefix, namespace, domain and
* timestamp.
* Produce a tag that is guaranteed to be unique for the given timer type and TimerData
*
* <p>This is necessary because Windmill will deduplicate based only on this tag.
*/
public abstract ByteString timerTag(WindmillNamespacePrefix prefix, TimerData timerData);
public abstract ByteString timerTag(WindmillTimerType windmillTimerType, TimerData timerData);

/** Converts Windmill Timer to beam TimerData */
public abstract TimerData windmillTimerToTimerData(
WindmillNamespacePrefix prefix,
Timer timer,
Coder<? extends BoundedWindow> windowCoder,
boolean draining);
/** Converts Windmill Timer to TimerData */
public abstract WindmillTimerData windmillTimerToTimerData(
Timer timer, Coder<? extends BoundedWindow> windowCoder, boolean draining);

/**
* Uses the given {@link Timer} builder to build a windmill {@link Timer} from {@link TimerData}.
Expand All @@ -82,11 +78,13 @@ public abstract TimerData windmillTimerToTimerData(
*/
public Timer.Builder buildWindmillTimerFromTimerData(
@Nullable String stateFamily,
WindmillNamespacePrefix prefix,
WindmillTimerType windmillTimerType,
TimerData timerData,
Timer.Builder builder) {

builder.setTag(timerTag(prefix, timerData)).setType(timerType(timerData.getDomain()));
builder
.setTag(timerTag(windmillTimerType, timerData))
.setType(timerType(timerData.getDomain()));

if (stateFamily != null) {
builder.setStateFamily(stateFamily);
Expand Down
Loading
Loading