diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectColumnLayer.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectColumnLayer.java index 5d60c251e64..cede68b8895 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectColumnLayer.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectColumnLayer.java @@ -221,9 +221,7 @@ private void prepareParallelUpdate(final JobScheduler jobScheduler, final TableU final int numTasks = splitUpdates.size(); final long[] destinationOffsets = new long[numTasks]; if (flattenedResult) { - Assert.assertion(upstream.removed().isEmpty(), "upstream.removed().isEmpty()"); - Assert.assertion(upstream.modified().isEmpty(), "upstream.modified().isEmpty()"); - Assert.assertion(upstream.shifted().empty(), "upstream.shifted().empty()"); + // Note that prepareSourcesForParallelPopulation asserts that upstream has no removes, modifies, or shifts long destinationOffset = 0; for (int ti = 0; ti < numTasks; ++ti) { final TableUpdate splitUpdate = splitUpdates.get(ti); @@ -552,13 +550,29 @@ private void doEnsureCapacity() { } } - void prepareSourcesForParallelPopulation(TableUpdate upstream) { - // we do not permit in-column parallelization with redirected results, so do not need to worry about how this - // interacts with the previous clearing of the redirection index that has occurred at the start of applyUpdate - try (final WritableRowSet changedRows = upstream.added().union(upstream.getModifiedPreShift())) { - changedRows.insert(upstream.removed()); - ((WritableSourceWithPrepareForParallelPopulation) (writableSource)) - .prepareForParallelPopulation(changedRows); + void prepareSourcesForParallelPopulation(@NotNull final TableUpdate upstream) { + // We do not permit in-column parallelization with redirected results, so do not need to worry about how this + // interacts with the previous clearing of the RowRedirection that has occurred at the start of applyUpdate. + Assert.eqFalse(isRedirected, "isRedirected"); + + if (flattenedResult) { + // For flattened results the input table must be static; upstream is a "fake" update, and is not permitted + // to have removes, modifies, or shifts. + Assert.assertion(upstream.removed().isEmpty(), "upstream.removed().isEmpty()"); + Assert.assertion(upstream.modified().isEmpty(), "upstream.modified().isEmpty()"); + Assert.assertion(upstream.shifted().empty(), "upstream.shifted().empty()"); + try (final RowSequence flattenedChanges = RowSequenceFactory.forRange(0, upstream.added().size() - 1)) { + ((WritableSourceWithPrepareForParallelPopulation) (writableSource)) + .prepareForParallelPopulation(flattenedChanges); + } + } else { + // Upstream is not permitted to have shifts for parallel update processing. + Assert.assertion(upstream.shifted().empty(), "upstream.shifted().empty()"); + try (final WritableRowSet changedRows = upstream.added().union(upstream.modified())) { + changedRows.insert(upstream.removed()); + ((WritableSourceWithPrepareForParallelPopulation) (writableSource)) + .prepareForParallelPopulation(changedRows); + } } }