Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for global sequence processing to the "ordered" extension in Java SDK #32540

Merged
merged 36 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
bc30ecc
Initial changes to support processing global sequences.
slilichenko Aug 22, 2024
54bbd07
Refactor the DoFns out of the transform and into a class hierarchy.
slilichenko Aug 22, 2024
66e9e7b
Next round of implementation of Global Sequence handling.
slilichenko Aug 23, 2024
66490da
Added ticker timers in global sequence processing.
slilichenko Aug 26, 2024
a4fc7f1
Corrected the emission batch logic.
slilichenko Aug 27, 2024
59e4426
Reworked some tests and fixed the batch output logic.
slilichenko Aug 27, 2024
ea6209d
Pluggable combiner for the global sequence.
slilichenko Aug 30, 2024
ed1b018
First iteration of the efficient merging accumulator
slilichenko Sep 3, 2024
3094ec9
Mostly complete implementation of the accumulator and corresponding t…
slilichenko Sep 9, 2024
10a9dea
Additional round of test refinements.
slilichenko Sep 9, 2024
03cfb3c
Added logic to DQL the records below the global sequence range.
slilichenko Sep 10, 2024
29f36bb
Added providing a global sequence combiner through a handler.
slilichenko Sep 11, 2024
4649795
Added SequenceRangeAccumulatorCoder and tests. Improved logic of crea…
slilichenko Sep 11, 2024
962bfdd
Fixed logging levels (moved them to "trace") on several transforms.
slilichenko Sep 11, 2024
d0a7a14
Round of code improvements and cleanups.
slilichenko Sep 12, 2024
ca12f0c
Tests to verify that the the global sequence is correctly produced by…
slilichenko Sep 19, 2024
7ba0d3c
Merge branch 'apache:master' into master
slilichenko Sep 19, 2024
a7ff45d
Added batch processing verification to the global sequence processing.
slilichenko Sep 23, 2024
4649d91
Merge branch 'apache:master' into master
slilichenko Sep 23, 2024
fa16547
A round of documentation update and minor clean up.
slilichenko Sep 23, 2024
1e39b98
Fixed the description in CHANGES.md
slilichenko Sep 23, 2024
13c4d90
Polish by "spotless"
slilichenko Sep 23, 2024
b73a4d6
Polish by "spotless"
slilichenko Sep 23, 2024
3035e5a
Removed unneeded logging configuration file.
slilichenko Sep 23, 2024
a1874b1
Made ContiguousSequenceRange open ended.
slilichenko Oct 8, 2024
3a8e3a1
Removed details from 2.60.0 section in CHANGES.md.
slilichenko Oct 8, 2024
69a1eae
Update sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk…
slilichenko Oct 8, 2024
76d769c
Fixed spotless related errors.
slilichenko Oct 8, 2024
058c875
Merge branch 'apache:master' into master
slilichenko Oct 8, 2024
7d80980
Added a note about the new functionality to CHANGES.md
slilichenko Oct 8, 2024
7a3b7fa
Added clarification around the data structure used in the sequence co…
slilichenko Oct 8, 2024
81acfcb
Added clarification around the data structure used in the sequence co…
slilichenko Oct 8, 2024
f54bdda
Fixed the problem with allowed lateness being set to 0 in the global …
slilichenko Oct 8, 2024
087dee6
Parameterized the GlobalSequenceTracker with the max number of events…
slilichenko Oct 8, 2024
479e60a
Made the event timer used to wait for the event arrival respect the l…
slilichenko Oct 8, 2024
f11dabf
Created new failure reason code - "before initial sequence"
slilichenko Oct 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
## New Features / Improvements

* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* Added support for processing events which use a global sequence to "ordered" extension (Java) [#32540](https://github.com/apache/beam/pull/32540)

## Breaking Changes

Expand Down
6 changes: 6 additions & 0 deletions sdks/java/extensions/ordered/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ dependencies {
implementation library.java.vendored_guava_32_1_2_jre
testImplementation library.java.junit
testImplementation library.java.hamcrest
testImplementation library.java.slf4j_jdk14
testImplementation project(path: ':sdks:java:core')
testImplementation 'junit:junit:4.13.1'
testImplementation project(path: ':runners:google-cloud-dataflow-java')
testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
testImplementation project(path: ":runners:google-cloud-dataflow-java")
testImplementation project(path: ":sdks:java:extensions:google-cloud-platform-core")
testImplementation project(path: ":sdks:java:io:google-cloud-platform")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.ordered;

import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.InstantCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.joda.time.Instant;

/** A range of contiguous event sequences and the latest timestamp of the events in the range. */
@AutoValue
public abstract class ContiguousSequenceRange {
public static final ContiguousSequenceRange EMPTY =
ContiguousSequenceRange.of(
Long.MIN_VALUE, Long.MIN_VALUE, Instant.ofEpochMilli(Long.MIN_VALUE));
damccorm marked this conversation as resolved.
Show resolved Hide resolved

/** @return inclusive starting sequence */
public abstract long getStart();
damccorm marked this conversation as resolved.
Show resolved Hide resolved

/** @return exclusive end sequence */
public abstract long getEnd();

/** @return latest timestamp of all events in the range */
public abstract Instant getTimestamp();

public static ContiguousSequenceRange of(long start, long end, Instant timestamp) {
return new AutoValue_ContiguousSequenceRange(start, end, timestamp);
}

static class CompletedSequenceRangeCoder extends CustomCoder<ContiguousSequenceRange> {

private static final CompletedSequenceRangeCoder INSTANCE = new CompletedSequenceRangeCoder();

static CompletedSequenceRangeCoder of() {
return INSTANCE;
}

private CompletedSequenceRangeCoder() {}

@Override
public void encode(
ContiguousSequenceRange value, @UnknownKeyFor @NonNull @Initialized OutputStream outStream)
throws @UnknownKeyFor @NonNull @Initialized CoderException, @UnknownKeyFor @NonNull
@Initialized IOException {
VarLongCoder.of().encode(value.getStart(), outStream);
VarLongCoder.of().encode(value.getEnd(), outStream);
InstantCoder.of().encode(value.getTimestamp(), outStream);
}

@Override
public ContiguousSequenceRange decode(@UnknownKeyFor @NonNull @Initialized InputStream inStream)
throws @UnknownKeyFor @NonNull @Initialized CoderException, @UnknownKeyFor @NonNull
@Initialized IOException {
long start = VarLongCoder.of().decode(inStream);
long end = VarLongCoder.of().decode(inStream);
Instant timestamp = InstantCoder.of().decode(inStream);
return ContiguousSequenceRange.of(start, end, timestamp);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ public interface EventExaminer<EventT, StateT extends MutableState<EventT, ?>>
extends Serializable {

/**
* Is this event the first expected event for the given key and window?
* Is this event the first expected event for the given key and window if the per key sequence is
* used? In case of global sequence it determines the first global sequence event.
*
* @param sequenceNumber the sequence number of the event as defined by the key of the input
* PCollection to {@link OrderedEventProcessor}
Expand All @@ -41,8 +42,8 @@ public interface EventExaminer<EventT, StateT extends MutableState<EventT, ?>>
boolean isInitialEvent(long sequenceNumber, EventT event);

/**
* If the event was the first event in the sequence, create the state to hold the required data
* needed for processing. This data will be persisted.
* If the event was the first event for a given key, create the state to hold the required data
* needed for processing. This data will be persisted in a Beam state.
*
* @param event the first event in the sequence.
* @return the state to persist.
Expand All @@ -53,6 +54,8 @@ public interface EventExaminer<EventT, StateT extends MutableState<EventT, ?>>
/**
* Is this event the last expected event for a given key and window?
*
* <p>Note, this method is not used yet with global sequences.
*
* @param sequenceNumber of the event
* @param event being processed
* @return true if the last event. There are cases where it's impossible to know whether it's the
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.ordered;

import org.apache.beam.sdk.extensions.ordered.ContiguousSequenceRange.CompletedSequenceRangeCoder;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.windowing.AfterFirst;
import org.apache.beam.sdk.transforms.windowing.AfterPane;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TimestampedValue;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;

/**
* PTransform to produce the side input of the maximum contiguous range of sequence numbers.
*
* @param <EventKeyT> type of event key
* @param <EventT> type of event
* @param <ResultT> type of processing result
* @param <StateT> type of state
*/
class GlobalSequenceTracker<
EventKeyT, EventT, ResultT, StateT extends MutableState<EventT, ResultT>>
extends PTransform<
PCollection<TimestampedValue<KV<EventKeyT, KV<Long, EventT>>>>,
PCollectionView<ContiguousSequenceRange>> {

private final Combine.GloballyAsSingletonView<
TimestampedValue<KV<EventKeyT, KV<Long, EventT>>>, ContiguousSequenceRange>
sideInputProducer;
private final @Nullable Duration frequencyOfGeneration;
private final int maxElementsBeforeReevaluatingGlobalSequence;

/**
* Constructor used in batch pipelines.
*
* @param sideInputProducer
*/
public GlobalSequenceTracker(
Combine.GloballyAsSingletonView<
TimestampedValue<KV<EventKeyT, KV<Long, EventT>>>, ContiguousSequenceRange>
sideInputProducer) {
this.sideInputProducer = sideInputProducer;
this.frequencyOfGeneration = null;
this.maxElementsBeforeReevaluatingGlobalSequence = 0;
}

public GlobalSequenceTracker(
Combine.GloballyAsSingletonView<
TimestampedValue<KV<EventKeyT, KV<Long, EventT>>>, ContiguousSequenceRange>
sideInputProducer,
Duration globalSequenceGenerationFrequency,
int maxElementsBeforeReevaluatingGlobalSequence) {
this.sideInputProducer = sideInputProducer;
this.frequencyOfGeneration = globalSequenceGenerationFrequency;
this.maxElementsBeforeReevaluatingGlobalSequence = maxElementsBeforeReevaluatingGlobalSequence;
}

@Override
public PCollectionView<ContiguousSequenceRange> expand(
PCollection<TimestampedValue<KV<EventKeyT, KV<Long, EventT>>>> input) {
input
.getPipeline()
.getCoderRegistry()
.registerCoderForClass(ContiguousSequenceRange.class, CompletedSequenceRangeCoder.of());

if (frequencyOfGeneration != null) {
// This branch will only be executed in case of streaming pipelines.
// For batch pipelines the side input should only be computed once.
input =
input.apply(
"Triggering Setup",
// Reproduce the windowing of the input PCollection, but change the triggering
// in order to create a slowing changing side input
Window.<TimestampedValue<KV<EventKeyT, KV<Long, EventT>>>>into(
(WindowFn<? super TimestampedValue<KV<EventKeyT, KV<Long, EventT>>>, ?>)
input.getWindowingStrategy().getWindowFn())
.accumulatingFiredPanes()
.withAllowedLateness(input.getWindowingStrategy().getAllowedLateness())
.triggering(
Repeatedly.forever(
AfterFirst.of(
AfterPane.elementCountAtLeast(
maxElementsBeforeReevaluatingGlobalSequence),
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(frequencyOfGeneration)))));
}
return input.apply("Create Side Input", sideInputProducer);
}
}
Loading
Loading