From 5439699f843a151c223eaa0069873c958321fca4 Mon Sep 17 00:00:00 2001 From: "shuo.cs" Date: Thu, 15 Apr 2021 14:31:43 +0800 Subject: [PATCH 1/3] [FLINK-22304][table] Refactor some interfaces for TVF based window to improve the extendability --- .../StreamExecLocalWindowAggregate.java | 20 +++-- .../window/LocalSlicingWindowAggOperator.java | 33 +-------- .../SlicingWindowAggOperatorBuilder.java | 31 ++++---- .../window/buffers/RecordsWindowBuffer.java | 73 +++++++++++++++++-- .../window/buffers/WindowBuffer.java | 46 +++++++++++- ...gRecordsCombiner.java => AggCombiner.java} | 29 ++++---- ...ccCombiner.java => GlobalAggCombiner.java} | 42 ++++++----- ...rdsCombiner.java => LocalAggCombiner.java} | 35 +++------ .../AbstractWindowAggProcessor.java | 17 ++--- .../SliceSharedWindowAggProcessor.java | 10 +-- .../SliceUnsharedWindowAggProcessor.java | 10 +-- .../window/WindowRankOperatorBuilder.java | 9 +-- .../window/combines/TopNRecordsCombiner.java | 12 +-- .../processors/WindowRankProcessor.java | 17 ++--- ...bineFunction.java => RecordsCombiner.java} | 25 +++---- 15 files changed, 221 insertions(+), 188 deletions(-) rename flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/{AggRecordsCombiner.java => AggCombiner.java} (90%) rename flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/{GlobalAggAccCombiner.java => GlobalAggCombiner.java} (90%) rename flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/{LocalAggRecordsCombiner.java => LocalAggCombiner.java} (77%) rename flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/combines/{WindowCombineFunction.java => RecordsCombiner.java} (76%) diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLocalWindowAggregate.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLocalWindowAggregate.java index 18f8a8dd21672..e58e7f13bbc84 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLocalWindowAggregate.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLocalWindowAggregate.java @@ -38,7 +38,11 @@ import org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction; import org.apache.flink.table.runtime.keyselector.RowDataKeySelector; import org.apache.flink.table.runtime.operators.aggregate.window.LocalSlicingWindowAggOperator; +import org.apache.flink.table.runtime.operators.aggregate.window.buffers.RecordsWindowBuffer; +import org.apache.flink.table.runtime.operators.aggregate.window.buffers.WindowBuffer; +import org.apache.flink.table.runtime.operators.aggregate.window.combines.LocalAggCombiner; import org.apache.flink.table.runtime.operators.window.slicing.SliceAssigner; +import org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import org.apache.flink.table.runtime.typeutils.PagedTypeSerializer; import org.apache.flink.table.runtime.typeutils.RowDataSerializer; @@ -65,7 +69,6 @@ public class StreamExecLocalWindowAggregate extends StreamExecWindowAggregateBas private static final long WINDOW_AGG_MEMORY_RATIO = 100; public static final String FIELD_NAME_WINDOWING = "windowing"; - public static final String FIELD_NAME_NAMED_WINDOW_PROPERTIES = "namedWindowProperties"; @JsonProperty(FIELD_NAME_GROUPING) private final int[] grouping; @@ -139,14 +142,17 @@ protected Transformation translateToPlanInternal(PlannerBase planner) { final RowDataKeySelector selector = KeySelectorUtil.getRowDataSelector(grouping, InternalTypeInfo.of(inputRowType)); + PagedTypeSerializer keySer = + (PagedTypeSerializer) selector.getProducedType().toSerializer(); + AbstractRowDataSerializer valueSer = new RowDataSerializer(inputRowType); + + WindowBuffer.LocalFactory bufferFactory = + new RecordsWindowBuffer.LocalFactory( + keySer, valueSer, new LocalAggCombiner.Factory(generatedAggsHandler)); + final OneInputStreamOperator localAggOperator = new LocalSlicingWindowAggOperator( - selector, - sliceAssigner, - (PagedTypeSerializer) selector.getProducedType().toSerializer(), - new RowDataSerializer(inputRowType), - generatedAggsHandler, - shiftTimeZone); + selector, sliceAssigner, bufferFactory, shiftTimeZone); return ExecNodeUtil.createOneInputTransformation( inputTransform, diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/LocalSlicingWindowAggOperator.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/LocalSlicingWindowAggOperator.java index 6634f7712b935..b044db7b430d6 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/LocalSlicingWindowAggOperator.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/LocalSlicingWindowAggOperator.java @@ -26,16 +26,10 @@ import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.table.data.RowData; -import org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction; import org.apache.flink.table.runtime.keyselector.RowDataKeySelector; -import org.apache.flink.table.runtime.operators.aggregate.window.buffers.RecordsWindowBuffer; import org.apache.flink.table.runtime.operators.aggregate.window.buffers.WindowBuffer; -import org.apache.flink.table.runtime.operators.aggregate.window.combines.LocalAggRecordsCombiner; -import org.apache.flink.table.runtime.operators.window.combines.WindowCombineFunction; import org.apache.flink.table.runtime.operators.window.slicing.ClockService; import org.apache.flink.table.runtime.operators.window.slicing.SliceAssigner; -import org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer; -import org.apache.flink.table.runtime.typeutils.PagedTypeSerializer; import java.time.ZoneId; import java.util.TimeZone; @@ -55,8 +49,7 @@ public class LocalSlicingWindowAggOperator extends AbstractStreamOperator keySer, - AbstractRowDataSerializer inputSer, - GeneratedNamespaceAggsHandleFunction genAggsHandler, - ZoneId shiftTimezone) { - this( - keySelector, - sliceAssigner, - new RecordsWindowBuffer.Factory(keySer, inputSer), - new LocalAggRecordsCombiner.Factory(genAggsHandler, keySer), - shiftTimezone); - } - - public LocalSlicingWindowAggOperator( - RowDataKeySelector keySelector, - SliceAssigner sliceAssigner, - WindowBuffer.Factory windowBufferFactory, - WindowCombineFunction.LocalFactory combinerFactory, + WindowBuffer.LocalFactory windowBufferFactory, ZoneId shiftTimezone) { this.keySelector = keySelector; this.sliceAssigner = sliceAssigner; this.windowInterval = sliceAssigner.getSliceEndInterval(); this.windowBufferFactory = windowBufferFactory; - this.combinerFactory = combinerFactory; this.shiftTimezone = shiftTimezone; this.useDayLightSaving = TimeZone.getTimeZone(shiftTimezone).useDaylightTime(); } @@ -123,14 +99,13 @@ public void open() throws Exception { collector = new TimestampedCollector<>(output); collector.eraseTimestamp(); - final WindowCombineFunction localCombiner = - combinerFactory.create(getRuntimeContext(), collector); this.windowBuffer = windowBufferFactory.create( getContainingTask(), getContainingTask().getEnvironment().getMemoryManager(), computeMemorySize(), - localCombiner, + getRuntimeContext(), + collector, shiftTimezone); } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorBuilder.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorBuilder.java index 41b0c0ce576a7..3aaae65f50687 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorBuilder.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorBuilder.java @@ -18,16 +18,15 @@ package org.apache.flink.table.runtime.operators.aggregate.window; -import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction; import org.apache.flink.table.runtime.operators.aggregate.window.buffers.RecordsWindowBuffer; import org.apache.flink.table.runtime.operators.aggregate.window.buffers.WindowBuffer; -import org.apache.flink.table.runtime.operators.aggregate.window.combines.AggRecordsCombiner; -import org.apache.flink.table.runtime.operators.aggregate.window.combines.GlobalAggAccCombiner; +import org.apache.flink.table.runtime.operators.aggregate.window.combines.AggCombiner; +import org.apache.flink.table.runtime.operators.aggregate.window.combines.GlobalAggCombiner; import org.apache.flink.table.runtime.operators.aggregate.window.processors.SliceSharedWindowAggProcessor; import org.apache.flink.table.runtime.operators.aggregate.window.processors.SliceUnsharedWindowAggProcessor; -import org.apache.flink.table.runtime.operators.window.combines.WindowCombineFunction; +import org.apache.flink.table.runtime.operators.window.combines.RecordsCombiner; import org.apache.flink.table.runtime.operators.window.slicing.SliceAssigner; import org.apache.flink.table.runtime.operators.window.slicing.SliceAssigners.HoppingSliceAssigner; import org.apache.flink.table.runtime.operators.window.slicing.SliceSharedAssigner; @@ -64,7 +63,7 @@ public static SlicingWindowAggOperatorBuilder builder() { private SliceAssigner assigner; private AbstractRowDataSerializer inputSerializer; private PagedTypeSerializer keySerializer; - private TypeSerializer accSerializer; + private AbstractRowDataSerializer accSerializer; private GeneratedNamespaceAggsHandleFunction generatedAggregateFunction; private GeneratedNamespaceAggsHandleFunction localGeneratedAggregateFunction; private GeneratedNamespaceAggsHandleFunction globalGeneratedAggregateFunction; @@ -95,7 +94,7 @@ public SlicingWindowAggOperatorBuilder assigner(SliceAssigner assigner) { public SlicingWindowAggOperatorBuilder aggregate( GeneratedNamespaceAggsHandleFunction generatedAggregateFunction, - TypeSerializer accSerializer) { + AbstractRowDataSerializer accSerializer) { this.generatedAggregateFunction = generatedAggregateFunction; this.accSerializer = accSerializer; return this; @@ -105,7 +104,7 @@ public SlicingWindowAggOperatorBuilder globalAggregate( GeneratedNamespaceAggsHandleFunction localGeneratedAggregateFunction, GeneratedNamespaceAggsHandleFunction globalGeneratedAggregateFunction, GeneratedNamespaceAggsHandleFunction stateGeneratedAggregateFunction, - TypeSerializer accSerializer) { + AbstractRowDataSerializer accSerializer) { this.localGeneratedAggregateFunction = localGeneratedAggregateFunction; this.globalGeneratedAggregateFunction = globalGeneratedAggregateFunction; this.generatedAggregateFunction = stateGeneratedAggregateFunction; @@ -131,20 +130,24 @@ public SlicingWindowAggOperatorBuilder countStarIndex(int indexOfCountStart) { checkNotNull(keySerializer); checkNotNull(accSerializer); checkNotNull(generatedAggregateFunction); - final WindowBuffer.Factory bufferFactory = - new RecordsWindowBuffer.Factory(keySerializer, inputSerializer); - final WindowCombineFunction.Factory combinerFactory; - if (localGeneratedAggregateFunction != null && globalGeneratedAggregateFunction != null) { + + boolean isGlobalAgg = + localGeneratedAggregateFunction != null && globalGeneratedAggregateFunction != null; + + RecordsCombiner.Factory combinerFactory; + if (isGlobalAgg) { combinerFactory = - new GlobalAggAccCombiner.Factory( + new GlobalAggCombiner.Factory( localGeneratedAggregateFunction, globalGeneratedAggregateFunction, keySerializer); } else { combinerFactory = - new AggRecordsCombiner.Factory( + new AggCombiner.Factory( generatedAggregateFunction, keySerializer, inputSerializer); } + final WindowBuffer.Factory bufferFactory = + new RecordsWindowBuffer.Factory(keySerializer, inputSerializer, combinerFactory); final SlicingWindowProcessor windowProcessor; if (assigner instanceof SliceSharedAssigner) { @@ -152,7 +155,6 @@ public SlicingWindowAggOperatorBuilder countStarIndex(int indexOfCountStart) { new SliceSharedWindowAggProcessor( generatedAggregateFunction, bufferFactory, - combinerFactory, (SliceSharedAssigner) assigner, accSerializer, indexOfCountStart, @@ -162,7 +164,6 @@ public SlicingWindowAggOperatorBuilder countStarIndex(int indexOfCountStart) { new SliceUnsharedWindowAggProcessor( generatedAggregateFunction, bufferFactory, - combinerFactory, (SliceUnsharedAssigner) assigner, accSerializer, shiftTimeZone); diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/buffers/RecordsWindowBuffer.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/buffers/RecordsWindowBuffer.java index 7f6e53d30b2a6..111935d9cad8e 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/buffers/RecordsWindowBuffer.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/buffers/RecordsWindowBuffer.java @@ -18,9 +18,13 @@ package org.apache.flink.table.runtime.operators.aggregate.window.buffers; +import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.state.KeyedStateBackend; import org.apache.flink.table.data.RowData; -import org.apache.flink.table.runtime.operators.window.combines.WindowCombineFunction; +import org.apache.flink.table.runtime.operators.window.combines.RecordsCombiner; +import org.apache.flink.table.runtime.operators.window.slicing.WindowTimerService; +import org.apache.flink.table.runtime.operators.window.state.WindowState; import org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer; import org.apache.flink.table.runtime.typeutils.PagedTypeSerializer; import org.apache.flink.table.runtime.typeutils.WindowKeySerializer; @@ -28,6 +32,7 @@ import org.apache.flink.table.runtime.util.WindowKey; import org.apache.flink.table.runtime.util.collections.binary.BytesMap.LookupInfo; import org.apache.flink.table.runtime.util.collections.binary.WindowBytesMultiMap; +import org.apache.flink.util.Collector; import java.io.EOFException; import java.time.ZoneId; @@ -41,7 +46,7 @@ */ public final class RecordsWindowBuffer implements WindowBuffer { - private final WindowCombineFunction combineFunction; + private final RecordsCombiner combineFunction; private final WindowBytesMultiMap recordsBuffer; private final WindowKey reuseWindowKey; private final AbstractRowDataSerializer recordSerializer; @@ -53,7 +58,7 @@ public RecordsWindowBuffer( Object operatorOwner, MemoryManager memoryManager, long memorySize, - WindowCombineFunction combineFunction, + RecordsCombiner combineFunction, PagedTypeSerializer keySer, AbstractRowDataSerializer inputSer, ZoneId shiftTimeZone) { @@ -117,18 +122,22 @@ public void close() throws Exception { // Factory // ------------------------------------------------------------------------------------------ - /** Factory to create {@link RecordsWindowBuffer}. */ + /** Factory to create {@link RecordsWindowBuffer} with {@link RecordsCombiner.Factory}. */ public static final class Factory implements WindowBuffer.Factory { private static final long serialVersionUID = 1L; private final PagedTypeSerializer keySer; private final AbstractRowDataSerializer inputSer; + private final RecordsCombiner.Factory factory; public Factory( - PagedTypeSerializer keySer, AbstractRowDataSerializer inputSer) { + PagedTypeSerializer keySer, + AbstractRowDataSerializer inputSer, + RecordsCombiner.Factory combinerFactory) { this.keySer = keySer; this.inputSer = inputSer; + this.factory = combinerFactory; } @Override @@ -136,13 +145,61 @@ public WindowBuffer create( Object operatorOwner, MemoryManager memoryManager, long memorySize, - WindowCombineFunction combineFunction, - ZoneId shiftTimeZone) { + RuntimeContext runtimeContext, + WindowTimerService timerService, + KeyedStateBackend stateBackend, + WindowState windowState, + boolean isEventTime, + ZoneId shiftTimeZone) + throws Exception { + RecordsCombiner combiner = + factory.createRecordsCombiner( + runtimeContext, timerService, stateBackend, windowState, isEventTime); return new RecordsWindowBuffer( operatorOwner, memoryManager, memorySize, - combineFunction, + combiner, + keySer, + inputSer, + shiftTimeZone); + } + } + + /** Factory to create {@link RecordsWindowBuffer} with {@link RecordsCombiner.LocalFactory}. */ + public static final class LocalFactory implements WindowBuffer.LocalFactory { + + private static final long serialVersionUID = 1L; + + private final PagedTypeSerializer keySer; + private final AbstractRowDataSerializer inputSer; + private final RecordsCombiner.LocalFactory localFactory; + + public LocalFactory( + PagedTypeSerializer keySer, + AbstractRowDataSerializer inputSer, + RecordsCombiner.LocalFactory localFactory) { + this.keySer = keySer; + this.inputSer = inputSer; + this.localFactory = localFactory; + } + + @Override + public WindowBuffer create( + Object operatorOwner, + MemoryManager memoryManager, + long memorySize, + RuntimeContext runtimeContext, + Collector collector, + ZoneId shiftTimeZone) + throws Exception { + RecordsCombiner combiner = + localFactory.createRecordsCombiner(runtimeContext, collector); + return new RecordsWindowBuffer( + operatorOwner, + memoryManager, + memorySize, + combiner, keySer, inputSer, shiftTimeZone); diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/buffers/WindowBuffer.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/buffers/WindowBuffer.java index 7b954f83c20de..4f86659d7121d 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/buffers/WindowBuffer.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/buffers/WindowBuffer.java @@ -19,9 +19,13 @@ package org.apache.flink.table.runtime.operators.aggregate.window.buffers; import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.state.KeyedStateBackend; import org.apache.flink.table.data.RowData; -import org.apache.flink.table.runtime.operators.window.combines.WindowCombineFunction; +import org.apache.flink.table.runtime.operators.window.slicing.WindowTimerService; +import org.apache.flink.table.runtime.operators.window.state.WindowState; +import org.apache.flink.util.Collector; import java.io.IOException; import java.io.Serializable; @@ -83,7 +87,12 @@ interface Factory extends Serializable { * @param operatorOwner the owner of the operator * @param memoryManager the manager that governs memory by Flink framework * @param memorySize the managed memory size can be used by this operator - * @param combineFunction the combine function used to combine buffered data into state + * @param runtimeContext the current {@link RuntimeContext} + * @param timerService the service to register event-time and processing-time timers + * @param stateBackend the state backend to accessing states + * @param windowState the window state to flush buffered data into. + * @param isEventTime indicates whether the operator works in event-time or processing-time + * mode, used for register corresponding timers. * @param shiftTimeZone the shit timezone of the window * @throws IOException thrown if the buffer can't be opened */ @@ -91,8 +100,37 @@ WindowBuffer create( Object operatorOwner, MemoryManager memoryManager, long memorySize, - WindowCombineFunction combineFunction, + RuntimeContext runtimeContext, + WindowTimerService timerService, + KeyedStateBackend stateBackend, + WindowState windowState, + boolean isEventTime, ZoneId shiftTimeZone) - throws IOException; + throws Exception; + } + + /** A factory that creates a {@link WindowBuffer}. */ + @FunctionalInterface + interface LocalFactory extends Serializable { + + /** + * Creates a {@link WindowBuffer} for local window that buffers elements in memory before + * flushing. + * + * @param operatorOwner the owner of the operator + * @param memoryManager the manager that governs memory by Flink framework + * @param memorySize the managed memory size can be used by this operator + * @param collector collector to emit records + * @param shiftTimeZone the shit timezone of the window + * @throws IOException thrown if the buffer can't be opened + */ + WindowBuffer create( + Object operatorOwner, + MemoryManager memoryManager, + long memorySize, + RuntimeContext runtimeContext, + Collector collector, + ZoneId shiftTimeZone) + throws Exception; } } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/AggRecordsCombiner.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/AggCombiner.java similarity index 90% rename from flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/AggRecordsCombiner.java rename to flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/AggCombiner.java index 53b9372aef10f..c3e2ee200c1b5 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/AggRecordsCombiner.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/AggCombiner.java @@ -26,7 +26,7 @@ import org.apache.flink.table.runtime.dataview.PerWindowStateDataViewStore; import org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction; import org.apache.flink.table.runtime.generated.NamespaceAggsHandleFunction; -import org.apache.flink.table.runtime.operators.window.combines.WindowCombineFunction; +import org.apache.flink.table.runtime.operators.window.combines.RecordsCombiner; import org.apache.flink.table.runtime.operators.window.slicing.WindowTimerService; import org.apache.flink.table.runtime.operators.window.state.StateKeyContext; import org.apache.flink.table.runtime.operators.window.state.WindowState; @@ -41,10 +41,10 @@ import static org.apache.flink.table.runtime.util.TimeWindowUtil.isWindowFired; /** - * An implementation of {@link WindowCombineFunction} that accumulates input records into the window + * An implementation of {@link RecordsCombiner} that accumulates input records into the window * accumulator state. */ -public final class AggRecordsCombiner implements WindowCombineFunction { +public class AggCombiner implements RecordsCombiner { /** The service to register event-time or processing-time timers. */ private final WindowTimerService timerService; @@ -70,14 +70,14 @@ public final class AggRecordsCombiner implements WindowCombineFunction { /** Whether the operator works in event-time mode, used to indicate registering which timer. */ private final boolean isEventTime; - public AggRecordsCombiner( + public AggCombiner( WindowTimerService timerService, StateKeyContext keyContext, WindowValueState accState, NamespaceAggsHandleFunction aggregator, boolean requiresCopy, TypeSerializer keySerializer, - TypeSerializer recordSerializer, + TypeSerializer valueSerializer, boolean isEventTime) { this.timerService = timerService; this.keyContext = keyContext; @@ -85,7 +85,7 @@ public AggRecordsCombiner( this.aggregator = aggregator; this.requiresCopy = requiresCopy; this.keySerializer = keySerializer; - this.recordSerializer = recordSerializer; + this.recordSerializer = valueSerializer; this.isEventTime = isEventTime; } @@ -151,26 +151,25 @@ public void close() throws Exception { // Factory // ---------------------------------------------------------------------------------------- - /** Factory to create {@link AggRecordsCombiner}. */ - public static final class Factory implements WindowCombineFunction.Factory { - + /** Factory to create {@link AggCombiner}. */ + public static final class Factory implements RecordsCombiner.Factory { private static final long serialVersionUID = 1L; private final GeneratedNamespaceAggsHandleFunction genAggsHandler; private final TypeSerializer keySerializer; - private final TypeSerializer recordSerializer; + private final TypeSerializer valueSerializer; public Factory( GeneratedNamespaceAggsHandleFunction genAggsHandler, TypeSerializer keySerializer, - TypeSerializer recordSerializer) { + TypeSerializer valueSerializer) { this.genAggsHandler = genAggsHandler; this.keySerializer = keySerializer; - this.recordSerializer = recordSerializer; + this.valueSerializer = valueSerializer; } @Override - public WindowCombineFunction create( + public RecordsCombiner createRecordsCombiner( RuntimeContext runtimeContext, WindowTimerService timerService, KeyedStateBackend stateBackend, @@ -184,14 +183,14 @@ public WindowCombineFunction create( stateBackend, LongSerializer.INSTANCE, runtimeContext)); boolean requiresCopy = !isStateImmutableInStateBackend(stateBackend); WindowValueState windowValueState = (WindowValueState) windowState; - return new AggRecordsCombiner( + return new AggCombiner( timerService, stateBackend::setCurrentKey, windowValueState, aggregator, requiresCopy, keySerializer, - recordSerializer, + valueSerializer, isEventTime); } } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/GlobalAggAccCombiner.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/GlobalAggCombiner.java similarity index 90% rename from flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/GlobalAggAccCombiner.java rename to flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/GlobalAggCombiner.java index e8fc66a2307be..3ef650ede875f 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/GlobalAggAccCombiner.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/GlobalAggCombiner.java @@ -26,7 +26,7 @@ import org.apache.flink.table.runtime.dataview.PerWindowStateDataViewStore; import org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction; import org.apache.flink.table.runtime.generated.NamespaceAggsHandleFunction; -import org.apache.flink.table.runtime.operators.window.combines.WindowCombineFunction; +import org.apache.flink.table.runtime.operators.window.combines.RecordsCombiner; import org.apache.flink.table.runtime.operators.window.slicing.WindowTimerService; import org.apache.flink.table.runtime.operators.window.state.StateKeyContext; import org.apache.flink.table.runtime.operators.window.state.WindowState; @@ -40,12 +40,12 @@ import static org.apache.flink.table.runtime.util.TimeWindowUtil.isWindowFired; /** - * An implementation of {@link WindowCombineFunction} that accumulates local accumulators records - * into the window accumulator state. + * An implementation of {@link RecordsCombiner} that accumulates local accumulators records into the + * window accumulator state. * *

Note: this only supports event-time window. */ -public final class GlobalAggAccCombiner implements WindowCombineFunction { +public class GlobalAggCombiner implements RecordsCombiner { /** The service to register event-time or processing-time timers. */ private final WindowTimerService timerService; @@ -68,7 +68,7 @@ public final class GlobalAggAccCombiner implements WindowCombineFunction { /** Serializer to copy key if required. */ private final TypeSerializer keySerializer; - public GlobalAggAccCombiner( + public GlobalAggCombiner( WindowTimerService timerService, StateKeyContext keyContext, WindowValueState accState, @@ -87,7 +87,18 @@ public GlobalAggAccCombiner( @Override public void combine(WindowKey windowKey, Iterator localAccs) throws Exception { - // step 0: set current key for states and timers + Long window = windowKey.getWindow(); + RowData acc = localAggregator.createAccumulators(); + localAggregator.setAccumulators(window, acc); + while (localAccs.hasNext()) { + RowData localAcc = localAccs.next(); + localAggregator.merge(window, localAcc); + } + combineAccumulator(windowKey, localAggregator.getAccumulators()); + } + + private void combineAccumulator(WindowKey windowKey, RowData acc) throws Exception { + // step 1: set current key for states and timers final RowData key; if (requiresCopy) { // the incoming key is reused, we should copy it if state backend doesn't copy it @@ -98,22 +109,13 @@ public void combine(WindowKey windowKey, Iterator localAccs) throws Exc keyContext.setCurrentKey(key); Long window = windowKey.getWindow(); - // step 1: merge localAccs into one acc - RowData acc = localAggregator.createAccumulators(); - localAggregator.setAccumulators(window, acc); - while (localAccs.hasNext()) { - RowData localAcc = localAccs.next(); - localAggregator.merge(window, localAcc); - } - RowData mergedLocalAcc = localAggregator.getAccumulators(); - // step2: merge acc into state RowData stateAcc = accState.value(window); if (stateAcc == null) { stateAcc = globalAggregator.createAccumulators(); } globalAggregator.setAccumulators(window, stateAcc); - globalAggregator.merge(window, mergedLocalAcc); + globalAggregator.merge(window, acc); stateAcc = globalAggregator.getAccumulators(); accState.update(window, stateAcc); @@ -136,8 +138,8 @@ public void close() throws Exception { // Factory // ---------------------------------------------------------------------------------------- - /** Factory to create {@link GlobalAggAccCombiner}. */ - public static final class Factory implements WindowCombineFunction.Factory { + /** Factory to create {@link GlobalAggCombiner}. */ + public static final class Factory implements RecordsCombiner.Factory { private static final long serialVersionUID = 1L; @@ -155,7 +157,7 @@ public Factory( } @Override - public WindowCombineFunction create( + public RecordsCombiner createRecordsCombiner( RuntimeContext runtimeContext, WindowTimerService timerService, KeyedStateBackend stateBackend, @@ -174,7 +176,7 @@ public WindowCombineFunction create( stateBackend, LongSerializer.INSTANCE, runtimeContext)); boolean requiresCopy = !isStateImmutableInStateBackend(stateBackend); WindowValueState windowValueState = (WindowValueState) windowState; - return new GlobalAggAccCombiner( + return new GlobalAggCombiner( timerService, stateBackend::setCurrentKey, windowValueState, diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/LocalAggRecordsCombiner.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/LocalAggCombiner.java similarity index 77% rename from flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/LocalAggRecordsCombiner.java rename to flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/LocalAggCombiner.java index 71e92d8a7de6d..c2c8807cf7fc3 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/LocalAggRecordsCombiner.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/LocalAggCombiner.java @@ -19,14 +19,13 @@ package org.apache.flink.table.runtime.operators.aggregate.window.combines; import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.utils.JoinedRowData; import org.apache.flink.table.runtime.dataview.UnsupportedStateDataViewStore; import org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction; import org.apache.flink.table.runtime.generated.NamespaceAggsHandleFunction; -import org.apache.flink.table.runtime.operators.window.combines.WindowCombineFunction; +import org.apache.flink.table.runtime.operators.window.combines.RecordsCombiner; import org.apache.flink.table.runtime.util.WindowKey; import org.apache.flink.util.Collector; @@ -35,19 +34,16 @@ import static org.apache.flink.table.data.util.RowDataUtil.isAccumulateMsg; /** - * An implementation of {@link WindowCombineFunction} that accumulates input records into local + * An implementation of {@link RecordsCombiner} that accumulates input records into local * accumulators. * *

Note: this only supports event-time window. */ -public final class LocalAggRecordsCombiner implements WindowCombineFunction { +public class LocalAggCombiner implements RecordsCombiner { /** Function used to handle all aggregates. */ private final NamespaceAggsHandleFunction aggregator; - /** Serializer to copy key if required. */ - private final TypeSerializer keySerializer; - /** The output to emit combined accumulator result. */ private final Collector collector; @@ -63,19 +59,16 @@ public final class LocalAggRecordsCombiner implements WindowCombineFunction { */ private final GenericRowData windowRow = new GenericRowData(1); - public LocalAggRecordsCombiner( - NamespaceAggsHandleFunction aggregator, - TypeSerializer keySerializer, - Collector collector) { + public LocalAggCombiner( + NamespaceAggsHandleFunction aggregator, Collector collector) { this.aggregator = aggregator; - this.keySerializer = keySerializer; this.collector = collector; } @Override public void combine(WindowKey windowKey, Iterator records) throws Exception { - // always copy key because we will merge record into memory acc - final RowData key = keySerializer.copy(windowKey.getKey()); + // always not copy key/value because they are not cached. + final RowData key = windowKey.getKey(); final Long window = windowKey.getWindow(); // step 1: create an empty accumulator @@ -116,28 +109,24 @@ private void output(RowData key, Long window, RowData acc) { // Factory // ---------------------------------------------------------------------------------------- - /** Factory to create {@link LocalAggRecordsCombiner}. */ - public static final class Factory implements WindowCombineFunction.LocalFactory { + /** Factory to create {@link LocalAggCombiner}. */ + public static final class Factory implements RecordsCombiner.LocalFactory { private static final long serialVersionUID = 1L; private final GeneratedNamespaceAggsHandleFunction genAggsHandler; - private final TypeSerializer keySerializer; - public Factory( - GeneratedNamespaceAggsHandleFunction genAggsHandler, - TypeSerializer keySerializer) { + public Factory(GeneratedNamespaceAggsHandleFunction genAggsHandler) { this.genAggsHandler = genAggsHandler; - this.keySerializer = keySerializer; } @Override - public WindowCombineFunction create( + public RecordsCombiner createRecordsCombiner( RuntimeContext runtimeContext, Collector collector) throws Exception { final NamespaceAggsHandleFunction aggregator = genAggsHandler.newInstance(runtimeContext.getUserCodeClassLoader()); aggregator.open(new UnsupportedStateDataViewStore(runtimeContext)); - return new LocalAggRecordsCombiner(aggregator, keySerializer, collector); + return new LocalAggCombiner(aggregator, collector); } } } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/AbstractWindowAggProcessor.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/AbstractWindowAggProcessor.java index 85f0ebb7e249a..0851949e04899 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/AbstractWindowAggProcessor.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/AbstractWindowAggProcessor.java @@ -29,7 +29,6 @@ import org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction; import org.apache.flink.table.runtime.generated.NamespaceAggsHandleFunction; import org.apache.flink.table.runtime.operators.aggregate.window.buffers.WindowBuffer; -import org.apache.flink.table.runtime.operators.window.combines.WindowCombineFunction; import org.apache.flink.table.runtime.operators.window.slicing.ClockService; import org.apache.flink.table.runtime.operators.window.slicing.SliceAssigner; import org.apache.flink.table.runtime.operators.window.slicing.SliceSharedAssigner; @@ -50,7 +49,6 @@ public abstract class AbstractWindowAggProcessor implements SlicingWindowProcess protected final GeneratedNamespaceAggsHandleFunction genAggsHandler; protected final WindowBuffer.Factory windowBufferFactory; - protected final WindowCombineFunction.Factory combineFactory; protected final SliceAssigner sliceAssigner; protected final TypeSerializer accSerializer; protected final boolean isEventTime; @@ -85,13 +83,11 @@ public abstract class AbstractWindowAggProcessor implements SlicingWindowProcess public AbstractWindowAggProcessor( GeneratedNamespaceAggsHandleFunction genAggsHandler, WindowBuffer.Factory bufferFactory, - WindowCombineFunction.Factory combinerFactory, SliceAssigner sliceAssigner, TypeSerializer accSerializer, ZoneId shiftTimeZone) { this.genAggsHandler = genAggsHandler; this.windowBufferFactory = bufferFactory; - this.combineFactory = combinerFactory; this.sliceAssigner = sliceAssigner; this.accSerializer = accSerializer; this.isEventTime = sliceAssigner.isEventTime(); @@ -118,19 +114,16 @@ public void open(Context context) throws Exception { this.aggregator.open( new PerWindowStateDataViewStore( ctx.getKeyedStateBackend(), namespaceSerializer, ctx.getRuntimeContext())); - final WindowCombineFunction combineFunction = - combineFactory.create( - ctx.getRuntimeContext(), - windowTimerService, - ctx.getKeyedStateBackend(), - windowState, - isEventTime); this.windowBuffer = windowBufferFactory.create( ctx.getOperatorOwner(), ctx.getMemoryManager(), ctx.getMemorySize(), - combineFunction, + ctx.getRuntimeContext(), + windowTimerService, + ctx.getKeyedStateBackend(), + windowState, + isEventTime, shiftTimeZone); this.reuseOutput = new JoinedRowData(); diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/SliceSharedWindowAggProcessor.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/SliceSharedWindowAggProcessor.java index 63fa925f79736..db0b39c6d0496 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/SliceSharedWindowAggProcessor.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/SliceSharedWindowAggProcessor.java @@ -22,7 +22,6 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction; import org.apache.flink.table.runtime.operators.aggregate.window.buffers.WindowBuffer; -import org.apache.flink.table.runtime.operators.window.combines.WindowCombineFunction; import org.apache.flink.table.runtime.operators.window.slicing.SliceAssigner; import org.apache.flink.table.runtime.operators.window.slicing.SliceAssigners; import org.apache.flink.table.runtime.operators.window.slicing.SliceSharedAssigner; @@ -51,18 +50,11 @@ public final class SliceSharedWindowAggProcessor extends AbstractWindowAggProces public SliceSharedWindowAggProcessor( GeneratedNamespaceAggsHandleFunction genAggsHandler, WindowBuffer.Factory bufferFactory, - WindowCombineFunction.Factory combinerFactory, SliceSharedAssigner sliceAssigner, TypeSerializer accSerializer, int indexOfCountStar, ZoneId shiftTimeZone) { - super( - genAggsHandler, - bufferFactory, - combinerFactory, - sliceAssigner, - accSerializer, - shiftTimeZone); + super(genAggsHandler, bufferFactory, sliceAssigner, accSerializer, shiftTimeZone); this.sliceSharedAssigner = sliceAssigner; this.mergeTargetHelper = new SliceMergeTargetHelper(); this.emptySupplier = new WindowIsEmptySupplier(indexOfCountStar, sliceAssigner); diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/SliceUnsharedWindowAggProcessor.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/SliceUnsharedWindowAggProcessor.java index 7c5d45ab0c279..53c395f180b9f 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/SliceUnsharedWindowAggProcessor.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/SliceUnsharedWindowAggProcessor.java @@ -22,7 +22,6 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction; import org.apache.flink.table.runtime.operators.aggregate.window.buffers.WindowBuffer; -import org.apache.flink.table.runtime.operators.window.combines.WindowCombineFunction; import org.apache.flink.table.runtime.operators.window.slicing.SliceUnsharedAssigner; import java.time.ZoneId; @@ -37,17 +36,10 @@ public final class SliceUnsharedWindowAggProcessor extends AbstractWindowAggProc public SliceUnsharedWindowAggProcessor( GeneratedNamespaceAggsHandleFunction genAggsHandler, WindowBuffer.Factory windowBufferFactory, - WindowCombineFunction.Factory combineFactory, SliceUnsharedAssigner sliceAssigner, TypeSerializer accSerializer, ZoneId shiftTimeZone) { - super( - genAggsHandler, - windowBufferFactory, - combineFactory, - sliceAssigner, - accSerializer, - shiftTimeZone); + super(genAggsHandler, windowBufferFactory, sliceAssigner, accSerializer, shiftTimeZone); } @Override diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorBuilder.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorBuilder.java index da89489eac50e..07703101a1579 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorBuilder.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorBuilder.java @@ -25,7 +25,7 @@ import org.apache.flink.table.runtime.operators.aggregate.window.buffers.WindowBuffer; import org.apache.flink.table.runtime.operators.rank.window.combines.TopNRecordsCombiner; import org.apache.flink.table.runtime.operators.rank.window.processors.WindowRankProcessor; -import org.apache.flink.table.runtime.operators.window.combines.WindowCombineFunction; +import org.apache.flink.table.runtime.operators.window.combines.RecordsCombiner; import org.apache.flink.table.runtime.operators.window.slicing.SlicingWindowOperator; import org.apache.flink.table.runtime.operators.window.slicing.SlicingWindowProcessor; import org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer; @@ -136,22 +136,21 @@ public WindowRankOperatorBuilder windowEndIndex(int windowEndIndex) { windowEndIndex >= 0, String.format( "Illegal window end index %s, it should not be negative!", windowEndIndex)); - final WindowBuffer.Factory bufferFactory = - new RecordsWindowBuffer.Factory(keySerializer, inputSerializer); - final WindowCombineFunction.Factory combinerFactory = + final RecordsCombiner.Factory combinerFactory = new TopNRecordsCombiner.Factory( generatedSortKeyComparator, sortKeySelector, keySerializer, inputSerializer, rankEnd); + final WindowBuffer.Factory bufferFactory = + new RecordsWindowBuffer.Factory(keySerializer, inputSerializer, combinerFactory); final SlicingWindowProcessor windowProcessor = new WindowRankProcessor( inputSerializer, generatedSortKeyComparator, sortKeySelector.getProducedType().toSerializer(), bufferFactory, - combinerFactory, rankStart, rankEnd, outputRankNumber, diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/combines/TopNRecordsCombiner.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/combines/TopNRecordsCombiner.java index f33757e554bf0..edc1113bc3a57 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/combines/TopNRecordsCombiner.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/combines/TopNRecordsCombiner.java @@ -26,7 +26,7 @@ import org.apache.flink.table.runtime.generated.GeneratedRecordComparator; import org.apache.flink.table.runtime.keyselector.RowDataKeySelector; import org.apache.flink.table.runtime.operators.rank.TopNBuffer; -import org.apache.flink.table.runtime.operators.window.combines.WindowCombineFunction; +import org.apache.flink.table.runtime.operators.window.combines.RecordsCombiner; import org.apache.flink.table.runtime.operators.window.slicing.WindowTimerService; import org.apache.flink.table.runtime.operators.window.state.StateKeyContext; import org.apache.flink.table.runtime.operators.window.state.WindowMapState; @@ -44,10 +44,10 @@ import static org.apache.flink.table.runtime.util.StateConfigUtil.isStateImmutableInStateBackend; /** - * An implementation of {@link WindowCombineFunction} that save topN records of incremental input - * records into the window state. + * An implementation of {@link RecordsCombiner} that save topN records of incremental input records + * into the window state. */ -public final class TopNRecordsCombiner implements WindowCombineFunction { +public final class TopNRecordsCombiner implements RecordsCombiner { /** The service to register event-time or processing-time timers. */ private final WindowTimerService timerService; @@ -158,7 +158,7 @@ public void close() throws Exception {} // ---------------------------------------------------------------------------------------- /** Factory to create {@link TopNRecordsCombiner}. */ - public static final class Factory implements WindowCombineFunction.Factory { + public static final class Factory implements RecordsCombiner.Factory { private static final long serialVersionUID = 1L; @@ -183,7 +183,7 @@ public Factory( } @Override - public WindowCombineFunction create( + public RecordsCombiner createRecordsCombiner( RuntimeContext runtimeContext, WindowTimerService timerService, KeyedStateBackend stateBackend, diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/processors/WindowRankProcessor.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/processors/WindowRankProcessor.java index c45b401e9b901..4df71ab88c788 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/processors/WindowRankProcessor.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/processors/WindowRankProcessor.java @@ -30,7 +30,6 @@ import org.apache.flink.table.runtime.generated.GeneratedRecordComparator; import org.apache.flink.table.runtime.operators.aggregate.window.buffers.WindowBuffer; import org.apache.flink.table.runtime.operators.rank.TopNBuffer; -import org.apache.flink.table.runtime.operators.window.combines.WindowCombineFunction; import org.apache.flink.table.runtime.operators.window.slicing.SlicingWindowProcessor; import org.apache.flink.table.runtime.operators.window.slicing.WindowTimerService; import org.apache.flink.table.runtime.operators.window.slicing.WindowTimerServiceImpl; @@ -59,7 +58,6 @@ public final class WindowRankProcessor implements SlicingWindowProcessor { private final TypeSerializer sortKeySerializer; private final WindowBuffer.Factory bufferFactory; - private final WindowCombineFunction.Factory combineFactory; private final TypeSerializer inputSerializer; private final long rankStart; private final long rankEnd; @@ -88,7 +86,6 @@ public WindowRankProcessor( GeneratedRecordComparator genSortKeyComparator, TypeSerializer sortKeySerializer, WindowBuffer.Factory bufferFactory, - WindowCombineFunction.Factory combineFactory, long rankStart, long rankEnd, boolean outputRankNumber, @@ -98,7 +95,6 @@ public WindowRankProcessor( this.generatedSortKeyComparator = genSortKeyComparator; this.sortKeySerializer = sortKeySerializer; this.bufferFactory = bufferFactory; - this.combineFactory = combineFactory; this.rankStart = rankStart; this.rankEnd = rankEnd; this.outputRankNumber = outputRankNumber; @@ -127,19 +123,16 @@ public void open(Context context) throws Exception { this.windowState = new WindowMapState<>( (InternalMapState>) state); - final WindowCombineFunction combineFunction = - combineFactory.create( - ctx.getRuntimeContext(), - windowTimerService, - ctx.getKeyedStateBackend(), - windowState, - true); this.windowBuffer = bufferFactory.create( ctx.getOperatorOwner(), ctx.getMemoryManager(), ctx.getMemorySize(), - combineFunction, + ctx.getRuntimeContext(), + windowTimerService, + ctx.getKeyedStateBackend(), + windowState, + true, shiftTimeZone); this.reuseOutput = new JoinedRowData(); diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/combines/WindowCombineFunction.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/combines/RecordsCombiner.java similarity index 76% rename from flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/combines/WindowCombineFunction.java rename to flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/combines/RecordsCombiner.java index c12a2476f9e0b..d235ed5c247e8 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/combines/WindowCombineFunction.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/combines/RecordsCombiner.java @@ -30,33 +30,29 @@ import java.io.Serializable; import java.util.Iterator; -/** The {@link WindowCombineFunction} is used to combine buffered data into state. */ +/** The {@link RecordsCombiner} is used to combine buffered records into state. */ @Internal -public interface WindowCombineFunction { - +public interface RecordsCombiner { /** * Combines the buffered data into state based on the given window-key pair. * * @param windowKey the window-key pair that the buffered data belong to, the window-key object * is reused. - * @param value the buffered data, the iterator and {@link RowData} objects are reused + * @param records the buffered data, the iterator and {@link RowData} objects are reused. */ - void combine(WindowKey windowKey, Iterator value) throws Exception; + void combine(WindowKey windowKey, Iterator records) throws Exception; /** Release resources allocated by this combine function. */ void close() throws Exception; // ------------------------------------------------------------------------ - /** - * A factory that creates a {@link WindowCombineFunction} for combining at global stage, i.e. - * keyed operator where combine into keyed state. - */ + /** A factory that creates a {@link RecordsCombiner}. */ @FunctionalInterface interface Factory extends Serializable { /** - * Creates a {@link WindowCombineFunction} that can combine buffered data into states. + * Creates a {@link RecordsCombiner} that can combine buffered data into states. * * @param runtimeContext the current {@link RuntimeContext} * @param timerService the service to register event-time and processing-time timers @@ -65,7 +61,7 @@ interface Factory extends Serializable { * @param isEventTime indicates whether the operator works in event-time or processing-time * mode, used for register corresponding timers. */ - WindowCombineFunction create( + RecordsCombiner createRecordsCombiner( RuntimeContext runtimeContext, WindowTimerService timerService, KeyedStateBackend stateBackend, @@ -74,9 +70,10 @@ WindowCombineFunction create( throws Exception; } - /** A factory that creates a {@link WindowCombineFunction} used for combining at local stage. */ + /** A factory that creates a {@link RecordsCombiner} used for combining at local stage. */ + @FunctionalInterface interface LocalFactory extends Serializable { - WindowCombineFunction create(RuntimeContext runtimeContext, Collector collector) - throws Exception; + RecordsCombiner createRecordsCombiner( + RuntimeContext runtimeContext, Collector collector) throws Exception; } } From f32562327c9c45c3b8e5f348966cd21b202800df Mon Sep 17 00:00:00 2001 From: "shuo.cs" Date: Mon, 26 Apr 2021 15:01:39 +0800 Subject: [PATCH 2/3] make BytesMap#getEntryIterator copy-aware --- .../SlicingWindowAggOperatorBuilder.java | 8 +--- .../window/buffers/RecordsWindowBuffer.java | 11 ++++- .../window/combines/AggCombiner.java | 43 +------------------ .../window/combines/GlobalAggCombiner.java | 33 ++------------ .../window/WindowRankOperatorBuilder.java | 6 +-- .../window/combines/TopNRecordsCombiner.java | 26 +---------- .../binary/AbstractBytesHashMap.java | 16 ++++--- .../binary/AbstractBytesMultiMap.java | 25 +++++++---- .../aggregate/SumHashAggTestOperator.java | 3 +- .../binary/BytesHashMapTestBase.java | 2 +- .../binary/BytesMultiMapTestBase.java | 2 +- 11 files changed, 50 insertions(+), 125 deletions(-) diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorBuilder.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorBuilder.java index 3aaae65f50687..3aba16208d7a2 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorBuilder.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorBuilder.java @@ -138,13 +138,9 @@ public SlicingWindowAggOperatorBuilder countStarIndex(int indexOfCountStart) { if (isGlobalAgg) { combinerFactory = new GlobalAggCombiner.Factory( - localGeneratedAggregateFunction, - globalGeneratedAggregateFunction, - keySerializer); + localGeneratedAggregateFunction, globalGeneratedAggregateFunction); } else { - combinerFactory = - new AggCombiner.Factory( - generatedAggregateFunction, keySerializer, inputSerializer); + combinerFactory = new AggCombiner.Factory(generatedAggregateFunction); } final WindowBuffer.Factory bufferFactory = new RecordsWindowBuffer.Factory(keySerializer, inputSerializer, combinerFactory); diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/buffers/RecordsWindowBuffer.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/buffers/RecordsWindowBuffer.java index 111935d9cad8e..9ca57f4a75275 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/buffers/RecordsWindowBuffer.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/buffers/RecordsWindowBuffer.java @@ -38,6 +38,7 @@ import java.time.ZoneId; import java.util.Iterator; +import static org.apache.flink.table.runtime.util.StateConfigUtil.isStateImmutableInStateBackend; import static org.apache.flink.table.runtime.util.TimeWindowUtil.isWindowFired; /** @@ -51,6 +52,9 @@ public final class RecordsWindowBuffer implements WindowBuffer { private final WindowKey reuseWindowKey; private final AbstractRowDataSerializer recordSerializer; private final ZoneId shiftTimeZone; + // copy key and input record if necessary(e.g., heap state backend), + // because key and record are reused. + private final boolean requiresCopy; private long minSliceEnd = Long.MAX_VALUE; @@ -61,6 +65,7 @@ public RecordsWindowBuffer( RecordsCombiner combineFunction, PagedTypeSerializer keySer, AbstractRowDataSerializer inputSer, + boolean requiresCopy, ZoneId shiftTimeZone) { this.combineFunction = combineFunction; this.recordsBuffer = @@ -68,6 +73,7 @@ public RecordsWindowBuffer( operatorOwner, memoryManager, memorySize, keySer, inputSer.getArity()); this.recordSerializer = inputSer; this.reuseWindowKey = new WindowKeySerializer(keySer).createInstance(); + this.requiresCopy = requiresCopy; this.shiftTimeZone = shiftTimeZone; } @@ -102,7 +108,7 @@ public void advanceProgress(long progress) throws Exception { public void flush() throws Exception { if (recordsBuffer.getNumKeys() > 0) { KeyValueIterator> entryIterator = - recordsBuffer.getEntryIterator(); + recordsBuffer.getEntryIterator(requiresCopy); while (entryIterator.advanceNext()) { combineFunction.combine(entryIterator.getKey(), entryIterator.getValue()); } @@ -155,6 +161,7 @@ public WindowBuffer create( RecordsCombiner combiner = factory.createRecordsCombiner( runtimeContext, timerService, stateBackend, windowState, isEventTime); + boolean requiresCopy = !isStateImmutableInStateBackend(stateBackend); return new RecordsWindowBuffer( operatorOwner, memoryManager, @@ -162,6 +169,7 @@ public WindowBuffer create( combiner, keySer, inputSer, + requiresCopy, shiftTimeZone); } } @@ -202,6 +210,7 @@ public WindowBuffer create( combiner, keySer, inputSer, + false, shiftTimeZone); } } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/AggCombiner.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/AggCombiner.java index c3e2ee200c1b5..1ec2bc13a4a3e 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/AggCombiner.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/AggCombiner.java @@ -19,7 +19,6 @@ package org.apache.flink.table.runtime.operators.aggregate.window.combines; import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.runtime.state.KeyedStateBackend; import org.apache.flink.table.data.RowData; @@ -37,7 +36,6 @@ import java.util.Iterator; import static org.apache.flink.table.data.util.RowDataUtil.isAccumulateMsg; -import static org.apache.flink.table.runtime.util.StateConfigUtil.isStateImmutableInStateBackend; import static org.apache.flink.table.runtime.util.TimeWindowUtil.isWindowFired; /** @@ -58,15 +56,6 @@ public class AggCombiner implements RecordsCombiner { /** Function used to handle all aggregates. */ private final NamespaceAggsHandleFunction aggregator; - /** Whether to copy key and input record, because key and record are reused. */ - private final boolean requiresCopy; - - /** Serializer to copy key if required. */ - private final TypeSerializer keySerializer; - - /** Serializer to copy record if required. */ - private final TypeSerializer recordSerializer; - /** Whether the operator works in event-time mode, used to indicate registering which timer. */ private final boolean isEventTime; @@ -75,31 +64,18 @@ public AggCombiner( StateKeyContext keyContext, WindowValueState accState, NamespaceAggsHandleFunction aggregator, - boolean requiresCopy, - TypeSerializer keySerializer, - TypeSerializer valueSerializer, boolean isEventTime) { this.timerService = timerService; this.keyContext = keyContext; this.accState = accState; this.aggregator = aggregator; - this.requiresCopy = requiresCopy; - this.keySerializer = keySerializer; - this.recordSerializer = valueSerializer; this.isEventTime = isEventTime; } @Override public void combine(WindowKey windowKey, Iterator records) throws Exception { // step 0: set current key for states and timers - final RowData key; - if (requiresCopy) { - // the incoming key is reused, we should copy it if state backend doesn't copy it - key = keySerializer.copy(windowKey.getKey()); - } else { - key = windowKey.getKey(); - } - keyContext.setCurrentKey(key); + keyContext.setCurrentKey(windowKey.getKey()); // step 1: get the accumulator for the current key and window Long window = windowKey.getWindow(); @@ -114,10 +90,6 @@ public void combine(WindowKey windowKey, Iterator records) throws Excep // step 3: do accumulate while (records.hasNext()) { RowData record = records.next(); - if (requiresCopy) { - // the incoming record is reused, we should copy it if state backend doesn't copy it - record = recordSerializer.copy(record); - } if (isAccumulateMsg(record)) { aggregator.accumulate(record); } else { @@ -156,16 +128,9 @@ public static final class Factory implements RecordsCombiner.Factory { private static final long serialVersionUID = 1L; private final GeneratedNamespaceAggsHandleFunction genAggsHandler; - private final TypeSerializer keySerializer; - private final TypeSerializer valueSerializer; - public Factory( - GeneratedNamespaceAggsHandleFunction genAggsHandler, - TypeSerializer keySerializer, - TypeSerializer valueSerializer) { + public Factory(GeneratedNamespaceAggsHandleFunction genAggsHandler) { this.genAggsHandler = genAggsHandler; - this.keySerializer = keySerializer; - this.valueSerializer = valueSerializer; } @Override @@ -181,16 +146,12 @@ public RecordsCombiner createRecordsCombiner( aggregator.open( new PerWindowStateDataViewStore( stateBackend, LongSerializer.INSTANCE, runtimeContext)); - boolean requiresCopy = !isStateImmutableInStateBackend(stateBackend); WindowValueState windowValueState = (WindowValueState) windowState; return new AggCombiner( timerService, stateBackend::setCurrentKey, windowValueState, aggregator, - requiresCopy, - keySerializer, - valueSerializer, isEventTime); } } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/GlobalAggCombiner.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/GlobalAggCombiner.java index 3ef650ede875f..bf6e300154f8e 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/GlobalAggCombiner.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/GlobalAggCombiner.java @@ -19,7 +19,6 @@ package org.apache.flink.table.runtime.operators.aggregate.window.combines; import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.runtime.state.KeyedStateBackend; import org.apache.flink.table.data.RowData; @@ -36,7 +35,6 @@ import java.time.ZoneId; import java.util.Iterator; -import static org.apache.flink.table.runtime.util.StateConfigUtil.isStateImmutableInStateBackend; import static org.apache.flink.table.runtime.util.TimeWindowUtil.isWindowFired; /** @@ -62,27 +60,17 @@ public class GlobalAggCombiner implements RecordsCombiner { /** Global aggregate function to handle global accumulator rows. */ private final NamespaceAggsHandleFunction globalAggregator; - /** Whether to copy key and input record, because key and record are reused. */ - private final boolean requiresCopy; - - /** Serializer to copy key if required. */ - private final TypeSerializer keySerializer; - public GlobalAggCombiner( WindowTimerService timerService, StateKeyContext keyContext, WindowValueState accState, NamespaceAggsHandleFunction localAggregator, - NamespaceAggsHandleFunction globalAggregator, - boolean requiresCopy, - TypeSerializer keySerializer) { + NamespaceAggsHandleFunction globalAggregator) { this.timerService = timerService; this.keyContext = keyContext; this.accState = accState; this.localAggregator = localAggregator; this.globalAggregator = globalAggregator; - this.requiresCopy = requiresCopy; - this.keySerializer = keySerializer; } @Override @@ -99,14 +87,7 @@ public void combine(WindowKey windowKey, Iterator localAccs) throws Exc private void combineAccumulator(WindowKey windowKey, RowData acc) throws Exception { // step 1: set current key for states and timers - final RowData key; - if (requiresCopy) { - // the incoming key is reused, we should copy it if state backend doesn't copy it - key = keySerializer.copy(windowKey.getKey()); - } else { - key = windowKey.getKey(); - } - keyContext.setCurrentKey(key); + keyContext.setCurrentKey(windowKey.getKey()); Long window = windowKey.getWindow(); // step2: merge acc into state @@ -145,15 +126,12 @@ public static final class Factory implements RecordsCombiner.Factory { private final GeneratedNamespaceAggsHandleFunction genLocalAggsHandler; private final GeneratedNamespaceAggsHandleFunction genGlobalAggsHandler; - private final TypeSerializer keySerializer; public Factory( GeneratedNamespaceAggsHandleFunction genLocalAggsHandler, - GeneratedNamespaceAggsHandleFunction genGlobalAggsHandler, - TypeSerializer keySerializer) { + GeneratedNamespaceAggsHandleFunction genGlobalAggsHandler) { this.genLocalAggsHandler = genLocalAggsHandler; this.genGlobalAggsHandler = genGlobalAggsHandler; - this.keySerializer = keySerializer; } @Override @@ -174,16 +152,13 @@ public RecordsCombiner createRecordsCombiner( globalAggregator.open( new PerWindowStateDataViewStore( stateBackend, LongSerializer.INSTANCE, runtimeContext)); - boolean requiresCopy = !isStateImmutableInStateBackend(stateBackend); WindowValueState windowValueState = (WindowValueState) windowState; return new GlobalAggCombiner( timerService, stateBackend::setCurrentKey, windowValueState, localAggregator, - globalAggregator, - requiresCopy, - keySerializer); + globalAggregator); } } } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorBuilder.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorBuilder.java index 07703101a1579..658988de5b2a5 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorBuilder.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorBuilder.java @@ -138,11 +138,7 @@ public WindowRankOperatorBuilder windowEndIndex(int windowEndIndex) { "Illegal window end index %s, it should not be negative!", windowEndIndex)); final RecordsCombiner.Factory combinerFactory = new TopNRecordsCombiner.Factory( - generatedSortKeyComparator, - sortKeySelector, - keySerializer, - inputSerializer, - rankEnd); + generatedSortKeyComparator, sortKeySelector, inputSerializer, rankEnd); final WindowBuffer.Factory bufferFactory = new RecordsWindowBuffer.Factory(keySerializer, inputSerializer, combinerFactory); final SlicingWindowProcessor windowProcessor = diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/combines/TopNRecordsCombiner.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/combines/TopNRecordsCombiner.java index edc1113bc3a57..7d8929ea068cc 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/combines/TopNRecordsCombiner.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/combines/TopNRecordsCombiner.java @@ -41,7 +41,6 @@ import java.util.Map; import static org.apache.flink.table.data.util.RowDataUtil.isAccumulateMsg; -import static org.apache.flink.table.runtime.util.StateConfigUtil.isStateImmutableInStateBackend; /** * An implementation of {@link RecordsCombiner} that save topN records of incremental input records @@ -67,12 +66,6 @@ public final class TopNRecordsCombiner implements RecordsCombiner { /** TopN size. */ private final long topN; - /** Whether to copy input key, because key is reused. */ - private final boolean requiresCopyKey; - - /** Serializer to copy key if required. */ - private final TypeSerializer keySerializer; - /** Serializer to copy record if required. */ private final TypeSerializer recordSerializer; @@ -86,8 +79,6 @@ public TopNRecordsCombiner( Comparator sortKeyComparator, KeySelector sortKeySelector, long topN, - boolean requiresCopyKey, - TypeSerializer keySerializer, TypeSerializer recordSerializer, boolean isEventTime) { this.timerService = timerService; @@ -96,8 +87,6 @@ public TopNRecordsCombiner( this.sortKeyComparator = sortKeyComparator; this.sortKeySelector = sortKeySelector; this.topN = topN; - this.requiresCopyKey = requiresCopyKey; - this.keySerializer = keySerializer; this.recordSerializer = recordSerializer; this.isEventTime = isEventTime; } @@ -123,14 +112,7 @@ public void combine(WindowKey windowKey, Iterator records) throws Excep // step 2: flush data in TopNBuffer into state Iterator>> bufferItr = buffer.entrySet().iterator(); - final RowData key; - if (requiresCopyKey) { - // the incoming key is reused, we should copy it if state backend doesn't copy it - key = keySerializer.copy(windowKey.getKey()); - } else { - key = windowKey.getKey(); - } - keyContext.setCurrentKey(key); + keyContext.setCurrentKey(windowKey.getKey()); Long window = windowKey.getWindow(); while (bufferItr.hasNext()) { Map.Entry> entry = bufferItr.next(); @@ -165,19 +147,16 @@ public static final class Factory implements RecordsCombiner.Factory { // The util to compare two sortKey equals to each other. private final GeneratedRecordComparator generatedSortKeyComparator; private final KeySelector sortKeySelector; - private final TypeSerializer keySerializer; private final TypeSerializer recordSerializer; private final long topN; public Factory( GeneratedRecordComparator genSortKeyComparator, RowDataKeySelector sortKeySelector, - TypeSerializer keySerializer, TypeSerializer recordSerializer, long topN) { this.generatedSortKeyComparator = genSortKeyComparator; this.sortKeySelector = sortKeySelector; - this.keySerializer = keySerializer; this.recordSerializer = recordSerializer; this.topN = topN; } @@ -192,7 +171,6 @@ public RecordsCombiner createRecordsCombiner( throws Exception { final Comparator sortKeyComparator = generatedSortKeyComparator.newInstance(runtimeContext.getUserCodeClassLoader()); - boolean requiresCopyKey = !isStateImmutableInStateBackend(stateBackend); WindowMapState> windowMapState = (WindowMapState>) windowState; return new TopNRecordsCombiner( @@ -202,8 +180,6 @@ public RecordsCombiner createRecordsCombiner( sortKeyComparator, sortKeySelector, topN, - requiresCopyKey, - keySerializer, recordSerializer, isEventTime); } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/collections/binary/AbstractBytesHashMap.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/collections/binary/AbstractBytesHashMap.java index 7929a263d8fd0..e3cb25fdac2f0 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/collections/binary/AbstractBytesHashMap.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/collections/binary/AbstractBytesHashMap.java @@ -194,12 +194,12 @@ public long getNumElements() { /** Returns an iterator for iterating over the entries of this map. */ @SuppressWarnings("WeakerAccess") - public KeyValueIterator getEntryIterator() { + public KeyValueIterator getEntryIterator(boolean requiresCopy) { if (destructiveIterator != null) { throw new IllegalArgumentException( "DestructiveIterator is not null, so this method can't be invoke!"); } - return ((RecordArea) recordArea).entryIterator(); + return ((RecordArea) recordArea).entryIterator(requiresCopy); } /** @return the underlying memory segments of the hash map's record area */ @@ -329,8 +329,8 @@ public BinaryRowData readValue(BinaryRowData reuse) throws IOException { // ----------------------- Iterator ----------------------- - private KeyValueIterator entryIterator() { - return new EntryIterator(); + private KeyValueIterator entryIterator(boolean requiresCopy) { + return new EntryIterator(requiresCopy); } private final class EntryIterator extends AbstractPagedInputView @@ -338,10 +338,12 @@ private final class EntryIterator extends AbstractPagedInputView private int count = 0; private int currentSegmentIndex = 0; + private final boolean requiresCopy; - private EntryIterator() { + private EntryIterator(boolean requiresCopy) { super(segments.get(0), segmentSize, 0); destructiveIterator = this; + this.requiresCopy = requiresCopy; } @Override @@ -358,12 +360,12 @@ public boolean advanceNext() throws IOException { @Override public K getKey() { - return reusedKey; + return requiresCopy ? keySerializer.copy(reusedKey) : reusedKey; } @Override public BinaryRowData getValue() { - return reusedValue; + return requiresCopy ? reusedValue.copy() : reusedValue; } public boolean hasNext() { diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/collections/binary/AbstractBytesMultiMap.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/collections/binary/AbstractBytesMultiMap.java index cc64ef8df0c63..7ec8132c3baba 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/collections/binary/AbstractBytesMultiMap.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/collections/binary/AbstractBytesMultiMap.java @@ -183,8 +183,8 @@ public void append(LookupInfo> lookupInfo, BinaryRowData va } } - public KeyValueIterator> getEntryIterator() { - return ((RecordArea) recordArea).entryIterator(); + public KeyValueIterator> getEntryIterator(boolean requiresCopy) { + return ((RecordArea) recordArea).entryIterator(requiresCopy); } /** release the map's record and bucket area's memory segments. */ @@ -329,14 +329,17 @@ private void updateValuePointer(RandomAccessInputView view, int newPointer, int view.getCurrentSegment().putInt(currPosInSeg, newPointer); } - KeyValueIterator> entryIterator() { - return new EntryIterator(); + KeyValueIterator> entryIterator(boolean requiresCopy) { + return new EntryIterator(requiresCopy); } final class EntryIterator implements KeyValueIterator> { private int count; + private final boolean requiresCopy; - public EntryIterator() { + public EntryIterator(boolean requiresCopy) { + this.requiresCopy = requiresCopy; + reusedValueIterator.setRequiresCopy(requiresCopy); count = 0; if (numKeys > 0) { recordArea.setReadPosition(0); @@ -361,7 +364,7 @@ public boolean advanceNext() throws IOException { @Override public K getKey() { - return reusedKey; + return requiresCopy ? keySerializer.copy(reusedKey) : reusedKey; } @Override @@ -382,10 +385,12 @@ Iterator valueIterator(int valueOffset) { final class ValueIterator implements Iterator { private int offset; private boolean isFirstRead; + private boolean requiresCopy; public ValueIterator(int offset) { this.offset = offset; this.isFirstRead = true; + this.requiresCopy = false; } public void setOffset(int offset) { @@ -393,6 +398,10 @@ public void setOffset(int offset) { this.isFirstRead = true; } + public void setRequiresCopy(boolean requiresCopy) { + this.requiresCopy = requiresCopy; + } + @Override public boolean hasNext() { return isFirstRead || offset != -1; @@ -402,7 +411,7 @@ public boolean hasNext() { public RowData next() { if (isFirstRead) { isFirstRead = false; - return reusedRecord; + return requiresCopy ? reusedRecord.copy() : reusedRecord; } if (hasNext()) { valInView.setReadPosition(offset); @@ -415,7 +424,7 @@ public RowData next() { "Exception happened while iterating" + " value list of a key in BytesMultiMap"); } - return reusedRecord; + return requiresCopy ? reusedRecord.copy() : reusedRecord; } return null; } diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/aggregate/SumHashAggTestOperator.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/aggregate/SumHashAggTestOperator.java index 24e9a2df17479..88be5ef2f0beb 100644 --- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/aggregate/SumHashAggTestOperator.java +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/aggregate/SumHashAggTestOperator.java @@ -158,7 +158,8 @@ public void endInput() throws Exception { if (sorter == null) { // no spilling, output by iterating aggregate map. - KeyValueIterator iter = aggregateMap.getEntryIterator(); + KeyValueIterator iter = + aggregateMap.getEntryIterator(false); while (iter.advanceNext()) { // set result and output diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/util/collections/binary/BytesHashMapTestBase.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/util/collections/binary/BytesHashMapTestBase.java index 44a32334c9dc3..a599f29835be7 100644 --- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/util/collections/binary/BytesHashMapTestBase.java +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/util/collections/binary/BytesHashMapTestBase.java @@ -253,7 +253,7 @@ public void testResetAndOutput() throws Exception { expected.add(entry.copy()); } } - KeyValueIterator iter = table.getEntryIterator(); + KeyValueIterator iter = table.getEntryIterator(false); while (iter.advanceNext()) { actualKeys.add(keySerializer.copy(iter.getKey())); actualValues.add(iter.getValue().copy()); diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/util/collections/binary/BytesMultiMapTestBase.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/util/collections/binary/BytesMultiMapTestBase.java index 9a6620c92cc42..22dca2db18435 100644 --- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/util/collections/binary/BytesMultiMapTestBase.java +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/util/collections/binary/BytesMultiMapTestBase.java @@ -116,7 +116,7 @@ public void testBuildAndRetrieve() throws Exception { } } - KeyValueIterator> iter = table.getEntryIterator(); + KeyValueIterator> iter = table.getEntryIterator(false); while (iter.advanceNext()) { int i = 0; Iterator valueIter = iter.getValue(); From 9e7da4cae78b1e44abaf51805fa0f89f3c81fe2d Mon Sep 17 00:00:00 2001 From: "shuo.cs" Date: Tue, 27 Apr 2021 17:11:36 +0800 Subject: [PATCH 3/3] fix tests --- .../table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala | 2 +- .../planner/codegen/agg/batch/HashWindowCodeGenerator.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala index 3c01f019e69c5..dc76352180d63 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala @@ -532,7 +532,7 @@ object HashAggCodeGenHelper { val rowDataType = classOf[RowData].getCanonicalName s""" |$iteratorType<$rowDataType, $rowDataType> $iteratorTerm = - | $aggregateMapTerm.getEntryIterator(); + | $aggregateMapTerm.getEntryIterator(false); // reuse key/value during iterating |while ($iteratorTerm.advanceNext()) { | // set result and output | $reuseGroupKeyTerm = ($rowDataType)$iteratorTerm.getKey(); diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashWindowCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashWindowCodeGenerator.scala index 83848616b701e..864e71bed62b6 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashWindowCodeGenerator.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashWindowCodeGenerator.scala @@ -716,7 +716,7 @@ class HashWindowCodeGenerator( val iteratorTerm = CodeGenUtils.newName("iterator") s""" |$iteratorType<$rowDataType, $rowDataType> $iteratorTerm = - | $aggregateMapTerm.getEntryIterator(); + | $aggregateMapTerm.getEntryIterator(false); // reuse key/value during iterating |while ($iteratorTerm.advanceNext()) { | $reuseAggMapKeyTerm = ($rowDataType) $iteratorTerm.getKey(); | $reuseAggBufferTerm = ($rowDataType) $iteratorTerm.getValue();