diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java index d7d2e99426a97..76c44f8e56f0a 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java @@ -148,7 +148,7 @@ public LocalExecutionPlanner( /** * turn the given plan into a list of drivers to execute */ - public LocalExecutionPlan plan(PhysicalPlan node) { + public LocalExecutionPlan plan(PhysicalPlan localPhysicalPlan) { var context = new LocalExecutionPlannerContext( new ArrayList<>(), new Holder<>(DriverParallelism.SINGLE), @@ -159,13 +159,61 @@ public LocalExecutionPlan plan(PhysicalPlan node) { ); // workaround for https://github.com/elastic/elasticsearch/issues/99782 - node = node.transformUp( + localPhysicalPlan = localPhysicalPlan.transformUp( AggregateExec.class, a -> a.getMode() == AggregateExec.Mode.FINAL ? new ProjectExec(a.source(), a, Expressions.asAttributes(a.aggregates())) : a ); - PhysicalOperation physicalOperation = plan(node, context); final TimeValue statusInterval = configuration.pragmas().statusInterval(); + PhysicalOperation physicalOperation; + + if (exchangeSinkHandler != null) {// this is a data node. And this is hack to differentiate between coordinator and data + List topNExecs = localPhysicalPlan.collectFirstChildren(TopNExec.class::isInstance); + PhysicalPlan lowerTopNPlan = localPhysicalPlan; + + if (topNExecs.isEmpty() == false) { + // get the first TopN + TopNExec topNExec = (TopNExec) topNExecs.get(0); + /** + * We could do better here and split the plan where the TopN is, BUT at that point together with all the needed + * fields in the TopN output/input, there is also the _doc that is usually needed in situation when the final projections + * include more than the TopN fields. + * We can improve this in a follow up PR. + */ + // split the given plan right after the exchange sink + if (localPhysicalPlan instanceof ExchangeSinkExec sinkExec) { + var child = sinkExec.child(); + lowerTopNPlan = new ExchangeSinkExec(child.source(), child.output(), false, child); + + ExchangeSourceExec sourceExec = new ExchangeSourceExec(child.source(), child.output(), false); + TopNExec reducingTopN = new TopNExec( + topNExec.source(), + sourceExec, + topNExec.order(), + topNExec.limit(), + topNExec.estimatedRowSize() + ); + localPhysicalPlan = sinkExec.replaceChild(reducingTopN); + + PhysicalOperation upperTopNPhysicalOperation = plan(localPhysicalPlan, context); + context.addDriverFactory( + new DriverFactory( + new DriverSupplier( + context.bigArrays, + context.blockFactory, + upperTopNPhysicalOperation, + statusInterval, + settings + ), + context.driverParallelism().get() + ) + ); + } + } + physicalOperation = plan(lowerTopNPlan, context); + } else { + physicalOperation = plan(localPhysicalPlan, context); + } context.addDriverFactory( new DriverFactory( new DriverSupplier(context.bigArrays, context.blockFactory, physicalOperation, statusInterval, settings), diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index 64f393ccdf2b0..5ff1f6590b56c 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -629,13 +629,26 @@ private void runBatch(int startBatchIndex) { List shardIds = request.shardIds().subList(startBatchIndex, endBatchIndex); acquireSearchContexts(clusterAlias, shardIds, configuration, request.aliasFilters(), ActionListener.wrap(searchContexts -> { assert ThreadPool.assertCurrentThreadPool(ESQL_THREAD_POOL_NAME, ESQL_WORKER_THREAD_POOL_NAME); - var computeContext = new ComputeContext(sessionId, clusterAlias, searchContexts, configuration, null, exchangeSink); - runCompute( - parentTask, - computeContext, - request.plan(), - ActionListener.wrap(profiles -> onBatchCompleted(endBatchIndex, profiles), this::onFailure) - ); + // create local ExchangeSource handler, for any per-node source-sink needs + final var localExchangeSource = new ExchangeSourceHandler(configuration.pragmas().exchangeBufferSize(), esqlExecutor); + try (Releasable ignored = localExchangeSource.addEmptySink(); + // RefCountingListener refs = new RefCountingListener(listener.map(unused -> new ComputeResponse(collectedProfiles))) + ) { + var computeContext = new ComputeContext( + sessionId, + clusterAlias, + searchContexts, + configuration, + localExchangeSource, + exchangeSink + ); + runCompute( + parentTask, + computeContext, + request.plan(), + ActionListener.wrap(profiles -> onBatchCompleted(endBatchIndex, profiles), this::onFailure) + ); + } }, this::onFailure)); }