Skip to content

Commit

Permalink
OperationInitializer usage bug fix and associated cleanups (#4976)
Browse files Browse the repository at this point in the history
* Get rid of inappropriate OperationInitializer usage from WhereListener
* Clean up direct OperationInitializationThreadPool usage
* Rename OperationInitializationPoolJobScheduler to OperationInitializerJobScheduler
* Rename ExecutionContext.getInitializer to ExecutionContext.getOperationInitializer
* Move PeriodicUpdateGraph.refreshThread construction to start()
  • Loading branch information
rcaudy authored Dec 22, 2023
1 parent c245cc7 commit 18c9f81
Show file tree
Hide file tree
Showing 13 changed files with 76 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public static Builder newBuilder() {
ExecutionContext existing = getContext();
return new Builder()
.setUpdateGraph(existing.getUpdateGraph())
.setOperationInitializer(existing.getInitializer());
.setOperationInitializer(existing.getOperationInitializer());
}

public static ExecutionContext makeExecutionContext(boolean isSystemic) {
Expand Down Expand Up @@ -229,7 +229,7 @@ public UpdateGraph getUpdateGraph() {
return updateGraph;
}

public OperationInitializer getInitializer() {
public OperationInitializer getOperationInitializer() {
return operationInitializer;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,8 +328,7 @@ abstract void enqueueSubFilters(

boolean doParallelizationBase(long numberOfRows) {
return !QueryTable.DISABLE_PARALLEL_WHERE && numberOfRows != 0
&& (QueryTable.FORCE_PARALLEL_WHERE || numberOfRows / 2 > QueryTable.PARALLEL_WHERE_ROWS_PER_SEGMENT)
&& ExecutionContext.getContext().getInitializer().canParallelize();
&& (QueryTable.FORCE_PARALLEL_WHERE || numberOfRows / 2 > QueryTable.PARALLEL_WHERE_ROWS_PER_SEGMENT);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

/**
* A FilterExecution that is used for initial filters. When we split off sub filters as child jobs, they are enqueued in
* the {@link OperationInitializationThreadPool}.
* the {@link io.deephaven.engine.updategraph.OperationInitializer OperationInitializer}.
*/
class InitialFilterExecution extends AbstractFilterExecution {
private final QueryTable sourceTable;
Expand Down Expand Up @@ -60,7 +60,8 @@ class InitialFilterExecution extends AbstractFilterExecution {
if (parent == null) {
pendingSatisfaction = new IntrusiveDoublyLinkedQueue<>(
IntrusiveDoublyLinkedNode.Adapter.<NotificationQueue.Notification>getInstance());
segmentCount = QueryTable.PARALLEL_WHERE_SEGMENTS <= 0 ? OperationInitializationThreadPool.NUM_THREADS
segmentCount = QueryTable.PARALLEL_WHERE_SEGMENTS <= 0
? ExecutionContext.getContext().getOperationInitializer().parallelismFactor()
: QueryTable.PARALLEL_WHERE_SEGMENTS;
runningChildren = Collections.synchronizedMap(new IdentityHashMap<>());
cancelled = new AtomicBoolean(false);
Expand All @@ -86,7 +87,7 @@ void enqueueSubFilters(

private void enqueueJobs(Iterable<? extends NotificationQueue.Notification> subFilters) {
for (NotificationQueue.Notification notification : subFilters) {
ExecutionContext.getContext().getInitializer().submit(() -> {
ExecutionContext.getContext().getOperationInitializer().submit(() -> {
root.runningChildren.put(Thread.currentThread(), Thread.currentThread());
try {
if (!root.cancelled.get()) {
Expand All @@ -113,7 +114,9 @@ int getTargetSegments() {

@Override
boolean doParallelization(long numberOfRows) {
return permitParallelization && doParallelizationBase(numberOfRows);
return permitParallelization
&& ExecutionContext.getContext().getOperationInitializer().canParallelize()
&& doParallelizationBase(numberOfRows);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
import io.deephaven.engine.table.impl.updateby.UpdateBy;
import io.deephaven.engine.table.impl.util.ImmediateJobScheduler;
import io.deephaven.engine.table.impl.util.JobScheduler;
import io.deephaven.engine.table.impl.util.OperationInitializationPoolJobScheduler;
import io.deephaven.engine.table.impl.util.OperationInitializerJobScheduler;
import io.deephaven.engine.table.impl.select.analyzers.SelectAndViewAnalyzerWrapper;
import io.deephaven.engine.table.impl.util.FieldUtils;
import io.deephaven.engine.table.impl.sources.ring.RingTableTools;
Expand Down Expand Up @@ -1482,10 +1482,9 @@ this, mode, columns, rowSet, getModifiedColumnSetForUpdates(), publishTheseSourc
final CompletableFuture<Void> waitForResult = new CompletableFuture<>();
final JobScheduler jobScheduler;
if ((QueryTable.FORCE_PARALLEL_SELECT_AND_UPDATE || QueryTable.ENABLE_PARALLEL_SELECT_AND_UPDATE)
&& ExecutionContext.getContext().getInitializer().canParallelize()
&& ExecutionContext.getContext().getOperationInitializer().canParallelize()
&& analyzer.allowCrossColumnParallelization()) {
jobScheduler = new OperationInitializationPoolJobScheduler(
ExecutionContext.getContext().getInitializer());
jobScheduler = new OperationInitializerJobScheduler();
} else {
jobScheduler = ImmediateJobScheduler.INSTANCE;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,9 @@ private ListenerFilterExecution(

@Override
boolean doParallelization(long numberOfRows) {
return permitParallelization && doParallelizationBase(numberOfRows);
return permitParallelization
&& (QueryTable.FORCE_PARALLEL_WHERE || getUpdateGraph().parallelismFactor() > 1)
&& doParallelizationBase(numberOfRows);
}

@Override
Expand All @@ -262,7 +264,8 @@ ListenerFilterExecution makeChild(
}

@Override
void enqueueSubFilters(List<AbstractFilterExecution> subFilters,
void enqueueSubFilters(
List<AbstractFilterExecution> subFilters,
CombinationNotification combinationNotification) {
getUpdateGraph().addNotifications(subFilters);
getUpdateGraph().addNotification(combinationNotification);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,10 +330,10 @@ private static ExecutionContext disableRecursiveParallelOperationInitialization(
return null;
}
ExecutionContext current = ExecutionContext.getContext();
if (!provided.getInitializer().canParallelize()) {
if (!provided.getOperationInitializer().canParallelize()) {
return provided;
}
if (current.getInitializer() != provided.getInitializer()) {
if (current.getOperationInitializer() != provided.getOperationInitializer()) {
return provided;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,8 +252,8 @@ public Result<QueryTable> initialize(final boolean usePrev, final long beforeClo
QueryTable.checkInitiateBinaryOperation(leftTable, rightTable);

final JobScheduler jobScheduler;
if (ExecutionContext.getContext().getInitializer().canParallelize()) {
jobScheduler = new OperationInitializationPoolJobScheduler(ExecutionContext.getContext().getInitializer());
if (ExecutionContext.getContext().getOperationInitializer().canParallelize()) {
jobScheduler = new OperationInitializerJobScheduler();
} else {
jobScheduler = ImmediateJobScheduler.INSTANCE;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import java.util.concurrent.atomic.AtomicIntegerArray;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

/**
Expand Down Expand Up @@ -300,9 +299,8 @@ class PhasedUpdateProcessor implements LogOutputAppendable {
dirtyWindowOperators[winIdx].set(0, windows[winIdx].operators.length);
}
// Create the proper JobScheduler for the following parallel tasks
if (ExecutionContext.getContext().getInitializer().canParallelize()) {
jobScheduler =
new OperationInitializationPoolJobScheduler(ExecutionContext.getContext().getInitializer());
if (ExecutionContext.getContext().getOperationInitializer().canParallelize()) {
jobScheduler = new OperationInitializerJobScheduler();
} else {
jobScheduler = ImmediateJobScheduler.INSTANCE;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,26 @@

import io.deephaven.base.log.LogOutputAppendable;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.table.impl.OperationInitializationThreadPool;
import io.deephaven.engine.table.impl.perf.BasePerformanceEntry;
import io.deephaven.engine.updategraph.OperationInitializer;
import io.deephaven.io.log.impl.LogOutputStringImpl;
import io.deephaven.util.SafeCloseable;
import io.deephaven.util.process.ProcessEnvironment;
import org.jetbrains.annotations.NotNull;

import java.util.function.Consumer;

public class OperationInitializationPoolJobScheduler implements JobScheduler {
public class OperationInitializerJobScheduler implements JobScheduler {

private final BasePerformanceEntry accumulatedBaseEntry = new BasePerformanceEntry();
private final OperationInitializer threadPool;
private final OperationInitializer operationInitializer;

public OperationInitializerJobScheduler(@NotNull final OperationInitializer operationInitializer) {
this.operationInitializer = operationInitializer;
}

public OperationInitializationPoolJobScheduler(OperationInitializer threadPool) {
this.threadPool = threadPool;
public OperationInitializerJobScheduler() {
this(ExecutionContext.getContext().getOperationInitializer());
}

@Override
Expand All @@ -25,7 +30,7 @@ public void submit(
final Runnable runnable,
final LogOutputAppendable description,
final Consumer<Exception> onError) {
threadPool.submit(() -> {
operationInitializer.submit(() -> {
final BasePerformanceEntry basePerformanceEntry = new BasePerformanceEntry();
basePerformanceEntry.onBaseEntryStart();
try (final SafeCloseable ignored = executionContext == null ? null : executionContext.open()) {
Expand All @@ -50,6 +55,6 @@ public BasePerformanceEntry getAccumulatedPerformance() {

@Override
public int threadCount() {
return OperationInitializationThreadPool.NUM_THREADS;
return operationInitializer.parallelismFactor();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.deephaven.io.log.impl.LogOutputStringImpl;
import io.deephaven.util.SafeCloseable;
import io.deephaven.util.process.ProcessEnvironment;
import org.jetbrains.annotations.NotNull;

import java.util.function.Consumer;

Expand All @@ -17,10 +18,14 @@ public class UpdateGraphJobScheduler implements JobScheduler {

private final UpdateGraph updateGraph;

public UpdateGraphJobScheduler(final UpdateGraph updateGraph) {
public UpdateGraphJobScheduler(@NotNull final UpdateGraph updateGraph) {
this.updateGraph = updateGraph;
}

public UpdateGraphJobScheduler() {
this(ExecutionContext.getContext().getUpdateGraph());
}

@Override
public void submit(
final ExecutionContext executionContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,16 @@ public static Builder newBuilder(final String name) {
*/
private final AtomicBoolean refreshRequested = new AtomicBoolean();

private final Thread refreshThread;
/**
* The core refresh driver thread, constructed and started during {@link #start()}.
*/
private Thread refreshThread;

/**
* {@link ScheduledExecutorService} used for scheduling the {@link #watchDogTimeoutProcedure}.
* {@link ScheduledExecutorService} used for scheduling the {@link #watchDogTimeoutProcedure}, constructed during
* {@link #start()}.
*/
private final ScheduledExecutorService watchdogScheduler;
private ScheduledExecutorService watchdogScheduler;

/**
* If this is set to a positive value, then we will call the {@link #watchDogTimeoutProcedure} if any single run
Expand Down Expand Up @@ -130,24 +134,6 @@ public PeriodicUpdateGraph(
} else {
this.updateThreads = numUpdateThreads;
}

OperationInitializer captured = ExecutionContext.getContext().getInitializer();
refreshThread = new Thread(threadInitializationFactory.createInitializer(() -> {
configureRefreshThread(captured);
while (running) {
Assert.eqFalse(this.allowUnitTestMode, "allowUnitTestMode");
refreshTablesAndFlushNotifications();
}
}), "PeriodicUpdateGraph." + name + ".refreshThread");
refreshThread.setDaemon(true);
watchdogScheduler = Executors.newSingleThreadScheduledExecutor(
new NamingThreadFactory(PeriodicUpdateGraph.class, "watchdogScheduler", true) {
@Override
public Thread newThread(@NotNull final Runnable r) {
// Not a refresh thread, but should still be instrumented for debugging purposes.
return super.newThread(threadInitializationFactory.createInitializer(r));
}
});
}

@Override
Expand Down Expand Up @@ -287,7 +273,7 @@ public void enableUnitTestMode() {
if (!allowUnitTestMode) {
throw new IllegalStateException("PeriodicUpdateGraph.allowUnitTestMode=false");
}
if (refreshThread.isAlive()) {
if (refreshThread != null) {
throw new IllegalStateException("PeriodicUpdateGraph.refreshThread is executing!");
}
resetLock();
Expand Down Expand Up @@ -341,11 +327,31 @@ public void start() {
Assert.eqTrue(running, "running");
Assert.eqFalse(unitTestMode, "unitTestMode");
Assert.eqFalse(allowUnitTestMode, "allowUnitTestMode");
synchronized (refreshThread) {
synchronized (this) {
if (watchdogScheduler == null) {
watchdogScheduler = Executors.newSingleThreadScheduledExecutor(
new NamingThreadFactory(PeriodicUpdateGraph.class, "watchdogScheduler", true) {
@Override
public Thread newThread(@NotNull final Runnable r) {
// Not a refresh thread, but should still be instrumented for debugging purposes.
return super.newThread(threadInitializationFactory.createInitializer(r));
}
});
}
if (notificationProcessor instanceof PoisonedNotificationProcessor) {
notificationProcessor = makeNotificationProcessor();
}
if (!refreshThread.isAlive()) {
if (refreshThread == null) {
final OperationInitializer operationInitializer =
ExecutionContext.getContext().getOperationInitializer();
refreshThread = new Thread(threadInitializationFactory.createInitializer(() -> {
configureRefreshThread(operationInitializer);
while (running) {
Assert.eqFalse(this.allowUnitTestMode, "allowUnitTestMode");
refreshTablesAndFlushNotifications();
}
}), "PeriodicUpdateGraph." + getName() + ".refreshThread");
refreshThread.setDaemon(true);
log.info().append("PeriodicUpdateGraph starting with ").append(updateThreads)
.append(" notification processing threads").endl();
updatePerformanceTracker.start();
Expand Down Expand Up @@ -466,7 +472,7 @@ public void resetForUnitTests(boolean after,
notificationProcessor = makeNotificationProcessor();
}

if (refreshThread.isAlive()) {
if (refreshThread != null) {
errors.add("UpdateGraph refreshThread isAlive");
}

Expand Down Expand Up @@ -1097,7 +1103,7 @@ private NotificationProcessorThreadFactory(@NotNull final ThreadGroup threadGrou

@Override
public Thread newThread(@NotNull final Runnable r) {
OperationInitializer captured = ExecutionContext.getContext().getInitializer();
OperationInitializer captured = ExecutionContext.getContext().getOperationInitializer();
return super.newThread(threadInitializationFactory.createInitializer(() -> {
configureRefreshThread(captured);
r.run();
Expand All @@ -1118,7 +1124,7 @@ private UnitTestThreadFactory() {

@Override
public Thread newThread(@NotNull final Runnable r) {
OperationInitializer captured = ExecutionContext.getContext().getInitializer();
OperationInitializer captured = ExecutionContext.getContext().getOperationInitializer();
return super.newThread(() -> {
configureUnitTestRefreshThread(captured);
r.run();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.IntSupplier;
import java.util.function.IntUnaryOperator;
import java.util.stream.IntStream;

Expand Down Expand Up @@ -833,7 +832,7 @@ public void testInterFilterInterruption() {

// we want to make sure we can push something through the thread pool and are not hogging it
final CountDownLatch latch = new CountDownLatch(1);
ExecutionContext.getContext().getInitializer().submit(latch::countDown);
ExecutionContext.getContext().getOperationInitializer().submit(latch::countDown);
waitForLatch(latch);

assertEquals(0, fastCounter.invokes.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import io.deephaven.configuration.Configuration;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.liveness.LivenessScopeStack;
import io.deephaven.engine.table.impl.OperationInitializationThreadPool;
import io.deephaven.engine.table.impl.perf.QueryPerformanceRecorderState;
import io.deephaven.engine.table.impl.util.AsyncErrorLogger;
import io.deephaven.engine.table.impl.util.EngineMetrics;
Expand Down

0 comments on commit 18c9f81

Please sign in to comment.