diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/recovery/RescalingStreamTaskNetworkInput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/recovery/RescalingStreamTaskNetworkInput.java index 7b8002eee2a97..89089512cba89 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/recovery/RescalingStreamTaskNetworkInput.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/recovery/RescalingStreamTaskNetworkInput.java @@ -232,7 +232,9 @@ public Predicate> apply(InputChannelInfo channelInfo) { channelInfo.getGateIdx(), this::createPartitioner); // use a copy of partitioner to ensure that the filter of ambiguous virtual channels // have the same state across several subtasks - return new RecordFilter<>(partitioner.copy(), inputSerializer, subtaskIndex); + StreamPartitioner partitionerCopy = partitioner.copy(); + partitionerCopy.setup(numberOfChannels); + return new RecordFilter<>(partitionerCopy, inputSerializer, subtaskIndex); } private StreamPartitioner createPartitioner(Integer index) {