From b48e081ab01f1c3deaf733fd1b720d6dd98ae4bb Mon Sep 17 00:00:00 2001 From: Ryan Caudy Date: Sat, 6 May 2023 13:19:35 -0400 Subject: [PATCH] Fix for SelectColumnLayer.prepareSourcesForParallelPopulation with flattenedResult (#3806) --- .../select/analyzers/SelectColumnLayer.java | 34 +++++++++++++------ 1 file changed, 24 insertions(+), 10 deletions(-) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectColumnLayer.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectColumnLayer.java index 5d60c251e64..cede68b8895 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectColumnLayer.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectColumnLayer.java @@ -221,9 +221,7 @@ private void prepareParallelUpdate(final JobScheduler jobScheduler, final TableU final int numTasks = splitUpdates.size(); final long[] destinationOffsets = new long[numTasks]; if (flattenedResult) { - Assert.assertion(upstream.removed().isEmpty(), "upstream.removed().isEmpty()"); - Assert.assertion(upstream.modified().isEmpty(), "upstream.modified().isEmpty()"); - Assert.assertion(upstream.shifted().empty(), "upstream.shifted().empty()"); + // Note that prepareSourcesForParallelPopulation asserts that upstream has no removes, modifies, or shifts long destinationOffset = 0; for (int ti = 0; ti < numTasks; ++ti) { final TableUpdate splitUpdate = splitUpdates.get(ti); @@ -552,13 +550,29 @@ private void doEnsureCapacity() { } } - void prepareSourcesForParallelPopulation(TableUpdate upstream) { - // we do not permit in-column parallelization with redirected results, so do not need to worry about how this - // interacts with the previous clearing of the redirection index that has occurred at the start of applyUpdate - try (final WritableRowSet changedRows = upstream.added().union(upstream.getModifiedPreShift())) { - changedRows.insert(upstream.removed()); - ((WritableSourceWithPrepareForParallelPopulation) (writableSource)) - .prepareForParallelPopulation(changedRows); + void prepareSourcesForParallelPopulation(@NotNull final TableUpdate upstream) { + // We do not permit in-column parallelization with redirected results, so do not need to worry about how this + // interacts with the previous clearing of the RowRedirection that has occurred at the start of applyUpdate. + Assert.eqFalse(isRedirected, "isRedirected"); + + if (flattenedResult) { + // For flattened results the input table must be static; upstream is a "fake" update, and is not permitted + // to have removes, modifies, or shifts. + Assert.assertion(upstream.removed().isEmpty(), "upstream.removed().isEmpty()"); + Assert.assertion(upstream.modified().isEmpty(), "upstream.modified().isEmpty()"); + Assert.assertion(upstream.shifted().empty(), "upstream.shifted().empty()"); + try (final RowSequence flattenedChanges = RowSequenceFactory.forRange(0, upstream.added().size() - 1)) { + ((WritableSourceWithPrepareForParallelPopulation) (writableSource)) + .prepareForParallelPopulation(flattenedChanges); + } + } else { + // Upstream is not permitted to have shifts for parallel update processing. + Assert.assertion(upstream.shifted().empty(), "upstream.shifted().empty()"); + try (final WritableRowSet changedRows = upstream.added().union(upstream.modified())) { + changedRows.insert(upstream.removed()); + ((WritableSourceWithPrepareForParallelPopulation) (writableSource)) + .prepareForParallelPopulation(changedRows); + } } }