Skip to content

Commit

Permalink
Allow selecting source handles deliver mode
Browse files Browse the repository at this point in the history
Extend Exchange SPI so engine can tell exchange that it should deliver
source handles as soon as it has any available, even if from troughput
perspective it would make more sense to wait a bit and deliver bigger
batch. It is important to be able to process stages which may
short-circuit query execution (like top-level LIMIT) swiftly.
  • Loading branch information
losipiuk committed Sep 18, 2023
1 parent e2f5afa commit b5cc2ce
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@
import static io.trino.spi.ErrorType.USER_ERROR;
import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static io.trino.spi.StandardErrorCode.REMOTE_HOST_GONE;
import static io.trino.spi.exchange.Exchange.SourceHandlesDeliveryMode.EAGER;
import static io.trino.sql.planner.RuntimeAdaptivePartitioningRewriter.consumesHashPartitionedInput;
import static io.trino.sql.planner.RuntimeAdaptivePartitioningRewriter.getMaxPlanFragmentId;
import static io.trino.sql.planner.RuntimeAdaptivePartitioningRewriter.getMaxPlanId;
Expand Down Expand Up @@ -1143,13 +1144,13 @@ private void createStageExecution(SubPlan subPlan, boolean rootFragment, Map<Sta
stageRegistry.add(stage);
stage.addFinalStageInfoListener(status -> queryStateMachine.updateQueryInfo(Optional.ofNullable(stageRegistry.getStageInfo())));

ImmutableMap.Builder<PlanFragmentId, Exchange> sourceExchanges = ImmutableMap.builder();
ImmutableMap.Builder<PlanFragmentId, Exchange> sourceExchangesBuilder = ImmutableMap.builder();
Map<PlanFragmentId, OutputDataSizeEstimate> sourceOutputEstimatesByFragmentId = new HashMap<>();
for (SubPlan source : subPlan.getChildren()) {
PlanFragmentId sourceFragmentId = source.getFragment().getId();
StageId sourceStageId = getStageId(sourceFragmentId);
StageExecution sourceStageExecution = getStageExecution(sourceStageId);
sourceExchanges.put(sourceFragmentId, sourceStageExecution.getExchange());
sourceExchangesBuilder.put(sourceFragmentId, sourceStageExecution.getExchange());
OutputDataSizeEstimate outputDataSizeResult = sourceOutputSizeEstimates.get(sourceStageId);
verify(outputDataSizeResult != null, "No output data size estimate in %s map for stage %s", sourceOutputSizeEstimates, sourceStageId);
sourceOutputEstimatesByFragmentId.put(sourceFragmentId, outputDataSizeResult);
Expand All @@ -1168,11 +1169,12 @@ private void createStageExecution(SubPlan subPlan, boolean rootFragment, Map<Sta
outputDataSizeEstimates.put(remoteSource.getId(), OutputDataSizeEstimate.merge(estimates));
}

Map<PlanFragmentId, Exchange> sourceExchanges = sourceExchangesBuilder.buildOrThrow();
EventDrivenTaskSource taskSource = closer.register(taskSourceFactory.create(
session,
stage.getStageSpan(),
fragment,
sourceExchanges.buildOrThrow(),
sourceExchanges,
partitioningSchemeFactory.get(fragment.getPartitioning(), fragment.getPartitionCount()),
stage::recordGetSplitTime,
outputDataSizeEstimates.buildOrThrow()));
Expand All @@ -1191,6 +1193,9 @@ private void createStageExecution(SubPlan subPlan, boolean rootFragment, Map<Sta
boolean coordinatorStage = stage.getFragment().getPartitioning().equals(COORDINATOR_DISTRIBUTION);

boolean noMemoryFragment = isNoMemoryFragment(fragment);
if (eager) {
sourceExchanges.values().forEach(sourceExchange -> sourceExchange.setSourceHandlesDeliveryMode(EAGER));
}
StageExecution execution = new StageExecution(
queryStateMachine,
taskDescriptorStorage,
Expand Down
24 changes: 23 additions & 1 deletion core/trino-spi/src/main/java/io/trino/spi/exchange/Exchange.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,15 @@
import java.util.concurrent.CompletableFuture;

@ThreadSafe
@Experimental(eta = "2023-09-01")
@Experimental(eta = "2024-03-01")
public interface Exchange
extends Closeable
{
enum SourceHandlesDeliveryMode {
STANDARD,
EAGER
}

/**
* Get id of this exchange
*/
Expand Down Expand Up @@ -94,6 +99,23 @@ public interface Exchange
*/
ExchangeSourceHandleSource getSourceHandles();

/**
* Change {@link ExchangeSourceHandleSource} delivery mode.
* <p>
* In {@link SourceHandlesDeliveryMode#STANDARD} mode the handles are delivered at
* pace optimized for throughput.
* <p>
* In {@link SourceHandlesDeliveryMode#EAGER} the handles are delivered as soon as possible even if that would mean
* each handle corresponds to smaller amount of data, which may be not optimal from throughput.
* <p>
* There are no strict constraints regarding when this method can be called. When called, the newly selected delivery mode
* will apply to all {@link ExchangeSourceHandleSource} instances already obtained via {@link #getSourceHandles()} method.
* As well as to those yet to be obtained.
* <p>
* Support for this method is optional and best-effort.
*/
default void setSourceHandlesDeliveryMode(SourceHandlesDeliveryMode sourceHandlesDeliveryMode) {}

@Override
void close();
}

0 comments on commit b5cc2ce

Please sign in to comment.