From d22ee11e9e65f8b7ff9295a089f57e50b3a5b18e Mon Sep 17 00:00:00 2001 From: Reuven Lax Date: Mon, 25 Oct 2021 00:15:02 -0700 Subject: [PATCH 1/2] Add FillGaps to timeseries module. --- .github/autolabeler.yml | 1 + build.gradle.kts | 1 + .../beam/sdk/coders/SortedMapCoder.java | 197 +++++++ .../beam/sdk/schemas/transforms/WithKeys.java | 79 +++ sdks/java/extensions/timeseries/build.gradle | 32 ++ .../sdk/extensions/timeseries/FillGaps.java | 535 ++++++++++++++++++ .../extensions/timeseries/package-info.java | 20 + .../extensions/timeseries/FillGapsTest.java | 355 ++++++++++++ settings.gradle.kts | 1 + 9 files changed, 1221 insertions(+) create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SortedMapCoder.java create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/WithKeys.java create mode 100644 sdks/java/extensions/timeseries/build.gradle create mode 100644 sdks/java/extensions/timeseries/src/main/java/org/apache/beam/sdk/extensions/timeseries/FillGaps.java create mode 100644 sdks/java/extensions/timeseries/src/main/java/org/apache/beam/sdk/extensions/timeseries/package-info.java create mode 100644 sdks/java/extensions/timeseries/src/test/java/org/apache/beam/sdk/extensions/timeseries/FillGapsTest.java diff --git a/.github/autolabeler.yml b/.github/autolabeler.yml index 715abeddecfee..888c5d4f3a406 100644 --- a/.github/autolabeler.yml +++ b/.github/autolabeler.yml @@ -42,6 +42,7 @@ extensions: ["sdks/java/extensions/**/*", "runners/extensions-java/**/*"] "sketching": ["sdks/java/extensions/sketching/**/*"] "sorter": ["sdks/java/extensions/sorter/**/*"] "sql": ["sdks/java/extensions/sql/**/*"] +"timeseries": ["sdks/java/extensions/timeseries/*"] "zetasketch": ["sdks/java/extensions/zetasketch/**/*"] # IO diff --git a/build.gradle.kts b/build.gradle.kts index aa5f8949fa518..7ea18895f77c0 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -195,6 +195,7 @@ tasks.register("javaPreCommitPortabilityApi") { tasks.register("javaPostCommit") { dependsOn(":sdks:java:extensions:google-cloud-platform-core:postCommit") + dependsOn(":sdks:java:extensions:timeseries:postCommit") dependsOn(":sdks:java:extensions:zetasketch:postCommit") dependsOn(":sdks:java:extensions:ml:postCommit") } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SortedMapCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SortedMapCoder.java new file mode 100644 index 0000000000000..8448c9156548d --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SortedMapCoder.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.coders; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.SortedMap; +import org.apache.beam.sdk.util.common.ElementByteSizeObserver; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.sdk.values.TypeParameter; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps; + +/** + * A {@link Coder} for {@link SortedMap Maps} that encodes them according to provided coders for + * keys and values. + * + * @param the type of the keys of the KVs being transcoded + * @param the type of the values of the KVs being transcoded + */ +public class SortedMapCoder, V> + extends StructuredCoder> { + /** Produces a MapCoder with the given keyCoder and valueCoder. */ + public static , V> SortedMapCoder of( + Coder keyCoder, Coder valueCoder) { + return new SortedMapCoder<>(keyCoder, valueCoder); + } + + public Coder getKeyCoder() { + return keyCoder; + } + + public Coder getValueCoder() { + return valueCoder; + } + + ///////////////////////////////////////////////////////////////////////////// + + private Coder keyCoder; + private Coder valueCoder; + + private SortedMapCoder(Coder keyCoder, Coder valueCoder) { + this.keyCoder = keyCoder; + this.valueCoder = valueCoder; + } + + @Override + public void encode(SortedMap map, OutputStream outStream) + throws IOException, CoderException { + encode(map, outStream, Context.NESTED); + } + + @Override + public void encode(SortedMap map, OutputStream outStream, Context context) + throws IOException, CoderException { + if (map == null) { + throw new CoderException("cannot encode a null SortedMap"); + } + DataOutputStream dataOutStream = new DataOutputStream(outStream); + + int size = map.size(); + dataOutStream.writeInt(size); + if (size == 0) { + return; + } + + // Since we handled size == 0 above, entry is guaranteed to exist before and after loop + Iterator> iterator = map.entrySet().iterator(); + Entry entry = iterator.next(); + while (iterator.hasNext()) { + keyCoder.encode(entry.getKey(), outStream); + valueCoder.encode(entry.getValue(), outStream); + entry = iterator.next(); + } + + keyCoder.encode(entry.getKey(), outStream); + valueCoder.encode(entry.getValue(), outStream, context); + // no flush needed as DataOutputStream does not buffer + } + + @Override + public SortedMap decode(InputStream inStream) throws IOException, CoderException { + return decode(inStream, Context.NESTED); + } + + @Override + public SortedMap decode(InputStream inStream, Context context) + throws IOException, CoderException { + DataInputStream dataInStream = new DataInputStream(inStream); + int size = dataInStream.readInt(); + if (size == 0) { + return Collections.emptySortedMap(); + } + + SortedMap retval = Maps.newTreeMap(); + for (int i = 0; i < size - 1; ++i) { + K key = keyCoder.decode(inStream); + V value = valueCoder.decode(inStream); + retval.put(key, value); + } + + K key = keyCoder.decode(inStream); + V value = valueCoder.decode(inStream, context); + retval.put(key, value); + return retval; + } + + /** + * {@inheritDoc} + * + * @return a {@link List} containing the key coder at index 0 at the and value coder at index 1. + */ + @Override + public List> getCoderArguments() { + return Arrays.asList(keyCoder, valueCoder); + } + + /** + * {@inheritDoc} + * + * @throws NonDeterministicException always. Not all maps have a deterministic encoding. For + * example, {@code HashMap} comparison does not depend on element order, so two {@code + * HashMap} instances may be equal but produce different encodings. + */ + @Override + public void verifyDeterministic() throws NonDeterministicException { + throw new NonDeterministicException( + this, "Ordering of entries in a Map may be non-deterministic."); + } + + @Override + public boolean consistentWithEquals() { + return keyCoder.consistentWithEquals() && valueCoder.consistentWithEquals(); + } + + @Override + public Object structuralValue(SortedMap value) { + if (consistentWithEquals()) { + return value; + } else { + Map ret = Maps.newHashMapWithExpectedSize(value.size()); + for (Map.Entry entry : value.entrySet()) { + ret.put( + keyCoder.structuralValue(entry.getKey()), valueCoder.structuralValue(entry.getValue())); + } + return ret; + } + } + + @Override + public void registerByteSizeObserver(SortedMap map, ElementByteSizeObserver observer) + throws Exception { + observer.update(4L); + if (map.isEmpty()) { + return; + } + Iterator> entries = map.entrySet().iterator(); + Entry entry = entries.next(); + while (entries.hasNext()) { + keyCoder.registerByteSizeObserver(entry.getKey(), observer); + valueCoder.registerByteSizeObserver(entry.getValue(), observer); + entry = entries.next(); + } + keyCoder.registerByteSizeObserver(entry.getKey(), observer); + valueCoder.registerByteSizeObserver(entry.getValue(), observer); + } + + @Override + public TypeDescriptor> getEncodedTypeDescriptor() { + return new TypeDescriptor>() {}.where( + new TypeParameter() {}, keyCoder.getEncodedTypeDescriptor()) + .where(new TypeParameter() {}, valueCoder.getEncodedTypeDescriptor()); + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/WithKeys.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/WithKeys.java new file mode 100644 index 0000000000000..d057b75c6779f --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/WithKeys.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.schemas.transforms; + +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.schemas.FieldAccessDescriptor; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.SchemaCoder; +import org.apache.beam.sdk.schemas.utils.RowSelector; +import org.apache.beam.sdk.schemas.utils.SelectHelpers; +import org.apache.beam.sdk.schemas.utils.SelectHelpers.RowSelectorContainer; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptor; + +public class WithKeys extends PTransform, PCollection>> { + private final FieldAccessDescriptor fieldAccessDescriptor; + + public static WithKeys of(FieldAccessDescriptor fieldAccessDescriptor) { + return new WithKeys<>(fieldAccessDescriptor); + } + + private WithKeys(FieldAccessDescriptor fieldAccessDescriptor) { + this.fieldAccessDescriptor = fieldAccessDescriptor; + } + + @Override + public PCollection> expand(PCollection input) { + Schema schema = input.getSchema(); + TypeDescriptor typeDescriptor = input.getTypeDescriptor(); + if (typeDescriptor == null) { + throw new RuntimeException("Null type descriptor on input."); + } + SerializableFunction toRowFunction = input.getToRowFunction(); + SerializableFunction fromRowFunction = input.getFromRowFunction(); + + FieldAccessDescriptor resolved = fieldAccessDescriptor.resolve(schema); + RowSelector rowSelector = new RowSelectorContainer(schema, resolved, true); + Schema keySchema = SelectHelpers.getOutputSchema(schema, resolved); + + return input + .apply( + "selectKeys", + ParDo.of( + new DoFn>() { + @ProcessElement + public void process( + @Element Row row, // Beam will convert the element to a row. + @Element T element, // Beam will return the original element. + OutputReceiver> o) { + o.output(KV.of(rowSelector.select(row), element)); + } + })) + .setCoder( + KvCoder.of( + SchemaCoder.of(keySchema), + SchemaCoder.of(schema, typeDescriptor, toRowFunction, fromRowFunction))); + } +} diff --git a/sdks/java/extensions/timeseries/build.gradle b/sdks/java/extensions/timeseries/build.gradle new file mode 100644 index 0000000000000..fd8b961b0add0 --- /dev/null +++ b/sdks/java/extensions/timeseries/build.gradle @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +plugins { id 'org.apache.beam.module' } +applyJavaNature( + automaticModuleName: 'org.apache.beam.sdk.extensions.timeseries' +) + +description = "Apache Beam :: SDKs :: Java :: Extensions :: Timeseries" + +dependencies { + implementation library.java.vendored_guava_26_0_jre + implementation library.java.joda_time + implementation project(path: ":sdks:java:core", configuration: "shadow") + testImplementation library.java.junit + testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow") +} diff --git a/sdks/java/extensions/timeseries/src/main/java/org/apache/beam/sdk/extensions/timeseries/FillGaps.java b/sdks/java/extensions/timeseries/src/main/java/org/apache/beam/sdk/extensions/timeseries/FillGaps.java new file mode 100644 index 0000000000000..1e33f292b8390 --- /dev/null +++ b/sdks/java/extensions/timeseries/src/main/java/org/apache/beam/sdk/extensions/timeseries/FillGaps.java @@ -0,0 +1,535 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.timeseries; + +import com.google.auto.value.AutoValue; +import java.util.Map; +import java.util.SortedMap; +import java.util.function.Supplier; +import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.InstantCoder; +import org.apache.beam.sdk.coders.SortedMapCoder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.schemas.FieldAccessDescriptor; +import org.apache.beam.sdk.schemas.transforms.WithKeys; +import org.apache.beam.sdk.state.StateSpec; +import org.apache.beam.sdk.state.StateSpecs; +import org.apache.beam.sdk.state.TimeDomain; +import org.apache.beam.sdk.state.TimerMap; +import org.apache.beam.sdk.state.TimerSpec; +import org.apache.beam.sdk.state.TimerSpecs; +import org.apache.beam.sdk.state.ValueState; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableBiFunction; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.GlobalWindows; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TimestampedValue; +import org.apache.beam.sdk.values.TimestampedValue.TimestampedValueCoder; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps; +import org.joda.time.Duration; +import org.joda.time.Instant; + +/** + * Fill gaps in timeseries. Values are expected to have Beam schemas registered. + * + *

This transform views the original PCollection as a collection of timeseries, each with a different key. They + * key to be used and the timeseries bucket size are both specified in the {@link #of} creation method. Multiple + * fields can be specified for the key - the key extracted will be a composite of all of them. Any elements in the + * original {@link PCollection} will appear unchanged in the output PCollection, with timestamp and window unchanged. + * Any gaps in timeseries (i.e. buckets with no elements) will be filled in the output PCollection with a single element + * (by default the latest element seen or propagated into the previous bucket). The timestamp of the filled element is + * the end of the bucket, and the original PCollection's window function is used to assign it to a window. + * + * + *

Example usage: the following code views each user,country pair in the input {@link PCollection} as a timeseries + * with bucket size one second. If any of these timeseries has a bucket with no elements, then the latest element from + * the previous bucket (i.e. the one with the largest timestamp) wil be propagated into the missing bucket. If there + * are multiple missing buckets, then they all will be filled up to 1 hour - the maximum gap size specified in + * {@link #withMaxGapFillBuckets}. + * + *

{@code PCollection input = readInput();
+ * PCollection gapFilled =
+ *   input.apply("fillGaps",
+ *      FillGaps.of(Duration.standardSeconds(1), "userId", "country")
+ *        .withMaxGapFillBuckets(3600L)));
+ *  gapFilled.apply(MySink.create());
+ *     }
+ * + *

By default, the latest element from the previous bucket is propagated into missing buckets. The user can override + * this using the {@link #withMergeFunction} method. Several built-in merge functions are provided for - + * {@link #keepLatest()} (the default), {@link #keepEarliest()}, an {@link #keepNull()}. + * + *

Sometimes elements need to be modified before being propagated into a missing bucket. For example, consider the + * following element type containing a timestamp: + * + *

{@code @DefaultSchema(JavaFieldSchema.class)
+ * class MyType {
+ *   MyData data;
+ *   Instant timestamp;
+ *   @SchemaCreate
+ *   MyType(MyData data, Instant timestamp) {
+ *       this.data = data;
+ *       this.timestamp - timestamp;
+ *   }
+ * })
+ * + * The element timestamps should always be contained in its current timeseries bucket, so the element needs to be + * modified when propagated to a new bucket. This can be done using the {@link #withInterpolateFunction}} method, as + * follows: + * + *
{@code PCollection input = readInput();
+ * PCollection gapFilled =
+ *   input.apply("fillGaps",
+ *      FillGaps.of(Duration.standardSeconds(1), "userId", "country")
+ *        .withInterpolateFunction(p -> new MyType(p.getValue().getValue().data, p.getNextWindow().maxTimestamp()))
+ *        .withMaxGapFillBuckets(360L)));
+ *  gapFilled.apply(MySink.create());
+ *  }
+ */ +@AutoValue +public abstract class FillGaps + extends PTransform, PCollection> { + // We garbage collect every 60 windows by default. + private static final int GC_EVERY_N_BUCKETS = 60; + + /** + * Argument to {@link #withMergeFunction}. Always propagates the element with the latest + * timestamp. + */ + public static + SerializableBiFunction< + TimestampedValue, TimestampedValue, TimestampedValue> + keepLatest() { + return (v1, v2) -> v1.getTimestamp().isAfter(v2.getTimestamp()) ? v1 : v2; + } + + /** + * Argument to {@link #withMergeFunction}. Always propagates the element with the earliest + * timestamp. + */ + public static + SerializableBiFunction< + TimestampedValue, TimestampedValue, TimestampedValue> + keepEarliest() { + return (v1, v2) -> v1.getTimestamp().isAfter(v2.getTimestamp()) ? v2 : v1; + } + + /** Argument to withInterpolateFunction function. */ + @AutoValue + public abstract static class InterpolateData { + public abstract TimestampedValue getValue(); + + public abstract BoundedWindow getPreviousWindow(); + + public abstract BoundedWindow getNextWindow(); + } + + abstract Duration getTimeseriesBucketDuration(); + + abstract Long getMaxGapFillBuckets(); + + abstract Instant getStopTime(); + + abstract FieldAccessDescriptor getKeyDescriptor(); + + abstract SerializableBiFunction< + TimestampedValue, TimestampedValue, TimestampedValue> + getMergeValues(); + + abstract int getGcEveryNBuckets(); + + @Nullable + abstract SerializableFunction, ValueT> getInterpolateFunction(); + + abstract Builder toBuilder(); + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setTimeseriesBucketDuration(Duration value); + + abstract Builder setMaxGapFillBuckets(Long value); + + abstract Builder setStopTime(Instant value); + + abstract Builder setKeyDescriptor(FieldAccessDescriptor keyDescriptor); + + abstract Builder setMergeValues( + SerializableBiFunction< + TimestampedValue, TimestampedValue, TimestampedValue> + mergeValues); + + abstract Builder setInterpolateFunction( + @Nullable SerializableFunction, ValueT> interpolateFunction); + + abstract Builder setGcEveryNBuckets(int gcEveryNBuckets); + + abstract FillGaps build(); + } + + /** Construct the transform for the given duration and key fields. */ + public static FillGaps of(Duration windowDuration, String... keys) { + return of(windowDuration, FieldAccessDescriptor.withFieldNames(keys)); + } + + /** Construct the transform for the given duration and key fields. */ + public static FillGaps of( + Duration windowDuration, FieldAccessDescriptor keyDescriptor) { + return new AutoValue_FillGaps.Builder() + .setTimeseriesBucketDuration(windowDuration) + .setMaxGapFillBuckets(Long.MAX_VALUE) + .setStopTime(BoundedWindow.TIMESTAMP_MAX_VALUE) + .setKeyDescriptor(keyDescriptor) + .setMergeValues(keepLatest()) + .setGcEveryNBuckets(GC_EVERY_N_BUCKETS) + .build(); + } + + /* The max gap duration that will be filled. The transform will stop filling timeseries buckets after this duration. */ + FillGaps withMaxGapFillBuckets(Long value) { + return toBuilder().setMaxGapFillBuckets(value).build(); + } + + /* A hard (event-time) stop time for the transform. */ + FillGaps withStopTime(Instant stopTime) { + return toBuilder().setStopTime(stopTime).build(); + } + + /** + * If there are multiple values in a single timeseries bucket, this function is used to specify + * what to propagate to the next bucket. If not specified, then the value with the latest + * timestamp will be propagated. + */ + FillGaps withMergeFunction( + SerializableBiFunction< + TimestampedValue, TimestampedValue, TimestampedValue> + mergeFunction) { + return toBuilder().setMergeValues(mergeFunction).build(); + } + + /** + * This function can be used to modify elements before propagating to the next bucket. A common + * use case is to modify a contained timestamp to match that of the new bucket. + */ + FillGaps withInterpolateFunction( + SerializableFunction, ValueT> interpolateFunction) { + return toBuilder().setInterpolateFunction(interpolateFunction).build(); + } + + @Override + public PCollection expand(PCollection input) { + if (!input.hasSchema()) { + throw new RuntimeException("The input to FillGaps must have a schema."); + } + + FixedWindows bucketWindows = FixedWindows.of(getTimeseriesBucketDuration()); + // TODO(reuvenlax, BEAM-12795): We need to create KVs to use state/timers. Once BEAM-12795 is + // fixed we can dispense with the KVs here. + PCollection> keyedValues = + input + .apply("FixedWindow", Window.into(bucketWindows)) + .apply("withKeys", WithKeys.of(getKeyDescriptor())); + + WindowFn originalWindowFn = + (WindowFn) input.getWindowingStrategy().getWindowFn(); + return keyedValues + .apply("globalWindow", Window.into(new GlobalWindows())) + .apply( + "fillGaps", + ParDo.of( + new FillGapsDoFn<>( + bucketWindows, + input.getCoder(), + getStopTime(), + getMaxGapFillBuckets(), + getMergeValues(), + getInterpolateFunction(), + getGcEveryNBuckets()))) + .apply("applyOriginalWindow", Window.into(originalWindowFn)) + .setCoder(input.getCoder()); + } + + public static class FillGapsDoFn extends DoFn, ValueT> { + // The window size used. + private final FixedWindows bucketWindows; + // The garbage-collection window (GC_EVERY_N_BUCKETS * fixedWindows.getSize()). + private final FixedWindows gcWindows; + // The stop time. + private final Instant stopTime; + // The max gap-duration to fill. Once the gap fill exceeds this, we will stop filling the gap. + private final long maxGapFillBuckets; + + private final SerializableBiFunction< + TimestampedValue, TimestampedValue, TimestampedValue> + mergeValues; + + @Nullable + private final SerializableFunction, ValueT> interpolateFunction; + + // A timer map used to fill potential gaps. Each logical "window" will have a separate timer + // which will be cleared if an element arrives in that window. This way the timer will only fire + // if there is a gap, at which point it will fill the gap. + @TimerFamily("gapTimers") + @SuppressWarnings({"UnusedVariable"}) + private final TimerSpec gapFillingTimersSpec = TimerSpecs.timerMap(TimeDomain.EVENT_TIME); + + // Timers used to garbage collect state. + @TimerFamily("gcTimers") + @SuppressWarnings({"UnusedVariable"}) + private final TimerSpec gcTimersSpec = TimerSpecs.timerMap(TimeDomain.EVENT_TIME); + + // Keep track of windows already seen. In the future we can replace this with OrderedListState. + // Keyed by window end timestamp (which is 1ms greater than the window max timestamp). + @StateId("seenBuckets") + @SuppressWarnings({"UnusedVariable"}) + private final StateSpec>>> + seenBucketsSpec; + + // For every window, keep track of how long the filled gap is in buckets. If a window was + // populated by a received element - i.e. + // it's not + // a gap fill - then there is no value in this map for that window. + // Keyed by window end timestamp (which is 1ms greater than the window max timestamp). + @StateId("gapDurationMap") + @SuppressWarnings({"UnusedVariable"}) + private final StateSpec>> gapDurationSpec; + + FillGapsDoFn( + FixedWindows bucketWindows, + Coder valueCoder, + Instant stopTime, + long maxGapFillBuckets, + SerializableBiFunction< + TimestampedValue, TimestampedValue, TimestampedValue> + mergeValues, + @Nullable SerializableFunction, ValueT> interpolateFunction, + int gcEveryNBuckets) { + this.bucketWindows = bucketWindows; + this.gcWindows = FixedWindows.of(bucketWindows.getSize().multipliedBy(gcEveryNBuckets)); + this.stopTime = stopTime; + this.maxGapFillBuckets = maxGapFillBuckets; + this.seenBucketsSpec = + StateSpecs.value( + SortedMapCoder.of(InstantCoder.of(), TimestampedValueCoder.of(valueCoder))); + this.gapDurationSpec = + StateSpecs.value(SortedMapCoder.of(InstantCoder.of(), VarLongCoder.of())); + this.mergeValues = mergeValues; + this.interpolateFunction = interpolateFunction; + } + + @ProcessElement + public void process( + @Element KV element, + @Timestamp Instant ts, + @TimerFamily("gapTimers") TimerMap gapTimers, + @TimerFamily("gcTimers") TimerMap gcTimers, + @AlwaysFetched @StateId("seenBuckets") + ValueState>> seenBuckets, + OutputReceiver output) { + if (ts.isAfter(stopTime)) { + return; + } + + Instant windowEndTs = bucketWindows.assignWindow(ts).end(); + if (processEvent( + () -> TimestampedValue.of(element.getValue(), ts), + windowEndTs, + gapTimers, + gcTimers, + seenBuckets, + -1, + output)) { + // We've seen data for this window, so clear any gap-filling timer. + gapTimers.get(windowToTimerTag(windowEndTs)).clear(); + } + } + + private String windowToTimerTag(Instant endTs) { + return Long.toString(endTs.getMillis()); + } + + private Instant windowFromTimerTag(String key) { + return Instant.ofEpochMilli(Long.parseLong(key)); + } + + @OnTimerFamily("gapTimers") + public void onTimer( + @TimerId String timerId, + @Timestamp Instant timestamp, + @TimerFamily("gapTimers") TimerMap gapTimers, + @TimerFamily("gcTimers") TimerMap gcTimers, + @AlwaysFetched @StateId("seenBuckets") + ValueState>> seenBuckets, + @AlwaysFetched @StateId("gapDurationMap") ValueState> gapDurations, + OutputReceiver output) { + Instant bucketEndTs = windowFromTimerTag(timerId); + Instant bucketMaxTs = bucketEndTs.minus(Duration.millis(1)); + Instant previousBucketEndTs = bucketEndTs.minus(bucketWindows.getSize()); + Instant previousBucketMaxTs = previousBucketEndTs.minus(Duration.millis(1)); + + Map> seenBucketMap = seenBuckets.read(); + if (seenBucketMap == null) { + throw new RuntimeException("Unexpected timer fired with no seenBucketMap."); + } + + @Nullable SortedMap gapDurationsMap = gapDurations.read(); + long gapSize = 0; + if (gapDurationsMap != null) { + gapSize = gapDurationsMap.getOrDefault(previousBucketEndTs, 0L); + } + // If the timer fires and we've never seen an element for this window then we reach into the + // previous + // window and copy its value into this window. This relies on the fact that timers fire in + // order + // for a given key, so if there are multiple gap windows then the previous window will be + // filled + // by the time we get here. + // processEvent will also set a timer for the next window if we haven't already seen an + // element + // for that window. + processEvent( + () -> { + TimestampedValue previous = seenBucketMap.get(previousBucketEndTs); + if (previous == null) { + throw new RuntimeException( + "Processing bucket for " + + bucketEndTs + + " before processing bucket " + + "for " + + previousBucketEndTs); + } + ValueT value = previous.getValue(); + if (interpolateFunction != null) { + BoundedWindow previousBucket = bucketWindows.assignWindow(previousBucketMaxTs); + BoundedWindow currentBucket = bucketWindows.assignWindow(bucketMaxTs); + Preconditions.checkState(!currentBucket.equals(previousBucket)); + value = + interpolateFunction.apply( + new AutoValue_FillGaps_InterpolateData<>( + previous, previousBucket, currentBucket)); + } + return TimestampedValue.of(value, bucketMaxTs); + }, + bucketEndTs, + gapTimers, + gcTimers, + seenBuckets, + gapSize, + output); + if (!seenBucketMap.containsKey(bucketEndTs.plus(bucketWindows.getSize()))) { + // The next bucket is still empty, so update gapDurations + if (gapDurationsMap == null) { + gapDurationsMap = Maps.newTreeMap(); + } + gapDurationsMap.put(bucketEndTs, gapSize + 1); + gapDurations.write(gapDurationsMap); + } + } + + @OnTimerFamily("gcTimers") + public void onGcTimer( + @Timestamp Instant timerTs, + @AlwaysFetched @StateId("seenBuckets") + ValueState>> seenBuckets, + @AlwaysFetched @StateId("gapDurationMap") + ValueState> gapDurations) { + gcMap(seenBuckets, timerTs.minus(gcWindows.getSize())); + gcMap(gapDurations, timerTs.minus(gcWindows.getSize())); + } + + // returns true if this is the first event for the bucket. + private boolean processEvent( + Supplier> getValue, + Instant bucketEndTs, + TimerMap gapTimers, + TimerMap gcTimers, + ValueState>> seenBuckets, + long gapSize, + OutputReceiver output) { + TimestampedValue value = getValue.get(); + output.outputWithTimestamp(value.getValue(), value.getTimestamp()); + + boolean firstElementInBucket = true; + TimestampedValue valueToWrite = value; + SortedMap> seenBucketsMap = seenBuckets.read(); + if (seenBucketsMap == null) { + seenBucketsMap = Maps.newTreeMap(); + } else { + @Nullable TimestampedValue existing = seenBucketsMap.get(bucketEndTs); + if (existing != null) { + valueToWrite = mergeValues.apply(existing, value); + // No need to set a timer as we've already seen an element for this window before. + firstElementInBucket = false; + } + } + + // Update the seenWindows state variable. + seenBucketsMap.put(bucketEndTs, valueToWrite); + seenBuckets.write(seenBucketsMap); + + if (firstElementInBucket) { + // Potentially set a timer for the next window. + + // Here we calculate how long the gap-extension duration (the total size of all gaps filled + // since + // the last element seen) would be for the next window. If this would exceed the max-gap + // duration + // set by the user, then stop. + Instant nextBucketEndTs = bucketEndTs.plus(bucketWindows.getSize()); + Instant nextBucketMaxTs = nextBucketEndTs.minus(Duration.millis(1)); + // Set a gap-filling timer for the next window if we haven't yet seen that window. + if (nextBucketMaxTs.isBefore(stopTime) + && gapSize + 1 < maxGapFillBuckets + && !seenBucketsMap.containsKey(nextBucketEndTs)) { + gapTimers + .get(windowToTimerTag(nextBucketEndTs)) + .withOutputTimestamp(bucketEndTs) + .set(nextBucketEndTs); + } + + // Set a gcTimer + Instant gcTs = gcWindows.assignWindow(nextBucketEndTs).end(); + gcTimers.get(windowToTimerTag(gcTs)).set(gcTs); + } + return firstElementInBucket; + } + + private static void gcMap(ValueState> mapState, Instant ts) { + SortedMap map = mapState.read(); + if (map != null) { + // Clear all map elements that are for windows strictly less than timerTs. + map.headMap(ts).clear(); + if (map.isEmpty()) { + mapState.clear(); + } else { + mapState.write(map); + } + } + } + } +} diff --git a/sdks/java/extensions/timeseries/src/main/java/org/apache/beam/sdk/extensions/timeseries/package-info.java b/sdks/java/extensions/timeseries/src/main/java/org/apache/beam/sdk/extensions/timeseries/package-info.java new file mode 100644 index 0000000000000..305e0db0d0fa7 --- /dev/null +++ b/sdks/java/extensions/timeseries/src/main/java/org/apache/beam/sdk/extensions/timeseries/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** Utilities for operating on timeseries data. */ +package org.apache.beam.sdk.extensions.timeseries; diff --git a/sdks/java/extensions/timeseries/src/test/java/org/apache/beam/sdk/extensions/timeseries/FillGapsTest.java b/sdks/java/extensions/timeseries/src/test/java/org/apache/beam/sdk/extensions/timeseries/FillGapsTest.java new file mode 100644 index 0000000000000..86b14aac0ae75 --- /dev/null +++ b/sdks/java/extensions/timeseries/src/test/java/org/apache/beam/sdk/extensions/timeseries/FillGapsTest.java @@ -0,0 +1,355 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.timeseries; + +import com.google.auto.value.AutoValue; +import java.util.List; +import java.util.Random; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.UsesLoopingTimer; +import org.apache.beam.sdk.testing.UsesStatefulParDo; +import org.apache.beam.sdk.testing.UsesStrictTimerOrdering; +import org.apache.beam.sdk.testing.UsesTimersInParDo; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.Reify; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TimestampedValue; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class FillGapsTest { + @Rule public final transient TestPipeline pipeline = TestPipeline.create(); + + @DefaultSchema(AutoValueSchema.class) + @AutoValue + abstract static class Message { + abstract String getKey(); + + abstract String getValue(); + + abstract Instant getTimestamp(); + + static Message update(FillGaps.PropagateData propagateData) { + Message value = propagateData.getValue().getValue(); + Instant nextWindowMax = propagateData.getNextWindow().maxTimestamp(); + return value.toBuilder().setTimestamp(nextWindowMax).build(); + } + + static Message of(String key, String value, Instant timestamp) { + return new AutoValue_FillGapsTest_Message.Builder() + .setKey(key) + .setValue(value) + .setTimestamp(timestamp) + .build(); + } + + static TimestampedValue ofTimestamped(String key, String value, Instant timestamp) { + return TimestampedValue.of(of(key, value, timestamp), timestamp); + } + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setKey(String key); + + abstract Builder setValue(String value); + + abstract Builder setTimestamp(Instant timestamp); + + abstract Message build(); + } + + abstract Builder toBuilder(); + } + + @Test + public void testFillGaps() { + List> values = + ImmutableList.of( + Message.ofTimestamped("key1", "value0<", Instant.ofEpochSecond(0)), + Message.ofTimestamped("key2", "value0", Instant.ofEpochSecond(0)), + Message.ofTimestamped("key1", "value1<", Instant.ofEpochSecond(1)), + Message.ofTimestamped( + "key1", "value1", Instant.ofEpochSecond(1).plus(Duration.millis(1))), + Message.ofTimestamped("key2", "value1<", Instant.ofEpochSecond(1)), + Message.ofTimestamped( + "key2", "value1", Instant.ofEpochSecond(1).plus(Duration.millis(1))), + Message.ofTimestamped("key1", "value3", Instant.ofEpochSecond(3)), + Message.ofTimestamped("key2", "value3", Instant.ofEpochSecond(3))); + + PCollection input = pipeline.apply(Create.timestamped(values)); + PCollection> gapFilled = + input + .apply( + FillGaps.of(Duration.standardSeconds(1), "key") + .withStopTime(Instant.ofEpochSecond(5))) + .apply(Reify.timestamps()); + + FixedWindows fixedWindows = FixedWindows.of(Duration.standardSeconds(1)); + PAssert.that(gapFilled) + .containsInAnyOrder( + Iterables.concat( + values, + ImmutableList.of( + TimestampedValue.of( + Message.of( + "key1", "value1", Instant.ofEpochSecond(1).plus(Duration.millis(1))), + fixedWindows.assignWindow(Instant.ofEpochSecond(2)).maxTimestamp()), + TimestampedValue.of( + Message.of( + "key2", "value1", Instant.ofEpochSecond(1).plus(Duration.millis(1))), + fixedWindows.assignWindow(Instant.ofEpochSecond(2)).maxTimestamp()), + TimestampedValue.of( + Message.of("key1", "value3", Instant.ofEpochSecond(3)), + fixedWindows.assignWindow(Instant.ofEpochSecond(4)).maxTimestamp()), + TimestampedValue.of( + Message.of("key2", "value3", Instant.ofEpochSecond(3)), + fixedWindows.assignWindow(Instant.ofEpochSecond(4)).maxTimestamp())))); + pipeline.run(); + } + + @Test + public void testFillGapsKeepEarliest() { + List> values = + ImmutableList.of( + Message.ofTimestamped("key1", "value0<", Instant.ofEpochSecond(0)), + Message.ofTimestamped("key2", "value0", Instant.ofEpochSecond(0)), + Message.ofTimestamped("key1", "value1<", Instant.ofEpochSecond(1)), + Message.ofTimestamped( + "key1", "value1", Instant.ofEpochSecond(1).plus(Duration.millis(1))), + Message.ofTimestamped("key2", "value1<", Instant.ofEpochSecond(1)), + Message.ofTimestamped( + "key2", "value1", Instant.ofEpochSecond(1).plus(Duration.millis(1))), + Message.ofTimestamped("key1", "value3", Instant.ofEpochSecond(3)), + Message.ofTimestamped("key2", "value3", Instant.ofEpochSecond(3))); + + PCollection input = pipeline.apply(Create.timestamped(values)); + PCollection> gapFilled = + input + .apply( + FillGaps.of(Duration.standardSeconds(1), "key") + .withMergeFunction(FillGaps.keepEarliest()) + .withStopTime(Instant.ofEpochSecond(5))) + .apply(Reify.timestamps()); + + FixedWindows fixedWindows = FixedWindows.of(Duration.standardSeconds(1)); + PAssert.that(gapFilled) + .containsInAnyOrder( + Iterables.concat( + values, + ImmutableList.of( + TimestampedValue.of( + Message.of("key1", "value1<", Instant.ofEpochSecond(1)), + fixedWindows.assignWindow(Instant.ofEpochSecond(2)).maxTimestamp()), + TimestampedValue.of( + Message.of("key2", "value1<", Instant.ofEpochSecond(1)), + fixedWindows.assignWindow(Instant.ofEpochSecond(2)).maxTimestamp()), + TimestampedValue.of( + Message.of("key1", "value3", Instant.ofEpochSecond(3)), + fixedWindows.assignWindow(Instant.ofEpochSecond(4)).maxTimestamp()), + TimestampedValue.of( + Message.of("key2", "value3", Instant.ofEpochSecond(3)), + fixedWindows.assignWindow(Instant.ofEpochSecond(4)).maxTimestamp())))); + pipeline.run(); + } + + @Test + public void testFillGapsMaxDuration() { + List> values = + ImmutableList.of( + Message.ofTimestamped("key1", "value0", Instant.ofEpochSecond(0)), + Message.ofTimestamped("key2", "value0", Instant.ofEpochSecond(0)), + Message.ofTimestamped("key1", "value1", Instant.ofEpochSecond(1)), + Message.ofTimestamped("key2", "value1", Instant.ofEpochSecond(1)), + Message.ofTimestamped("key1", "value3", Instant.ofEpochSecond(10)), + Message.ofTimestamped("key2", "value3", Instant.ofEpochSecond(10))); + + PCollection input = pipeline.apply(Create.timestamped(values)); + PCollection> gapFilled = + input + .apply( + FillGaps.of(Duration.standardSeconds(1), "key") + .withMaxGapFillBuckets(4L) + .withStopTime(Instant.ofEpochSecond(11))) + .apply(Reify.timestamps()); + + FixedWindows fixedWindows = FixedWindows.of(Duration.standardSeconds(1)); + PAssert.that(gapFilled) + .containsInAnyOrder( + Iterables.concat( + values, + ImmutableList.of( + TimestampedValue.of( + Message.of("key1", "value1", Instant.ofEpochSecond(1)), + fixedWindows.assignWindow(Instant.ofEpochSecond(2)).maxTimestamp()), + TimestampedValue.of( + Message.of("key1", "value1", Instant.ofEpochSecond(1)), + fixedWindows.assignWindow(Instant.ofEpochSecond(3)).maxTimestamp()), + TimestampedValue.of( + Message.of("key1", "value1", Instant.ofEpochSecond(1)), + fixedWindows.assignWindow(Instant.ofEpochSecond(4)).maxTimestamp()), + TimestampedValue.of( + Message.of("key1", "value1", Instant.ofEpochSecond(1)), + fixedWindows.assignWindow(Instant.ofEpochSecond(5)).maxTimestamp()), + TimestampedValue.of( + Message.of("key2", "value1", Instant.ofEpochSecond(1)), + fixedWindows.assignWindow(Instant.ofEpochSecond(2)).maxTimestamp()), + TimestampedValue.of( + Message.of("key2", "value1", Instant.ofEpochSecond(1)), + fixedWindows.assignWindow(Instant.ofEpochSecond(3)).maxTimestamp()), + TimestampedValue.of( + Message.of("key2", "value1", Instant.ofEpochSecond(1)), + fixedWindows.assignWindow(Instant.ofEpochSecond(4)).maxTimestamp()), + TimestampedValue.of( + Message.of("key2", "value1", Instant.ofEpochSecond(1)), + fixedWindows.assignWindow(Instant.ofEpochSecond(5)).maxTimestamp())))); + pipeline.run(); + } + + @Test + public void testFillGapsPropagateFunction() { + List> values = + ImmutableList.of( + Message.ofTimestamped("key1", "value0<", Instant.ofEpochSecond(0)), + Message.ofTimestamped("key2", "value0", Instant.ofEpochSecond(0)), + Message.ofTimestamped("key1", "value1<", Instant.ofEpochSecond(1)), + Message.ofTimestamped( + "key1", "value1", Instant.ofEpochSecond(1).plus(Duration.millis(1))), + Message.ofTimestamped("key2", "value1<", Instant.ofEpochSecond(1)), + Message.ofTimestamped( + "key2", "value1", Instant.ofEpochSecond(1).plus(Duration.millis(1))), + Message.ofTimestamped("key1", "value3", Instant.ofEpochSecond(3)), + Message.ofTimestamped("key2", "value3", Instant.ofEpochSecond(3))); + + PCollection input = pipeline.apply(Create.timestamped(values)); + PCollection> gapFilled = + input + .apply( + FillGaps.of(Duration.standardSeconds(1), "key") + .withPropagateFunction(Message::update) + .withStopTime(Instant.ofEpochSecond(5))) + .apply(Reify.timestamps()); + + FixedWindows fixedWindows = FixedWindows.of(Duration.standardSeconds(1)); + Instant bucketTwoMax = fixedWindows.assignWindow(Instant.ofEpochSecond(2)).maxTimestamp(); + Instant bucketFourMax = fixedWindows.assignWindow(Instant.ofEpochSecond(4)).maxTimestamp(); + + PAssert.that(gapFilled) + .containsInAnyOrder( + Iterables.concat( + values, + ImmutableList.of( + Message.ofTimestamped("key1", "value1", bucketTwoMax), + Message.ofTimestamped("key2", "value1", bucketTwoMax), + Message.ofTimestamped("key1", "value3", bucketFourMax), + Message.ofTimestamped("key2", "value3", bucketFourMax)))); + pipeline.run(); + } + + // TODO: This test fails due to DirectRunner bugs. Uncomment once those bugs are fixed. + @Test + @Category({ + UsesTimersInParDo.class, + UsesLoopingTimer.class, + UsesStatefulParDo.class, + UsesStrictTimerOrdering.class + }) + public void testFillGapsFuzz() { + for (int i = 0; i < 6; ++i) { + fuzzTest(10, 500, 25, 20); + } + } + + public void fuzzTest(int numKeys, int numBuckets, int maxGapSizeToGenerate, long maxGapSize) { + Pipeline p = Pipeline.create(); + List> values = Lists.newArrayList(); + List> expectedGaps = Lists.newArrayList(); + for (int i = 0; i < numKeys; ++i) { + String key = "key" + i; + generateFuzzTimerseries( + key, numBuckets, maxGapSizeToGenerate, maxGapSize, values, expectedGaps); + } + + PCollection input = p.apply(Create.timestamped(values)); + PCollection> gapFilled = + input + .apply( + FillGaps.of(Duration.standardSeconds(1), "key") + .withPropagateFunction(Message::update) + .withMaxGapFillBuckets(maxGapSize) + .withStopTime(Instant.ofEpochSecond(numBuckets))) + .apply(Reify.timestamps()); + + PAssert.that(gapFilled).containsInAnyOrder(Iterables.concat(values, expectedGaps)); + p.run(); + } + + void generateFuzzTimerseries( + String key, + int numBuckets, + int maxGapSizeToGenerate, + long maxGapSize, + List> values, + List> expectedGaps) { + Random random = new Random(); + String lastValue = null; + int currentGapSize = 0; + for (int bucket = 0; bucket < numBuckets; ) { + if (lastValue != null && currentGapSize < maxGapSizeToGenerate && random.nextInt(10) == 0) { + // 10% chance of creating a gap. + int gapSize = random.nextInt(maxGapSizeToGenerate) + 1; + int lastGapBucket = Math.min(bucket + gapSize, numBuckets); + + for (; bucket < lastGapBucket; ++bucket) { + if (currentGapSize < maxGapSize) { + addBucketToTimeseries(key, lastValue, bucket, expectedGaps); + } + ++currentGapSize; + } + } else { + lastValue = "bucket" + bucket; + currentGapSize = 0; + addBucketToTimeseries(key, lastValue, bucket, values); + ++bucket; + } + } + } + + void addBucketToTimeseries( + String key, String value, int bucket, List> list) { + FixedWindows fixedWindows = FixedWindows.of(Duration.standardSeconds(1)); + BoundedWindow currentBucket = fixedWindows.assignWindow(Instant.ofEpochSecond(bucket)); + TimestampedValue message = + Message.ofTimestamped(key, value, currentBucket.maxTimestamp()); + list.add(message); + } +} diff --git a/settings.gradle.kts b/settings.gradle.kts index 7cc83b9698bc6..357a99c5c652f 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -145,6 +145,7 @@ include(":sdks:java:extensions:sql:zetasql") include(":sdks:java:extensions:sql:expansion-service") include(":sdks:java:extensions:sql:udf") include(":sdks:java:extensions:sql:udf-test-provider") +include(":sdks:java:extensions:timeseries") include(":sdks:java:extensions:zetasketch") include(":sdks:java:fn-execution") include(":sdks:java:harness") From 3b22bd378eae0d077b04061afb4192a3194fb1ff Mon Sep 17 00:00:00 2001 From: Reuven Lax Date: Wed, 13 Jul 2022 10:17:20 -0700 Subject: [PATCH 2/2] fix name --- .../beam/sdk/extensions/timeseries/FillGapsTest.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sdks/java/extensions/timeseries/src/test/java/org/apache/beam/sdk/extensions/timeseries/FillGapsTest.java b/sdks/java/extensions/timeseries/src/test/java/org/apache/beam/sdk/extensions/timeseries/FillGapsTest.java index 86b14aac0ae75..da419ec182042 100644 --- a/sdks/java/extensions/timeseries/src/test/java/org/apache/beam/sdk/extensions/timeseries/FillGapsTest.java +++ b/sdks/java/extensions/timeseries/src/test/java/org/apache/beam/sdk/extensions/timeseries/FillGapsTest.java @@ -59,9 +59,9 @@ abstract static class Message { abstract Instant getTimestamp(); - static Message update(FillGaps.PropagateData propagateData) { - Message value = propagateData.getValue().getValue(); - Instant nextWindowMax = propagateData.getNextWindow().maxTimestamp(); + static Message update(FillGaps.InterpolateData interpolateData) { + Message value = interpolateData.getValue().getValue(); + Instant nextWindowMax = interpolateData.getNextWindow().maxTimestamp(); return value.toBuilder().setTimestamp(nextWindowMax).build(); } @@ -255,7 +255,7 @@ public void testFillGapsPropagateFunction() { input .apply( FillGaps.of(Duration.standardSeconds(1), "key") - .withPropagateFunction(Message::update) + .withInterpolateFunction(Message::update) .withStopTime(Instant.ofEpochSecond(5))) .apply(Reify.timestamps()); @@ -304,7 +304,7 @@ public void fuzzTest(int numKeys, int numBuckets, int maxGapSizeToGenerate, long input .apply( FillGaps.of(Duration.standardSeconds(1), "key") - .withPropagateFunction(Message::update) + .withInterpolateFunction(Message::update) .withMaxGapFillBuckets(maxGapSize) .withStopTime(Instant.ofEpochSecond(numBuckets))) .apply(Reify.timestamps());