diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java index 2c936f88e28c..b45688771437 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java @@ -66,6 +66,7 @@ import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTagEncoding; import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTagEncodingV1; import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTagEncodingV2; +import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTimerData; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.UnboundedSource; @@ -798,7 +799,7 @@ public void start( this.systemTimerInternals = new WindmillTimerInternals( stateFamily, - WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, + WindmillTimerType.SYSTEM_TIMER, processingTime, watermarks, windmillTagEncoding, @@ -807,7 +808,7 @@ public void start( this.userTimerInternals = new WindmillTimerInternals( stateFamily, - WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, + WindmillTimerType.USER_TIMER, processingTime, watermarks, windmillTagEncoding, @@ -832,17 +833,15 @@ public TimerData getNextFiredTimer(Coder windowCode if (cachedFiredSystemTimers == null) { cachedFiredSystemTimers = FluentIterable.from(StreamingModeExecutionContext.this.getFiredTimers()) - .filter( - timer -> - WindmillTimerInternals.isSystemTimer(timer) - && timer.getStateFamily().equals(stateFamily)) + .filter(timer -> timer.getStateFamily().equals(stateFamily)) .transform( timer -> windmillTagEncoding.windmillTimerToTimerData( - WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, - timer, - windowCoder, - getDrainMode())) + timer, windowCoder, getDrainMode())) + .filter( + windmillTimerData -> + windmillTimerData.getWindmillTimerType() == WindmillTimerType.SYSTEM_TIMER) + .transform(WindmillTimerData::getTimerData) .iterator(); } @@ -895,17 +894,16 @@ public TimerData getNextFiredUserTimer(Coder window cachedFiredUserTimers = Iterators.peekingIterator( FluentIterable.from(StreamingModeExecutionContext.this.getFiredTimers()) - .filter( - timer -> - WindmillTimerInternals.isUserTimer(timer) - && timer.getStateFamily().equals(stateFamily)) + .filter(timer -> timer.getStateFamily().equals(stateFamily)) .transform( timer -> windmillTagEncoding.windmillTimerToTimerData( - WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, - timer, - windowCoder, - getDrainMode())) + timer, windowCoder, getDrainMode())) + .filter( + windmillTimerData -> + windmillTimerData.getWindmillTimerType() + == WindmillTimerType.USER_TIMER) + .transform(WindmillTimerData::getTimerData) .iterator()); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java index da69f1a23718..59489babf0bd 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java @@ -17,6 +17,8 @@ */ package org.apache.beam.runners.dataflow.worker; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; + import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -31,6 +33,7 @@ import org.apache.beam.runners.dataflow.worker.windmill.Windmill; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Timer; import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTagEncoding; +import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTimerData; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StructuredCoder; @@ -101,12 +104,13 @@ public Iterable timersIterable() { return eventTimers .append(nonEventTimers) .transform( - timer -> - windmillTagEncoding.windmillTimerToTimerData( - WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, - timer, - windowCoder, - drainMode)); + timer -> { + WindmillTimerData windmillTimerData = + windmillTagEncoding.windmillTimerToTimerData(timer, windowCoder, drainMode); + checkState( + windmillTimerData.getWindmillTimerType() == WindmillTimerType.SYSTEM_TIMER); + return windmillTimerData.getTimerData(); + }); } @Override diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java index c62839333c7d..07ce62d59339 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java @@ -60,13 +60,13 @@ class WindmillTimerInternals implements TimerInternals { private final Watermarks watermarks; private final Instant processingTime; private final String stateFamily; - private final WindmillNamespacePrefix prefix; + private final WindmillTimerType type; private final Consumer onTimerModified; private final WindmillTagEncoding windmillTagEncoding; public WindmillTimerInternals( String stateFamily, // unique identifies a step - WindmillNamespacePrefix prefix, // partitions user and system namespaces into "/u" and "/s" + WindmillTimerType type, Instant processingTime, Watermarks watermarks, WindmillTagEncoding windmillTagEncoding, @@ -74,14 +74,14 @@ public WindmillTimerInternals( this.watermarks = watermarks; this.processingTime = checkNotNull(processingTime); this.stateFamily = stateFamily; - this.prefix = prefix; + this.type = type; this.windmillTagEncoding = windmillTagEncoding; this.onTimerModified = onTimerModified; } - public WindmillTimerInternals withPrefix(WindmillNamespacePrefix prefix) { + public WindmillTimerInternals withType(WindmillTimerType type) { return new WindmillTimerInternals( - stateFamily, prefix, processingTime, watermarks, windmillTagEncoding, onTimerModified); + stateFamily, type, processingTime, watermarks, windmillTagEncoding, onTimerModified); } @Override @@ -197,7 +197,7 @@ public void persistTo(Windmill.WorkItemCommitRequest.Builder outputBuilder) { Timer.Builder timer = windmillTagEncoding.buildWindmillTimerFromTimerData( - stateFamily, prefix, timerData, outputBuilder.addOutputTimersBuilder()); + stateFamily, type, timerData, outputBuilder.addOutputTimersBuilder()); if (value.getValue()) { // Setting the timer. If it is a user timer, set a hold. @@ -210,7 +210,7 @@ public void persistTo(Windmill.WorkItemCommitRequest.Builder outputBuilder) { // Setting a timer, clear any prior hold and set to the new value outputBuilder .addWatermarkHoldsBuilder() - .setTag(windmillTagEncoding.timerHoldTag(prefix, timerData, timer.getTag())) + .setTag(windmillTagEncoding.timerHoldTag(type, timerData, timer.getTag())) .setStateFamily(stateFamily) .setReset(true) .addTimestamps( @@ -219,7 +219,7 @@ public void persistTo(Windmill.WorkItemCommitRequest.Builder outputBuilder) { // Clear the hold in case a previous iteration of this timer set one. outputBuilder .addWatermarkHoldsBuilder() - .setTag(windmillTagEncoding.timerHoldTag(prefix, timerData, timer.getTag())) + .setTag(windmillTagEncoding.timerHoldTag(type, timerData, timer.getTag())) .setStateFamily(stateFamily) .setReset(true); } @@ -234,7 +234,7 @@ public void persistTo(Windmill.WorkItemCommitRequest.Builder outputBuilder) { // We are deleting timer; clear the hold outputBuilder .addWatermarkHoldsBuilder() - .setTag(windmillTagEncoding.timerHoldTag(prefix, timerData, timer.getTag())) + .setTag(windmillTagEncoding.timerHoldTag(type, timerData, timer.getTag())) .setStateFamily(stateFamily) .setReset(true); } @@ -247,15 +247,7 @@ public void persistTo(Windmill.WorkItemCommitRequest.Builder outputBuilder) { private boolean needsWatermarkHold(TimerData timerData) { // If it is a user timer or a system timer with outputTimestamp different than timestamp - return WindmillNamespacePrefix.USER_NAMESPACE_PREFIX.equals(prefix) + return WindmillTimerType.USER_TIMER.equals(type) || !timerData.getTimestamp().isEqual(timerData.getOutputTimestamp()); } - - public static boolean isSystemTimer(Windmill.Timer timer) { - return timer.getTag().startsWith(WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX.byteString()); - } - - public static boolean isUserTimer(Windmill.Timer timer) { - return timer.getTag().startsWith(WindmillNamespacePrefix.USER_NAMESPACE_PREFIX.byteString()); - } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillNamespacePrefix.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerType.java similarity index 55% rename from runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillNamespacePrefix.java rename to runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerType.java index 4dc95aa1a0c2..a421d94a5827 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillNamespacePrefix.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerType.java @@ -18,30 +18,10 @@ package org.apache.beam.runners.dataflow.worker; import org.apache.beam.sdk.annotations.Internal; -import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString; -/** - * A prefix for a Windmill state or timer tag to separate user state and timers from system state - * and timers. - */ +/** A type for a Windmill timer to separate user state and timers from system state and timers. */ @Internal -public enum WindmillNamespacePrefix { - USER_NAMESPACE_PREFIX { - @Override - public ByteString byteString() { - return USER_NAMESPACE_BYTESTRING; - } - }, - - SYSTEM_NAMESPACE_PREFIX { - @Override - public ByteString byteString() { - return SYSTEM_NAMESPACE_BYTESTRING; - } - }; - - public abstract ByteString byteString(); - - private static final ByteString USER_NAMESPACE_BYTESTRING = ByteString.copyFromUtf8("/u"); - private static final ByteString SYSTEM_NAMESPACE_BYTESTRING = ByteString.copyFromUtf8("/s"); +public enum WindmillTimerType { + USER_TIMER, + SYSTEM_TIMER } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncoding.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncoding.java index a979a1d982c4..f9e1e7f8cab0 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncoding.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncoding.java @@ -21,8 +21,8 @@ import org.apache.beam.runners.core.StateNamespace; import org.apache.beam.runners.core.StateTag; import org.apache.beam.runners.core.TimerInternals.TimerData; -import org.apache.beam.runners.dataflow.worker.WindmillNamespacePrefix; import org.apache.beam.runners.dataflow.worker.WindmillTimeUtils; +import org.apache.beam.runners.dataflow.worker.WindmillTimerType; import org.apache.beam.runners.dataflow.worker.util.common.worker.InternedByteString; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Timer; import org.apache.beam.sdk.annotations.Internal; @@ -58,22 +58,18 @@ public abstract class WindmillTagEncoding { * @param timerTag tag of the timer that maps to the hold. */ public abstract ByteString timerHoldTag( - WindmillNamespacePrefix prefix, TimerData timerData, ByteString timerTag); + WindmillTimerType windmillTimerType, TimerData timerData, ByteString timerTag); /** - * Produce a tag that is guaranteed to be unique for the given prefix, namespace, domain and - * timestamp. + * Produce a tag that is guaranteed to be unique for the given timer type and TimerData * *

This is necessary because Windmill will deduplicate based only on this tag. */ - public abstract ByteString timerTag(WindmillNamespacePrefix prefix, TimerData timerData); + public abstract ByteString timerTag(WindmillTimerType windmillTimerType, TimerData timerData); - /** Converts Windmill Timer to beam TimerData */ - public abstract TimerData windmillTimerToTimerData( - WindmillNamespacePrefix prefix, - Timer timer, - Coder windowCoder, - boolean draining); + /** Converts Windmill Timer to TimerData */ + public abstract WindmillTimerData windmillTimerToTimerData( + Timer timer, Coder windowCoder, boolean draining); /** * Uses the given {@link Timer} builder to build a windmill {@link Timer} from {@link TimerData}. @@ -82,11 +78,13 @@ public abstract TimerData windmillTimerToTimerData( */ public Timer.Builder buildWindmillTimerFromTimerData( @Nullable String stateFamily, - WindmillNamespacePrefix prefix, + WindmillTimerType windmillTimerType, TimerData timerData, Timer.Builder builder) { - builder.setTag(timerTag(prefix, timerData)).setType(timerType(timerData.getDomain())); + builder + .setTag(timerTag(windmillTimerType, timerData)) + .setType(timerType(timerData.getDomain())); if (stateFamily != null) { builder.setStateFamily(stateFamily); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1.java index 14c3f8c01794..9efdf5e925c1 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1.java @@ -17,16 +17,14 @@ */ package org.apache.beam.runners.dataflow.worker.windmill.state; -import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; - import java.io.IOException; import javax.annotation.concurrent.ThreadSafe; import org.apache.beam.runners.core.StateNamespace; import org.apache.beam.runners.core.StateNamespaces; import org.apache.beam.runners.core.StateTag; import org.apache.beam.runners.core.TimerInternals.TimerData; -import org.apache.beam.runners.dataflow.worker.WindmillNamespacePrefix; import org.apache.beam.runners.dataflow.worker.WindmillTimeUtils; +import org.apache.beam.runners.dataflow.worker.WindmillTimerType; import org.apache.beam.runners.dataflow.worker.util.ThreadLocalByteStringOutputStream; import org.apache.beam.runners.dataflow.worker.util.ThreadLocalByteStringOutputStream.StreamHandle; import org.apache.beam.runners.dataflow.worker.util.common.worker.InternedByteString; @@ -71,11 +69,11 @@ public InternedByteString stateTag(StateNamespace namespace, StateTag address /** {@inheritDoc} */ @Override public ByteString timerHoldTag( - WindmillNamespacePrefix prefix, TimerData timerData, ByteString unusedTimerTag) { + WindmillTimerType windmillTimerType, TimerData timerData, ByteString unusedTimerTag) { String tagString; if ("".equals(timerData.getTimerFamilyId())) { tagString = - prefix.byteString().toStringUtf8() + namespacePrefixString(windmillTimerType) + // this never ends with a slash TIMER_HOLD_PREFIX + // this never ends with a slash @@ -86,7 +84,7 @@ public ByteString timerHoldTag( ; } else { tagString = - prefix.byteString().toStringUtf8() + namespacePrefixString(windmillTimerType) + // this never ends with a slash TIMER_HOLD_PREFIX + // this never ends with a slash @@ -105,11 +103,11 @@ public ByteString timerHoldTag( /** {@inheritDoc} */ @Override - public ByteString timerTag(WindmillNamespacePrefix prefix, TimerData timerData) { + public ByteString timerTag(WindmillTimerType windmillTimerType, TimerData timerData) { String tagString; if (useNewTimerTagEncoding(timerData)) { tagString = - prefix.byteString().toStringUtf8() + namespacePrefixString(windmillTimerType) + // this never ends with a slash timerData.getNamespace().stringKey() + // this must begin and end with a slash @@ -121,7 +119,7 @@ public ByteString timerTag(WindmillNamespacePrefix prefix, TimerData timerData) } else { // Timers without timerFamily would have timerFamily would be an empty string tagString = - prefix.byteString().toStringUtf8() + namespacePrefixString(windmillTimerType) + // this never ends with a slash timerData.getNamespace().stringKey() + // this must begin and end with a slash @@ -134,11 +132,8 @@ public ByteString timerTag(WindmillNamespacePrefix prefix, TimerData timerData) /** {@inheritDoc} */ @Override - public TimerData windmillTimerToTimerData( - WindmillNamespacePrefix prefix, - Timer timer, - Coder windowCoder, - boolean draining) { + public WindmillTimerData windmillTimerToTimerData( + Timer timer, Coder windowCoder, boolean draining) { // The tag is a path-structure string but cheaper to parse than a proper URI. It follows // this pattern, where no component but the ID can contain a slash @@ -160,16 +155,20 @@ public TimerData windmillTimerToTimerData( // - the id is totally arbitrary; currently unescaped though that could change ByteString tag = timer.getTag(); - checkArgument( - tag.startsWith(prefix.byteString()), - "Expected timer tag %s to start with prefix %s", - tag, - prefix.byteString()); + WindmillTimerType timerType; + if (tag.startsWith(namespacePrefixByteString(WindmillTimerType.SYSTEM_TIMER))) { + timerType = WindmillTimerType.SYSTEM_TIMER; + } else if (tag.startsWith(namespacePrefixByteString(WindmillTimerType.USER_TIMER))) { + timerType = WindmillTimerType.USER_TIMER; + } else { + throw new IllegalArgumentException("Unknown timer tag prefix: " + tag.toStringUtf8()); + } Instant timestamp = WindmillTimeUtils.windmillToHarnessTimestamp(timer.getTimestamp()); // Parse the namespace. - int namespaceStart = prefix.byteString().size(); // drop the prefix, leave the begin slash + int namespaceStart = + namespacePrefixByteString(timerType).size(); // drop the prefix, leave the begin slash int namespaceEnd = namespaceStart; while (namespaceEnd < tag.size() && tag.byteAt(namespaceEnd) != '+') { namespaceEnd++; @@ -225,21 +224,51 @@ public TimerData windmillTimerToTimerData( } StateNamespace namespace = StateNamespaces.fromString(namespaceString, windowCoder); - return TimerData.of( - timerId, - timerFamily, - namespace, - timestamp, - outputTimestamp, - timerTypeToTimeDomain(timer.getType())); + return WindmillTimerData.create( + timerType, + TimerData.of( + timerId, + timerFamily, + namespace, + timestamp, + outputTimestamp, + timerTypeToTimeDomain(timer.getType()))); // todo add draining (https://github.com/apache/beam/issues/36884) - } private static boolean useNewTimerTagEncoding(TimerData timerData) { return !timerData.getTimerFamilyId().isEmpty(); } + private static final String USER_NAMESPACE_STRING = "/u"; + private static final ByteString USER_NAMESPACE_BYTESTRING = + ByteString.copyFromUtf8(USER_NAMESPACE_STRING); + private static final String SYSTEM_NAMESPACE_STRING = "/s"; + private static final ByteString SYSTEM_NAMESPACE_BYTESTRING = + ByteString.copyFromUtf8(SYSTEM_NAMESPACE_STRING); + + private static String namespacePrefixString(WindmillTimerType windmillTimerType) { + switch (windmillTimerType) { + case USER_TIMER: + return USER_NAMESPACE_STRING; + case SYSTEM_TIMER: + return SYSTEM_NAMESPACE_STRING; + default: + throw new IllegalStateException("unexpected windmill timer type"); + } + } + + private static ByteString namespacePrefixByteString(WindmillTimerType windmillTimerType) { + switch (windmillTimerType) { + case USER_TIMER: + return USER_NAMESPACE_BYTESTRING; + case SYSTEM_TIMER: + return SYSTEM_NAMESPACE_BYTESTRING; + default: + throw new IllegalStateException("unexpected windmill timer type"); + } + } + /** @return the singleton WindmillStateTagUtil */ public static WindmillTagEncodingV1 instance() { return INSTANCE; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2.java index 0702c3752820..c607698ad9f9 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2.java @@ -17,8 +17,6 @@ */ package org.apache.beam.runners.dataflow.worker.windmill.state; -import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; - import java.io.IOException; import java.io.InputStream; import javax.annotation.concurrent.ThreadSafe; @@ -30,8 +28,8 @@ import org.apache.beam.runners.core.StateTag; import org.apache.beam.runners.core.StateTags; import org.apache.beam.runners.core.TimerInternals.TimerData; -import org.apache.beam.runners.dataflow.worker.WindmillNamespacePrefix; import org.apache.beam.runners.dataflow.worker.WindmillTimeUtils; +import org.apache.beam.runners.dataflow.worker.WindmillTimerType; import org.apache.beam.runners.dataflow.worker.util.ThreadLocalByteStringOutputStream; import org.apache.beam.runners.dataflow.worker.util.ThreadLocalByteStringOutputStream.StreamHandle; import org.apache.beam.runners.dataflow.worker.util.common.worker.InternedByteString; @@ -219,7 +217,7 @@ public InternedByteString stateTag(StateNamespace namespace, StateTag address /** {@inheritDoc} */ @Override public ByteString timerHoldTag( - WindmillNamespacePrefix prefix, TimerData timerData, ByteString timerTag) { + WindmillTimerType windmillTimerType, TimerData timerData, ByteString timerTag) { // Same encoding for timer tag and timer hold tag. // They are put in different places and won't collide. return timerTag; @@ -227,16 +225,16 @@ public ByteString timerHoldTag( /** {@inheritDoc} */ @Override - public ByteString timerTag(WindmillNamespacePrefix prefix, TimerData timerData) { + public ByteString timerTag(WindmillTimerType windmillTimerType, TimerData timerData) { try (StreamHandle streamHandle = ThreadLocalByteStringOutputStream.acquire()) { ByteStringOutputStream stream = streamHandle.stream(); encodeNamespace(timerData.getNamespace(), stream); - if (WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX.equals(prefix)) { + if (WindmillTimerType.SYSTEM_TIMER.equals(windmillTimerType)) { stream.write(SYSTEM_TIMER_BYTE); - } else if (WindmillNamespacePrefix.USER_NAMESPACE_PREFIX.equals(prefix)) { + } else if (WindmillTimerType.USER_TIMER.equals(windmillTimerType)) { stream.write(USER_TIMER_BYTE); } else { - throw new IllegalStateException("Unexpected WindmillNamespacePrefix" + prefix); + throw new IllegalStateException("Unexpected WindmillTimerType" + windmillTimerType); } StringUtf8Coder.of().encode(timerData.getTimerFamilyId(), stream); StringUtf8Coder.of().encode(timerData.getTimerId(), stream); @@ -248,21 +246,19 @@ public ByteString timerTag(WindmillNamespacePrefix prefix, TimerData timerData) /** {@inheritDoc} */ @Override - public TimerData windmillTimerToTimerData( - WindmillNamespacePrefix prefix, - Timer timer, - Coder windowCoder, - boolean draining) { + public WindmillTimerData windmillTimerToTimerData( + Timer timer, Coder windowCoder, boolean draining) { InputStream stream = timer.getTag().newInput(); try { StateNamespace stateNamespace = decodeNamespace(stream, windowCoder); int nextByte = stream.read(); + WindmillTimerType timerType; if (nextByte == SYSTEM_TIMER_BYTE) { - checkState(WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX.equals(prefix)); + timerType = WindmillTimerType.SYSTEM_TIMER; } else if (nextByte == USER_TIMER_BYTE) { - checkState(WindmillNamespacePrefix.USER_NAMESPACE_PREFIX.equals(prefix)); + timerType = WindmillTimerType.USER_TIMER; } else { throw new IllegalStateException("Unexpected timer tag byte: " + nextByte); } @@ -282,13 +278,15 @@ public TimerData windmillTimerToTimerData( } } - return TimerData.of( - timerId, - timerFamilyId, - stateNamespace, - timestamp, - outputTimestamp, - timerTypeToTimeDomain(timer.getType())); + return WindmillTimerData.create( + timerType, + TimerData.of( + timerId, + timerFamilyId, + stateNamespace, + timestamp, + outputTimestamp, + timerTypeToTimeDomain(timer.getType()))); } catch (IOException e) { throw new RuntimeException(e); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTimerData.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTimerData.java new file mode 100644 index 000000000000..f3708f7bcfd0 --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTimerData.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.state; + +import com.google.auto.value.AutoValue; +import org.apache.beam.runners.core.TimerInternals.TimerData; +import org.apache.beam.runners.dataflow.worker.WindmillTimerType; + +@AutoValue +public abstract class WindmillTimerData { + + public abstract WindmillTimerType getWindmillTimerType(); + + public abstract TimerData getTimerData(); + + public static WindmillTimerData create(WindmillTimerType windmillTimerType, TimerData timerData) { + return new AutoValue_WindmillTimerData(windmillTimerType, timerData); + } +} diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java index 35dc19dd7816..20dec94de088 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java @@ -152,7 +152,7 @@ private void addTimer( .setTag( WindmillTagEncodingV1.instance() .timerTag( - WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, + WindmillTimerType.SYSTEM_TIMER, TimerData.of( namespace, timestamp, diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java index 9062c881096f..e5de780ac4eb 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java @@ -198,7 +198,7 @@ private Windmill.Timer makeSerializedTimer( return Windmill.Timer.newBuilder() .setTag( windmillTagEncoding.timerTag( - WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, + WindmillTimerType.SYSTEM_TIMER, TimerData.of( ns, new Instant(timestamp), diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1Test.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1Test.java index 48751c57754c..fcac966ff963 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1Test.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1Test.java @@ -29,7 +29,7 @@ import org.apache.beam.runners.core.StateTag; import org.apache.beam.runners.core.StateTags; import org.apache.beam.runners.core.TimerInternals.TimerData; -import org.apache.beam.runners.dataflow.worker.WindmillNamespacePrefix; +import org.apache.beam.runners.dataflow.worker.WindmillTimerType; import org.apache.beam.runners.dataflow.worker.util.common.worker.InternedByteString; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Timer; import org.apache.beam.sdk.coders.Coder; @@ -140,7 +140,7 @@ public void testTimerDataToFromTimer() { StateNamespace namespace = coderAndNamespace.getValue(); for (TimeDomain timeDomain : TimeDomain.values()) { - for (WindmillNamespacePrefix prefix : WindmillNamespacePrefix.values()) { + for (WindmillTimerType windmillTimerType : WindmillTimerType.values()) { for (Instant timestamp : TEST_TIMESTAMPS) { List anonymousTimers = ImmutableList.of( @@ -157,16 +157,17 @@ public void testTimerDataToFromTimer() { timer.getOutputTimestamp().isBefore(BoundedWindow.TIMESTAMP_MIN_VALUE) ? BoundedWindow.TIMESTAMP_MIN_VALUE : timer.getOutputTimestamp(); - TimerData computed = + WindmillTimerData windmillTimerData = WindmillTagEncodingV1.instance() .windmillTimerToTimerData( - prefix, WindmillTagEncodingV1.instance() .buildWindmillTimerFromTimerData( - stateFamily, prefix, timer, Timer.newBuilder()) + stateFamily, windmillTimerType, timer, Timer.newBuilder()) .build(), coder, false); + TimerData timerData = windmillTimerData.getTimerData(); + assertEquals(windmillTimerType, windmillTimerData.getWindmillTimerType()); // The function itself bounds output, so we dont expect the original input as the // output, we expect it to be bounded TimerData expected = @@ -177,7 +178,7 @@ public void testTimerDataToFromTimer() { timer.getDomain(), CausedByDrain.NORMAL); - assertThat(computed, equalTo(expected)); + assertThat(timerData, equalTo(expected)); } for (String timerId : TEST_TIMER_IDS) { @@ -221,17 +222,17 @@ public void testTimerDataToFromTimer() { timer.getTimestamp(), expectedTimestamp, timer.getDomain()); - assertThat( + WindmillTimerData windmillTimerData = WindmillTagEncodingV1.instance() .windmillTimerToTimerData( - prefix, WindmillTagEncodingV1.instance() .buildWindmillTimerFromTimerData( - stateFamily, prefix, timer, Timer.newBuilder()) + stateFamily, windmillTimerType, timer, Timer.newBuilder()) .build(), coder, - false), - equalTo(expected)); + false); + assertEquals(windmillTimerType, windmillTimerData.getWindmillTimerType()); + assertThat(windmillTimerData.getTimerData(), equalTo(expected)); } } } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2Test.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2Test.java index 1284c46a99ab..fbbcfed7cf0d 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2Test.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2Test.java @@ -35,8 +35,8 @@ import org.apache.beam.runners.core.StateTag; import org.apache.beam.runners.core.StateTags; import org.apache.beam.runners.core.TimerInternals.TimerData; -import org.apache.beam.runners.dataflow.worker.WindmillNamespacePrefix; import org.apache.beam.runners.dataflow.worker.WindmillTimeUtils; +import org.apache.beam.runners.dataflow.worker.WindmillTimerType; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Timer; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.InstantCoder; @@ -222,7 +222,7 @@ public static class TimerTagTest { @Parameters( name = - "{index}: namespace={0} prefix={1} expectedBytes={2} includeTimerId={3}" + "{index}: namespace={0} windmillTimerType={1} expectedBytes={2} includeTimerId={3}" + " includeTimerFamilyId={4} timeDomain={4}") public static Collection data() { List data = new ArrayList<>(); @@ -235,52 +235,52 @@ public static Collection data() { ImmutableList.of( new Object[] { GLOBAL_NAMESPACE, - WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, + WindmillTimerType.USER_TIMER, GLOBAL_NAMESPACE_BYTES.concat(expectedUserTimerBytes) }, new Object[] { GLOBAL_NAMESPACE, - WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, + WindmillTimerType.SYSTEM_TIMER, GLOBAL_NAMESPACE_BYTES.concat(expectedSystemTimerBytes) }, new Object[] { INTERVAL_WINDOW_NAMESPACE, - WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, + WindmillTimerType.USER_TIMER, INTERVAL_WINDOW_NAMESPACE_BYTES.concat(expectedUserTimerBytes) }, new Object[] { INTERVAL_WINDOW_NAMESPACE, - WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, + WindmillTimerType.SYSTEM_TIMER, INTERVAL_WINDOW_NAMESPACE_BYTES.concat(expectedSystemTimerBytes) }, new Object[] { OTHER_WINDOW_NAMESPACE, - WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, + WindmillTimerType.USER_TIMER, OTHER_WINDOW_NAMESPACE_BYTES.concat(expectedUserTimerBytes) }, new Object[] { OTHER_WINDOW_NAMESPACE, - WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, + WindmillTimerType.SYSTEM_TIMER, OTHER_WINDOW_NAMESPACE_BYTES.concat(expectedSystemTimerBytes) }, new Object[] { INTERVAL_WINDOW_AND_TRIGGER_NAMESPACE, - WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, + WindmillTimerType.USER_TIMER, INTERVAL_WINDOW_AND_TRIGGER_NAMESPACE_BYTES.concat(expectedUserTimerBytes) }, new Object[] { INTERVAL_WINDOW_AND_TRIGGER_NAMESPACE, - WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, + WindmillTimerType.SYSTEM_TIMER, INTERVAL_WINDOW_AND_TRIGGER_NAMESPACE_BYTES.concat(expectedSystemTimerBytes) }, new Object[] { OTHER_WINDOW_AND_TRIGGER_NAMESPACE, - WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, + WindmillTimerType.USER_TIMER, OTHER_WINDOW_AND_TRIGGER_NAMESPACE_BYTES.concat(expectedUserTimerBytes) }, new Object[] { OTHER_WINDOW_AND_TRIGGER_NAMESPACE, - WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, + WindmillTimerType.SYSTEM_TIMER, OTHER_WINDOW_AND_TRIGGER_NAMESPACE_BYTES.concat(expectedSystemTimerBytes) }); @@ -298,7 +298,7 @@ public static Collection data() { public StateNamespace namespace; @Parameter(1) - public WindmillNamespacePrefix prefix; + public WindmillTimerType windmillTimerType; @Parameter(2) public ByteString expectedBytes; @@ -327,74 +327,75 @@ public void testTimerTag() { new Instant(456), timeDomain, CausedByDrain.NORMAL); - assertEquals(expectedBytes, WindmillTagEncodingV2.instance().timerTag(prefix, timerData)); + assertEquals( + expectedBytes, WindmillTagEncodingV2.instance().timerTag(windmillTimerType, timerData)); } } @RunWith(Parameterized.class) public static class TimerDataFromTimerTest { - @Parameters(name = "{index}: namespace={0} prefix={1} draining={4} timeDomain={5}") + @Parameters(name = "{index}: namespace={0} windmillTimerType={1} draining={4} timeDomain={5}") public static Collection data() { List tests = ImmutableList.of( new Object[] { GLOBAL_NAMESPACE, - WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, + WindmillTimerType.USER_TIMER, GLOBAL_NAMESPACE_BYTES.concat(USER_TIMER_BYTES), GlobalWindow.Coder.INSTANCE }, new Object[] { GLOBAL_NAMESPACE, - WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, + WindmillTimerType.SYSTEM_TIMER, GLOBAL_NAMESPACE_BYTES.concat(SYSTEM_TIMER_BYTES), GlobalWindow.Coder.INSTANCE }, new Object[] { INTERVAL_WINDOW_NAMESPACE, - WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, + WindmillTimerType.USER_TIMER, INTERVAL_WINDOW_NAMESPACE_BYTES.concat(USER_TIMER_BYTES), IntervalWindow.getCoder() }, new Object[] { INTERVAL_WINDOW_NAMESPACE, - WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, + WindmillTimerType.SYSTEM_TIMER, INTERVAL_WINDOW_NAMESPACE_BYTES.concat(SYSTEM_TIMER_BYTES), IntervalWindow.getCoder() }, new Object[] { OTHER_WINDOW_NAMESPACE, - WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, + WindmillTimerType.USER_TIMER, OTHER_WINDOW_NAMESPACE_BYTES.concat(USER_TIMER_BYTES), new CustomWindow.CustomWindowCoder() }, new Object[] { OTHER_WINDOW_NAMESPACE, - WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, + WindmillTimerType.SYSTEM_TIMER, OTHER_WINDOW_NAMESPACE_BYTES.concat(SYSTEM_TIMER_BYTES), new CustomWindow.CustomWindowCoder() }, new Object[] { INTERVAL_WINDOW_AND_TRIGGER_NAMESPACE, - WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, + WindmillTimerType.USER_TIMER, INTERVAL_WINDOW_AND_TRIGGER_NAMESPACE_BYTES.concat(USER_TIMER_BYTES), IntervalWindow.getCoder() }, new Object[] { INTERVAL_WINDOW_AND_TRIGGER_NAMESPACE, - WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, + WindmillTimerType.SYSTEM_TIMER, INTERVAL_WINDOW_AND_TRIGGER_NAMESPACE_BYTES.concat(SYSTEM_TIMER_BYTES), IntervalWindow.getCoder() }, new Object[] { OTHER_WINDOW_AND_TRIGGER_NAMESPACE, - WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, + WindmillTimerType.USER_TIMER, OTHER_WINDOW_AND_TRIGGER_NAMESPACE_BYTES.concat(USER_TIMER_BYTES), new CustomWindow.CustomWindowCoder() }, new Object[] { OTHER_WINDOW_AND_TRIGGER_NAMESPACE, - WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, + WindmillTimerType.SYSTEM_TIMER, OTHER_WINDOW_AND_TRIGGER_NAMESPACE_BYTES.concat(SYSTEM_TIMER_BYTES), new CustomWindow.CustomWindowCoder() }); @@ -415,7 +416,7 @@ public static Collection data() { public StateNamespace namespace; @Parameter(1) - public WindmillNamespacePrefix prefix; + public WindmillTimerType windmillTimerType; @Parameter(2) public ByteString timerTag; @@ -444,8 +445,10 @@ public void testTimerDataFromTimer() { .setMetadataTimestamp(WindmillTimeUtils.harnessToWindmillTimestamp(outputTimestamp)) .setType(timerType(timeDomain)) .build(); - assertEquals( - timerData, encoding.windmillTimerToTimerData(prefix, timer, windowCoder, draining)); + WindmillTimerData windmillTimerData = + encoding.windmillTimerToTimerData(timer, windowCoder, draining); + assertEquals(windmillTimerType, windmillTimerData.getWindmillTimerType()); + assertEquals(timerData, windmillTimerData.getTimerData()); } } @@ -467,7 +470,7 @@ public void testTimerHoldTagUsesTimerTag() { ByteString timerTag = ByteString.copyFrom(bytes); assertEquals( WindmillTagEncodingV2.instance() - .timerHoldTag(WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, timerData, timerTag), + .timerHoldTag(WindmillTimerType.SYSTEM_TIMER, timerData, timerTag), timerTag); } }