From e7a99f5f45e9acb529b903d7823735f3158756fc Mon Sep 17 00:00:00 2001 From: Jing Zhang Date: Wed, 28 Apr 2021 11:55:25 +0800 Subject: [PATCH 1/3] [FLINK-19606][table-runtime-blink] Introduce WindowJoinOperator and WindowJoinOperatorBuilder. --- .../join/window/WindowJoinOperator.java | 575 ++++++++++++++++++ .../window/WindowJoinOperatorBuilder.java | 187 ++++++ .../window/state/WindowListState.java | 83 +++ .../join/window/WindowJoinOperatorTest.java | 534 ++++++++++++++++ .../rank/window/WindowRankOperatorTest.java | 225 ++++--- 5 files changed, 1523 insertions(+), 81 deletions(-) create mode 100644 flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperator.java create mode 100644 flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperatorBuilder.java create mode 100644 flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/state/WindowListState.java create mode 100644 flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperatorTest.java diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperator.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperator.java new file mode 100644 index 0000000000000..eb929e5591f02 --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperator.java @@ -0,0 +1,575 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.join.window; + +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Meter; +import org.apache.flink.metrics.MeterView; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.KeyContext; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.operators.Triggerable; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.util.RowDataUtil; +import org.apache.flink.table.data.utils.JoinedRowData; +import org.apache.flink.table.runtime.generated.GeneratedJoinCondition; +import org.apache.flink.table.runtime.generated.JoinCondition; +import org.apache.flink.table.runtime.operators.TableStreamOperator; +import org.apache.flink.table.runtime.operators.join.JoinConditionWithNullFilters; +import org.apache.flink.table.runtime.operators.window.state.WindowListState; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.types.RowKind; + +import java.time.ZoneId; +import java.util.IdentityHashMap; +import java.util.List; + +import static org.apache.flink.table.runtime.util.TimeWindowUtil.isWindowFired; +import static org.apache.flink.table.runtime.util.TimeWindowUtil.toEpochMillsForTimer; + +/** + * Streaming window join operator. + * + *

Note: currently, {@link WindowJoinOperator} doesn't support early-fire and late-arrival. Thus + * late elements (elements belong to emitted windows) will be simply dropped. + */ +public abstract class WindowJoinOperator extends TableStreamOperator + implements TwoInputStreamOperator, + Triggerable, + KeyContext { + + private static final long serialVersionUID = 1L; + + private static final String LEFT_LATE_ELEMENTS_DROPPED_METRIC_NAME = + "leftNumLateRecordsDropped"; + private static final String LEFT_LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME = + "leftLateRecordsDroppedRate"; + private static final String RIGHT_LATE_ELEMENTS_DROPPED_METRIC_NAME = + "rightNumLateRecordsDropped"; + private static final String RIGHT_LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME = + "rightLateRecordsDroppedRate"; + private static final String WATERMARK_LATENCY_METRIC_NAME = "watermarkLatency"; + private static final String LEFT_RECORDS_STATE_NAME = "left-records"; + private static final String RIGHT_RECORDS_STATE_NAME = "right-records"; + + protected final InternalTypeInfo leftType; + protected final InternalTypeInfo rightType; + private final GeneratedJoinCondition generatedJoinCondition; + + private final int leftWindowEndIndex; + private final int rightWindowEndIndex; + + private final boolean[] filterNullKeys; + private final ZoneId shiftTimeZone; + + /** Flag to prevent duplicate function.close() calls in close() and dispose(). */ + private transient boolean functionsClosed = false; + + private transient InternalTimerService internalTimerService; + + // ------------------------------------------------------------------------ + protected transient JoinConditionWithNullFilters joinCondition; + + /** This is used for emitting elements with a given timestamp. */ + protected transient TimestampedCollector collector; + + private transient WindowListState leftWindowState; + private transient WindowListState rightWindowState; + + // ------------------------------------------------------------------------ + // Metrics + // ------------------------------------------------------------------------ + + private transient Counter leftNumLateRecordsDropped; + private transient Meter leftLateRecordsDroppedRate; + private transient Counter rightNumLateRecordsDropped; + private transient Meter rightLateRecordsDroppedRate; + private transient Gauge watermarkLatency; + + WindowJoinOperator( + InternalTypeInfo leftType, + InternalTypeInfo rightType, + GeneratedJoinCondition generatedJoinCondition, + int leftWindowEndIndex, + int rightWindowEndIndex, + boolean[] filterNullKeys, + ZoneId shiftTimeZone) { + this.leftType = leftType; + this.rightType = rightType; + this.generatedJoinCondition = generatedJoinCondition; + this.leftWindowEndIndex = leftWindowEndIndex; + this.rightWindowEndIndex = rightWindowEndIndex; + this.filterNullKeys = filterNullKeys; + this.shiftTimeZone = shiftTimeZone; + } + + @Override + public void open() throws Exception { + super.open(); + functionsClosed = false; + + this.collector = new TimestampedCollector<>(output); + collector.eraseTimestamp(); + + final LongSerializer windowSerializer = LongSerializer.INSTANCE; + + internalTimerService = getInternalTimerService("window-timers", windowSerializer, this); + + // init join condition + JoinCondition condition = + generatedJoinCondition.newInstance(getRuntimeContext().getUserCodeClassLoader()); + this.joinCondition = new JoinConditionWithNullFilters(condition, filterNullKeys, this); + this.joinCondition.setRuntimeContext(getRuntimeContext()); + this.joinCondition.open(new Configuration()); + + // init state + ListStateDescriptor leftRecordStateDesc = + new ListStateDescriptor<>(LEFT_RECORDS_STATE_NAME, leftType); + ListState leftListState = + getOrCreateKeyedState(windowSerializer, leftRecordStateDesc); + this.leftWindowState = + new WindowListState<>((InternalListState) leftListState); + + ListStateDescriptor rightRecordStateDesc = + new ListStateDescriptor<>(RIGHT_RECORDS_STATE_NAME, rightType); + ListState rightListState = + getOrCreateKeyedState(windowSerializer, rightRecordStateDesc); + this.rightWindowState = + new WindowListState<>((InternalListState) rightListState); + + // metrics + this.leftNumLateRecordsDropped = metrics.counter(LEFT_LATE_ELEMENTS_DROPPED_METRIC_NAME); + this.leftLateRecordsDroppedRate = + metrics.meter( + LEFT_LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME, + new MeterView(leftNumLateRecordsDropped)); + this.rightNumLateRecordsDropped = metrics.counter(RIGHT_LATE_ELEMENTS_DROPPED_METRIC_NAME); + this.rightLateRecordsDroppedRate = + metrics.meter( + RIGHT_LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME, + new MeterView(rightNumLateRecordsDropped)); + this.watermarkLatency = + metrics.gauge( + WATERMARK_LATENCY_METRIC_NAME, + () -> { + long watermark = internalTimerService.currentWatermark(); + if (watermark < 0) { + return 0L; + } else { + return internalTimerService.currentProcessingTime() - watermark; + } + }); + } + + @Override + public void close() throws Exception { + super.close(); + collector = null; + functionsClosed = true; + if (joinCondition != null) { + joinCondition.close(); + } + } + + @Override + public void dispose() throws Exception { + super.dispose(); + collector = null; + if (!functionsClosed) { + functionsClosed = true; + if (joinCondition != null) { + joinCondition.close(); + } + } + } + + @Override + public void processElement1(StreamRecord element) throws Exception { + processElement(element, leftWindowEndIndex, leftLateRecordsDroppedRate, leftWindowState); + } + + @Override + public void processElement2(StreamRecord element) throws Exception { + processElement(element, rightWindowEndIndex, rightLateRecordsDroppedRate, rightWindowState); + } + + private void processElement( + StreamRecord element, + int windowEndIndex, + Meter lateRecordsDroppedRate, + WindowListState recordState) + throws Exception { + RowData inputRow = element.getValue(); + long windowEnd = inputRow.getLong(windowEndIndex); + if (isWindowFired(windowEnd, internalTimerService.currentWatermark(), shiftTimeZone)) { + // element is late and should be dropped + lateRecordsDroppedRate.markEvent(); + return; + } + if (RowDataUtil.isAccumulateMsg(inputRow)) { + recordState.add(windowEnd, inputRow); + } else { + recordState.delete(windowEnd, inputRow); + } + // always register time for every element + internalTimerService.registerEventTimeTimer( + windowEnd, toEpochMillsForTimer(windowEnd - 1, shiftTimeZone)); + } + + @Override + public void onProcessingTime(InternalTimer timer) throws Exception { + // Window join only support event-time now + throw new UnsupportedOperationException( + "This is a bug and should not happen. Please file an issue."); + } + + @Override + public void onEventTime(InternalTimer timer) throws Exception { + setCurrentKey(timer.getKey()); + Long window = timer.getNamespace(); + // join left records and right records + List leftData = leftWindowState.get(window); + List rightData = rightWindowState.get(window); + join(leftData, rightData); + // clear state + leftWindowState.clear(window); + rightWindowState.clear(window); + } + + public abstract void join(Iterable leftRecords, Iterable rightRecords); + + static class SemiAntiJoinOperator extends WindowJoinOperator { + + private final boolean isAntiJoin; + + SemiAntiJoinOperator( + InternalTypeInfo leftType, + InternalTypeInfo rightType, + GeneratedJoinCondition generatedJoinCondition, + int leftWindowEndIndex, + int rightWindowEndIndex, + boolean[] filterNullKeys, + boolean isAntiJoin, + ZoneId shiftTimeZone) { + super( + leftType, + rightType, + generatedJoinCondition, + leftWindowEndIndex, + rightWindowEndIndex, + filterNullKeys, + shiftTimeZone); + this.isAntiJoin = isAntiJoin; + } + + @Override + public void join(Iterable leftRecords, Iterable rightRecords) { + if (leftRecords == null) { + return; + } + if (rightRecords == null) { + if (isAntiJoin) { + for (RowData leftRecord : leftRecords) { + collector.collect(leftRecord); + } + } + return; + } + for (RowData leftRecord : leftRecords) { + boolean matches = false; + for (RowData rightRecord : rightRecords) { + if (joinCondition.apply(leftRecord, rightRecord)) { + if (joinCondition.apply(leftRecord, rightRecord)) { + matches = true; + break; + } + } + } + if (matches) { + if (!isAntiJoin) { + // emit left record if there are matched rows on the other side + collector.collect(leftRecord); + } + } else { + if (isAntiJoin) { + // emit left record if there is no matched row on the other side + collector.collect(leftRecord); + } + } + } + } + } + + static class InnerJoinOperator extends WindowJoinOperator { + private transient JoinedRowData outRow; + + InnerJoinOperator( + InternalTypeInfo leftType, + InternalTypeInfo rightType, + GeneratedJoinCondition generatedJoinCondition, + int leftWindowEndIndex, + int rightWindowEndIndex, + boolean[] filterNullKeys, + ZoneId shiftTimeZone) { + super( + leftType, + rightType, + generatedJoinCondition, + leftWindowEndIndex, + rightWindowEndIndex, + filterNullKeys, + shiftTimeZone); + } + + @Override + public void open() throws Exception { + super.open(); + outRow = new JoinedRowData(); + } + + @Override + public void join(Iterable leftRecords, Iterable rightRecords) { + if (leftRecords == null || rightRecords == null) { + return; + } + for (RowData leftRecord : leftRecords) { + for (RowData rightRecord : rightRecords) { + if (joinCondition.apply(leftRecord, rightRecord)) { + outRow.setRowKind(RowKind.INSERT); + outRow.replace(leftRecord, rightRecord); + collector.collect(outRow); + } + } + } + } + } + + private abstract static class AbstractOuterJoinOperator extends WindowJoinOperator { + + private transient RowData leftNullRow; + private transient RowData rightNullRow; + private transient JoinedRowData outRow; + + AbstractOuterJoinOperator( + InternalTypeInfo leftType, + InternalTypeInfo rightType, + GeneratedJoinCondition generatedJoinCondition, + int leftWindowEndIndex, + int rightWindowEndIndex, + boolean[] filterNullKeys, + ZoneId shiftTimeZone) { + super( + leftType, + rightType, + generatedJoinCondition, + leftWindowEndIndex, + rightWindowEndIndex, + filterNullKeys, + shiftTimeZone); + } + + @Override + public void open() throws Exception { + super.open(); + leftNullRow = new GenericRowData(leftType.toRowSize()); + rightNullRow = new GenericRowData(rightType.toRowSize()); + outRow = new JoinedRowData(); + } + + protected void outputNullPadding(RowData row, boolean isLeft) { + if (isLeft) { + outRow.replace(row, rightNullRow); + } else { + outRow.replace(leftNullRow, row); + } + outRow.setRowKind(RowKind.INSERT); + collector.collect(outRow); + } + + protected void outputNullPadding(Iterable rows, boolean isLeft) { + for (RowData row : rows) { + outputNullPadding(row, isLeft); + } + } + + protected void output(RowData inputRow, RowData otherRow, boolean inputIsLeft) { + if (inputIsLeft) { + outRow.replace(inputRow, otherRow); + } else { + outRow.replace(otherRow, inputRow); + } + outRow.setRowKind(RowKind.INSERT); + collector.collect(outRow); + } + } + + static class LeftOuterJoinOperator extends AbstractOuterJoinOperator { + + LeftOuterJoinOperator( + InternalTypeInfo leftType, + InternalTypeInfo rightType, + GeneratedJoinCondition generatedJoinCondition, + int leftWindowEndIndex, + int rightWindowEndIndex, + boolean[] filterNullKeys, + ZoneId shiftTimeZone) { + super( + leftType, + rightType, + generatedJoinCondition, + leftWindowEndIndex, + rightWindowEndIndex, + filterNullKeys, + shiftTimeZone); + } + + @Override + public void join(Iterable leftRecords, Iterable rightRecords) { + if (leftRecords == null) { + return; + } + if (rightRecords == null) { + outputNullPadding(leftRecords, true); + } else { + for (RowData leftRecord : leftRecords) { + boolean matches = false; + for (RowData rightRecord : rightRecords) { + if (joinCondition.apply(leftRecord, rightRecord)) { + output(leftRecord, rightRecord, true); + matches = true; + } + } + if (!matches) { + // padding null for left side + outputNullPadding(leftRecord, true); + } + } + } + } + } + + static class RightOuterJoinOperator extends AbstractOuterJoinOperator { + + RightOuterJoinOperator( + InternalTypeInfo leftType, + InternalTypeInfo rightType, + GeneratedJoinCondition generatedJoinCondition, + int leftWindowEndIndex, + int rightWindowEndIndex, + boolean[] filterNullKeys, + ZoneId shiftTimeZone) { + super( + leftType, + rightType, + generatedJoinCondition, + leftWindowEndIndex, + rightWindowEndIndex, + filterNullKeys, + shiftTimeZone); + } + + @Override + public void join(Iterable leftRecords, Iterable rightRecords) { + if (rightRecords == null) { + return; + } + if (leftRecords == null) { + outputNullPadding(rightRecords, false); + } else { + for (RowData rightRecord : rightRecords) { + boolean matches = false; + for (RowData leftRecord : leftRecords) { + if (joinCondition.apply(leftRecord, rightRecord)) { + output(leftRecord, rightRecord, true); + matches = true; + } + } + if (!matches) { + outputNullPadding(rightRecord, false); + } + } + } + } + } + + static class FullOuterJoinOperator extends AbstractOuterJoinOperator { + + FullOuterJoinOperator( + InternalTypeInfo leftType, + InternalTypeInfo rightType, + GeneratedJoinCondition generatedJoinCondition, + int leftWindowEndIndex, + int rightWindowEndIndex, + boolean[] filterNullKeys, + ZoneId shiftTimeZone) { + super( + leftType, + rightType, + generatedJoinCondition, + leftWindowEndIndex, + rightWindowEndIndex, + filterNullKeys, + shiftTimeZone); + } + + @Override + public void join(Iterable leftRecords, Iterable rightRecords) { + if (leftRecords == null && rightRecords == null) { + return; + } + if (rightRecords == null) { + outputNullPadding(leftRecords, true); + } else if (leftRecords == null) { + outputNullPadding(rightRecords, false); + } else { + IdentityHashMap emittedRightRecords = new IdentityHashMap<>(); + for (RowData leftRecord : leftRecords) { + boolean matches = false; + for (RowData rightRecord : rightRecords) { + if (joinCondition.apply(leftRecord, rightRecord)) { + output(leftRecord, rightRecord, true); + matches = true; + emittedRightRecords.put(rightRecord, Boolean.TRUE); + } + } + // padding null for left side + if (!matches) { + outputNullPadding(leftRecord, true); + } + } + // padding null for never emitted right side + for (RowData rightRecord : rightRecords) { + if (!emittedRightRecords.containsKey(rightRecord)) { + outputNullPadding(rightRecord, false); + } + } + } + } + } +} diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperatorBuilder.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperatorBuilder.java new file mode 100644 index 0000000000000..03909487c750c --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperatorBuilder.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.join.window; + +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.generated.GeneratedJoinCondition; +import org.apache.flink.table.runtime.operators.join.FlinkJoinType; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; + +import java.time.ZoneId; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * The {@link WindowJoinOperatorBuilder} is used to build a {@link WindowJoinOperator} for window + * join. + * + *

+ * WindowJoinOperatorBuilder.builder()
+ *   .leftType(leftType)
+ *   .rightType(rightType)
+ *   .generatedJoinCondition(generatedJoinCondition)
+ *   .leftWindowEndIndex(leftWindowEndIndex)
+ *   .rightWindowEndIndex(rightWindowEndIndex)
+ *   .filterNullKeys(filterNullKeys)
+ *   .joinType(joinType)
+ *   .build();
+ * 
+ */ +public class WindowJoinOperatorBuilder { + + public static WindowJoinOperatorBuilder builder() { + return new WindowJoinOperatorBuilder(); + } + + private InternalTypeInfo leftType; + private InternalTypeInfo rightType; + private GeneratedJoinCondition generatedJoinCondition; + private int leftWindowEndIndex = -1; + private int rightWindowEndIndex = -1; + private boolean[] filterNullKeys; + private FlinkJoinType joinType; + private ZoneId shiftTimeZone = ZoneId.of("UTC"); + + public WindowJoinOperatorBuilder leftType(InternalTypeInfo leftType) { + this.leftType = leftType; + return this; + } + + public WindowJoinOperatorBuilder rightType(InternalTypeInfo rightType) { + this.rightType = rightType; + return this; + } + + public WindowJoinOperatorBuilder generatedJoinCondition( + GeneratedJoinCondition generatedJoinCondition) { + this.generatedJoinCondition = generatedJoinCondition; + return this; + } + + public WindowJoinOperatorBuilder filterNullKeys(boolean[] filterNullKeys) { + this.filterNullKeys = filterNullKeys; + return this; + } + + public WindowJoinOperatorBuilder joinType(FlinkJoinType joinType) { + this.joinType = joinType; + return this; + } + + public WindowJoinOperatorBuilder leftWindowEndIndex(int leftWindowEndIndex) { + this.leftWindowEndIndex = leftWindowEndIndex; + return this; + } + + public WindowJoinOperatorBuilder rightWindowEndIndex(int rightWindowEndIndex) { + this.rightWindowEndIndex = rightWindowEndIndex; + return this; + } + + /** + * The shift timezone of the window, if the proctime or rowtime type is TIMESTAMP_LTZ, the shift + * timezone is the timezone user configured in TableConfig, other cases the timezone is UTC + * which means never shift when assigning windows. + */ + public WindowJoinOperatorBuilder withShiftTimezone(ZoneId shiftTimeZone) { + this.shiftTimeZone = shiftTimeZone; + return this; + } + + public WindowJoinOperator build() { + checkNotNull(leftType); + checkNotNull(rightType); + checkNotNull(generatedJoinCondition); + checkNotNull(filterNullKeys); + checkNotNull(joinType); + + checkArgument( + leftWindowEndIndex >= 0, + String.format( + "Illegal window end index %s, it should not be negative!", + leftWindowEndIndex)); + checkArgument( + rightWindowEndIndex >= 0, + String.format( + "Illegal window end index %s, it should not be negative!", + rightWindowEndIndex)); + + switch (joinType) { + case INNER: + return new WindowJoinOperator.InnerJoinOperator( + leftType, + rightType, + generatedJoinCondition, + leftWindowEndIndex, + rightWindowEndIndex, + filterNullKeys, + shiftTimeZone); + case SEMI: + return new WindowJoinOperator.SemiAntiJoinOperator( + leftType, + rightType, + generatedJoinCondition, + leftWindowEndIndex, + rightWindowEndIndex, + filterNullKeys, + false, + shiftTimeZone); + case ANTI: + return new WindowJoinOperator.SemiAntiJoinOperator( + leftType, + rightType, + generatedJoinCondition, + leftWindowEndIndex, + rightWindowEndIndex, + filterNullKeys, + true, + shiftTimeZone); + case LEFT: + return new WindowJoinOperator.LeftOuterJoinOperator( + leftType, + rightType, + generatedJoinCondition, + leftWindowEndIndex, + rightWindowEndIndex, + filterNullKeys, + shiftTimeZone); + case RIGHT: + return new WindowJoinOperator.RightOuterJoinOperator( + leftType, + rightType, + generatedJoinCondition, + leftWindowEndIndex, + rightWindowEndIndex, + filterNullKeys, + shiftTimeZone); + case FULL: + return new WindowJoinOperator.FullOuterJoinOperator( + leftType, + rightType, + generatedJoinCondition, + leftWindowEndIndex, + rightWindowEndIndex, + filterNullKeys, + shiftTimeZone); + default: + throw new IllegalArgumentException("Invalid join type: " + joinType); + } + } +} diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/state/WindowListState.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/state/WindowListState.java new file mode 100644 index 0000000000000..afeb2911ab5c3 --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/state/WindowListState.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.window.state; + +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.table.data.RowData; + +import java.util.List; + +/** A wrapper of {@link ListState} which is easier to update based on window namespace. */ +public final class WindowListState implements WindowState { + + private final InternalListState windowState; + + public WindowListState(InternalListState windowState) { + this.windowState = windowState; + } + + public void clear(W window) { + windowState.setCurrentNamespace(window); + windowState.clear(); + } + + public List get(W window) throws Exception { + windowState.setCurrentNamespace(window); + return windowState.getInternal(); + } + + /** + * Updates the operator state accessible by {@link #get(W)})} by adding the given value to the + * list of values. The next time {@link #get(W)} is called (for the same state partition) the + * returned state will represent the updated list. + * + *

If null is passed in, the state value will remain unchanged. + * + * @param window The namespace for the state. + * @param value The new value for the state. + * @throws Exception Thrown if the system cannot access the state. + */ + public void add(W window, RowData value) throws Exception { + windowState.setCurrentNamespace(window); + windowState.add(value); + } + + /** + * Updates the operator state accessible by {@link #get(W)} )} by delete the given value to the + * list of values. The next time {@link #get(W)} is called (for the same state partition) the + * returned state will represent the updated list. + * + *

If null is passed in, the state value will remain unchanged. + * + *

The performance is not well, first get complete list by calling {@link + * InternalListState#getInternal()})}, then remove the value from list, finally update state by + * calling {@link InternalListState#update(List)}. + * + * @param window The namespace for the state. + * @param value The new value for the state. + * @throws Exception Thrown if the system cannot access the state. + */ + public boolean delete(W window, RowData value) throws Exception { + List completeData = get(window); + boolean flag = completeData.remove(value); + windowState.updateInternal(completeData); + return flag; + } +} diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperatorTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperatorTest.java new file mode 100644 index 0000000000000..0075100b9ae61 --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperatorTest.java @@ -0,0 +1,534 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.join.window; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.generated.GeneratedJoinCondition; +import org.apache.flink.table.runtime.keyselector.RowDataKeySelector; +import org.apache.flink.table.runtime.operators.join.FlinkJoinType; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.runtime.util.RowDataHarnessAssertor; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.VarCharType; +import org.apache.flink.table.utils.HandwrittenSelectorUtil; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.time.ZoneId; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; + +import static org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord; +import static org.apache.flink.table.runtime.util.TimeWindowUtil.toUtcTimestampMills; +import static org.junit.Assert.assertEquals; + +/** Tests for window join operators created by {@link WindowJoinOperatorBuilder}. */ +@RunWith(Parameterized.class) +public class WindowJoinOperatorTest { + + private static final InternalTypeInfo INPUT_ROW_TYPE = + InternalTypeInfo.ofFields(new BigIntType(), new VarCharType(VarCharType.MAX_LENGTH)); + + private static final InternalTypeInfo OUTPUT_ROW_TYPE = + InternalTypeInfo.ofFields( + new BigIntType(), + new VarCharType(VarCharType.MAX_LENGTH), + new BigIntType(), + new VarCharType(VarCharType.MAX_LENGTH)); + + private static final RowDataHarnessAssertor ASSERTER = + new RowDataHarnessAssertor(OUTPUT_ROW_TYPE.toRowFieldTypes()); + + private static final RowDataHarnessAssertor SEMI_ANTI_JOIN_ASSERTER = + new RowDataHarnessAssertor(INPUT_ROW_TYPE.toRowFieldTypes()); + + private static final ZoneId UTC_ZONE_ID = ZoneId.of("UTC"); + + private static final ZoneId SHANGHAI_ZONE_ID = ZoneId.of("Asia/Shanghai"); + + private final ZoneId shiftTimeZone; + + public WindowJoinOperatorTest(ZoneId shiftTimeZone) { + this.shiftTimeZone = shiftTimeZone; + } + + @Parameterized.Parameters(name = "TimeZone = {0}") + public static Collection runMode() { + return Arrays.asList(new Object[] {UTC_ZONE_ID}, new Object[] {SHANGHAI_ZONE_ID}); + } + + @Test + public void testSemiJoin() throws Exception { + KeyedTwoInputStreamOperatorTestHarness testHarness = + createTestHarness(FlinkJoinType.SEMI); + + testHarness.open(); + testHarness.processWatermark1(new Watermark(1)); + testHarness.processWatermark2(new Watermark(1)); + + // Test late data would be dropped + testHarness.processElement1(insertRecord(toUtcTimestampMills(1L, shiftTimeZone), "k1")); + assertEquals(0, testHarness.numEventTimeTimers()); + + testHarness.processElement1(insertRecord(toUtcTimestampMills(3L, shiftTimeZone), "k1")); + testHarness.processElement1(insertRecord(toUtcTimestampMills(3L, shiftTimeZone), "k1")); + testHarness.processElement2(insertRecord(toUtcTimestampMills(3L, shiftTimeZone), "k1")); + testHarness.processElement1(insertRecord(toUtcTimestampMills(6L, shiftTimeZone), "k1")); + testHarness.processElement2(insertRecord(toUtcTimestampMills(9L, shiftTimeZone), "k1")); + assertEquals(3, testHarness.numEventTimeTimers()); + assertEquals(4, testHarness.numKeyedStateEntries()); + + testHarness.processWatermark1(new Watermark(10)); + testHarness.processWatermark2(new Watermark(10)); + + List expectedOutput = new ArrayList<>(); + expectedOutput.add(new Watermark(1)); + expectedOutput.add(insertRecord(toUtcTimestampMills(3L, shiftTimeZone), "k1")); + expectedOutput.add(insertRecord(toUtcTimestampMills(3L, shiftTimeZone), "k1")); + expectedOutput.add(new Watermark(10)); + SEMI_ANTI_JOIN_ASSERTER.assertOutputEqualsSorted( + "output wrong.", expectedOutput, testHarness.getOutput()); + assertEquals(0, testHarness.numEventTimeTimers()); + assertEquals(0, testHarness.numKeyedStateEntries()); + + testHarness.processElement1(insertRecord(toUtcTimestampMills(12L, shiftTimeZone), "k1")); + testHarness.processElement1(insertRecord(toUtcTimestampMills(15L, shiftTimeZone), "k1")); + testHarness.processElement2(insertRecord(toUtcTimestampMills(15L, shiftTimeZone), "k1")); + testHarness.processElement2(insertRecord(toUtcTimestampMills(15L, shiftTimeZone), "k1")); + assertEquals(3, testHarness.numKeyedStateEntries()); + + testHarness.processWatermark1(new Watermark(13)); + testHarness.processWatermark2(new Watermark(13)); + + expectedOutput.add(new Watermark(13)); + assertEquals(2, testHarness.numKeyedStateEntries()); + ASSERTER.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput()); + + testHarness.processWatermark1(new Watermark(18)); + testHarness.processWatermark2(new Watermark(18)); + expectedOutput.add(insertRecord(toUtcTimestampMills(15L, shiftTimeZone), "k1")); + expectedOutput.add(new Watermark(18)); + ASSERTER.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput()); + testHarness.close(); + } + + @Test + public void testAntiJoin() throws Exception { + KeyedTwoInputStreamOperatorTestHarness testHarness = + createTestHarness(FlinkJoinType.ANTI); + testHarness.open(); + testHarness.processWatermark1(new Watermark(1)); + testHarness.processWatermark2(new Watermark(1)); + + // Test late data would be dropped + testHarness.processElement1(insertRecord(toUtcTimestampMills(1L, shiftTimeZone), "k1")); + assertEquals(0, testHarness.numEventTimeTimers()); + + testHarness.processElement1(insertRecord(toUtcTimestampMills(3L, shiftTimeZone), "k1")); + testHarness.processElement1(insertRecord(toUtcTimestampMills(3L, shiftTimeZone), "k1")); + testHarness.processElement2(insertRecord(toUtcTimestampMills(3L, shiftTimeZone), "k1")); + testHarness.processElement1(insertRecord(toUtcTimestampMills(6L, shiftTimeZone), "k1")); + testHarness.processElement2(insertRecord(toUtcTimestampMills(9L, shiftTimeZone), "k1")); + assertEquals(3, testHarness.numEventTimeTimers()); + assertEquals(4, testHarness.numKeyedStateEntries()); + + testHarness.processWatermark1(new Watermark(10)); + testHarness.processWatermark2(new Watermark(10)); + + List expectedOutput = new ArrayList<>(); + expectedOutput.add(new Watermark(1)); + expectedOutput.add(insertRecord(toUtcTimestampMills(6L, shiftTimeZone), "k1")); + expectedOutput.add(new Watermark(10)); + SEMI_ANTI_JOIN_ASSERTER.assertOutputEqualsSorted( + "output wrong.", expectedOutput, testHarness.getOutput()); + assertEquals(0, testHarness.numEventTimeTimers()); + assertEquals(0, testHarness.numKeyedStateEntries()); + + testHarness.processElement1(insertRecord(toUtcTimestampMills(12L, shiftTimeZone), "k1")); + testHarness.processElement1(insertRecord(toUtcTimestampMills(15L, shiftTimeZone), "k1")); + testHarness.processElement2(insertRecord(toUtcTimestampMills(15L, shiftTimeZone), "k1")); + testHarness.processElement2(insertRecord(toUtcTimestampMills(15L, shiftTimeZone), "k1")); + assertEquals(3, testHarness.numKeyedStateEntries()); + + testHarness.processWatermark1(new Watermark(13)); + testHarness.processWatermark2(new Watermark(13)); + + expectedOutput.add(insertRecord(toUtcTimestampMills(12L, shiftTimeZone), "k1")); + expectedOutput.add(new Watermark(13)); + assertEquals(2, testHarness.numKeyedStateEntries()); + ASSERTER.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput()); + + testHarness.processWatermark1(new Watermark(18)); + testHarness.processWatermark2(new Watermark(18)); + expectedOutput.add(new Watermark(18)); + ASSERTER.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput()); + testHarness.close(); + } + + @Test + public void testInnerJoin() throws Exception { + KeyedTwoInputStreamOperatorTestHarness testHarness = + createTestHarness(FlinkJoinType.INNER); + + testHarness.open(); + testHarness.processWatermark1(new Watermark(1)); + testHarness.processWatermark2(new Watermark(1)); + + // Test late data would be dropped + testHarness.processElement1(insertRecord(toUtcTimestampMills(1L, shiftTimeZone), "k1")); + assertEquals(0, testHarness.numEventTimeTimers()); + + testHarness.processElement1(insertRecord(toUtcTimestampMills(3L, shiftTimeZone), "k1")); + testHarness.processElement1(insertRecord(toUtcTimestampMills(3L, shiftTimeZone), "k1")); + testHarness.processElement2(insertRecord(toUtcTimestampMills(3L, shiftTimeZone), "k1")); + testHarness.processElement1(insertRecord(toUtcTimestampMills(6L, shiftTimeZone), "k1")); + testHarness.processElement2(insertRecord(toUtcTimestampMills(9L, shiftTimeZone), "k1")); + assertEquals(3, testHarness.numEventTimeTimers()); + assertEquals(4, testHarness.numKeyedStateEntries()); + + testHarness.processWatermark1(new Watermark(10)); + testHarness.processWatermark2(new Watermark(10)); + + List expectedOutput = new ArrayList<>(); + expectedOutput.add(new Watermark(1)); + expectedOutput.add( + insertRecord( + toUtcTimestampMills(3L, shiftTimeZone), + "k1", + toUtcTimestampMills(3L, shiftTimeZone), + "k1")); + expectedOutput.add( + insertRecord( + toUtcTimestampMills(3L, shiftTimeZone), + "k1", + toUtcTimestampMills(3L, shiftTimeZone), + "k1")); + expectedOutput.add(new Watermark(10)); + ASSERTER.assertOutputEqualsSorted("output wrong.", expectedOutput, testHarness.getOutput()); + assertEquals(0, testHarness.numEventTimeTimers()); + assertEquals(0, testHarness.numKeyedStateEntries()); + + testHarness.processElement1(insertRecord(toUtcTimestampMills(12L, shiftTimeZone), "k1")); + testHarness.processElement1(insertRecord(toUtcTimestampMills(15L, shiftTimeZone), "k1")); + testHarness.processElement2(insertRecord(toUtcTimestampMills(15L, shiftTimeZone), "k1")); + testHarness.processElement2(insertRecord(toUtcTimestampMills(15L, shiftTimeZone), "k1")); + assertEquals(3, testHarness.numKeyedStateEntries()); + + testHarness.processWatermark1(new Watermark(13)); + testHarness.processWatermark2(new Watermark(13)); + + expectedOutput.add(new Watermark(13)); + assertEquals(2, testHarness.numKeyedStateEntries()); + ASSERTER.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput()); + + testHarness.processWatermark1(new Watermark(18)); + testHarness.processWatermark2(new Watermark(18)); + expectedOutput.add( + insertRecord( + toUtcTimestampMills(15L, shiftTimeZone), + "k1", + toUtcTimestampMills(15L, shiftTimeZone), + "k1")); + expectedOutput.add( + insertRecord( + toUtcTimestampMills(15L, shiftTimeZone), + "k1", + toUtcTimestampMills(15L, shiftTimeZone), + "k1")); + expectedOutput.add(new Watermark(18)); + ASSERTER.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput()); + testHarness.close(); + } + + @Test + public void testLeftOuterJoin() throws Exception { + KeyedTwoInputStreamOperatorTestHarness testHarness = + createTestHarness(FlinkJoinType.LEFT); + + testHarness.open(); + testHarness.processWatermark1(new Watermark(1)); + testHarness.processWatermark2(new Watermark(1)); + + // Test late data would be dropped + testHarness.processElement1(insertRecord(toUtcTimestampMills(1L, shiftTimeZone), "k1")); + assertEquals(0, testHarness.numEventTimeTimers()); + + testHarness.processElement1(insertRecord(toUtcTimestampMills(3L, shiftTimeZone), "k1")); + testHarness.processElement1(insertRecord(toUtcTimestampMills(3L, shiftTimeZone), "k1")); + testHarness.processElement2(insertRecord(toUtcTimestampMills(3L, shiftTimeZone), "k1")); + testHarness.processElement1(insertRecord(toUtcTimestampMills(6L, shiftTimeZone), "k1")); + testHarness.processElement2(insertRecord(toUtcTimestampMills(9L, shiftTimeZone), "k1")); + assertEquals(3, testHarness.numEventTimeTimers()); + assertEquals(4, testHarness.numKeyedStateEntries()); + + testHarness.processWatermark1(new Watermark(10)); + testHarness.processWatermark2(new Watermark(10)); + + List expectedOutput = new ArrayList<>(); + expectedOutput.add(new Watermark(1)); + expectedOutput.add( + insertRecord( + toUtcTimestampMills(3L, shiftTimeZone), + "k1", + toUtcTimestampMills(3L, shiftTimeZone), + "k1")); + expectedOutput.add( + insertRecord( + toUtcTimestampMills(3L, shiftTimeZone), + "k1", + toUtcTimestampMills(3L, shiftTimeZone), + "k1")); + expectedOutput.add(insertRecord(toUtcTimestampMills(6L, shiftTimeZone), "k1", null, null)); + expectedOutput.add(new Watermark(10)); + ASSERTER.assertOutputEqualsSorted("output wrong.", expectedOutput, testHarness.getOutput()); + assertEquals(0, testHarness.numEventTimeTimers()); + assertEquals(0, testHarness.numKeyedStateEntries()); + + testHarness.processElement1(insertRecord(toUtcTimestampMills(12L, shiftTimeZone), "k1")); + testHarness.processElement1(insertRecord(toUtcTimestampMills(15L, shiftTimeZone), "k1")); + testHarness.processElement2(insertRecord(toUtcTimestampMills(15L, shiftTimeZone), "k1")); + testHarness.processElement2(insertRecord(toUtcTimestampMills(15L, shiftTimeZone), "k1")); + assertEquals(3, testHarness.numKeyedStateEntries()); + + testHarness.processWatermark1(new Watermark(13)); + testHarness.processWatermark2(new Watermark(13)); + + expectedOutput.add(insertRecord(toUtcTimestampMills(12L, shiftTimeZone), "k1", null, null)); + expectedOutput.add(new Watermark(13)); + assertEquals(2, testHarness.numKeyedStateEntries()); + ASSERTER.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput()); + + testHarness.processWatermark1(new Watermark(18)); + testHarness.processWatermark2(new Watermark(18)); + expectedOutput.add( + insertRecord( + toUtcTimestampMills(15L, shiftTimeZone), + "k1", + toUtcTimestampMills(15L, shiftTimeZone), + "k1")); + expectedOutput.add( + insertRecord( + toUtcTimestampMills(15L, shiftTimeZone), + "k1", + toUtcTimestampMills(15L, shiftTimeZone), + "k1")); + expectedOutput.add(new Watermark(18)); + ASSERTER.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput()); + testHarness.close(); + } + + @Test + public void testRightOuterJoin() throws Exception { + KeyedTwoInputStreamOperatorTestHarness testHarness = + createTestHarness(FlinkJoinType.RIGHT); + + testHarness.open(); + testHarness.processWatermark1(new Watermark(1)); + testHarness.processWatermark2(new Watermark(1)); + + // Test late data would be dropped + testHarness.processElement1(insertRecord(toUtcTimestampMills(1L, shiftTimeZone), "k1")); + assertEquals(0, testHarness.numEventTimeTimers()); + + testHarness.processElement1(insertRecord(toUtcTimestampMills(3L, shiftTimeZone), "k1")); + testHarness.processElement1(insertRecord(toUtcTimestampMills(3L, shiftTimeZone), "k1")); + testHarness.processElement2(insertRecord(toUtcTimestampMills(3L, shiftTimeZone), "k1")); + testHarness.processElement1(insertRecord(toUtcTimestampMills(6L, shiftTimeZone), "k1")); + testHarness.processElement2(insertRecord(toUtcTimestampMills(9L, shiftTimeZone), "k1")); + assertEquals(3, testHarness.numEventTimeTimers()); + assertEquals(4, testHarness.numKeyedStateEntries()); + + testHarness.processWatermark1(new Watermark(10)); + testHarness.processWatermark2(new Watermark(10)); + + List expectedOutput = new ArrayList<>(); + expectedOutput.add(new Watermark(1)); + expectedOutput.add( + insertRecord( + toUtcTimestampMills(3L, shiftTimeZone), + "k1", + toUtcTimestampMills(3L, shiftTimeZone), + "k1")); + expectedOutput.add( + insertRecord( + toUtcTimestampMills(3L, shiftTimeZone), + "k1", + toUtcTimestampMills(3L, shiftTimeZone), + "k1")); + expectedOutput.add(insertRecord(null, null, toUtcTimestampMills(9L, shiftTimeZone), "k1")); + expectedOutput.add(new Watermark(10)); + ASSERTER.assertOutputEqualsSorted("output wrong.", expectedOutput, testHarness.getOutput()); + assertEquals(0, testHarness.numEventTimeTimers()); + assertEquals(0, testHarness.numKeyedStateEntries()); + + testHarness.processElement1(insertRecord(toUtcTimestampMills(12L, shiftTimeZone), "k1")); + testHarness.processElement1(insertRecord(toUtcTimestampMills(15L, shiftTimeZone), "k1")); + testHarness.processElement2(insertRecord(toUtcTimestampMills(15L, shiftTimeZone), "k1")); + testHarness.processElement2(insertRecord(toUtcTimestampMills(15L, shiftTimeZone), "k1")); + assertEquals(3, testHarness.numKeyedStateEntries()); + + testHarness.processWatermark1(new Watermark(13)); + testHarness.processWatermark2(new Watermark(13)); + + expectedOutput.add(new Watermark(13)); + assertEquals(2, testHarness.numKeyedStateEntries()); + ASSERTER.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput()); + + testHarness.processWatermark1(new Watermark(18)); + testHarness.processWatermark2(new Watermark(18)); + expectedOutput.add( + insertRecord( + toUtcTimestampMills(15L, shiftTimeZone), + "k1", + toUtcTimestampMills(15L, shiftTimeZone), + "k1")); + expectedOutput.add( + insertRecord( + toUtcTimestampMills(15L, shiftTimeZone), + "k1", + toUtcTimestampMills(15L, shiftTimeZone), + "k1")); + expectedOutput.add(new Watermark(18)); + ASSERTER.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput()); + testHarness.close(); + } + + @Test + public void testOuterJoin() throws Exception { + KeyedTwoInputStreamOperatorTestHarness testHarness = + createTestHarness(FlinkJoinType.FULL); + + testHarness.open(); + testHarness.processWatermark1(new Watermark(1)); + testHarness.processWatermark2(new Watermark(1)); + + // Test late data would be dropped + testHarness.processElement1(insertRecord(toUtcTimestampMills(1L, shiftTimeZone), "k1")); + assertEquals(0, testHarness.numEventTimeTimers()); + + testHarness.processElement1(insertRecord(toUtcTimestampMills(3L, shiftTimeZone), "k1")); + testHarness.processElement1(insertRecord(toUtcTimestampMills(3L, shiftTimeZone), "k1")); + testHarness.processElement2(insertRecord(toUtcTimestampMills(3L, shiftTimeZone), "k1")); + testHarness.processElement1(insertRecord(toUtcTimestampMills(6L, shiftTimeZone), "k1")); + testHarness.processElement2(insertRecord(toUtcTimestampMills(9L, shiftTimeZone), "k1")); + assertEquals(3, testHarness.numEventTimeTimers()); + assertEquals(4, testHarness.numKeyedStateEntries()); + + testHarness.processWatermark1(new Watermark(10)); + testHarness.processWatermark2(new Watermark(10)); + + List expectedOutput = new ArrayList<>(); + expectedOutput.add(new Watermark(1)); + expectedOutput.add( + insertRecord( + toUtcTimestampMills(3L, shiftTimeZone), + "k1", + toUtcTimestampMills(3L, shiftTimeZone), + "k1")); + expectedOutput.add( + insertRecord( + toUtcTimestampMills(3L, shiftTimeZone), + "k1", + toUtcTimestampMills(3L, shiftTimeZone), + "k1")); + expectedOutput.add(insertRecord(toUtcTimestampMills(6L, shiftTimeZone), "k1", null, null)); + expectedOutput.add(insertRecord(null, null, toUtcTimestampMills(9L, shiftTimeZone), "k1")); + expectedOutput.add(new Watermark(10)); + ASSERTER.assertOutputEqualsSorted("output wrong.", expectedOutput, testHarness.getOutput()); + assertEquals(0, testHarness.numEventTimeTimers()); + assertEquals(0, testHarness.numKeyedStateEntries()); + + testHarness.processElement1(insertRecord(toUtcTimestampMills(12L, shiftTimeZone), "k1")); + testHarness.processElement1(insertRecord(toUtcTimestampMills(15L, shiftTimeZone), "k1")); + testHarness.processElement2(insertRecord(toUtcTimestampMills(15L, shiftTimeZone), "k1")); + testHarness.processElement2(insertRecord(toUtcTimestampMills(15L, shiftTimeZone), "k1")); + assertEquals(3, testHarness.numKeyedStateEntries()); + + testHarness.processWatermark1(new Watermark(13)); + testHarness.processWatermark2(new Watermark(13)); + + expectedOutput.add(insertRecord(toUtcTimestampMills(12L, shiftTimeZone), "k1", null, null)); + expectedOutput.add(new Watermark(13)); + assertEquals(2, testHarness.numKeyedStateEntries()); + ASSERTER.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput()); + + testHarness.processWatermark1(new Watermark(18)); + testHarness.processWatermark2(new Watermark(18)); + expectedOutput.add( + insertRecord( + toUtcTimestampMills(15L, shiftTimeZone), + "k1", + toUtcTimestampMills(15L, shiftTimeZone), + "k1")); + expectedOutput.add( + insertRecord( + toUtcTimestampMills(15L, shiftTimeZone), + "k1", + toUtcTimestampMills(15L, shiftTimeZone), + "k1")); + expectedOutput.add(new Watermark(18)); + ASSERTER.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput()); + testHarness.close(); + } + + private KeyedTwoInputStreamOperatorTestHarness + createTestHarness(FlinkJoinType joinType) throws Exception { + String funcCode = + "public class TestWindowJoinCondition extends org.apache.flink.api.common.functions.AbstractRichFunction " + + "implements org.apache.flink.table.runtime.generated.JoinCondition {\n" + + "\n" + + " public TestWindowJoinCondition(Object[] reference) {\n" + + " }\n" + + "\n" + + " @Override\n" + + " public boolean apply(org.apache.flink.table.data.RowData in1, org.apache.flink.table.data.RowData in2) {\n" + + " return true;\n" + + " }\n" + + "}\n"; + GeneratedJoinCondition joinFunction = + new GeneratedJoinCondition("TestWindowJoinCondition", funcCode, new Object[0]); + int keyIdx = 1; + RowDataKeySelector keySelector = + HandwrittenSelectorUtil.getRowDataSelector( + new int[] {keyIdx}, INPUT_ROW_TYPE.toRowFieldTypes()); + TypeInformation keyType = InternalTypeInfo.ofFields(); + WindowJoinOperator operator = + WindowJoinOperatorBuilder.builder() + .leftType(INPUT_ROW_TYPE) + .rightType(INPUT_ROW_TYPE) + .generatedJoinCondition(joinFunction) + .leftWindowEndIndex(0) + .rightWindowEndIndex(0) + .filterNullKeys(new boolean[] {true}) + .joinType(joinType) + .withShiftTimezone(shiftTimeZone) + .build(); + KeyedTwoInputStreamOperatorTestHarness testHarness = + new KeyedTwoInputStreamOperatorTestHarness<>( + operator, keySelector, keySelector, keyType); + return testHarness; + } +} diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorTest.java index c4d9df05410fa..087cc4c6710c6 100644 --- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorTest.java +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorTest.java @@ -41,15 +41,20 @@ import org.apache.flink.table.utils.HandwrittenSelectorUtil; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.time.ZoneId; import java.util.Arrays; +import java.util.Collection; import java.util.concurrent.ConcurrentLinkedQueue; import static org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord; +import static org.apache.flink.table.runtime.util.TimeWindowUtil.toUtcTimestampMills; import static org.junit.Assert.assertEquals; /** Tests for window rank operators created by {@link WindowRankOperatorBuilder}. */ +@RunWith(Parameterized.class) public class WindowRankOperatorTest { private static final RowType INPUT_ROW_TYPE = @@ -60,7 +65,6 @@ public class WindowRankOperatorTest { new RowType.RowField("f2", new BigIntType()))); private static final RowDataSerializer INPUT_ROW_SER = new RowDataSerializer(INPUT_ROW_TYPE); - private static final ZoneId UTC_ZONE_ID = ZoneId.of("UTC"); private static final RowDataKeySelector KEY_SELECTOR = HandwrittenSelectorUtil.getRowDataSelector( @@ -116,12 +120,31 @@ public RecordComparator newInstance(ClassLoader classLoader) { OUTPUT_TYPES_WITHOUT_RANK_NUMBER, new GenericRowRecordSortComparator(0, new VarCharType(VarCharType.MAX_LENGTH))); + private static final ZoneId UTC_ZONE_ID = ZoneId.of("UTC"); + private static final ZoneId SHANGHAI_ZONE_ID = ZoneId.of("Asia/Shanghai"); + private final ZoneId shiftTimeZone; + + public WindowRankOperatorTest(ZoneId shiftTimeZone) { + this.shiftTimeZone = shiftTimeZone; + } + + @Parameterized.Parameters(name = "TimeZone = {0}") + public static Collection runMode() { + return Arrays.asList(new Object[] {UTC_ZONE_ID}, new Object[] {SHANGHAI_ZONE_ID}); + } + + private static OneInputStreamOperatorTestHarness createTestHarness( + SlicingWindowOperator operator) throws Exception { + return new KeyedOneInputStreamOperatorTestHarness<>( + operator, KEY_SELECTOR, KEY_SELECTOR.getProducedType()); + } + @Test public void testTop2Windows() throws Exception { SlicingWindowOperator operator = WindowRankOperatorBuilder.builder() .inputSerializer(INPUT_ROW_SER) - .shiftTimeZone(UTC_ZONE_ID) + .shiftTimeZone(shiftTimeZone) .keySerializer(KEY_SER) .sortKeyComparator(GENERATED_SORT_KEY_COMPARATOR) .sortKeySelector(SORT_KEY_SELECTOR) @@ -141,36 +164,51 @@ public void testTop2Windows() throws Exception { ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); // add elements out-of-order - testHarness.processElement(insertRecord("key2", 1, 999L)); - testHarness.processElement(insertRecord("key2", 4, 999L)); - testHarness.processElement(insertRecord("key2", 5, 999L)); - testHarness.processElement(insertRecord("key2", 3, 999L)); - testHarness.processElement(insertRecord("key2", 2, 1999L)); - testHarness.processElement(insertRecord("key2", 7, 3999L)); - testHarness.processElement(insertRecord("key2", 8, 3999L)); - testHarness.processElement(insertRecord("key2", 1, 3999L)); - - testHarness.processElement(insertRecord("key1", 2, 999L)); - testHarness.processElement(insertRecord("key1", 1, 999L)); - testHarness.processElement(insertRecord("key1", 3, 999L)); - testHarness.processElement(insertRecord("key1", 3, 999L)); - testHarness.processElement(insertRecord("key1", 4, 1999L)); - testHarness.processElement(insertRecord("key1", 6, 1999L)); - testHarness.processElement(insertRecord("key1", 7, 1999L)); + testHarness.processElement( + insertRecord("key2", 1, toUtcTimestampMills(999L, shiftTimeZone))); + testHarness.processElement( + insertRecord("key2", 4, toUtcTimestampMills(999L, shiftTimeZone))); + testHarness.processElement( + insertRecord("key2", 5, toUtcTimestampMills(999L, shiftTimeZone))); + testHarness.processElement( + insertRecord("key2", 3, toUtcTimestampMills(999L, shiftTimeZone))); + testHarness.processElement( + insertRecord("key2", 2, toUtcTimestampMills(1999L, shiftTimeZone))); + testHarness.processElement( + insertRecord("key2", 7, toUtcTimestampMills(3999L, shiftTimeZone))); + testHarness.processElement( + insertRecord("key2", 8, toUtcTimestampMills(3999L, shiftTimeZone))); + testHarness.processElement( + insertRecord("key2", 1, toUtcTimestampMills(3999L, shiftTimeZone))); + + testHarness.processElement( + insertRecord("key1", 2, toUtcTimestampMills(999L, shiftTimeZone))); + testHarness.processElement( + insertRecord("key1", 1, toUtcTimestampMills(999L, shiftTimeZone))); + testHarness.processElement( + insertRecord("key1", 3, toUtcTimestampMills(999L, shiftTimeZone))); + testHarness.processElement( + insertRecord("key1", 3, toUtcTimestampMills(999L, shiftTimeZone))); + testHarness.processElement( + insertRecord("key1", 4, toUtcTimestampMills(1999L, shiftTimeZone))); + testHarness.processElement( + insertRecord("key1", 6, toUtcTimestampMills(1999L, shiftTimeZone))); + testHarness.processElement( + insertRecord("key1", 7, toUtcTimestampMills(1999L, shiftTimeZone))); testHarness.processWatermark(new Watermark(999)); - expectedOutput.add(insertRecord("key1", 1, 999L, 1L)); - expectedOutput.add(insertRecord("key1", 2, 999L, 2L)); - expectedOutput.add(insertRecord("key2", 1, 999L, 1L)); - expectedOutput.add(insertRecord("key2", 3, 999L, 2L)); + expectedOutput.add(insertRecord("key1", 1, toUtcTimestampMills(999L, shiftTimeZone), 1L)); + expectedOutput.add(insertRecord("key1", 2, toUtcTimestampMills(999L, shiftTimeZone), 2L)); + expectedOutput.add(insertRecord("key2", 1, toUtcTimestampMills(999L, shiftTimeZone), 1L)); + expectedOutput.add(insertRecord("key2", 3, toUtcTimestampMills(999L, shiftTimeZone), 2L)); expectedOutput.add(new Watermark(999)); ASSERTER.assertOutputEqualsSorted( "Output was not correct.", expectedOutput, testHarness.getOutput()); testHarness.processWatermark(new Watermark(1999)); - expectedOutput.add(insertRecord("key1", 4, 1999L, 1L)); - expectedOutput.add(insertRecord("key1", 6, 1999L, 2L)); - expectedOutput.add(insertRecord("key2", 2, 1999L, 1L)); + expectedOutput.add(insertRecord("key1", 4, toUtcTimestampMills(1999L, shiftTimeZone), 1L)); + expectedOutput.add(insertRecord("key1", 6, toUtcTimestampMills(1999L, shiftTimeZone), 2L)); + expectedOutput.add(insertRecord("key2", 2, toUtcTimestampMills(1999L, shiftTimeZone), 1L)); expectedOutput.add(new Watermark(1999)); ASSERTER.assertOutputEqualsSorted( "Output was not correct.", expectedOutput, testHarness.getOutput()); @@ -187,14 +225,15 @@ public void testTop2Windows() throws Exception { testHarness.open(); testHarness.processWatermark(new Watermark(3999)); - expectedOutput.add(insertRecord("key2", 1, 3999L, 1L)); - expectedOutput.add(insertRecord("key2", 7, 3999L, 2L)); + expectedOutput.add(insertRecord("key2", 1, toUtcTimestampMills(3999L, shiftTimeZone), 1L)); + expectedOutput.add(insertRecord("key2", 7, toUtcTimestampMills(3999L, shiftTimeZone), 2L)); expectedOutput.add(new Watermark(3999)); ASSERTER.assertOutputEqualsSorted( "Output was not correct.", expectedOutput, testHarness.getOutput()); // late element, should be dropped - testHarness.processElement(insertRecord("key2", 1, 3500L)); + testHarness.processElement( + insertRecord("key2", 1, toUtcTimestampMills(3500L, shiftTimeZone))); testHarness.processWatermark(new Watermark(4999)); expectedOutput.add(new Watermark(4999)); @@ -211,7 +250,7 @@ public void testTop2WindowsWithOffset() throws Exception { SlicingWindowOperator operator = WindowRankOperatorBuilder.builder() .inputSerializer(INPUT_ROW_SER) - .shiftTimeZone(UTC_ZONE_ID) + .shiftTimeZone(shiftTimeZone) .keySerializer(KEY_SER) .sortKeyComparator(GENERATED_SORT_KEY_COMPARATOR) .sortKeySelector(SORT_KEY_SELECTOR) @@ -231,32 +270,47 @@ public void testTop2WindowsWithOffset() throws Exception { ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); // add elements out-of-order - testHarness.processElement(insertRecord("key2", 1, 999L)); - testHarness.processElement(insertRecord("key2", 4, 999L)); - testHarness.processElement(insertRecord("key2", 5, 999L)); - testHarness.processElement(insertRecord("key2", 3, 999L)); - testHarness.processElement(insertRecord("key2", 2, 1999L)); - testHarness.processElement(insertRecord("key2", 7, 3999L)); - testHarness.processElement(insertRecord("key2", 8, 3999L)); - testHarness.processElement(insertRecord("key2", 1, 3999L)); - - testHarness.processElement(insertRecord("key1", 2, 999L)); - testHarness.processElement(insertRecord("key1", 1, 999L)); - testHarness.processElement(insertRecord("key1", 3, 999L)); - testHarness.processElement(insertRecord("key1", 3, 999L)); - testHarness.processElement(insertRecord("key1", 4, 1999L)); - testHarness.processElement(insertRecord("key1", 6, 1999L)); - testHarness.processElement(insertRecord("key1", 7, 1999L)); + testHarness.processElement( + insertRecord("key2", 1, toUtcTimestampMills(999L, shiftTimeZone))); + testHarness.processElement( + insertRecord("key2", 4, toUtcTimestampMills(999L, shiftTimeZone))); + testHarness.processElement( + insertRecord("key2", 5, toUtcTimestampMills(999L, shiftTimeZone))); + testHarness.processElement( + insertRecord("key2", 3, toUtcTimestampMills(999L, shiftTimeZone))); + testHarness.processElement( + insertRecord("key2", 2, toUtcTimestampMills(1999L, shiftTimeZone))); + testHarness.processElement( + insertRecord("key2", 7, toUtcTimestampMills(3999L, shiftTimeZone))); + testHarness.processElement( + insertRecord("key2", 8, toUtcTimestampMills(3999L, shiftTimeZone))); + testHarness.processElement( + insertRecord("key2", 1, toUtcTimestampMills(3999L, shiftTimeZone))); + + testHarness.processElement( + insertRecord("key1", 2, toUtcTimestampMills(999L, shiftTimeZone))); + testHarness.processElement( + insertRecord("key1", 1, toUtcTimestampMills(999L, shiftTimeZone))); + testHarness.processElement( + insertRecord("key1", 3, toUtcTimestampMills(999L, shiftTimeZone))); + testHarness.processElement( + insertRecord("key1", 3, toUtcTimestampMills(999L, shiftTimeZone))); + testHarness.processElement( + insertRecord("key1", 4, toUtcTimestampMills(1999L, shiftTimeZone))); + testHarness.processElement( + insertRecord("key1", 6, toUtcTimestampMills(1999L, shiftTimeZone))); + testHarness.processElement( + insertRecord("key1", 7, toUtcTimestampMills(1999L, shiftTimeZone))); testHarness.processWatermark(new Watermark(999)); - expectedOutput.add(insertRecord("key1", 2, 999L, 2L)); - expectedOutput.add(insertRecord("key2", 3, 999L, 2L)); + expectedOutput.add(insertRecord("key1", 2, toUtcTimestampMills(999L, shiftTimeZone), 2L)); + expectedOutput.add(insertRecord("key2", 3, toUtcTimestampMills(999L, shiftTimeZone), 2L)); expectedOutput.add(new Watermark(999)); ASSERTER.assertOutputEqualsSorted( "Output was not correct.", expectedOutput, testHarness.getOutput()); testHarness.processWatermark(new Watermark(1999)); - expectedOutput.add(insertRecord("key1", 6, 1999L, 2L)); + expectedOutput.add(insertRecord("key1", 6, toUtcTimestampMills(1999L, shiftTimeZone), 2L)); expectedOutput.add(new Watermark(1999)); ASSERTER.assertOutputEqualsSorted( "Output was not correct.", expectedOutput, testHarness.getOutput()); @@ -273,7 +327,7 @@ public void testTop2WindowsWithOffset() throws Exception { testHarness.open(); testHarness.processWatermark(new Watermark(3999)); - expectedOutput.add(insertRecord("key2", 7, 3999L, 2L)); + expectedOutput.add(insertRecord("key2", 7, toUtcTimestampMills(3999L, shiftTimeZone), 2L)); expectedOutput.add(new Watermark(3999)); ASSERTER.assertOutputEqualsSorted( "Output was not correct.", expectedOutput, testHarness.getOutput()); @@ -285,7 +339,7 @@ public void testTop2WindowsWithoutRankNumber() throws Exception { SlicingWindowOperator operator = WindowRankOperatorBuilder.builder() .inputSerializer(INPUT_ROW_SER) - .shiftTimeZone(UTC_ZONE_ID) + .shiftTimeZone(shiftTimeZone) .keySerializer(KEY_SER) .sortKeyComparator(GENERATED_SORT_KEY_COMPARATOR) .sortKeySelector(SORT_KEY_SELECTOR) @@ -305,36 +359,51 @@ public void testTop2WindowsWithoutRankNumber() throws Exception { ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); // add elements out-of-order - testHarness.processElement(insertRecord("key2", 1, 999L)); - testHarness.processElement(insertRecord("key2", 4, 999L)); - testHarness.processElement(insertRecord("key2", 5, 999L)); - testHarness.processElement(insertRecord("key2", 3, 999L)); - testHarness.processElement(insertRecord("key2", 2, 1999L)); - testHarness.processElement(insertRecord("key2", 7, 3999L)); - testHarness.processElement(insertRecord("key2", 8, 3999L)); - testHarness.processElement(insertRecord("key2", 1, 3999L)); - - testHarness.processElement(insertRecord("key1", 2, 999L)); - testHarness.processElement(insertRecord("key1", 1, 999L)); - testHarness.processElement(insertRecord("key1", 3, 999L)); - testHarness.processElement(insertRecord("key1", 3, 999L)); - testHarness.processElement(insertRecord("key1", 4, 1999L)); - testHarness.processElement(insertRecord("key1", 6, 1999L)); - testHarness.processElement(insertRecord("key1", 7, 1999L)); + testHarness.processElement( + insertRecord("key2", 1, toUtcTimestampMills(999L, shiftTimeZone))); + testHarness.processElement( + insertRecord("key2", 4, toUtcTimestampMills(999L, shiftTimeZone))); + testHarness.processElement( + insertRecord("key2", 5, toUtcTimestampMills(999L, shiftTimeZone))); + testHarness.processElement( + insertRecord("key2", 3, toUtcTimestampMills(999L, shiftTimeZone))); + testHarness.processElement( + insertRecord("key2", 2, toUtcTimestampMills(1999L, shiftTimeZone))); + testHarness.processElement( + insertRecord("key2", 7, toUtcTimestampMills(3999L, shiftTimeZone))); + testHarness.processElement( + insertRecord("key2", 8, toUtcTimestampMills(3999L, shiftTimeZone))); + testHarness.processElement( + insertRecord("key2", 1, toUtcTimestampMills(3999L, shiftTimeZone))); + + testHarness.processElement( + insertRecord("key1", 2, toUtcTimestampMills(999L, shiftTimeZone))); + testHarness.processElement( + insertRecord("key1", 1, toUtcTimestampMills(999L, shiftTimeZone))); + testHarness.processElement( + insertRecord("key1", 3, toUtcTimestampMills(999L, shiftTimeZone))); + testHarness.processElement( + insertRecord("key1", 3, toUtcTimestampMills(999L, shiftTimeZone))); + testHarness.processElement( + insertRecord("key1", 4, toUtcTimestampMills(1999L, shiftTimeZone))); + testHarness.processElement( + insertRecord("key1", 6, toUtcTimestampMills(1999L, shiftTimeZone))); + testHarness.processElement( + insertRecord("key1", 7, toUtcTimestampMills(1999L, shiftTimeZone))); testHarness.processWatermark(new Watermark(999)); - expectedOutput.add(insertRecord("key1", 1, 999L)); - expectedOutput.add(insertRecord("key1", 2, 999L)); - expectedOutput.add(insertRecord("key2", 1, 999L)); - expectedOutput.add(insertRecord("key2", 3, 999L)); + expectedOutput.add(insertRecord("key1", 1, toUtcTimestampMills(999L, shiftTimeZone))); + expectedOutput.add(insertRecord("key1", 2, toUtcTimestampMills(999L, shiftTimeZone))); + expectedOutput.add(insertRecord("key2", 1, toUtcTimestampMills(999L, shiftTimeZone))); + expectedOutput.add(insertRecord("key2", 3, toUtcTimestampMills(999L, shiftTimeZone))); expectedOutput.add(new Watermark(999)); ASSERTER_WITHOUT_RANK_NUMBER.assertOutputEqualsSorted( "Output was not correct.", expectedOutput, testHarness.getOutput()); testHarness.processWatermark(new Watermark(1999)); - expectedOutput.add(insertRecord("key1", 4, 1999L)); - expectedOutput.add(insertRecord("key1", 6, 1999L)); - expectedOutput.add(insertRecord("key2", 2, 1999L)); + expectedOutput.add(insertRecord("key1", 4, toUtcTimestampMills(1999L, shiftTimeZone))); + expectedOutput.add(insertRecord("key1", 6, toUtcTimestampMills(1999L, shiftTimeZone))); + expectedOutput.add(insertRecord("key2", 2, toUtcTimestampMills(1999L, shiftTimeZone))); expectedOutput.add(new Watermark(1999)); ASSERTER_WITHOUT_RANK_NUMBER.assertOutputEqualsSorted( "Output was not correct.", expectedOutput, testHarness.getOutput()); @@ -351,18 +420,12 @@ public void testTop2WindowsWithoutRankNumber() throws Exception { testHarness.open(); testHarness.processWatermark(new Watermark(3999)); - expectedOutput.add(insertRecord("key2", 1, 3999L)); - expectedOutput.add(insertRecord("key2", 7, 3999L)); + expectedOutput.add(insertRecord("key2", 1, toUtcTimestampMills(3999L, shiftTimeZone))); + expectedOutput.add(insertRecord("key2", 7, toUtcTimestampMills(3999L, shiftTimeZone))); expectedOutput.add(new Watermark(3999)); ASSERTER_WITHOUT_RANK_NUMBER.assertOutputEqualsSorted( "Output was not correct.", expectedOutput, testHarness.getOutput()); testHarness.close(); } - - private static OneInputStreamOperatorTestHarness createTestHarness( - SlicingWindowOperator operator) throws Exception { - return new KeyedOneInputStreamOperatorTestHarness<>( - operator, KEY_SELECTOR, KEY_SELECTOR.getProducedType()); - } } From 7fadefe78bb0f8b0d7b61ee2a105e1cb3b5d17b5 Mon Sep 17 00:00:00 2001 From: Jing Zhang Date: Fri, 30 Apr 2021 18:35:00 +0800 Subject: [PATCH 2/3] Update based on JingsongLi's comments --- .../join/window/WindowJoinOperator.java | 32 +++++++++++++------ .../window/state/WindowListState.java | 22 ------------- 2 files changed, 23 insertions(+), 31 deletions(-) diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperator.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperator.java index eb929e5591f02..2a12037aaf18f 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperator.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperator.java @@ -42,6 +42,8 @@ import org.apache.flink.table.runtime.generated.JoinCondition; import org.apache.flink.table.runtime.operators.TableStreamOperator; import org.apache.flink.table.runtime.operators.join.JoinConditionWithNullFilters; +import org.apache.flink.table.runtime.operators.window.slicing.WindowTimerService; +import org.apache.flink.table.runtime.operators.window.slicing.WindowTimerServiceImpl; import org.apache.flink.table.runtime.operators.window.state.WindowListState; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import org.apache.flink.types.RowKind; @@ -51,13 +53,14 @@ import java.util.List; import static org.apache.flink.table.runtime.util.TimeWindowUtil.isWindowFired; -import static org.apache.flink.table.runtime.util.TimeWindowUtil.toEpochMillsForTimer; /** * Streaming window join operator. * *

Note: currently, {@link WindowJoinOperator} doesn't support early-fire and late-arrival. Thus * late elements (elements belong to emitted windows) will be simply dropped. + * + *

Note: currently, {@link WindowJoinOperator} doesn't support DELETE or UPDATE_BEFORE input row. */ public abstract class WindowJoinOperator extends TableStreamOperator implements TwoInputStreamOperator, @@ -91,7 +94,7 @@ public abstract class WindowJoinOperator extends TableStreamOperator /** Flag to prevent duplicate function.close() calls in close() and dispose(). */ private transient boolean functionsClosed = false; - private transient InternalTimerService internalTimerService; + private transient WindowTimerService windowTimerService; // ------------------------------------------------------------------------ protected transient JoinConditionWithNullFilters joinCondition; @@ -139,7 +142,9 @@ public void open() throws Exception { final LongSerializer windowSerializer = LongSerializer.INSTANCE; - internalTimerService = getInternalTimerService("window-timers", windowSerializer, this); + InternalTimerService internalTimerService = + getInternalTimerService("window-timers", windowSerializer, this); + this.windowTimerService = new WindowTimerServiceImpl(internalTimerService, shiftTimeZone); // init join condition JoinCondition condition = @@ -178,11 +183,11 @@ public void open() throws Exception { metrics.gauge( WATERMARK_LATENCY_METRIC_NAME, () -> { - long watermark = internalTimerService.currentWatermark(); + long watermark = windowTimerService.currentWatermark(); if (watermark < 0) { return 0L; } else { - return internalTimerService.currentProcessingTime() - watermark; + return windowTimerService.currentProcessingTime() - watermark; } }); } @@ -227,7 +232,7 @@ private void processElement( throws Exception { RowData inputRow = element.getValue(); long windowEnd = inputRow.getLong(windowEndIndex); - if (isWindowFired(windowEnd, internalTimerService.currentWatermark(), shiftTimeZone)) { + if (isWindowFired(windowEnd, windowTimerService.currentWatermark(), shiftTimeZone)) { // element is late and should be dropped lateRecordsDroppedRate.markEvent(); return; @@ -235,11 +240,12 @@ private void processElement( if (RowDataUtil.isAccumulateMsg(inputRow)) { recordState.add(windowEnd, inputRow); } else { - recordState.delete(windowEnd, inputRow); + // Window join could not handle retraction input stream + throw new UnsupportedOperationException( + "This is a bug and should not happen. Please file an issue."); } // always register time for every element - internalTimerService.registerEventTimeTimer( - windowEnd, toEpochMillsForTimer(windowEnd - 1, shiftTimeZone)); + windowTimerService.registerEventTimeWindowTimer(windowEnd); } @Override @@ -372,6 +378,8 @@ public void join(Iterable leftRecords, Iterable rightRecords) private abstract static class AbstractOuterJoinOperator extends WindowJoinOperator { + private static final long serialVersionUID = 1L; + private transient RowData leftNullRow; private transient RowData rightNullRow; private transient JoinedRowData outRow; @@ -431,6 +439,8 @@ protected void output(RowData inputRow, RowData otherRow, boolean inputIsLeft) { static class LeftOuterJoinOperator extends AbstractOuterJoinOperator { + private static final long serialVersionUID = 1L; + LeftOuterJoinOperator( InternalTypeInfo leftType, InternalTypeInfo rightType, @@ -476,6 +486,8 @@ public void join(Iterable leftRecords, Iterable rightRecords) static class RightOuterJoinOperator extends AbstractOuterJoinOperator { + private static final long serialVersionUID = 1L; + RightOuterJoinOperator( InternalTypeInfo leftType, InternalTypeInfo rightType, @@ -520,6 +532,8 @@ public void join(Iterable leftRecords, Iterable rightRecords) static class FullOuterJoinOperator extends AbstractOuterJoinOperator { + private static final long serialVersionUID = 1L; + FullOuterJoinOperator( InternalTypeInfo leftType, InternalTypeInfo rightType, diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/state/WindowListState.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/state/WindowListState.java index afeb2911ab5c3..1de0ad0ef8c13 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/state/WindowListState.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/state/WindowListState.java @@ -58,26 +58,4 @@ public void add(W window, RowData value) throws Exception { windowState.setCurrentNamespace(window); windowState.add(value); } - - /** - * Updates the operator state accessible by {@link #get(W)} )} by delete the given value to the - * list of values. The next time {@link #get(W)} is called (for the same state partition) the - * returned state will represent the updated list. - * - *

If null is passed in, the state value will remain unchanged. - * - *

The performance is not well, first get complete list by calling {@link - * InternalListState#getInternal()})}, then remove the value from list, finally update state by - * calling {@link InternalListState#update(List)}. - * - * @param window The namespace for the state. - * @param value The new value for the state. - * @throws Exception Thrown if the system cannot access the state. - */ - public boolean delete(W window, RowData value) throws Exception { - List completeData = get(window); - boolean flag = completeData.remove(value); - windowState.updateInternal(completeData); - return flag; - } } From a7045f2abb260e6ffb2707142309d943387be02e Mon Sep 17 00:00:00 2001 From: Jing Zhang Date: Fri, 7 May 2021 14:31:47 +0800 Subject: [PATCH 3/3] Address jingsongli's comment --- .../join/window/WindowJoinOperator.java | 79 ++++++++++--------- .../window/WindowJoinOperatorBuilder.java | 42 +++++----- .../join/window/WindowJoinOperatorTest.java | 4 +- 3 files changed, 65 insertions(+), 60 deletions(-) diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperator.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperator.java index 2a12037aaf18f..d51064f79b5c8 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperator.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperator.java @@ -20,6 +20,7 @@ import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.Counter; @@ -45,7 +46,7 @@ import org.apache.flink.table.runtime.operators.window.slicing.WindowTimerService; import org.apache.flink.table.runtime.operators.window.slicing.WindowTimerServiceImpl; import org.apache.flink.table.runtime.operators.window.state.WindowListState; -import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.runtime.typeutils.RowDataSerializer; import org.apache.flink.types.RowKind; import java.time.ZoneId; @@ -81,8 +82,8 @@ public abstract class WindowJoinOperator extends TableStreamOperator private static final String LEFT_RECORDS_STATE_NAME = "left-records"; private static final String RIGHT_RECORDS_STATE_NAME = "right-records"; - protected final InternalTypeInfo leftType; - protected final InternalTypeInfo rightType; + protected final RowDataSerializer leftSerializer; + protected final RowDataSerializer rightSerializer; private final GeneratedJoinCondition generatedJoinCondition; private final int leftWindowEndIndex; @@ -116,15 +117,15 @@ public abstract class WindowJoinOperator extends TableStreamOperator private transient Gauge watermarkLatency; WindowJoinOperator( - InternalTypeInfo leftType, - InternalTypeInfo rightType, + TypeSerializer leftSerializer, + TypeSerializer rightSerializer, GeneratedJoinCondition generatedJoinCondition, int leftWindowEndIndex, int rightWindowEndIndex, boolean[] filterNullKeys, ZoneId shiftTimeZone) { - this.leftType = leftType; - this.rightType = rightType; + this.leftSerializer = (RowDataSerializer) leftSerializer; + this.rightSerializer = (RowDataSerializer) rightSerializer; this.generatedJoinCondition = generatedJoinCondition; this.leftWindowEndIndex = leftWindowEndIndex; this.rightWindowEndIndex = rightWindowEndIndex; @@ -155,14 +156,14 @@ public void open() throws Exception { // init state ListStateDescriptor leftRecordStateDesc = - new ListStateDescriptor<>(LEFT_RECORDS_STATE_NAME, leftType); + new ListStateDescriptor<>(LEFT_RECORDS_STATE_NAME, leftSerializer); ListState leftListState = getOrCreateKeyedState(windowSerializer, leftRecordStateDesc); this.leftWindowState = new WindowListState<>((InternalListState) leftListState); ListStateDescriptor rightRecordStateDesc = - new ListStateDescriptor<>(RIGHT_RECORDS_STATE_NAME, rightType); + new ListStateDescriptor<>(RIGHT_RECORDS_STATE_NAME, rightSerializer); ListState rightListState = getOrCreateKeyedState(windowSerializer, rightRecordStateDesc); this.rightWindowState = @@ -264,8 +265,12 @@ public void onEventTime(InternalTimer timer) throws Exception { List rightData = rightWindowState.get(window); join(leftData, rightData); // clear state - leftWindowState.clear(window); - rightWindowState.clear(window); + if (leftData != null) { + leftWindowState.clear(window); + } + if (rightData != null) { + rightWindowState.clear(window); + } } public abstract void join(Iterable leftRecords, Iterable rightRecords); @@ -275,8 +280,8 @@ static class SemiAntiJoinOperator extends WindowJoinOperator { private final boolean isAntiJoin; SemiAntiJoinOperator( - InternalTypeInfo leftType, - InternalTypeInfo rightType, + TypeSerializer leftSerializer, + TypeSerializer rightSerializer, GeneratedJoinCondition generatedJoinCondition, int leftWindowEndIndex, int rightWindowEndIndex, @@ -284,8 +289,8 @@ static class SemiAntiJoinOperator extends WindowJoinOperator { boolean isAntiJoin, ZoneId shiftTimeZone) { super( - leftType, - rightType, + leftSerializer, + rightSerializer, generatedJoinCondition, leftWindowEndIndex, rightWindowEndIndex, @@ -336,16 +341,16 @@ static class InnerJoinOperator extends WindowJoinOperator { private transient JoinedRowData outRow; InnerJoinOperator( - InternalTypeInfo leftType, - InternalTypeInfo rightType, + TypeSerializer leftSerializer, + TypeSerializer rightSerializer, GeneratedJoinCondition generatedJoinCondition, int leftWindowEndIndex, int rightWindowEndIndex, boolean[] filterNullKeys, ZoneId shiftTimeZone) { super( - leftType, - rightType, + leftSerializer, + rightSerializer, generatedJoinCondition, leftWindowEndIndex, rightWindowEndIndex, @@ -385,16 +390,16 @@ private abstract static class AbstractOuterJoinOperator extends WindowJoinOperat private transient JoinedRowData outRow; AbstractOuterJoinOperator( - InternalTypeInfo leftType, - InternalTypeInfo rightType, + TypeSerializer leftSerializer, + TypeSerializer rightSerializer, GeneratedJoinCondition generatedJoinCondition, int leftWindowEndIndex, int rightWindowEndIndex, boolean[] filterNullKeys, ZoneId shiftTimeZone) { super( - leftType, - rightType, + leftSerializer, + rightSerializer, generatedJoinCondition, leftWindowEndIndex, rightWindowEndIndex, @@ -405,8 +410,8 @@ private abstract static class AbstractOuterJoinOperator extends WindowJoinOperat @Override public void open() throws Exception { super.open(); - leftNullRow = new GenericRowData(leftType.toRowSize()); - rightNullRow = new GenericRowData(rightType.toRowSize()); + leftNullRow = new GenericRowData(leftSerializer.getArity()); + rightNullRow = new GenericRowData(rightSerializer.getArity()); outRow = new JoinedRowData(); } @@ -442,16 +447,16 @@ static class LeftOuterJoinOperator extends AbstractOuterJoinOperator { private static final long serialVersionUID = 1L; LeftOuterJoinOperator( - InternalTypeInfo leftType, - InternalTypeInfo rightType, + TypeSerializer leftSerializer, + TypeSerializer rightSerializer, GeneratedJoinCondition generatedJoinCondition, int leftWindowEndIndex, int rightWindowEndIndex, boolean[] filterNullKeys, ZoneId shiftTimeZone) { super( - leftType, - rightType, + leftSerializer, + rightSerializer, generatedJoinCondition, leftWindowEndIndex, rightWindowEndIndex, @@ -489,16 +494,16 @@ static class RightOuterJoinOperator extends AbstractOuterJoinOperator { private static final long serialVersionUID = 1L; RightOuterJoinOperator( - InternalTypeInfo leftType, - InternalTypeInfo rightType, + TypeSerializer leftSerializer, + TypeSerializer rightSerializer, GeneratedJoinCondition generatedJoinCondition, int leftWindowEndIndex, int rightWindowEndIndex, boolean[] filterNullKeys, ZoneId shiftTimeZone) { super( - leftType, - rightType, + leftSerializer, + rightSerializer, generatedJoinCondition, leftWindowEndIndex, rightWindowEndIndex, @@ -535,16 +540,16 @@ static class FullOuterJoinOperator extends AbstractOuterJoinOperator { private static final long serialVersionUID = 1L; FullOuterJoinOperator( - InternalTypeInfo leftType, - InternalTypeInfo rightType, + TypeSerializer leftSerializer, + TypeSerializer rightSerializer, GeneratedJoinCondition generatedJoinCondition, int leftWindowEndIndex, int rightWindowEndIndex, boolean[] filterNullKeys, ZoneId shiftTimeZone) { super( - leftType, - rightType, + leftSerializer, + rightSerializer, generatedJoinCondition, leftWindowEndIndex, rightWindowEndIndex, diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperatorBuilder.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperatorBuilder.java index 03909487c750c..c0e566555e714 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperatorBuilder.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperatorBuilder.java @@ -18,10 +18,10 @@ package org.apache.flink.table.runtime.operators.join.window; +import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.generated.GeneratedJoinCondition; import org.apache.flink.table.runtime.operators.join.FlinkJoinType; -import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import java.time.ZoneId; @@ -50,8 +50,8 @@ public static WindowJoinOperatorBuilder builder() { return new WindowJoinOperatorBuilder(); } - private InternalTypeInfo leftType; - private InternalTypeInfo rightType; + private TypeSerializer leftSerializer; + private TypeSerializer rightSerializer; private GeneratedJoinCondition generatedJoinCondition; private int leftWindowEndIndex = -1; private int rightWindowEndIndex = -1; @@ -59,13 +59,13 @@ public static WindowJoinOperatorBuilder builder() { private FlinkJoinType joinType; private ZoneId shiftTimeZone = ZoneId.of("UTC"); - public WindowJoinOperatorBuilder leftType(InternalTypeInfo leftType) { - this.leftType = leftType; + public WindowJoinOperatorBuilder leftSerializer(TypeSerializer leftSerializer) { + this.leftSerializer = leftSerializer; return this; } - public WindowJoinOperatorBuilder rightType(InternalTypeInfo rightType) { - this.rightType = rightType; + public WindowJoinOperatorBuilder rightSerializer(TypeSerializer rightSerializer) { + this.rightSerializer = rightSerializer; return this; } @@ -106,8 +106,8 @@ public WindowJoinOperatorBuilder withShiftTimezone(ZoneId shiftTimeZone) { } public WindowJoinOperator build() { - checkNotNull(leftType); - checkNotNull(rightType); + checkNotNull(leftSerializer); + checkNotNull(rightSerializer); checkNotNull(generatedJoinCondition); checkNotNull(filterNullKeys); checkNotNull(joinType); @@ -126,8 +126,8 @@ public WindowJoinOperator build() { switch (joinType) { case INNER: return new WindowJoinOperator.InnerJoinOperator( - leftType, - rightType, + leftSerializer, + rightSerializer, generatedJoinCondition, leftWindowEndIndex, rightWindowEndIndex, @@ -135,8 +135,8 @@ public WindowJoinOperator build() { shiftTimeZone); case SEMI: return new WindowJoinOperator.SemiAntiJoinOperator( - leftType, - rightType, + leftSerializer, + rightSerializer, generatedJoinCondition, leftWindowEndIndex, rightWindowEndIndex, @@ -145,8 +145,8 @@ public WindowJoinOperator build() { shiftTimeZone); case ANTI: return new WindowJoinOperator.SemiAntiJoinOperator( - leftType, - rightType, + leftSerializer, + rightSerializer, generatedJoinCondition, leftWindowEndIndex, rightWindowEndIndex, @@ -155,8 +155,8 @@ public WindowJoinOperator build() { shiftTimeZone); case LEFT: return new WindowJoinOperator.LeftOuterJoinOperator( - leftType, - rightType, + leftSerializer, + rightSerializer, generatedJoinCondition, leftWindowEndIndex, rightWindowEndIndex, @@ -164,8 +164,8 @@ public WindowJoinOperator build() { shiftTimeZone); case RIGHT: return new WindowJoinOperator.RightOuterJoinOperator( - leftType, - rightType, + leftSerializer, + rightSerializer, generatedJoinCondition, leftWindowEndIndex, rightWindowEndIndex, @@ -173,8 +173,8 @@ public WindowJoinOperator build() { shiftTimeZone); case FULL: return new WindowJoinOperator.FullOuterJoinOperator( - leftType, - rightType, + leftSerializer, + rightSerializer, generatedJoinCondition, leftWindowEndIndex, rightWindowEndIndex, diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperatorTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperatorTest.java index 0075100b9ae61..3b7d09384886a 100644 --- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperatorTest.java +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperatorTest.java @@ -517,8 +517,8 @@ public void testOuterJoin() throws Exception { TypeInformation keyType = InternalTypeInfo.ofFields(); WindowJoinOperator operator = WindowJoinOperatorBuilder.builder() - .leftType(INPUT_ROW_TYPE) - .rightType(INPUT_ROW_TYPE) + .leftSerializer(INPUT_ROW_TYPE.toRowSerializer()) + .rightSerializer(INPUT_ROW_TYPE.toRowSerializer()) .generatedJoinCondition(joinFunction) .leftWindowEndIndex(0) .rightWindowEndIndex(0)