Skip to content

Commit

Permalink
Fix for SelectColumnLayer.prepareSourcesForParallelPopulation with fl…
Browse files Browse the repository at this point in the history
…attenedResult (#3806)
  • Loading branch information
rcaudy authored May 6, 2023
1 parent 958a4d9 commit b48e081
Showing 1 changed file with 24 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -221,9 +221,7 @@ private void prepareParallelUpdate(final JobScheduler jobScheduler, final TableU
final int numTasks = splitUpdates.size();
final long[] destinationOffsets = new long[numTasks];
if (flattenedResult) {
Assert.assertion(upstream.removed().isEmpty(), "upstream.removed().isEmpty()");
Assert.assertion(upstream.modified().isEmpty(), "upstream.modified().isEmpty()");
Assert.assertion(upstream.shifted().empty(), "upstream.shifted().empty()");
// Note that prepareSourcesForParallelPopulation asserts that upstream has no removes, modifies, or shifts
long destinationOffset = 0;
for (int ti = 0; ti < numTasks; ++ti) {
final TableUpdate splitUpdate = splitUpdates.get(ti);
Expand Down Expand Up @@ -552,13 +550,29 @@ private void doEnsureCapacity() {
}
}

void prepareSourcesForParallelPopulation(TableUpdate upstream) {
// we do not permit in-column parallelization with redirected results, so do not need to worry about how this
// interacts with the previous clearing of the redirection index that has occurred at the start of applyUpdate
try (final WritableRowSet changedRows = upstream.added().union(upstream.getModifiedPreShift())) {
changedRows.insert(upstream.removed());
((WritableSourceWithPrepareForParallelPopulation) (writableSource))
.prepareForParallelPopulation(changedRows);
void prepareSourcesForParallelPopulation(@NotNull final TableUpdate upstream) {
// We do not permit in-column parallelization with redirected results, so do not need to worry about how this
// interacts with the previous clearing of the RowRedirection that has occurred at the start of applyUpdate.
Assert.eqFalse(isRedirected, "isRedirected");

if (flattenedResult) {
// For flattened results the input table must be static; upstream is a "fake" update, and is not permitted
// to have removes, modifies, or shifts.
Assert.assertion(upstream.removed().isEmpty(), "upstream.removed().isEmpty()");
Assert.assertion(upstream.modified().isEmpty(), "upstream.modified().isEmpty()");
Assert.assertion(upstream.shifted().empty(), "upstream.shifted().empty()");
try (final RowSequence flattenedChanges = RowSequenceFactory.forRange(0, upstream.added().size() - 1)) {
((WritableSourceWithPrepareForParallelPopulation) (writableSource))
.prepareForParallelPopulation(flattenedChanges);
}
} else {
// Upstream is not permitted to have shifts for parallel update processing.
Assert.assertion(upstream.shifted().empty(), "upstream.shifted().empty()");
try (final WritableRowSet changedRows = upstream.added().union(upstream.modified())) {
changedRows.insert(upstream.removed());
((WritableSourceWithPrepareForParallelPopulation) (writableSource))
.prepareForParallelPopulation(changedRows);
}
}
}

Expand Down

0 comments on commit b48e081

Please sign in to comment.