Skip to content

Commit

Permalink
[hotfix] [streaming] Correct copying StreamPartitioner in rescale case
Browse files Browse the repository at this point in the history
  • Loading branch information
ldadima authored and pnowojski committed May 31, 2024
1 parent 90a71a1 commit e095657
Showing 1 changed file with 3 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,9 @@ public Predicate<StreamRecord<T>> apply(InputChannelInfo channelInfo) {
channelInfo.getGateIdx(), this::createPartitioner);
// use a copy of partitioner to ensure that the filter of ambiguous virtual channels
// have the same state across several subtasks
return new RecordFilter<>(partitioner.copy(), inputSerializer, subtaskIndex);
StreamPartitioner<T> partitionerCopy = partitioner.copy();
partitionerCopy.setup(numberOfChannels);
return new RecordFilter<>(partitionerCopy, inputSerializer, subtaskIndex);
}

private StreamPartitioner<T> createPartitioner(Integer index) {
Expand Down

0 comments on commit e095657

Please sign in to comment.