diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperator.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperator.java new file mode 100644 index 0000000000000..d51064f79b5c8 --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperator.java @@ -0,0 +1,594 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.join.window; + +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Meter; +import org.apache.flink.metrics.MeterView; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.KeyContext; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.operators.Triggerable; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.util.RowDataUtil; +import org.apache.flink.table.data.utils.JoinedRowData; +import org.apache.flink.table.runtime.generated.GeneratedJoinCondition; +import org.apache.flink.table.runtime.generated.JoinCondition; +import org.apache.flink.table.runtime.operators.TableStreamOperator; +import org.apache.flink.table.runtime.operators.join.JoinConditionWithNullFilters; +import org.apache.flink.table.runtime.operators.window.slicing.WindowTimerService; +import org.apache.flink.table.runtime.operators.window.slicing.WindowTimerServiceImpl; +import org.apache.flink.table.runtime.operators.window.state.WindowListState; +import org.apache.flink.table.runtime.typeutils.RowDataSerializer; +import org.apache.flink.types.RowKind; + +import java.time.ZoneId; +import java.util.IdentityHashMap; +import java.util.List; + +import static org.apache.flink.table.runtime.util.TimeWindowUtil.isWindowFired; + +/** + * Streaming window join operator. + * + *

Note: currently, {@link WindowJoinOperator} doesn't support early-fire and late-arrival. Thus + * late elements (elements belong to emitted windows) will be simply dropped. + * + *

Note: currently, {@link WindowJoinOperator} doesn't support DELETE or UPDATE_BEFORE input row. + */ +public abstract class WindowJoinOperator extends TableStreamOperator + implements TwoInputStreamOperator, + Triggerable, + KeyContext { + + private static final long serialVersionUID = 1L; + + private static final String LEFT_LATE_ELEMENTS_DROPPED_METRIC_NAME = + "leftNumLateRecordsDropped"; + private static final String LEFT_LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME = + "leftLateRecordsDroppedRate"; + private static final String RIGHT_LATE_ELEMENTS_DROPPED_METRIC_NAME = + "rightNumLateRecordsDropped"; + private static final String RIGHT_LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME = + "rightLateRecordsDroppedRate"; + private static final String WATERMARK_LATENCY_METRIC_NAME = "watermarkLatency"; + private static final String LEFT_RECORDS_STATE_NAME = "left-records"; + private static final String RIGHT_RECORDS_STATE_NAME = "right-records"; + + protected final RowDataSerializer leftSerializer; + protected final RowDataSerializer rightSerializer; + private final GeneratedJoinCondition generatedJoinCondition; + + private final int leftWindowEndIndex; + private final int rightWindowEndIndex; + + private final boolean[] filterNullKeys; + private final ZoneId shiftTimeZone; + + /** Flag to prevent duplicate function.close() calls in close() and dispose(). */ + private transient boolean functionsClosed = false; + + private transient WindowTimerService windowTimerService; + + // ------------------------------------------------------------------------ + protected transient JoinConditionWithNullFilters joinCondition; + + /** This is used for emitting elements with a given timestamp. */ + protected transient TimestampedCollector collector; + + private transient WindowListState leftWindowState; + private transient WindowListState rightWindowState; + + // ------------------------------------------------------------------------ + // Metrics + // ------------------------------------------------------------------------ + + private transient Counter leftNumLateRecordsDropped; + private transient Meter leftLateRecordsDroppedRate; + private transient Counter rightNumLateRecordsDropped; + private transient Meter rightLateRecordsDroppedRate; + private transient Gauge watermarkLatency; + + WindowJoinOperator( + TypeSerializer leftSerializer, + TypeSerializer rightSerializer, + GeneratedJoinCondition generatedJoinCondition, + int leftWindowEndIndex, + int rightWindowEndIndex, + boolean[] filterNullKeys, + ZoneId shiftTimeZone) { + this.leftSerializer = (RowDataSerializer) leftSerializer; + this.rightSerializer = (RowDataSerializer) rightSerializer; + this.generatedJoinCondition = generatedJoinCondition; + this.leftWindowEndIndex = leftWindowEndIndex; + this.rightWindowEndIndex = rightWindowEndIndex; + this.filterNullKeys = filterNullKeys; + this.shiftTimeZone = shiftTimeZone; + } + + @Override + public void open() throws Exception { + super.open(); + functionsClosed = false; + + this.collector = new TimestampedCollector<>(output); + collector.eraseTimestamp(); + + final LongSerializer windowSerializer = LongSerializer.INSTANCE; + + InternalTimerService internalTimerService = + getInternalTimerService("window-timers", windowSerializer, this); + this.windowTimerService = new WindowTimerServiceImpl(internalTimerService, shiftTimeZone); + + // init join condition + JoinCondition condition = + generatedJoinCondition.newInstance(getRuntimeContext().getUserCodeClassLoader()); + this.joinCondition = new JoinConditionWithNullFilters(condition, filterNullKeys, this); + this.joinCondition.setRuntimeContext(getRuntimeContext()); + this.joinCondition.open(new Configuration()); + + // init state + ListStateDescriptor leftRecordStateDesc = + new ListStateDescriptor<>(LEFT_RECORDS_STATE_NAME, leftSerializer); + ListState leftListState = + getOrCreateKeyedState(windowSerializer, leftRecordStateDesc); + this.leftWindowState = + new WindowListState<>((InternalListState) leftListState); + + ListStateDescriptor rightRecordStateDesc = + new ListStateDescriptor<>(RIGHT_RECORDS_STATE_NAME, rightSerializer); + ListState rightListState = + getOrCreateKeyedState(windowSerializer, rightRecordStateDesc); + this.rightWindowState = + new WindowListState<>((InternalListState) rightListState); + + // metrics + this.leftNumLateRecordsDropped = metrics.counter(LEFT_LATE_ELEMENTS_DROPPED_METRIC_NAME); + this.leftLateRecordsDroppedRate = + metrics.meter( + LEFT_LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME, + new MeterView(leftNumLateRecordsDropped)); + this.rightNumLateRecordsDropped = metrics.counter(RIGHT_LATE_ELEMENTS_DROPPED_METRIC_NAME); + this.rightLateRecordsDroppedRate = + metrics.meter( + RIGHT_LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME, + new MeterView(rightNumLateRecordsDropped)); + this.watermarkLatency = + metrics.gauge( + WATERMARK_LATENCY_METRIC_NAME, + () -> { + long watermark = windowTimerService.currentWatermark(); + if (watermark < 0) { + return 0L; + } else { + return windowTimerService.currentProcessingTime() - watermark; + } + }); + } + + @Override + public void close() throws Exception { + super.close(); + collector = null; + functionsClosed = true; + if (joinCondition != null) { + joinCondition.close(); + } + } + + @Override + public void dispose() throws Exception { + super.dispose(); + collector = null; + if (!functionsClosed) { + functionsClosed = true; + if (joinCondition != null) { + joinCondition.close(); + } + } + } + + @Override + public void processElement1(StreamRecord element) throws Exception { + processElement(element, leftWindowEndIndex, leftLateRecordsDroppedRate, leftWindowState); + } + + @Override + public void processElement2(StreamRecord element) throws Exception { + processElement(element, rightWindowEndIndex, rightLateRecordsDroppedRate, rightWindowState); + } + + private void processElement( + StreamRecord element, + int windowEndIndex, + Meter lateRecordsDroppedRate, + WindowListState recordState) + throws Exception { + RowData inputRow = element.getValue(); + long windowEnd = inputRow.getLong(windowEndIndex); + if (isWindowFired(windowEnd, windowTimerService.currentWatermark(), shiftTimeZone)) { + // element is late and should be dropped + lateRecordsDroppedRate.markEvent(); + return; + } + if (RowDataUtil.isAccumulateMsg(inputRow)) { + recordState.add(windowEnd, inputRow); + } else { + // Window join could not handle retraction input stream + throw new UnsupportedOperationException( + "This is a bug and should not happen. Please file an issue."); + } + // always register time for every element + windowTimerService.registerEventTimeWindowTimer(windowEnd); + } + + @Override + public void onProcessingTime(InternalTimer timer) throws Exception { + // Window join only support event-time now + throw new UnsupportedOperationException( + "This is a bug and should not happen. Please file an issue."); + } + + @Override + public void onEventTime(InternalTimer timer) throws Exception { + setCurrentKey(timer.getKey()); + Long window = timer.getNamespace(); + // join left records and right records + List leftData = leftWindowState.get(window); + List rightData = rightWindowState.get(window); + join(leftData, rightData); + // clear state + if (leftData != null) { + leftWindowState.clear(window); + } + if (rightData != null) { + rightWindowState.clear(window); + } + } + + public abstract void join(Iterable leftRecords, Iterable rightRecords); + + static class SemiAntiJoinOperator extends WindowJoinOperator { + + private final boolean isAntiJoin; + + SemiAntiJoinOperator( + TypeSerializer leftSerializer, + TypeSerializer rightSerializer, + GeneratedJoinCondition generatedJoinCondition, + int leftWindowEndIndex, + int rightWindowEndIndex, + boolean[] filterNullKeys, + boolean isAntiJoin, + ZoneId shiftTimeZone) { + super( + leftSerializer, + rightSerializer, + generatedJoinCondition, + leftWindowEndIndex, + rightWindowEndIndex, + filterNullKeys, + shiftTimeZone); + this.isAntiJoin = isAntiJoin; + } + + @Override + public void join(Iterable leftRecords, Iterable rightRecords) { + if (leftRecords == null) { + return; + } + if (rightRecords == null) { + if (isAntiJoin) { + for (RowData leftRecord : leftRecords) { + collector.collect(leftRecord); + } + } + return; + } + for (RowData leftRecord : leftRecords) { + boolean matches = false; + for (RowData rightRecord : rightRecords) { + if (joinCondition.apply(leftRecord, rightRecord)) { + if (joinCondition.apply(leftRecord, rightRecord)) { + matches = true; + break; + } + } + } + if (matches) { + if (!isAntiJoin) { + // emit left record if there are matched rows on the other side + collector.collect(leftRecord); + } + } else { + if (isAntiJoin) { + // emit left record if there is no matched row on the other side + collector.collect(leftRecord); + } + } + } + } + } + + static class InnerJoinOperator extends WindowJoinOperator { + private transient JoinedRowData outRow; + + InnerJoinOperator( + TypeSerializer leftSerializer, + TypeSerializer rightSerializer, + GeneratedJoinCondition generatedJoinCondition, + int leftWindowEndIndex, + int rightWindowEndIndex, + boolean[] filterNullKeys, + ZoneId shiftTimeZone) { + super( + leftSerializer, + rightSerializer, + generatedJoinCondition, + leftWindowEndIndex, + rightWindowEndIndex, + filterNullKeys, + shiftTimeZone); + } + + @Override + public void open() throws Exception { + super.open(); + outRow = new JoinedRowData(); + } + + @Override + public void join(Iterable leftRecords, Iterable rightRecords) { + if (leftRecords == null || rightRecords == null) { + return; + } + for (RowData leftRecord : leftRecords) { + for (RowData rightRecord : rightRecords) { + if (joinCondition.apply(leftRecord, rightRecord)) { + outRow.setRowKind(RowKind.INSERT); + outRow.replace(leftRecord, rightRecord); + collector.collect(outRow); + } + } + } + } + } + + private abstract static class AbstractOuterJoinOperator extends WindowJoinOperator { + + private static final long serialVersionUID = 1L; + + private transient RowData leftNullRow; + private transient RowData rightNullRow; + private transient JoinedRowData outRow; + + AbstractOuterJoinOperator( + TypeSerializer leftSerializer, + TypeSerializer rightSerializer, + GeneratedJoinCondition generatedJoinCondition, + int leftWindowEndIndex, + int rightWindowEndIndex, + boolean[] filterNullKeys, + ZoneId shiftTimeZone) { + super( + leftSerializer, + rightSerializer, + generatedJoinCondition, + leftWindowEndIndex, + rightWindowEndIndex, + filterNullKeys, + shiftTimeZone); + } + + @Override + public void open() throws Exception { + super.open(); + leftNullRow = new GenericRowData(leftSerializer.getArity()); + rightNullRow = new GenericRowData(rightSerializer.getArity()); + outRow = new JoinedRowData(); + } + + protected void outputNullPadding(RowData row, boolean isLeft) { + if (isLeft) { + outRow.replace(row, rightNullRow); + } else { + outRow.replace(leftNullRow, row); + } + outRow.setRowKind(RowKind.INSERT); + collector.collect(outRow); + } + + protected void outputNullPadding(Iterable rows, boolean isLeft) { + for (RowData row : rows) { + outputNullPadding(row, isLeft); + } + } + + protected void output(RowData inputRow, RowData otherRow, boolean inputIsLeft) { + if (inputIsLeft) { + outRow.replace(inputRow, otherRow); + } else { + outRow.replace(otherRow, inputRow); + } + outRow.setRowKind(RowKind.INSERT); + collector.collect(outRow); + } + } + + static class LeftOuterJoinOperator extends AbstractOuterJoinOperator { + + private static final long serialVersionUID = 1L; + + LeftOuterJoinOperator( + TypeSerializer leftSerializer, + TypeSerializer rightSerializer, + GeneratedJoinCondition generatedJoinCondition, + int leftWindowEndIndex, + int rightWindowEndIndex, + boolean[] filterNullKeys, + ZoneId shiftTimeZone) { + super( + leftSerializer, + rightSerializer, + generatedJoinCondition, + leftWindowEndIndex, + rightWindowEndIndex, + filterNullKeys, + shiftTimeZone); + } + + @Override + public void join(Iterable leftRecords, Iterable rightRecords) { + if (leftRecords == null) { + return; + } + if (rightRecords == null) { + outputNullPadding(leftRecords, true); + } else { + for (RowData leftRecord : leftRecords) { + boolean matches = false; + for (RowData rightRecord : rightRecords) { + if (joinCondition.apply(leftRecord, rightRecord)) { + output(leftRecord, rightRecord, true); + matches = true; + } + } + if (!matches) { + // padding null for left side + outputNullPadding(leftRecord, true); + } + } + } + } + } + + static class RightOuterJoinOperator extends AbstractOuterJoinOperator { + + private static final long serialVersionUID = 1L; + + RightOuterJoinOperator( + TypeSerializer leftSerializer, + TypeSerializer rightSerializer, + GeneratedJoinCondition generatedJoinCondition, + int leftWindowEndIndex, + int rightWindowEndIndex, + boolean[] filterNullKeys, + ZoneId shiftTimeZone) { + super( + leftSerializer, + rightSerializer, + generatedJoinCondition, + leftWindowEndIndex, + rightWindowEndIndex, + filterNullKeys, + shiftTimeZone); + } + + @Override + public void join(Iterable leftRecords, Iterable rightRecords) { + if (rightRecords == null) { + return; + } + if (leftRecords == null) { + outputNullPadding(rightRecords, false); + } else { + for (RowData rightRecord : rightRecords) { + boolean matches = false; + for (RowData leftRecord : leftRecords) { + if (joinCondition.apply(leftRecord, rightRecord)) { + output(leftRecord, rightRecord, true); + matches = true; + } + } + if (!matches) { + outputNullPadding(rightRecord, false); + } + } + } + } + } + + static class FullOuterJoinOperator extends AbstractOuterJoinOperator { + + private static final long serialVersionUID = 1L; + + FullOuterJoinOperator( + TypeSerializer leftSerializer, + TypeSerializer rightSerializer, + GeneratedJoinCondition generatedJoinCondition, + int leftWindowEndIndex, + int rightWindowEndIndex, + boolean[] filterNullKeys, + ZoneId shiftTimeZone) { + super( + leftSerializer, + rightSerializer, + generatedJoinCondition, + leftWindowEndIndex, + rightWindowEndIndex, + filterNullKeys, + shiftTimeZone); + } + + @Override + public void join(Iterable leftRecords, Iterable rightRecords) { + if (leftRecords == null && rightRecords == null) { + return; + } + if (rightRecords == null) { + outputNullPadding(leftRecords, true); + } else if (leftRecords == null) { + outputNullPadding(rightRecords, false); + } else { + IdentityHashMap emittedRightRecords = new IdentityHashMap<>(); + for (RowData leftRecord : leftRecords) { + boolean matches = false; + for (RowData rightRecord : rightRecords) { + if (joinCondition.apply(leftRecord, rightRecord)) { + output(leftRecord, rightRecord, true); + matches = true; + emittedRightRecords.put(rightRecord, Boolean.TRUE); + } + } + // padding null for left side + if (!matches) { + outputNullPadding(leftRecord, true); + } + } + // padding null for never emitted right side + for (RowData rightRecord : rightRecords) { + if (!emittedRightRecords.containsKey(rightRecord)) { + outputNullPadding(rightRecord, false); + } + } + } + } + } +} diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperatorBuilder.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperatorBuilder.java new file mode 100644 index 0000000000000..c0e566555e714 --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperatorBuilder.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.join.window; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.generated.GeneratedJoinCondition; +import org.apache.flink.table.runtime.operators.join.FlinkJoinType; + +import java.time.ZoneId; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * The {@link WindowJoinOperatorBuilder} is used to build a {@link WindowJoinOperator} for window + * join. + * + *

+ * WindowJoinOperatorBuilder.builder()
+ *   .leftType(leftType)
+ *   .rightType(rightType)
+ *   .generatedJoinCondition(generatedJoinCondition)
+ *   .leftWindowEndIndex(leftWindowEndIndex)
+ *   .rightWindowEndIndex(rightWindowEndIndex)
+ *   .filterNullKeys(filterNullKeys)
+ *   .joinType(joinType)
+ *   .build();
+ * 
+ */ +public class WindowJoinOperatorBuilder { + + public static WindowJoinOperatorBuilder builder() { + return new WindowJoinOperatorBuilder(); + } + + private TypeSerializer leftSerializer; + private TypeSerializer rightSerializer; + private GeneratedJoinCondition generatedJoinCondition; + private int leftWindowEndIndex = -1; + private int rightWindowEndIndex = -1; + private boolean[] filterNullKeys; + private FlinkJoinType joinType; + private ZoneId shiftTimeZone = ZoneId.of("UTC"); + + public WindowJoinOperatorBuilder leftSerializer(TypeSerializer leftSerializer) { + this.leftSerializer = leftSerializer; + return this; + } + + public WindowJoinOperatorBuilder rightSerializer(TypeSerializer rightSerializer) { + this.rightSerializer = rightSerializer; + return this; + } + + public WindowJoinOperatorBuilder generatedJoinCondition( + GeneratedJoinCondition generatedJoinCondition) { + this.generatedJoinCondition = generatedJoinCondition; + return this; + } + + public WindowJoinOperatorBuilder filterNullKeys(boolean[] filterNullKeys) { + this.filterNullKeys = filterNullKeys; + return this; + } + + public WindowJoinOperatorBuilder joinType(FlinkJoinType joinType) { + this.joinType = joinType; + return this; + } + + public WindowJoinOperatorBuilder leftWindowEndIndex(int leftWindowEndIndex) { + this.leftWindowEndIndex = leftWindowEndIndex; + return this; + } + + public WindowJoinOperatorBuilder rightWindowEndIndex(int rightWindowEndIndex) { + this.rightWindowEndIndex = rightWindowEndIndex; + return this; + } + + /** + * The shift timezone of the window, if the proctime or rowtime type is TIMESTAMP_LTZ, the shift + * timezone is the timezone user configured in TableConfig, other cases the timezone is UTC + * which means never shift when assigning windows. + */ + public WindowJoinOperatorBuilder withShiftTimezone(ZoneId shiftTimeZone) { + this.shiftTimeZone = shiftTimeZone; + return this; + } + + public WindowJoinOperator build() { + checkNotNull(leftSerializer); + checkNotNull(rightSerializer); + checkNotNull(generatedJoinCondition); + checkNotNull(filterNullKeys); + checkNotNull(joinType); + + checkArgument( + leftWindowEndIndex >= 0, + String.format( + "Illegal window end index %s, it should not be negative!", + leftWindowEndIndex)); + checkArgument( + rightWindowEndIndex >= 0, + String.format( + "Illegal window end index %s, it should not be negative!", + rightWindowEndIndex)); + + switch (joinType) { + case INNER: + return new WindowJoinOperator.InnerJoinOperator( + leftSerializer, + rightSerializer, + generatedJoinCondition, + leftWindowEndIndex, + rightWindowEndIndex, + filterNullKeys, + shiftTimeZone); + case SEMI: + return new WindowJoinOperator.SemiAntiJoinOperator( + leftSerializer, + rightSerializer, + generatedJoinCondition, + leftWindowEndIndex, + rightWindowEndIndex, + filterNullKeys, + false, + shiftTimeZone); + case ANTI: + return new WindowJoinOperator.SemiAntiJoinOperator( + leftSerializer, + rightSerializer, + generatedJoinCondition, + leftWindowEndIndex, + rightWindowEndIndex, + filterNullKeys, + true, + shiftTimeZone); + case LEFT: + return new WindowJoinOperator.LeftOuterJoinOperator( + leftSerializer, + rightSerializer, + generatedJoinCondition, + leftWindowEndIndex, + rightWindowEndIndex, + filterNullKeys, + shiftTimeZone); + case RIGHT: + return new WindowJoinOperator.RightOuterJoinOperator( + leftSerializer, + rightSerializer, + generatedJoinCondition, + leftWindowEndIndex, + rightWindowEndIndex, + filterNullKeys, + shiftTimeZone); + case FULL: + return new WindowJoinOperator.FullOuterJoinOperator( + leftSerializer, + rightSerializer, + generatedJoinCondition, + leftWindowEndIndex, + rightWindowEndIndex, + filterNullKeys, + shiftTimeZone); + default: + throw new IllegalArgumentException("Invalid join type: " + joinType); + } + } +} diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/state/WindowListState.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/state/WindowListState.java new file mode 100644 index 0000000000000..1de0ad0ef8c13 --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/state/WindowListState.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.window.state; + +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.table.data.RowData; + +import java.util.List; + +/** A wrapper of {@link ListState} which is easier to update based on window namespace. */ +public final class WindowListState implements WindowState { + + private final InternalListState windowState; + + public WindowListState(InternalListState windowState) { + this.windowState = windowState; + } + + public void clear(W window) { + windowState.setCurrentNamespace(window); + windowState.clear(); + } + + public List get(W window) throws Exception { + windowState.setCurrentNamespace(window); + return windowState.getInternal(); + } + + /** + * Updates the operator state accessible by {@link #get(W)})} by adding the given value to the + * list of values. The next time {@link #get(W)} is called (for the same state partition) the + * returned state will represent the updated list. + * + *

If null is passed in, the state value will remain unchanged. + * + * @param window The namespace for the state. + * @param value The new value for the state. + * @throws Exception Thrown if the system cannot access the state. + */ + public void add(W window, RowData value) throws Exception { + windowState.setCurrentNamespace(window); + windowState.add(value); + } +} diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperatorTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperatorTest.java new file mode 100644 index 0000000000000..3b7d09384886a --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperatorTest.java @@ -0,0 +1,534 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.join.window; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.generated.GeneratedJoinCondition; +import org.apache.flink.table.runtime.keyselector.RowDataKeySelector; +import org.apache.flink.table.runtime.operators.join.FlinkJoinType; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.runtime.util.RowDataHarnessAssertor; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.VarCharType; +import org.apache.flink.table.utils.HandwrittenSelectorUtil; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.time.ZoneId; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; + +import static org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord; +import static org.apache.flink.table.runtime.util.TimeWindowUtil.toUtcTimestampMills; +import static org.junit.Assert.assertEquals; + +/** Tests for window join operators created by {@link WindowJoinOperatorBuilder}. */ +@RunWith(Parameterized.class) +public class WindowJoinOperatorTest { + + private static final InternalTypeInfo INPUT_ROW_TYPE = + InternalTypeInfo.ofFields(new BigIntType(), new VarCharType(VarCharType.MAX_LENGTH)); + + private static final InternalTypeInfo OUTPUT_ROW_TYPE = + InternalTypeInfo.ofFields( + new BigIntType(), + new VarCharType(VarCharType.MAX_LENGTH), + new BigIntType(), + new VarCharType(VarCharType.MAX_LENGTH)); + + private static final RowDataHarnessAssertor ASSERTER = + new RowDataHarnessAssertor(OUTPUT_ROW_TYPE.toRowFieldTypes()); + + private static final RowDataHarnessAssertor SEMI_ANTI_JOIN_ASSERTER = + new RowDataHarnessAssertor(INPUT_ROW_TYPE.toRowFieldTypes()); + + private static final ZoneId UTC_ZONE_ID = ZoneId.of("UTC"); + + private static final ZoneId SHANGHAI_ZONE_ID = ZoneId.of("Asia/Shanghai"); + + private final ZoneId shiftTimeZone; + + public WindowJoinOperatorTest(ZoneId shiftTimeZone) { + this.shiftTimeZone = shiftTimeZone; + } + + @Parameterized.Parameters(name = "TimeZone = {0}") + public static Collection runMode() { + return Arrays.asList(new Object[] {UTC_ZONE_ID}, new Object[] {SHANGHAI_ZONE_ID}); + } + + @Test + public void testSemiJoin() throws Exception { + KeyedTwoInputStreamOperatorTestHarness testHarness = + createTestHarness(FlinkJoinType.SEMI); + + testHarness.open(); + testHarness.processWatermark1(new Watermark(1)); + testHarness.processWatermark2(new Watermark(1)); + + // Test late data would be dropped + testHarness.processElement1(insertRecord(toUtcTimestampMills(1L, shiftTimeZone), "k1")); + assertEquals(0, testHarness.numEventTimeTimers()); + + testHarness.processElement1(insertRecord(toUtcTimestampMills(3L, shiftTimeZone), "k1")); + testHarness.processElement1(insertRecord(toUtcTimestampMills(3L, shiftTimeZone), "k1")); + testHarness.processElement2(insertRecord(toUtcTimestampMills(3L, shiftTimeZone), "k1")); + testHarness.processElement1(insertRecord(toUtcTimestampMills(6L, shiftTimeZone), "k1")); + testHarness.processElement2(insertRecord(toUtcTimestampMills(9L, shiftTimeZone), "k1")); + assertEquals(3, testHarness.numEventTimeTimers()); + assertEquals(4, testHarness.numKeyedStateEntries()); + + testHarness.processWatermark1(new Watermark(10)); + testHarness.processWatermark2(new Watermark(10)); + + List expectedOutput = new ArrayList<>(); + expectedOutput.add(new Watermark(1)); + expectedOutput.add(insertRecord(toUtcTimestampMills(3L, shiftTimeZone), "k1")); + expectedOutput.add(insertRecord(toUtcTimestampMills(3L, shiftTimeZone), "k1")); + expectedOutput.add(new Watermark(10)); + SEMI_ANTI_JOIN_ASSERTER.assertOutputEqualsSorted( + "output wrong.", expectedOutput, testHarness.getOutput()); + assertEquals(0, testHarness.numEventTimeTimers()); + assertEquals(0, testHarness.numKeyedStateEntries()); + + testHarness.processElement1(insertRecord(toUtcTimestampMills(12L, shiftTimeZone), "k1")); + testHarness.processElement1(insertRecord(toUtcTimestampMills(15L, shiftTimeZone), "k1")); + testHarness.processElement2(insertRecord(toUtcTimestampMills(15L, shiftTimeZone), "k1")); + testHarness.processElement2(insertRecord(toUtcTimestampMills(15L, shiftTimeZone), "k1")); + assertEquals(3, testHarness.numKeyedStateEntries()); + + testHarness.processWatermark1(new Watermark(13)); + testHarness.processWatermark2(new Watermark(13)); + + expectedOutput.add(new Watermark(13)); + assertEquals(2, testHarness.numKeyedStateEntries()); + ASSERTER.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput()); + + testHarness.processWatermark1(new Watermark(18)); + testHarness.processWatermark2(new Watermark(18)); + expectedOutput.add(insertRecord(toUtcTimestampMills(15L, shiftTimeZone), "k1")); + expectedOutput.add(new Watermark(18)); + ASSERTER.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput()); + testHarness.close(); + } + + @Test + public void testAntiJoin() throws Exception { + KeyedTwoInputStreamOperatorTestHarness testHarness = + createTestHarness(FlinkJoinType.ANTI); + testHarness.open(); + testHarness.processWatermark1(new Watermark(1)); + testHarness.processWatermark2(new Watermark(1)); + + // Test late data would be dropped + testHarness.processElement1(insertRecord(toUtcTimestampMills(1L, shiftTimeZone), "k1")); + assertEquals(0, testHarness.numEventTimeTimers()); + + testHarness.processElement1(insertRecord(toUtcTimestampMills(3L, shiftTimeZone), "k1")); + testHarness.processElement1(insertRecord(toUtcTimestampMills(3L, shiftTimeZone), "k1")); + testHarness.processElement2(insertRecord(toUtcTimestampMills(3L, shiftTimeZone), "k1")); + testHarness.processElement1(insertRecord(toUtcTimestampMills(6L, shiftTimeZone), "k1")); + testHarness.processElement2(insertRecord(toUtcTimestampMills(9L, shiftTimeZone), "k1")); + assertEquals(3, testHarness.numEventTimeTimers()); + assertEquals(4, testHarness.numKeyedStateEntries()); + + testHarness.processWatermark1(new Watermark(10)); + testHarness.processWatermark2(new Watermark(10)); + + List expectedOutput = new ArrayList<>(); + expectedOutput.add(new Watermark(1)); + expectedOutput.add(insertRecord(toUtcTimestampMills(6L, shiftTimeZone), "k1")); + expectedOutput.add(new Watermark(10)); + SEMI_ANTI_JOIN_ASSERTER.assertOutputEqualsSorted( + "output wrong.", expectedOutput, testHarness.getOutput()); + assertEquals(0, testHarness.numEventTimeTimers()); + assertEquals(0, testHarness.numKeyedStateEntries()); + + testHarness.processElement1(insertRecord(toUtcTimestampMills(12L, shiftTimeZone), "k1")); + testHarness.processElement1(insertRecord(toUtcTimestampMills(15L, shiftTimeZone), "k1")); + testHarness.processElement2(insertRecord(toUtcTimestampMills(15L, shiftTimeZone), "k1")); + testHarness.processElement2(insertRecord(toUtcTimestampMills(15L, shiftTimeZone), "k1")); + assertEquals(3, testHarness.numKeyedStateEntries()); + + testHarness.processWatermark1(new Watermark(13)); + testHarness.processWatermark2(new Watermark(13)); + + expectedOutput.add(insertRecord(toUtcTimestampMills(12L, shiftTimeZone), "k1")); + expectedOutput.add(new Watermark(13)); + assertEquals(2, testHarness.numKeyedStateEntries()); + ASSERTER.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput()); + + testHarness.processWatermark1(new Watermark(18)); + testHarness.processWatermark2(new Watermark(18)); + expectedOutput.add(new Watermark(18)); + ASSERTER.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput()); + testHarness.close(); + } + + @Test + public void testInnerJoin() throws Exception { + KeyedTwoInputStreamOperatorTestHarness testHarness = + createTestHarness(FlinkJoinType.INNER); + + testHarness.open(); + testHarness.processWatermark1(new Watermark(1)); + testHarness.processWatermark2(new Watermark(1)); + + // Test late data would be dropped + testHarness.processElement1(insertRecord(toUtcTimestampMills(1L, shiftTimeZone), "k1")); + assertEquals(0, testHarness.numEventTimeTimers()); + + testHarness.processElement1(insertRecord(toUtcTimestampMills(3L, shiftTimeZone), "k1")); + testHarness.processElement1(insertRecord(toUtcTimestampMills(3L, shiftTimeZone), "k1")); + testHarness.processElement2(insertRecord(toUtcTimestampMills(3L, shiftTimeZone), "k1")); + testHarness.processElement1(insertRecord(toUtcTimestampMills(6L, shiftTimeZone), "k1")); + testHarness.processElement2(insertRecord(toUtcTimestampMills(9L, shiftTimeZone), "k1")); + assertEquals(3, testHarness.numEventTimeTimers()); + assertEquals(4, testHarness.numKeyedStateEntries()); + + testHarness.processWatermark1(new Watermark(10)); + testHarness.processWatermark2(new Watermark(10)); + + List expectedOutput = new ArrayList<>(); + expectedOutput.add(new Watermark(1)); + expectedOutput.add( + insertRecord( + toUtcTimestampMills(3L, shiftTimeZone), + "k1", + toUtcTimestampMills(3L, shiftTimeZone), + "k1")); + expectedOutput.add( + insertRecord( + toUtcTimestampMills(3L, shiftTimeZone), + "k1", + toUtcTimestampMills(3L, shiftTimeZone), + "k1")); + expectedOutput.add(new Watermark(10)); + ASSERTER.assertOutputEqualsSorted("output wrong.", expectedOutput, testHarness.getOutput()); + assertEquals(0, testHarness.numEventTimeTimers()); + assertEquals(0, testHarness.numKeyedStateEntries()); + + testHarness.processElement1(insertRecord(toUtcTimestampMills(12L, shiftTimeZone), "k1")); + testHarness.processElement1(insertRecord(toUtcTimestampMills(15L, shiftTimeZone), "k1")); + testHarness.processElement2(insertRecord(toUtcTimestampMills(15L, shiftTimeZone), "k1")); + testHarness.processElement2(insertRecord(toUtcTimestampMills(15L, shiftTimeZone), "k1")); + assertEquals(3, testHarness.numKeyedStateEntries()); + + testHarness.processWatermark1(new Watermark(13)); + testHarness.processWatermark2(new Watermark(13)); + + expectedOutput.add(new Watermark(13)); + assertEquals(2, testHarness.numKeyedStateEntries()); + ASSERTER.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput()); + + testHarness.processWatermark1(new Watermark(18)); + testHarness.processWatermark2(new Watermark(18)); + expectedOutput.add( + insertRecord( + toUtcTimestampMills(15L, shiftTimeZone), + "k1", + toUtcTimestampMills(15L, shiftTimeZone), + "k1")); + expectedOutput.add( + insertRecord( + toUtcTimestampMills(15L, shiftTimeZone), + "k1", + toUtcTimestampMills(15L, shiftTimeZone), + "k1")); + expectedOutput.add(new Watermark(18)); + ASSERTER.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput()); + testHarness.close(); + } + + @Test + public void testLeftOuterJoin() throws Exception { + KeyedTwoInputStreamOperatorTestHarness testHarness = + createTestHarness(FlinkJoinType.LEFT); + + testHarness.open(); + testHarness.processWatermark1(new Watermark(1)); + testHarness.processWatermark2(new Watermark(1)); + + // Test late data would be dropped + testHarness.processElement1(insertRecord(toUtcTimestampMills(1L, shiftTimeZone), "k1")); + assertEquals(0, testHarness.numEventTimeTimers()); + + testHarness.processElement1(insertRecord(toUtcTimestampMills(3L, shiftTimeZone), "k1")); + testHarness.processElement1(insertRecord(toUtcTimestampMills(3L, shiftTimeZone), "k1")); + testHarness.processElement2(insertRecord(toUtcTimestampMills(3L, shiftTimeZone), "k1")); + testHarness.processElement1(insertRecord(toUtcTimestampMills(6L, shiftTimeZone), "k1")); + testHarness.processElement2(insertRecord(toUtcTimestampMills(9L, shiftTimeZone), "k1")); + assertEquals(3, testHarness.numEventTimeTimers()); + assertEquals(4, testHarness.numKeyedStateEntries()); + + testHarness.processWatermark1(new Watermark(10)); + testHarness.processWatermark2(new Watermark(10)); + + List expectedOutput = new ArrayList<>(); + expectedOutput.add(new Watermark(1)); + expectedOutput.add( + insertRecord( + toUtcTimestampMills(3L, shiftTimeZone), + "k1", + toUtcTimestampMills(3L, shiftTimeZone), + "k1")); + expectedOutput.add( + insertRecord( + toUtcTimestampMills(3L, shiftTimeZone), + "k1", + toUtcTimestampMills(3L, shiftTimeZone), + "k1")); + expectedOutput.add(insertRecord(toUtcTimestampMills(6L, shiftTimeZone), "k1", null, null)); + expectedOutput.add(new Watermark(10)); + ASSERTER.assertOutputEqualsSorted("output wrong.", expectedOutput, testHarness.getOutput()); + assertEquals(0, testHarness.numEventTimeTimers()); + assertEquals(0, testHarness.numKeyedStateEntries()); + + testHarness.processElement1(insertRecord(toUtcTimestampMills(12L, shiftTimeZone), "k1")); + testHarness.processElement1(insertRecord(toUtcTimestampMills(15L, shiftTimeZone), "k1")); + testHarness.processElement2(insertRecord(toUtcTimestampMills(15L, shiftTimeZone), "k1")); + testHarness.processElement2(insertRecord(toUtcTimestampMills(15L, shiftTimeZone), "k1")); + assertEquals(3, testHarness.numKeyedStateEntries()); + + testHarness.processWatermark1(new Watermark(13)); + testHarness.processWatermark2(new Watermark(13)); + + expectedOutput.add(insertRecord(toUtcTimestampMills(12L, shiftTimeZone), "k1", null, null)); + expectedOutput.add(new Watermark(13)); + assertEquals(2, testHarness.numKeyedStateEntries()); + ASSERTER.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput()); + + testHarness.processWatermark1(new Watermark(18)); + testHarness.processWatermark2(new Watermark(18)); + expectedOutput.add( + insertRecord( + toUtcTimestampMills(15L, shiftTimeZone), + "k1", + toUtcTimestampMills(15L, shiftTimeZone), + "k1")); + expectedOutput.add( + insertRecord( + toUtcTimestampMills(15L, shiftTimeZone), + "k1", + toUtcTimestampMills(15L, shiftTimeZone), + "k1")); + expectedOutput.add(new Watermark(18)); + ASSERTER.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput()); + testHarness.close(); + } + + @Test + public void testRightOuterJoin() throws Exception { + KeyedTwoInputStreamOperatorTestHarness testHarness = + createTestHarness(FlinkJoinType.RIGHT); + + testHarness.open(); + testHarness.processWatermark1(new Watermark(1)); + testHarness.processWatermark2(new Watermark(1)); + + // Test late data would be dropped + testHarness.processElement1(insertRecord(toUtcTimestampMills(1L, shiftTimeZone), "k1")); + assertEquals(0, testHarness.numEventTimeTimers()); + + testHarness.processElement1(insertRecord(toUtcTimestampMills(3L, shiftTimeZone), "k1")); + testHarness.processElement1(insertRecord(toUtcTimestampMills(3L, shiftTimeZone), "k1")); + testHarness.processElement2(insertRecord(toUtcTimestampMills(3L, shiftTimeZone), "k1")); + testHarness.processElement1(insertRecord(toUtcTimestampMills(6L, shiftTimeZone), "k1")); + testHarness.processElement2(insertRecord(toUtcTimestampMills(9L, shiftTimeZone), "k1")); + assertEquals(3, testHarness.numEventTimeTimers()); + assertEquals(4, testHarness.numKeyedStateEntries()); + + testHarness.processWatermark1(new Watermark(10)); + testHarness.processWatermark2(new Watermark(10)); + + List expectedOutput = new ArrayList<>(); + expectedOutput.add(new Watermark(1)); + expectedOutput.add( + insertRecord( + toUtcTimestampMills(3L, shiftTimeZone), + "k1", + toUtcTimestampMills(3L, shiftTimeZone), + "k1")); + expectedOutput.add( + insertRecord( + toUtcTimestampMills(3L, shiftTimeZone), + "k1", + toUtcTimestampMills(3L, shiftTimeZone), + "k1")); + expectedOutput.add(insertRecord(null, null, toUtcTimestampMills(9L, shiftTimeZone), "k1")); + expectedOutput.add(new Watermark(10)); + ASSERTER.assertOutputEqualsSorted("output wrong.", expectedOutput, testHarness.getOutput()); + assertEquals(0, testHarness.numEventTimeTimers()); + assertEquals(0, testHarness.numKeyedStateEntries()); + + testHarness.processElement1(insertRecord(toUtcTimestampMills(12L, shiftTimeZone), "k1")); + testHarness.processElement1(insertRecord(toUtcTimestampMills(15L, shiftTimeZone), "k1")); + testHarness.processElement2(insertRecord(toUtcTimestampMills(15L, shiftTimeZone), "k1")); + testHarness.processElement2(insertRecord(toUtcTimestampMills(15L, shiftTimeZone), "k1")); + assertEquals(3, testHarness.numKeyedStateEntries()); + + testHarness.processWatermark1(new Watermark(13)); + testHarness.processWatermark2(new Watermark(13)); + + expectedOutput.add(new Watermark(13)); + assertEquals(2, testHarness.numKeyedStateEntries()); + ASSERTER.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput()); + + testHarness.processWatermark1(new Watermark(18)); + testHarness.processWatermark2(new Watermark(18)); + expectedOutput.add( + insertRecord( + toUtcTimestampMills(15L, shiftTimeZone), + "k1", + toUtcTimestampMills(15L, shiftTimeZone), + "k1")); + expectedOutput.add( + insertRecord( + toUtcTimestampMills(15L, shiftTimeZone), + "k1", + toUtcTimestampMills(15L, shiftTimeZone), + "k1")); + expectedOutput.add(new Watermark(18)); + ASSERTER.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput()); + testHarness.close(); + } + + @Test + public void testOuterJoin() throws Exception { + KeyedTwoInputStreamOperatorTestHarness testHarness = + createTestHarness(FlinkJoinType.FULL); + + testHarness.open(); + testHarness.processWatermark1(new Watermark(1)); + testHarness.processWatermark2(new Watermark(1)); + + // Test late data would be dropped + testHarness.processElement1(insertRecord(toUtcTimestampMills(1L, shiftTimeZone), "k1")); + assertEquals(0, testHarness.numEventTimeTimers()); + + testHarness.processElement1(insertRecord(toUtcTimestampMills(3L, shiftTimeZone), "k1")); + testHarness.processElement1(insertRecord(toUtcTimestampMills(3L, shiftTimeZone), "k1")); + testHarness.processElement2(insertRecord(toUtcTimestampMills(3L, shiftTimeZone), "k1")); + testHarness.processElement1(insertRecord(toUtcTimestampMills(6L, shiftTimeZone), "k1")); + testHarness.processElement2(insertRecord(toUtcTimestampMills(9L, shiftTimeZone), "k1")); + assertEquals(3, testHarness.numEventTimeTimers()); + assertEquals(4, testHarness.numKeyedStateEntries()); + + testHarness.processWatermark1(new Watermark(10)); + testHarness.processWatermark2(new Watermark(10)); + + List expectedOutput = new ArrayList<>(); + expectedOutput.add(new Watermark(1)); + expectedOutput.add( + insertRecord( + toUtcTimestampMills(3L, shiftTimeZone), + "k1", + toUtcTimestampMills(3L, shiftTimeZone), + "k1")); + expectedOutput.add( + insertRecord( + toUtcTimestampMills(3L, shiftTimeZone), + "k1", + toUtcTimestampMills(3L, shiftTimeZone), + "k1")); + expectedOutput.add(insertRecord(toUtcTimestampMills(6L, shiftTimeZone), "k1", null, null)); + expectedOutput.add(insertRecord(null, null, toUtcTimestampMills(9L, shiftTimeZone), "k1")); + expectedOutput.add(new Watermark(10)); + ASSERTER.assertOutputEqualsSorted("output wrong.", expectedOutput, testHarness.getOutput()); + assertEquals(0, testHarness.numEventTimeTimers()); + assertEquals(0, testHarness.numKeyedStateEntries()); + + testHarness.processElement1(insertRecord(toUtcTimestampMills(12L, shiftTimeZone), "k1")); + testHarness.processElement1(insertRecord(toUtcTimestampMills(15L, shiftTimeZone), "k1")); + testHarness.processElement2(insertRecord(toUtcTimestampMills(15L, shiftTimeZone), "k1")); + testHarness.processElement2(insertRecord(toUtcTimestampMills(15L, shiftTimeZone), "k1")); + assertEquals(3, testHarness.numKeyedStateEntries()); + + testHarness.processWatermark1(new Watermark(13)); + testHarness.processWatermark2(new Watermark(13)); + + expectedOutput.add(insertRecord(toUtcTimestampMills(12L, shiftTimeZone), "k1", null, null)); + expectedOutput.add(new Watermark(13)); + assertEquals(2, testHarness.numKeyedStateEntries()); + ASSERTER.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput()); + + testHarness.processWatermark1(new Watermark(18)); + testHarness.processWatermark2(new Watermark(18)); + expectedOutput.add( + insertRecord( + toUtcTimestampMills(15L, shiftTimeZone), + "k1", + toUtcTimestampMills(15L, shiftTimeZone), + "k1")); + expectedOutput.add( + insertRecord( + toUtcTimestampMills(15L, shiftTimeZone), + "k1", + toUtcTimestampMills(15L, shiftTimeZone), + "k1")); + expectedOutput.add(new Watermark(18)); + ASSERTER.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput()); + testHarness.close(); + } + + private KeyedTwoInputStreamOperatorTestHarness + createTestHarness(FlinkJoinType joinType) throws Exception { + String funcCode = + "public class TestWindowJoinCondition extends org.apache.flink.api.common.functions.AbstractRichFunction " + + "implements org.apache.flink.table.runtime.generated.JoinCondition {\n" + + "\n" + + " public TestWindowJoinCondition(Object[] reference) {\n" + + " }\n" + + "\n" + + " @Override\n" + + " public boolean apply(org.apache.flink.table.data.RowData in1, org.apache.flink.table.data.RowData in2) {\n" + + " return true;\n" + + " }\n" + + "}\n"; + GeneratedJoinCondition joinFunction = + new GeneratedJoinCondition("TestWindowJoinCondition", funcCode, new Object[0]); + int keyIdx = 1; + RowDataKeySelector keySelector = + HandwrittenSelectorUtil.getRowDataSelector( + new int[] {keyIdx}, INPUT_ROW_TYPE.toRowFieldTypes()); + TypeInformation keyType = InternalTypeInfo.ofFields(); + WindowJoinOperator operator = + WindowJoinOperatorBuilder.builder() + .leftSerializer(INPUT_ROW_TYPE.toRowSerializer()) + .rightSerializer(INPUT_ROW_TYPE.toRowSerializer()) + .generatedJoinCondition(joinFunction) + .leftWindowEndIndex(0) + .rightWindowEndIndex(0) + .filterNullKeys(new boolean[] {true}) + .joinType(joinType) + .withShiftTimezone(shiftTimeZone) + .build(); + KeyedTwoInputStreamOperatorTestHarness testHarness = + new KeyedTwoInputStreamOperatorTestHarness<>( + operator, keySelector, keySelector, keyType); + return testHarness; + } +} diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorTest.java index c4d9df05410fa..087cc4c6710c6 100644 --- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorTest.java +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorTest.java @@ -41,15 +41,20 @@ import org.apache.flink.table.utils.HandwrittenSelectorUtil; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.time.ZoneId; import java.util.Arrays; +import java.util.Collection; import java.util.concurrent.ConcurrentLinkedQueue; import static org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord; +import static org.apache.flink.table.runtime.util.TimeWindowUtil.toUtcTimestampMills; import static org.junit.Assert.assertEquals; /** Tests for window rank operators created by {@link WindowRankOperatorBuilder}. */ +@RunWith(Parameterized.class) public class WindowRankOperatorTest { private static final RowType INPUT_ROW_TYPE = @@ -60,7 +65,6 @@ public class WindowRankOperatorTest { new RowType.RowField("f2", new BigIntType()))); private static final RowDataSerializer INPUT_ROW_SER = new RowDataSerializer(INPUT_ROW_TYPE); - private static final ZoneId UTC_ZONE_ID = ZoneId.of("UTC"); private static final RowDataKeySelector KEY_SELECTOR = HandwrittenSelectorUtil.getRowDataSelector( @@ -116,12 +120,31 @@ public RecordComparator newInstance(ClassLoader classLoader) { OUTPUT_TYPES_WITHOUT_RANK_NUMBER, new GenericRowRecordSortComparator(0, new VarCharType(VarCharType.MAX_LENGTH))); + private static final ZoneId UTC_ZONE_ID = ZoneId.of("UTC"); + private static final ZoneId SHANGHAI_ZONE_ID = ZoneId.of("Asia/Shanghai"); + private final ZoneId shiftTimeZone; + + public WindowRankOperatorTest(ZoneId shiftTimeZone) { + this.shiftTimeZone = shiftTimeZone; + } + + @Parameterized.Parameters(name = "TimeZone = {0}") + public static Collection runMode() { + return Arrays.asList(new Object[] {UTC_ZONE_ID}, new Object[] {SHANGHAI_ZONE_ID}); + } + + private static OneInputStreamOperatorTestHarness createTestHarness( + SlicingWindowOperator operator) throws Exception { + return new KeyedOneInputStreamOperatorTestHarness<>( + operator, KEY_SELECTOR, KEY_SELECTOR.getProducedType()); + } + @Test public void testTop2Windows() throws Exception { SlicingWindowOperator operator = WindowRankOperatorBuilder.builder() .inputSerializer(INPUT_ROW_SER) - .shiftTimeZone(UTC_ZONE_ID) + .shiftTimeZone(shiftTimeZone) .keySerializer(KEY_SER) .sortKeyComparator(GENERATED_SORT_KEY_COMPARATOR) .sortKeySelector(SORT_KEY_SELECTOR) @@ -141,36 +164,51 @@ public void testTop2Windows() throws Exception { ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); // add elements out-of-order - testHarness.processElement(insertRecord("key2", 1, 999L)); - testHarness.processElement(insertRecord("key2", 4, 999L)); - testHarness.processElement(insertRecord("key2", 5, 999L)); - testHarness.processElement(insertRecord("key2", 3, 999L)); - testHarness.processElement(insertRecord("key2", 2, 1999L)); - testHarness.processElement(insertRecord("key2", 7, 3999L)); - testHarness.processElement(insertRecord("key2", 8, 3999L)); - testHarness.processElement(insertRecord("key2", 1, 3999L)); - - testHarness.processElement(insertRecord("key1", 2, 999L)); - testHarness.processElement(insertRecord("key1", 1, 999L)); - testHarness.processElement(insertRecord("key1", 3, 999L)); - testHarness.processElement(insertRecord("key1", 3, 999L)); - testHarness.processElement(insertRecord("key1", 4, 1999L)); - testHarness.processElement(insertRecord("key1", 6, 1999L)); - testHarness.processElement(insertRecord("key1", 7, 1999L)); + testHarness.processElement( + insertRecord("key2", 1, toUtcTimestampMills(999L, shiftTimeZone))); + testHarness.processElement( + insertRecord("key2", 4, toUtcTimestampMills(999L, shiftTimeZone))); + testHarness.processElement( + insertRecord("key2", 5, toUtcTimestampMills(999L, shiftTimeZone))); + testHarness.processElement( + insertRecord("key2", 3, toUtcTimestampMills(999L, shiftTimeZone))); + testHarness.processElement( + insertRecord("key2", 2, toUtcTimestampMills(1999L, shiftTimeZone))); + testHarness.processElement( + insertRecord("key2", 7, toUtcTimestampMills(3999L, shiftTimeZone))); + testHarness.processElement( + insertRecord("key2", 8, toUtcTimestampMills(3999L, shiftTimeZone))); + testHarness.processElement( + insertRecord("key2", 1, toUtcTimestampMills(3999L, shiftTimeZone))); + + testHarness.processElement( + insertRecord("key1", 2, toUtcTimestampMills(999L, shiftTimeZone))); + testHarness.processElement( + insertRecord("key1", 1, toUtcTimestampMills(999L, shiftTimeZone))); + testHarness.processElement( + insertRecord("key1", 3, toUtcTimestampMills(999L, shiftTimeZone))); + testHarness.processElement( + insertRecord("key1", 3, toUtcTimestampMills(999L, shiftTimeZone))); + testHarness.processElement( + insertRecord("key1", 4, toUtcTimestampMills(1999L, shiftTimeZone))); + testHarness.processElement( + insertRecord("key1", 6, toUtcTimestampMills(1999L, shiftTimeZone))); + testHarness.processElement( + insertRecord("key1", 7, toUtcTimestampMills(1999L, shiftTimeZone))); testHarness.processWatermark(new Watermark(999)); - expectedOutput.add(insertRecord("key1", 1, 999L, 1L)); - expectedOutput.add(insertRecord("key1", 2, 999L, 2L)); - expectedOutput.add(insertRecord("key2", 1, 999L, 1L)); - expectedOutput.add(insertRecord("key2", 3, 999L, 2L)); + expectedOutput.add(insertRecord("key1", 1, toUtcTimestampMills(999L, shiftTimeZone), 1L)); + expectedOutput.add(insertRecord("key1", 2, toUtcTimestampMills(999L, shiftTimeZone), 2L)); + expectedOutput.add(insertRecord("key2", 1, toUtcTimestampMills(999L, shiftTimeZone), 1L)); + expectedOutput.add(insertRecord("key2", 3, toUtcTimestampMills(999L, shiftTimeZone), 2L)); expectedOutput.add(new Watermark(999)); ASSERTER.assertOutputEqualsSorted( "Output was not correct.", expectedOutput, testHarness.getOutput()); testHarness.processWatermark(new Watermark(1999)); - expectedOutput.add(insertRecord("key1", 4, 1999L, 1L)); - expectedOutput.add(insertRecord("key1", 6, 1999L, 2L)); - expectedOutput.add(insertRecord("key2", 2, 1999L, 1L)); + expectedOutput.add(insertRecord("key1", 4, toUtcTimestampMills(1999L, shiftTimeZone), 1L)); + expectedOutput.add(insertRecord("key1", 6, toUtcTimestampMills(1999L, shiftTimeZone), 2L)); + expectedOutput.add(insertRecord("key2", 2, toUtcTimestampMills(1999L, shiftTimeZone), 1L)); expectedOutput.add(new Watermark(1999)); ASSERTER.assertOutputEqualsSorted( "Output was not correct.", expectedOutput, testHarness.getOutput()); @@ -187,14 +225,15 @@ public void testTop2Windows() throws Exception { testHarness.open(); testHarness.processWatermark(new Watermark(3999)); - expectedOutput.add(insertRecord("key2", 1, 3999L, 1L)); - expectedOutput.add(insertRecord("key2", 7, 3999L, 2L)); + expectedOutput.add(insertRecord("key2", 1, toUtcTimestampMills(3999L, shiftTimeZone), 1L)); + expectedOutput.add(insertRecord("key2", 7, toUtcTimestampMills(3999L, shiftTimeZone), 2L)); expectedOutput.add(new Watermark(3999)); ASSERTER.assertOutputEqualsSorted( "Output was not correct.", expectedOutput, testHarness.getOutput()); // late element, should be dropped - testHarness.processElement(insertRecord("key2", 1, 3500L)); + testHarness.processElement( + insertRecord("key2", 1, toUtcTimestampMills(3500L, shiftTimeZone))); testHarness.processWatermark(new Watermark(4999)); expectedOutput.add(new Watermark(4999)); @@ -211,7 +250,7 @@ public void testTop2WindowsWithOffset() throws Exception { SlicingWindowOperator operator = WindowRankOperatorBuilder.builder() .inputSerializer(INPUT_ROW_SER) - .shiftTimeZone(UTC_ZONE_ID) + .shiftTimeZone(shiftTimeZone) .keySerializer(KEY_SER) .sortKeyComparator(GENERATED_SORT_KEY_COMPARATOR) .sortKeySelector(SORT_KEY_SELECTOR) @@ -231,32 +270,47 @@ public void testTop2WindowsWithOffset() throws Exception { ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); // add elements out-of-order - testHarness.processElement(insertRecord("key2", 1, 999L)); - testHarness.processElement(insertRecord("key2", 4, 999L)); - testHarness.processElement(insertRecord("key2", 5, 999L)); - testHarness.processElement(insertRecord("key2", 3, 999L)); - testHarness.processElement(insertRecord("key2", 2, 1999L)); - testHarness.processElement(insertRecord("key2", 7, 3999L)); - testHarness.processElement(insertRecord("key2", 8, 3999L)); - testHarness.processElement(insertRecord("key2", 1, 3999L)); - - testHarness.processElement(insertRecord("key1", 2, 999L)); - testHarness.processElement(insertRecord("key1", 1, 999L)); - testHarness.processElement(insertRecord("key1", 3, 999L)); - testHarness.processElement(insertRecord("key1", 3, 999L)); - testHarness.processElement(insertRecord("key1", 4, 1999L)); - testHarness.processElement(insertRecord("key1", 6, 1999L)); - testHarness.processElement(insertRecord("key1", 7, 1999L)); + testHarness.processElement( + insertRecord("key2", 1, toUtcTimestampMills(999L, shiftTimeZone))); + testHarness.processElement( + insertRecord("key2", 4, toUtcTimestampMills(999L, shiftTimeZone))); + testHarness.processElement( + insertRecord("key2", 5, toUtcTimestampMills(999L, shiftTimeZone))); + testHarness.processElement( + insertRecord("key2", 3, toUtcTimestampMills(999L, shiftTimeZone))); + testHarness.processElement( + insertRecord("key2", 2, toUtcTimestampMills(1999L, shiftTimeZone))); + testHarness.processElement( + insertRecord("key2", 7, toUtcTimestampMills(3999L, shiftTimeZone))); + testHarness.processElement( + insertRecord("key2", 8, toUtcTimestampMills(3999L, shiftTimeZone))); + testHarness.processElement( + insertRecord("key2", 1, toUtcTimestampMills(3999L, shiftTimeZone))); + + testHarness.processElement( + insertRecord("key1", 2, toUtcTimestampMills(999L, shiftTimeZone))); + testHarness.processElement( + insertRecord("key1", 1, toUtcTimestampMills(999L, shiftTimeZone))); + testHarness.processElement( + insertRecord("key1", 3, toUtcTimestampMills(999L, shiftTimeZone))); + testHarness.processElement( + insertRecord("key1", 3, toUtcTimestampMills(999L, shiftTimeZone))); + testHarness.processElement( + insertRecord("key1", 4, toUtcTimestampMills(1999L, shiftTimeZone))); + testHarness.processElement( + insertRecord("key1", 6, toUtcTimestampMills(1999L, shiftTimeZone))); + testHarness.processElement( + insertRecord("key1", 7, toUtcTimestampMills(1999L, shiftTimeZone))); testHarness.processWatermark(new Watermark(999)); - expectedOutput.add(insertRecord("key1", 2, 999L, 2L)); - expectedOutput.add(insertRecord("key2", 3, 999L, 2L)); + expectedOutput.add(insertRecord("key1", 2, toUtcTimestampMills(999L, shiftTimeZone), 2L)); + expectedOutput.add(insertRecord("key2", 3, toUtcTimestampMills(999L, shiftTimeZone), 2L)); expectedOutput.add(new Watermark(999)); ASSERTER.assertOutputEqualsSorted( "Output was not correct.", expectedOutput, testHarness.getOutput()); testHarness.processWatermark(new Watermark(1999)); - expectedOutput.add(insertRecord("key1", 6, 1999L, 2L)); + expectedOutput.add(insertRecord("key1", 6, toUtcTimestampMills(1999L, shiftTimeZone), 2L)); expectedOutput.add(new Watermark(1999)); ASSERTER.assertOutputEqualsSorted( "Output was not correct.", expectedOutput, testHarness.getOutput()); @@ -273,7 +327,7 @@ public void testTop2WindowsWithOffset() throws Exception { testHarness.open(); testHarness.processWatermark(new Watermark(3999)); - expectedOutput.add(insertRecord("key2", 7, 3999L, 2L)); + expectedOutput.add(insertRecord("key2", 7, toUtcTimestampMills(3999L, shiftTimeZone), 2L)); expectedOutput.add(new Watermark(3999)); ASSERTER.assertOutputEqualsSorted( "Output was not correct.", expectedOutput, testHarness.getOutput()); @@ -285,7 +339,7 @@ public void testTop2WindowsWithoutRankNumber() throws Exception { SlicingWindowOperator operator = WindowRankOperatorBuilder.builder() .inputSerializer(INPUT_ROW_SER) - .shiftTimeZone(UTC_ZONE_ID) + .shiftTimeZone(shiftTimeZone) .keySerializer(KEY_SER) .sortKeyComparator(GENERATED_SORT_KEY_COMPARATOR) .sortKeySelector(SORT_KEY_SELECTOR) @@ -305,36 +359,51 @@ public void testTop2WindowsWithoutRankNumber() throws Exception { ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); // add elements out-of-order - testHarness.processElement(insertRecord("key2", 1, 999L)); - testHarness.processElement(insertRecord("key2", 4, 999L)); - testHarness.processElement(insertRecord("key2", 5, 999L)); - testHarness.processElement(insertRecord("key2", 3, 999L)); - testHarness.processElement(insertRecord("key2", 2, 1999L)); - testHarness.processElement(insertRecord("key2", 7, 3999L)); - testHarness.processElement(insertRecord("key2", 8, 3999L)); - testHarness.processElement(insertRecord("key2", 1, 3999L)); - - testHarness.processElement(insertRecord("key1", 2, 999L)); - testHarness.processElement(insertRecord("key1", 1, 999L)); - testHarness.processElement(insertRecord("key1", 3, 999L)); - testHarness.processElement(insertRecord("key1", 3, 999L)); - testHarness.processElement(insertRecord("key1", 4, 1999L)); - testHarness.processElement(insertRecord("key1", 6, 1999L)); - testHarness.processElement(insertRecord("key1", 7, 1999L)); + testHarness.processElement( + insertRecord("key2", 1, toUtcTimestampMills(999L, shiftTimeZone))); + testHarness.processElement( + insertRecord("key2", 4, toUtcTimestampMills(999L, shiftTimeZone))); + testHarness.processElement( + insertRecord("key2", 5, toUtcTimestampMills(999L, shiftTimeZone))); + testHarness.processElement( + insertRecord("key2", 3, toUtcTimestampMills(999L, shiftTimeZone))); + testHarness.processElement( + insertRecord("key2", 2, toUtcTimestampMills(1999L, shiftTimeZone))); + testHarness.processElement( + insertRecord("key2", 7, toUtcTimestampMills(3999L, shiftTimeZone))); + testHarness.processElement( + insertRecord("key2", 8, toUtcTimestampMills(3999L, shiftTimeZone))); + testHarness.processElement( + insertRecord("key2", 1, toUtcTimestampMills(3999L, shiftTimeZone))); + + testHarness.processElement( + insertRecord("key1", 2, toUtcTimestampMills(999L, shiftTimeZone))); + testHarness.processElement( + insertRecord("key1", 1, toUtcTimestampMills(999L, shiftTimeZone))); + testHarness.processElement( + insertRecord("key1", 3, toUtcTimestampMills(999L, shiftTimeZone))); + testHarness.processElement( + insertRecord("key1", 3, toUtcTimestampMills(999L, shiftTimeZone))); + testHarness.processElement( + insertRecord("key1", 4, toUtcTimestampMills(1999L, shiftTimeZone))); + testHarness.processElement( + insertRecord("key1", 6, toUtcTimestampMills(1999L, shiftTimeZone))); + testHarness.processElement( + insertRecord("key1", 7, toUtcTimestampMills(1999L, shiftTimeZone))); testHarness.processWatermark(new Watermark(999)); - expectedOutput.add(insertRecord("key1", 1, 999L)); - expectedOutput.add(insertRecord("key1", 2, 999L)); - expectedOutput.add(insertRecord("key2", 1, 999L)); - expectedOutput.add(insertRecord("key2", 3, 999L)); + expectedOutput.add(insertRecord("key1", 1, toUtcTimestampMills(999L, shiftTimeZone))); + expectedOutput.add(insertRecord("key1", 2, toUtcTimestampMills(999L, shiftTimeZone))); + expectedOutput.add(insertRecord("key2", 1, toUtcTimestampMills(999L, shiftTimeZone))); + expectedOutput.add(insertRecord("key2", 3, toUtcTimestampMills(999L, shiftTimeZone))); expectedOutput.add(new Watermark(999)); ASSERTER_WITHOUT_RANK_NUMBER.assertOutputEqualsSorted( "Output was not correct.", expectedOutput, testHarness.getOutput()); testHarness.processWatermark(new Watermark(1999)); - expectedOutput.add(insertRecord("key1", 4, 1999L)); - expectedOutput.add(insertRecord("key1", 6, 1999L)); - expectedOutput.add(insertRecord("key2", 2, 1999L)); + expectedOutput.add(insertRecord("key1", 4, toUtcTimestampMills(1999L, shiftTimeZone))); + expectedOutput.add(insertRecord("key1", 6, toUtcTimestampMills(1999L, shiftTimeZone))); + expectedOutput.add(insertRecord("key2", 2, toUtcTimestampMills(1999L, shiftTimeZone))); expectedOutput.add(new Watermark(1999)); ASSERTER_WITHOUT_RANK_NUMBER.assertOutputEqualsSorted( "Output was not correct.", expectedOutput, testHarness.getOutput()); @@ -351,18 +420,12 @@ public void testTop2WindowsWithoutRankNumber() throws Exception { testHarness.open(); testHarness.processWatermark(new Watermark(3999)); - expectedOutput.add(insertRecord("key2", 1, 3999L)); - expectedOutput.add(insertRecord("key2", 7, 3999L)); + expectedOutput.add(insertRecord("key2", 1, toUtcTimestampMills(3999L, shiftTimeZone))); + expectedOutput.add(insertRecord("key2", 7, toUtcTimestampMills(3999L, shiftTimeZone))); expectedOutput.add(new Watermark(3999)); ASSERTER_WITHOUT_RANK_NUMBER.assertOutputEqualsSorted( "Output was not correct.", expectedOutput, testHarness.getOutput()); testHarness.close(); } - - private static OneInputStreamOperatorTestHarness createTestHarness( - SlicingWindowOperator operator) throws Exception { - return new KeyedOneInputStreamOperatorTestHarness<>( - operator, KEY_SELECTOR, KEY_SELECTOR.getProducedType()); - } }