From 18c9f81f80f2dcac7ca4148103316480406dfffe Mon Sep 17 00:00:00 2001 From: Ryan Caudy Date: Fri, 22 Dec 2023 18:45:55 -0500 Subject: [PATCH] OperationInitializer usage bug fix and associated cleanups (#4976) * Get rid of inappropriate OperationInitializer usage from WhereListener * Clean up direct OperationInitializationThreadPool usage * Rename OperationInitializationPoolJobScheduler to OperationInitializerJobScheduler * Rename ExecutionContext.getInitializer to ExecutionContext.getOperationInitializer * Move PeriodicUpdateGraph.refreshThread construction to start() --- .../engine/context/ExecutionContext.java | 4 +- .../table/impl/AbstractFilterExecution.java | 3 +- .../table/impl/InitialFilterExecution.java | 11 ++-- .../engine/table/impl/QueryTable.java | 7 +-- .../engine/table/impl/WhereListener.java | 7 ++- .../partitioned/PartitionedTableImpl.java | 4 +- .../impl/rangejoin/RangeJoinOperation.java | 4 +- .../engine/table/impl/updateby/UpdateBy.java | 6 +- ... => OperationInitializerJobScheduler.java} | 19 +++--- .../impl/util/UpdateGraphJobScheduler.java | 7 ++- .../updategraph/impl/PeriodicUpdateGraph.java | 60 ++++++++++--------- .../table/impl/QueryTableWhereTest.java | 3 +- .../server/runner/DeephavenApiServer.java | 1 - 13 files changed, 76 insertions(+), 60 deletions(-) rename engine/table/src/main/java/io/deephaven/engine/table/impl/util/{OperationInitializationPoolJobScheduler.java => OperationInitializerJobScheduler.java} (75%) diff --git a/engine/context/src/main/java/io/deephaven/engine/context/ExecutionContext.java b/engine/context/src/main/java/io/deephaven/engine/context/ExecutionContext.java index a698a85b841..386eb7879c3 100644 --- a/engine/context/src/main/java/io/deephaven/engine/context/ExecutionContext.java +++ b/engine/context/src/main/java/io/deephaven/engine/context/ExecutionContext.java @@ -33,7 +33,7 @@ public static Builder newBuilder() { ExecutionContext existing = getContext(); return new Builder() .setUpdateGraph(existing.getUpdateGraph()) - .setOperationInitializer(existing.getInitializer()); + .setOperationInitializer(existing.getOperationInitializer()); } public static ExecutionContext makeExecutionContext(boolean isSystemic) { @@ -229,7 +229,7 @@ public UpdateGraph getUpdateGraph() { return updateGraph; } - public OperationInitializer getInitializer() { + public OperationInitializer getOperationInitializer() { return operationInitializer; } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractFilterExecution.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractFilterExecution.java index a136c87701c..6879471032a 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractFilterExecution.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractFilterExecution.java @@ -328,8 +328,7 @@ abstract void enqueueSubFilters( boolean doParallelizationBase(long numberOfRows) { return !QueryTable.DISABLE_PARALLEL_WHERE && numberOfRows != 0 - && (QueryTable.FORCE_PARALLEL_WHERE || numberOfRows / 2 > QueryTable.PARALLEL_WHERE_ROWS_PER_SEGMENT) - && ExecutionContext.getContext().getInitializer().canParallelize(); + && (QueryTable.FORCE_PARALLEL_WHERE || numberOfRows / 2 > QueryTable.PARALLEL_WHERE_ROWS_PER_SEGMENT); } /** diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/InitialFilterExecution.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/InitialFilterExecution.java index dc00456fdd2..721cd64943c 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/InitialFilterExecution.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/InitialFilterExecution.java @@ -16,7 +16,7 @@ /** * A FilterExecution that is used for initial filters. When we split off sub filters as child jobs, they are enqueued in - * the {@link OperationInitializationThreadPool}. + * the {@link io.deephaven.engine.updategraph.OperationInitializer OperationInitializer}. */ class InitialFilterExecution extends AbstractFilterExecution { private final QueryTable sourceTable; @@ -60,7 +60,8 @@ class InitialFilterExecution extends AbstractFilterExecution { if (parent == null) { pendingSatisfaction = new IntrusiveDoublyLinkedQueue<>( IntrusiveDoublyLinkedNode.Adapter.getInstance()); - segmentCount = QueryTable.PARALLEL_WHERE_SEGMENTS <= 0 ? OperationInitializationThreadPool.NUM_THREADS + segmentCount = QueryTable.PARALLEL_WHERE_SEGMENTS <= 0 + ? ExecutionContext.getContext().getOperationInitializer().parallelismFactor() : QueryTable.PARALLEL_WHERE_SEGMENTS; runningChildren = Collections.synchronizedMap(new IdentityHashMap<>()); cancelled = new AtomicBoolean(false); @@ -86,7 +87,7 @@ void enqueueSubFilters( private void enqueueJobs(Iterable subFilters) { for (NotificationQueue.Notification notification : subFilters) { - ExecutionContext.getContext().getInitializer().submit(() -> { + ExecutionContext.getContext().getOperationInitializer().submit(() -> { root.runningChildren.put(Thread.currentThread(), Thread.currentThread()); try { if (!root.cancelled.get()) { @@ -113,7 +114,9 @@ int getTargetSegments() { @Override boolean doParallelization(long numberOfRows) { - return permitParallelization && doParallelizationBase(numberOfRows); + return permitParallelization + && ExecutionContext.getContext().getOperationInitializer().canParallelize() + && doParallelizationBase(numberOfRows); } @Override diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java index afeff5a30c8..dd015c81d1c 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java @@ -50,7 +50,7 @@ import io.deephaven.engine.table.impl.updateby.UpdateBy; import io.deephaven.engine.table.impl.util.ImmediateJobScheduler; import io.deephaven.engine.table.impl.util.JobScheduler; -import io.deephaven.engine.table.impl.util.OperationInitializationPoolJobScheduler; +import io.deephaven.engine.table.impl.util.OperationInitializerJobScheduler; import io.deephaven.engine.table.impl.select.analyzers.SelectAndViewAnalyzerWrapper; import io.deephaven.engine.table.impl.util.FieldUtils; import io.deephaven.engine.table.impl.sources.ring.RingTableTools; @@ -1482,10 +1482,9 @@ this, mode, columns, rowSet, getModifiedColumnSetForUpdates(), publishTheseSourc final CompletableFuture waitForResult = new CompletableFuture<>(); final JobScheduler jobScheduler; if ((QueryTable.FORCE_PARALLEL_SELECT_AND_UPDATE || QueryTable.ENABLE_PARALLEL_SELECT_AND_UPDATE) - && ExecutionContext.getContext().getInitializer().canParallelize() + && ExecutionContext.getContext().getOperationInitializer().canParallelize() && analyzer.allowCrossColumnParallelization()) { - jobScheduler = new OperationInitializationPoolJobScheduler( - ExecutionContext.getContext().getInitializer()); + jobScheduler = new OperationInitializerJobScheduler(); } else { jobScheduler = ImmediateJobScheduler.INSTANCE; } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/WhereListener.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/WhereListener.java index 18fffba9d74..b63195fd0c7 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/WhereListener.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/WhereListener.java @@ -237,7 +237,9 @@ private ListenerFilterExecution( @Override boolean doParallelization(long numberOfRows) { - return permitParallelization && doParallelizationBase(numberOfRows); + return permitParallelization + && (QueryTable.FORCE_PARALLEL_WHERE || getUpdateGraph().parallelismFactor() > 1) + && doParallelizationBase(numberOfRows); } @Override @@ -262,7 +264,8 @@ ListenerFilterExecution makeChild( } @Override - void enqueueSubFilters(List subFilters, + void enqueueSubFilters( + List subFilters, CombinationNotification combinationNotification) { getUpdateGraph().addNotifications(subFilters); getUpdateGraph().addNotification(combinationNotification); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/PartitionedTableImpl.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/PartitionedTableImpl.java index d401673b5f0..f6b39838ad4 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/PartitionedTableImpl.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/PartitionedTableImpl.java @@ -330,10 +330,10 @@ private static ExecutionContext disableRecursiveParallelOperationInitialization( return null; } ExecutionContext current = ExecutionContext.getContext(); - if (!provided.getInitializer().canParallelize()) { + if (!provided.getOperationInitializer().canParallelize()) { return provided; } - if (current.getInitializer() != provided.getInitializer()) { + if (current.getOperationInitializer() != provided.getOperationInitializer()) { return provided; } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/rangejoin/RangeJoinOperation.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/rangejoin/RangeJoinOperation.java index aebaf7a975e..52e1ee4f0f8 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/rangejoin/RangeJoinOperation.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/rangejoin/RangeJoinOperation.java @@ -252,8 +252,8 @@ public Result initialize(final boolean usePrev, final long beforeClo QueryTable.checkInitiateBinaryOperation(leftTable, rightTable); final JobScheduler jobScheduler; - if (ExecutionContext.getContext().getInitializer().canParallelize()) { - jobScheduler = new OperationInitializationPoolJobScheduler(ExecutionContext.getContext().getInitializer()); + if (ExecutionContext.getContext().getOperationInitializer().canParallelize()) { + jobScheduler = new OperationInitializerJobScheduler(); } else { jobScheduler = ImmediateJobScheduler.INSTANCE; } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/UpdateBy.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/UpdateBy.java index f6f0ca98558..85079818e9f 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/UpdateBy.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/UpdateBy.java @@ -46,7 +46,6 @@ import java.util.concurrent.atomic.AtomicIntegerArray; import java.util.concurrent.atomic.AtomicReferenceArray; import java.util.function.Consumer; -import java.util.stream.Collectors; import java.util.stream.IntStream; /** @@ -300,9 +299,8 @@ class PhasedUpdateProcessor implements LogOutputAppendable { dirtyWindowOperators[winIdx].set(0, windows[winIdx].operators.length); } // Create the proper JobScheduler for the following parallel tasks - if (ExecutionContext.getContext().getInitializer().canParallelize()) { - jobScheduler = - new OperationInitializationPoolJobScheduler(ExecutionContext.getContext().getInitializer()); + if (ExecutionContext.getContext().getOperationInitializer().canParallelize()) { + jobScheduler = new OperationInitializerJobScheduler(); } else { jobScheduler = ImmediateJobScheduler.INSTANCE; } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/util/OperationInitializationPoolJobScheduler.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/util/OperationInitializerJobScheduler.java similarity index 75% rename from engine/table/src/main/java/io/deephaven/engine/table/impl/util/OperationInitializationPoolJobScheduler.java rename to engine/table/src/main/java/io/deephaven/engine/table/impl/util/OperationInitializerJobScheduler.java index f3a9a45fc30..b6ac2b22cf1 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/util/OperationInitializationPoolJobScheduler.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/util/OperationInitializerJobScheduler.java @@ -2,21 +2,26 @@ import io.deephaven.base.log.LogOutputAppendable; import io.deephaven.engine.context.ExecutionContext; -import io.deephaven.engine.table.impl.OperationInitializationThreadPool; import io.deephaven.engine.table.impl.perf.BasePerformanceEntry; import io.deephaven.engine.updategraph.OperationInitializer; import io.deephaven.io.log.impl.LogOutputStringImpl; import io.deephaven.util.SafeCloseable; import io.deephaven.util.process.ProcessEnvironment; +import org.jetbrains.annotations.NotNull; import java.util.function.Consumer; -public class OperationInitializationPoolJobScheduler implements JobScheduler { +public class OperationInitializerJobScheduler implements JobScheduler { + private final BasePerformanceEntry accumulatedBaseEntry = new BasePerformanceEntry(); - private final OperationInitializer threadPool; + private final OperationInitializer operationInitializer; + + public OperationInitializerJobScheduler(@NotNull final OperationInitializer operationInitializer) { + this.operationInitializer = operationInitializer; + } - public OperationInitializationPoolJobScheduler(OperationInitializer threadPool) { - this.threadPool = threadPool; + public OperationInitializerJobScheduler() { + this(ExecutionContext.getContext().getOperationInitializer()); } @Override @@ -25,7 +30,7 @@ public void submit( final Runnable runnable, final LogOutputAppendable description, final Consumer onError) { - threadPool.submit(() -> { + operationInitializer.submit(() -> { final BasePerformanceEntry basePerformanceEntry = new BasePerformanceEntry(); basePerformanceEntry.onBaseEntryStart(); try (final SafeCloseable ignored = executionContext == null ? null : executionContext.open()) { @@ -50,6 +55,6 @@ public BasePerformanceEntry getAccumulatedPerformance() { @Override public int threadCount() { - return OperationInitializationThreadPool.NUM_THREADS; + return operationInitializer.parallelismFactor(); } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/util/UpdateGraphJobScheduler.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/util/UpdateGraphJobScheduler.java index 345b08aa24e..8c528b38fa0 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/util/UpdateGraphJobScheduler.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/util/UpdateGraphJobScheduler.java @@ -9,6 +9,7 @@ import io.deephaven.io.log.impl.LogOutputStringImpl; import io.deephaven.util.SafeCloseable; import io.deephaven.util.process.ProcessEnvironment; +import org.jetbrains.annotations.NotNull; import java.util.function.Consumer; @@ -17,10 +18,14 @@ public class UpdateGraphJobScheduler implements JobScheduler { private final UpdateGraph updateGraph; - public UpdateGraphJobScheduler(final UpdateGraph updateGraph) { + public UpdateGraphJobScheduler(@NotNull final UpdateGraph updateGraph) { this.updateGraph = updateGraph; } + public UpdateGraphJobScheduler() { + this(ExecutionContext.getContext().getUpdateGraph()); + } + @Override public void submit( final ExecutionContext executionContext, diff --git a/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph.java b/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph.java index acc4ea026ee..e61f03bc624 100644 --- a/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph.java +++ b/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph.java @@ -66,12 +66,16 @@ public static Builder newBuilder(final String name) { */ private final AtomicBoolean refreshRequested = new AtomicBoolean(); - private final Thread refreshThread; + /** + * The core refresh driver thread, constructed and started during {@link #start()}. + */ + private Thread refreshThread; /** - * {@link ScheduledExecutorService} used for scheduling the {@link #watchDogTimeoutProcedure}. + * {@link ScheduledExecutorService} used for scheduling the {@link #watchDogTimeoutProcedure}, constructed during + * {@link #start()}. */ - private final ScheduledExecutorService watchdogScheduler; + private ScheduledExecutorService watchdogScheduler; /** * If this is set to a positive value, then we will call the {@link #watchDogTimeoutProcedure} if any single run @@ -130,24 +134,6 @@ public PeriodicUpdateGraph( } else { this.updateThreads = numUpdateThreads; } - - OperationInitializer captured = ExecutionContext.getContext().getInitializer(); - refreshThread = new Thread(threadInitializationFactory.createInitializer(() -> { - configureRefreshThread(captured); - while (running) { - Assert.eqFalse(this.allowUnitTestMode, "allowUnitTestMode"); - refreshTablesAndFlushNotifications(); - } - }), "PeriodicUpdateGraph." + name + ".refreshThread"); - refreshThread.setDaemon(true); - watchdogScheduler = Executors.newSingleThreadScheduledExecutor( - new NamingThreadFactory(PeriodicUpdateGraph.class, "watchdogScheduler", true) { - @Override - public Thread newThread(@NotNull final Runnable r) { - // Not a refresh thread, but should still be instrumented for debugging purposes. - return super.newThread(threadInitializationFactory.createInitializer(r)); - } - }); } @Override @@ -287,7 +273,7 @@ public void enableUnitTestMode() { if (!allowUnitTestMode) { throw new IllegalStateException("PeriodicUpdateGraph.allowUnitTestMode=false"); } - if (refreshThread.isAlive()) { + if (refreshThread != null) { throw new IllegalStateException("PeriodicUpdateGraph.refreshThread is executing!"); } resetLock(); @@ -341,11 +327,31 @@ public void start() { Assert.eqTrue(running, "running"); Assert.eqFalse(unitTestMode, "unitTestMode"); Assert.eqFalse(allowUnitTestMode, "allowUnitTestMode"); - synchronized (refreshThread) { + synchronized (this) { + if (watchdogScheduler == null) { + watchdogScheduler = Executors.newSingleThreadScheduledExecutor( + new NamingThreadFactory(PeriodicUpdateGraph.class, "watchdogScheduler", true) { + @Override + public Thread newThread(@NotNull final Runnable r) { + // Not a refresh thread, but should still be instrumented for debugging purposes. + return super.newThread(threadInitializationFactory.createInitializer(r)); + } + }); + } if (notificationProcessor instanceof PoisonedNotificationProcessor) { notificationProcessor = makeNotificationProcessor(); } - if (!refreshThread.isAlive()) { + if (refreshThread == null) { + final OperationInitializer operationInitializer = + ExecutionContext.getContext().getOperationInitializer(); + refreshThread = new Thread(threadInitializationFactory.createInitializer(() -> { + configureRefreshThread(operationInitializer); + while (running) { + Assert.eqFalse(this.allowUnitTestMode, "allowUnitTestMode"); + refreshTablesAndFlushNotifications(); + } + }), "PeriodicUpdateGraph." + getName() + ".refreshThread"); + refreshThread.setDaemon(true); log.info().append("PeriodicUpdateGraph starting with ").append(updateThreads) .append(" notification processing threads").endl(); updatePerformanceTracker.start(); @@ -466,7 +472,7 @@ public void resetForUnitTests(boolean after, notificationProcessor = makeNotificationProcessor(); } - if (refreshThread.isAlive()) { + if (refreshThread != null) { errors.add("UpdateGraph refreshThread isAlive"); } @@ -1097,7 +1103,7 @@ private NotificationProcessorThreadFactory(@NotNull final ThreadGroup threadGrou @Override public Thread newThread(@NotNull final Runnable r) { - OperationInitializer captured = ExecutionContext.getContext().getInitializer(); + OperationInitializer captured = ExecutionContext.getContext().getOperationInitializer(); return super.newThread(threadInitializationFactory.createInitializer(() -> { configureRefreshThread(captured); r.run(); @@ -1118,7 +1124,7 @@ private UnitTestThreadFactory() { @Override public Thread newThread(@NotNull final Runnable r) { - OperationInitializer captured = ExecutionContext.getContext().getInitializer(); + OperationInitializer captured = ExecutionContext.getContext().getOperationInitializer(); return super.newThread(() -> { configureUnitTestRefreshThread(captured); r.run(); diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableWhereTest.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableWhereTest.java index 4d511bb2a80..7d72d391ec7 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableWhereTest.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableWhereTest.java @@ -47,7 +47,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.IntSupplier; import java.util.function.IntUnaryOperator; import java.util.stream.IntStream; @@ -833,7 +832,7 @@ public void testInterFilterInterruption() { // we want to make sure we can push something through the thread pool and are not hogging it final CountDownLatch latch = new CountDownLatch(1); - ExecutionContext.getContext().getInitializer().submit(latch::countDown); + ExecutionContext.getContext().getOperationInitializer().submit(latch::countDown); waitForLatch(latch); assertEquals(0, fastCounter.invokes.get()); diff --git a/server/src/main/java/io/deephaven/server/runner/DeephavenApiServer.java b/server/src/main/java/io/deephaven/server/runner/DeephavenApiServer.java index 8ccc367a6b7..54eea4795e8 100644 --- a/server/src/main/java/io/deephaven/server/runner/DeephavenApiServer.java +++ b/server/src/main/java/io/deephaven/server/runner/DeephavenApiServer.java @@ -7,7 +7,6 @@ import io.deephaven.configuration.Configuration; import io.deephaven.engine.context.ExecutionContext; import io.deephaven.engine.liveness.LivenessScopeStack; -import io.deephaven.engine.table.impl.OperationInitializationThreadPool; import io.deephaven.engine.table.impl.perf.QueryPerformanceRecorderState; import io.deephaven.engine.table.impl.util.AsyncErrorLogger; import io.deephaven.engine.table.impl.util.EngineMetrics;