diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenFaultTolerantQueryScheduler.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenFaultTolerantQueryScheduler.java index d979741b49c5..fa110578f76b 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenFaultTolerantQueryScheduler.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenFaultTolerantQueryScheduler.java @@ -173,6 +173,7 @@ import static io.trino.spi.ErrorType.USER_ERROR; import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR; import static io.trino.spi.StandardErrorCode.REMOTE_HOST_GONE; +import static io.trino.spi.exchange.Exchange.SourceHandlesDeliveryMode.EAGER; import static io.trino.sql.planner.RuntimeAdaptivePartitioningRewriter.consumesHashPartitionedInput; import static io.trino.sql.planner.RuntimeAdaptivePartitioningRewriter.getMaxPlanFragmentId; import static io.trino.sql.planner.RuntimeAdaptivePartitioningRewriter.getMaxPlanId; @@ -1143,13 +1144,13 @@ private void createStageExecution(SubPlan subPlan, boolean rootFragment, Map queryStateMachine.updateQueryInfo(Optional.ofNullable(stageRegistry.getStageInfo()))); - ImmutableMap.Builder sourceExchanges = ImmutableMap.builder(); + ImmutableMap.Builder sourceExchangesBuilder = ImmutableMap.builder(); Map sourceOutputEstimatesByFragmentId = new HashMap<>(); for (SubPlan source : subPlan.getChildren()) { PlanFragmentId sourceFragmentId = source.getFragment().getId(); StageId sourceStageId = getStageId(sourceFragmentId); StageExecution sourceStageExecution = getStageExecution(sourceStageId); - sourceExchanges.put(sourceFragmentId, sourceStageExecution.getExchange()); + sourceExchangesBuilder.put(sourceFragmentId, sourceStageExecution.getExchange()); OutputDataSizeEstimate outputDataSizeResult = sourceOutputSizeEstimates.get(sourceStageId); verify(outputDataSizeResult != null, "No output data size estimate in %s map for stage %s", sourceOutputSizeEstimates, sourceStageId); sourceOutputEstimatesByFragmentId.put(sourceFragmentId, outputDataSizeResult); @@ -1168,11 +1169,12 @@ private void createStageExecution(SubPlan subPlan, boolean rootFragment, Map sourceExchanges = sourceExchangesBuilder.buildOrThrow(); EventDrivenTaskSource taskSource = closer.register(taskSourceFactory.create( session, stage.getStageSpan(), fragment, - sourceExchanges.buildOrThrow(), + sourceExchanges, partitioningSchemeFactory.get(fragment.getPartitioning(), fragment.getPartitionCount()), stage::recordGetSplitTime, outputDataSizeEstimates.buildOrThrow())); @@ -1191,6 +1193,9 @@ private void createStageExecution(SubPlan subPlan, boolean rootFragment, Map sourceExchange.setSourceHandlesDeliveryMode(EAGER)); + } StageExecution execution = new StageExecution( queryStateMachine, taskDescriptorStorage, diff --git a/core/trino-spi/src/main/java/io/trino/spi/exchange/Exchange.java b/core/trino-spi/src/main/java/io/trino/spi/exchange/Exchange.java index ad266d75bc25..911aa8daf0c2 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/exchange/Exchange.java +++ b/core/trino-spi/src/main/java/io/trino/spi/exchange/Exchange.java @@ -20,10 +20,15 @@ import java.util.concurrent.CompletableFuture; @ThreadSafe -@Experimental(eta = "2023-09-01") +@Experimental(eta = "2024-03-01") public interface Exchange extends Closeable { + enum SourceHandlesDeliveryMode { + STANDARD, + EAGER + } + /** * Get id of this exchange */ @@ -94,6 +99,23 @@ public interface Exchange */ ExchangeSourceHandleSource getSourceHandles(); + /** + * Change {@link ExchangeSourceHandleSource} delivery mode. + *

+ * In {@link SourceHandlesDeliveryMode#STANDARD} mode the handles are delivered at + * pace optimized for throughput. + *

+ * In {@link SourceHandlesDeliveryMode#EAGER} the handles are delivered as soon as possible even if that would mean + * each handle corresponds to smaller amount of data, which may be not optimal from throughput. + *

+ * There are no strict constraints regarding when this method can be called. When called, the newly selected delivery mode + * will apply to all {@link ExchangeSourceHandleSource} instances already obtained via {@link #getSourceHandles()} method. + * As well as to those yet to be obtained. + *

+ * Support for this method is optional and best-effort. + */ + default void setSourceHandlesDeliveryMode(SourceHandlesDeliveryMode sourceHandlesDeliveryMode) {} + @Override void close(); }