Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Add ForkJoinPool.commonPool()-based OperationInitializer for nested parallel DataIndex building #5802

Merged
merged 1 commit into from
Jul 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.engine.table.impl;

import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.updategraph.OperationInitializer;
import org.jetbrains.annotations.NotNull;

import java.util.Objects;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.Future;
import java.util.function.Supplier;

/**
* Implementation of {@link OperationInitializer} that delegates to a {@link ForkJoinPool}.
*/
public class ForkJoinPoolOperationInitializer implements OperationInitializer {

@NotNull
public static OperationInitializer fromCommonPool() {
return COMMON;
}

private static final ForkJoinPoolOperationInitializer COMMON =
new ForkJoinPoolOperationInitializer(ForkJoinPool.commonPool()) {

private final ExecutionContext executionContext = ExecutionContext.newBuilder()
.setOperationInitializer(NON_PARALLELIZABLE)
.build();

@Override
public @NotNull Future<?> submit(@NotNull final Runnable task) {
return super.submit(() -> executionContext.apply(task));
}
};

private final ForkJoinPool pool;

private ForkJoinPoolOperationInitializer(@NotNull final ForkJoinPool pool) {
this.pool = Objects.requireNonNull(pool);
}

@Override
public boolean canParallelize() {
return parallelismFactor() > 1 && ForkJoinTask.getPool() != pool;
}

@Override
@NotNull
public Future<?> submit(@NotNull final Runnable taskRunnable) {
return pool.submit(taskRunnable);
}

@Override
public int parallelismFactor() {
return pool.getParallelism();
}

/**
* Ensure that {@code task} is parallelizable within the current {@link ExecutionContext}, by wrapping it with a new
* {@code ExecutionContext} that uses {@link #fromCommonPool()} if the current {@code ExecutionContext} does not
* {@link OperationInitializer#canParallelize() allow parallelization}.
*
* @param task The task to possible wrap
* @return The possibly-wrapped task
*/
public static Runnable ensureParallelizable(@NotNull final Runnable task) {
if (ExecutionContext.getContext().getOperationInitializer().canParallelize()) {
return task;
}
return () -> ExecutionContext.getContext()
.withOperationInitializer(ForkJoinPoolOperationInitializer.fromCommonPool())
.apply(task);
}

/**
* Ensure that {@code task} is parallelizable within the current {@link ExecutionContext}, by wrapping it with a new
* {@code ExecutionContext} that uses {@link #fromCommonPool()} if the current {@code ExecutionContext} does not
* {@link OperationInitializer#canParallelize() allow parallelization}.
*
* @param task The task to possible wrap
* @return The possibly-wrapped task
*/
public static <T> Supplier<T> ensureParallelizable(@NotNull final Supplier<T> task) {
if (ExecutionContext.getContext().getOperationInitializer().canParallelize()) {
return task;
}
return () -> ExecutionContext.getContext()
.withOperationInitializer(ForkJoinPoolOperationInitializer.fromCommonPool())
.apply(task);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import io.deephaven.engine.table.DataIndex;
import io.deephaven.engine.table.DataIndexTransformer;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.impl.ForkJoinPoolOperationInitializer;
import io.deephaven.engine.table.impl.perf.QueryPerformanceRecorder;
import io.deephaven.engine.table.impl.select.FunctionalColumn;
import io.deephaven.engine.table.impl.select.FunctionalColumnLong;
import io.deephaven.engine.table.impl.select.SelectColumn;
Expand Down Expand Up @@ -79,25 +81,28 @@ public Table table() {
if ((localIndexTable = indexTable) != null) {
return localIndexTable;
}
indexTable = localIndexTable = QueryPerformanceRecorder.withNugget("Transform Data Index",
ForkJoinPoolOperationInitializer.ensureParallelizable(this::buildTable));
// Don't hold onto the transformer after the index table is computed, we don't need to maintain
// reachability for its RowSets anymore.
transformer = null;
return localIndexTable;
}
}

try (final SafeCloseable ignored = parentIndex.isRefreshing() ? LivenessScopeStack.open() : null) {
localIndexTable = parentIndex.table();
localIndexTable = maybeIntersectAndInvert(localIndexTable);
localIndexTable = maybeSortByFirstKey(localIndexTable);
localIndexTable = localIndexTable.isRefreshing() && transformer.snapshotResult()
? localIndexTable.snapshot()
: localIndexTable;

if (localIndexTable.isRefreshing()) {
manage(localIndexTable);
}

indexTable = localIndexTable;
// Don't hold onto the transformer after the index table is computed, we don't need to maintain
// reachability for its RowSets anymore.
transformer = null;
return localIndexTable;
private Table buildTable() {
try (final SafeCloseable ignored = parentIndex.isRefreshing() ? LivenessScopeStack.open() : null) {
Table localIndexTable = parentIndex.table();
localIndexTable = maybeIntersectAndInvert(localIndexTable);
localIndexTable = maybeSortByFirstKey(localIndexTable);
localIndexTable = localIndexTable.isRefreshing() && transformer.snapshotResult()
? localIndexTable.snapshot()
: localIndexTable;

if (localIndexTable.isRefreshing()) {
manage(localIndexTable);
}
return localIndexTable;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.deephaven.engine.table.ColumnSource;
import io.deephaven.engine.table.PartitionedTableFactory;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.impl.ForkJoinPoolOperationInitializer;
import io.deephaven.engine.table.impl.by.AggregationProcessor;
import io.deephaven.engine.table.impl.by.AggregationRowLookup;
import io.deephaven.engine.table.impl.dataindex.AbstractDataIndex;
Expand All @@ -33,7 +34,7 @@
/**
* DataIndex that accumulates the individual per-{@link TableLocation} data indexes of a {@link Table} backed by a
* {@link RegionedColumnSourceManager}.
*
*
* @implNote This implementation is responsible for ensuring that the provided table accounts for the relative positions
* of individual table locations in the provided table of indices. Work to coalesce the index table is
* deferred until the first call to {@link #table()}. Refreshing inputs/indexes are not supported at this time
Expand Down Expand Up @@ -123,7 +124,7 @@ public Table table() {
try {
return QueryPerformanceRecorder.withNugget(
String.format("Merge Data Indexes [%s]", String.join(", ", keyColumnNames)),
this::buildTable);
ForkJoinPoolOperationInitializer.ensureParallelizable(this::buildTable));
} catch (Throwable t) {
isCorrupt = true;
throw t;
Expand Down
Loading