From b5cc2cee55faf06a150da99a1344fcd8582ca066 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Osipiuk?= Date: Fri, 1 Sep 2023 16:33:57 +0200 Subject: [PATCH] Allow selecting source handles deliver mode Extend Exchange SPI so engine can tell exchange that it should deliver source handles as soon as it has any available, even if from troughput perspective it would make more sense to wait a bit and deliver bigger batch. It is important to be able to process stages which may short-circuit query execution (like top-level LIMIT) swiftly. --- ...ventDrivenFaultTolerantQueryScheduler.java | 11 ++++++--- .../java/io/trino/spi/exchange/Exchange.java | 24 ++++++++++++++++++- 2 files changed, 31 insertions(+), 4 deletions(-) diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenFaultTolerantQueryScheduler.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenFaultTolerantQueryScheduler.java index d979741b49c5..fa110578f76b 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenFaultTolerantQueryScheduler.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenFaultTolerantQueryScheduler.java @@ -173,6 +173,7 @@ import static io.trino.spi.ErrorType.USER_ERROR; import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR; import static io.trino.spi.StandardErrorCode.REMOTE_HOST_GONE; +import static io.trino.spi.exchange.Exchange.SourceHandlesDeliveryMode.EAGER; import static io.trino.sql.planner.RuntimeAdaptivePartitioningRewriter.consumesHashPartitionedInput; import static io.trino.sql.planner.RuntimeAdaptivePartitioningRewriter.getMaxPlanFragmentId; import static io.trino.sql.planner.RuntimeAdaptivePartitioningRewriter.getMaxPlanId; @@ -1143,13 +1144,13 @@ private void createStageExecution(SubPlan subPlan, boolean rootFragment, Map queryStateMachine.updateQueryInfo(Optional.ofNullable(stageRegistry.getStageInfo()))); - ImmutableMap.Builder sourceExchanges = ImmutableMap.builder(); + ImmutableMap.Builder sourceExchangesBuilder = ImmutableMap.builder(); Map sourceOutputEstimatesByFragmentId = new HashMap<>(); for (SubPlan source : subPlan.getChildren()) { PlanFragmentId sourceFragmentId = source.getFragment().getId(); StageId sourceStageId = getStageId(sourceFragmentId); StageExecution sourceStageExecution = getStageExecution(sourceStageId); - sourceExchanges.put(sourceFragmentId, sourceStageExecution.getExchange()); + sourceExchangesBuilder.put(sourceFragmentId, sourceStageExecution.getExchange()); OutputDataSizeEstimate outputDataSizeResult = sourceOutputSizeEstimates.get(sourceStageId); verify(outputDataSizeResult != null, "No output data size estimate in %s map for stage %s", sourceOutputSizeEstimates, sourceStageId); sourceOutputEstimatesByFragmentId.put(sourceFragmentId, outputDataSizeResult); @@ -1168,11 +1169,12 @@ private void createStageExecution(SubPlan subPlan, boolean rootFragment, Map sourceExchanges = sourceExchangesBuilder.buildOrThrow(); EventDrivenTaskSource taskSource = closer.register(taskSourceFactory.create( session, stage.getStageSpan(), fragment, - sourceExchanges.buildOrThrow(), + sourceExchanges, partitioningSchemeFactory.get(fragment.getPartitioning(), fragment.getPartitionCount()), stage::recordGetSplitTime, outputDataSizeEstimates.buildOrThrow())); @@ -1191,6 +1193,9 @@ private void createStageExecution(SubPlan subPlan, boolean rootFragment, Map sourceExchange.setSourceHandlesDeliveryMode(EAGER)); + } StageExecution execution = new StageExecution( queryStateMachine, taskDescriptorStorage, diff --git a/core/trino-spi/src/main/java/io/trino/spi/exchange/Exchange.java b/core/trino-spi/src/main/java/io/trino/spi/exchange/Exchange.java index ad266d75bc25..911aa8daf0c2 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/exchange/Exchange.java +++ b/core/trino-spi/src/main/java/io/trino/spi/exchange/Exchange.java @@ -20,10 +20,15 @@ import java.util.concurrent.CompletableFuture; @ThreadSafe -@Experimental(eta = "2023-09-01") +@Experimental(eta = "2024-03-01") public interface Exchange extends Closeable { + enum SourceHandlesDeliveryMode { + STANDARD, + EAGER + } + /** * Get id of this exchange */ @@ -94,6 +99,23 @@ public interface Exchange */ ExchangeSourceHandleSource getSourceHandles(); + /** + * Change {@link ExchangeSourceHandleSource} delivery mode. + *

+ * In {@link SourceHandlesDeliveryMode#STANDARD} mode the handles are delivered at + * pace optimized for throughput. + *

+ * In {@link SourceHandlesDeliveryMode#EAGER} the handles are delivered as soon as possible even if that would mean + * each handle corresponds to smaller amount of data, which may be not optimal from throughput. + *

+ * There are no strict constraints regarding when this method can be called. When called, the newly selected delivery mode + * will apply to all {@link ExchangeSourceHandleSource} instances already obtained via {@link #getSourceHandles()} method. + * As well as to those yet to be obtained. + *

+ * Support for this method is optional and best-effort. + */ + default void setSourceHandlesDeliveryMode(SourceHandlesDeliveryMode sourceHandlesDeliveryMode) {} + @Override void close(); }