Skip to content

Commit

Permalink
Local (per-node) TopN reduction
Browse files Browse the repository at this point in the history
  • Loading branch information
astefan committed Mar 10, 2024
1 parent fb0fc30 commit 694f25b
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public LocalExecutionPlanner(
/**
* turn the given plan into a list of drivers to execute
*/
public LocalExecutionPlan plan(PhysicalPlan node) {
public LocalExecutionPlan plan(PhysicalPlan localPhysicalPlan) {
var context = new LocalExecutionPlannerContext(
new ArrayList<>(),
new Holder<>(DriverParallelism.SINGLE),
Expand All @@ -159,13 +159,61 @@ public LocalExecutionPlan plan(PhysicalPlan node) {
);

// workaround for https://github.com/elastic/elasticsearch/issues/99782
node = node.transformUp(
localPhysicalPlan = localPhysicalPlan.transformUp(
AggregateExec.class,
a -> a.getMode() == AggregateExec.Mode.FINAL ? new ProjectExec(a.source(), a, Expressions.asAttributes(a.aggregates())) : a
);
PhysicalOperation physicalOperation = plan(node, context);

final TimeValue statusInterval = configuration.pragmas().statusInterval();
PhysicalOperation physicalOperation;

if (exchangeSinkHandler != null) {// this is a data node. And this is hack to differentiate between coordinator and data
List<PhysicalPlan> topNExecs = localPhysicalPlan.collectFirstChildren(TopNExec.class::isInstance);
PhysicalPlan lowerTopNPlan = localPhysicalPlan;

if (topNExecs.isEmpty() == false) {
// get the first TopN
TopNExec topNExec = (TopNExec) topNExecs.get(0);
/**
* We could do better here and split the plan where the TopN is, BUT at that point together with all the needed
* fields in the TopN output/input, there is also the _doc that is usually needed in situation when the final projections
* include more than the TopN fields.
* We can improve this in a follow up PR.
*/
// split the given plan right after the exchange sink
if (localPhysicalPlan instanceof ExchangeSinkExec sinkExec) {
var child = sinkExec.child();
lowerTopNPlan = new ExchangeSinkExec(child.source(), child.output(), false, child);

ExchangeSourceExec sourceExec = new ExchangeSourceExec(child.source(), child.output(), false);
TopNExec reducingTopN = new TopNExec(
topNExec.source(),
sourceExec,
topNExec.order(),
topNExec.limit(),
topNExec.estimatedRowSize()
);
localPhysicalPlan = sinkExec.replaceChild(reducingTopN);

PhysicalOperation upperTopNPhysicalOperation = plan(localPhysicalPlan, context);
context.addDriverFactory(
new DriverFactory(
new DriverSupplier(
context.bigArrays,
context.blockFactory,
upperTopNPhysicalOperation,
statusInterval,
settings
),
context.driverParallelism().get()
)
);
}
}
physicalOperation = plan(lowerTopNPlan, context);
} else {
physicalOperation = plan(localPhysicalPlan, context);
}
context.addDriverFactory(
new DriverFactory(
new DriverSupplier(context.bigArrays, context.blockFactory, physicalOperation, statusInterval, settings),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -629,13 +629,26 @@ private void runBatch(int startBatchIndex) {
List<ShardId> shardIds = request.shardIds().subList(startBatchIndex, endBatchIndex);
acquireSearchContexts(clusterAlias, shardIds, configuration, request.aliasFilters(), ActionListener.wrap(searchContexts -> {
assert ThreadPool.assertCurrentThreadPool(ESQL_THREAD_POOL_NAME, ESQL_WORKER_THREAD_POOL_NAME);
var computeContext = new ComputeContext(sessionId, clusterAlias, searchContexts, configuration, null, exchangeSink);
runCompute(
parentTask,
computeContext,
request.plan(),
ActionListener.wrap(profiles -> onBatchCompleted(endBatchIndex, profiles), this::onFailure)
);
// create local ExchangeSource handler, for any per-node source-sink needs
final var localExchangeSource = new ExchangeSourceHandler(configuration.pragmas().exchangeBufferSize(), esqlExecutor);
try (Releasable ignored = localExchangeSource.addEmptySink();
// RefCountingListener refs = new RefCountingListener(listener.map(unused -> new ComputeResponse(collectedProfiles)))
) {
var computeContext = new ComputeContext(
sessionId,
clusterAlias,
searchContexts,
configuration,
localExchangeSource,
exchangeSink
);
runCompute(
parentTask,
computeContext,
request.plan(),
ActionListener.wrap(profiles -> onBatchCompleted(endBatchIndex, profiles), this::onFailure)
);
}
}, this::onFailure));
}

Expand Down

0 comments on commit 694f25b

Please sign in to comment.