Skip to content

Commit

Permalink
Remove Exchange#split method
Browse files Browse the repository at this point in the history
In favor of creating smaller ExchangeSourceHandle's.

It is possible to create more than a single ExchangeSourceHandle per
partition. Instead of splitting an existing ExchangeSourceHandle it
is more natural not to create too large ExchangeSourceHandle in first
place.
  • Loading branch information
arhimondr committed Sep 2, 2022
1 parent 0747a66 commit d68ee01
Show file tree
Hide file tree
Showing 18 changed files with 89 additions and 364 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,6 @@ public synchronized void schedule()
taskSource = taskSourceFactory.create(
session,
stage.getFragment(),
sourceExchanges,
exchangeSources,
stage::recordGetSplitTime,
sourceBucketToPartitionMap,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,7 @@
import io.trino.spi.Node;
import io.trino.spi.QueryId;
import io.trino.spi.SplitWeight;
import io.trino.spi.exchange.Exchange;
import io.trino.spi.exchange.ExchangeSourceHandle;
import io.trino.spi.exchange.ExchangeSourceSplitter;
import io.trino.split.SplitSource;
import io.trino.split.SplitSource.SplitBatch;
import io.trino.sql.planner.PartitioningHandle;
Expand All @@ -66,7 +64,6 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -100,7 +97,6 @@
import static io.trino.sql.planner.SystemPartitioningHandle.SCALED_WRITER_DISTRIBUTION;
import static io.trino.sql.planner.SystemPartitioningHandle.SINGLE_DISTRIBUTION;
import static io.trino.sql.planner.SystemPartitioningHandle.SOURCE_DISTRIBUTION;
import static io.trino.sql.planner.plan.ExchangeNode.Type.GATHER;
import static io.trino.sql.planner.plan.ExchangeNode.Type.REPLICATE;
import static java.util.Objects.requireNonNull;

Expand Down Expand Up @@ -149,7 +145,6 @@ public StageTaskSourceFactory(
public TaskSource create(
Session session,
PlanFragment fragment,
Map<PlanFragmentId, Exchange> sourceExchanges,
Multimap<PlanFragmentId, ExchangeSourceHandle> exchangeSourceHandles,
LongConsumer getSplitTimeRecorder,
Optional<int[]> bucketToPartitionMap,
Expand All @@ -164,7 +159,6 @@ public TaskSource create(
return ArbitraryDistributionTaskSource.create(
session,
fragment,
sourceExchanges,
exchangeSourceHandles,
getFaultTolerantExecutionTargetTaskInputSize(session));
}
Expand Down Expand Up @@ -272,7 +266,6 @@ public void close()
public static class ArbitraryDistributionTaskSource
implements TaskSource
{
private final IdentityHashMap<ExchangeSourceHandle, Exchange> sourceExchanges;
private final Multimap<PlanNodeId, ExchangeSourceHandle> partitionedExchangeSourceHandles;
private final Multimap<PlanNodeId, ExchangeSourceHandle> replicatedExchangeSourceHandles;
private final long targetPartitionSizeInBytes;
Expand All @@ -283,15 +276,11 @@ public static class ArbitraryDistributionTaskSource
public static ArbitraryDistributionTaskSource create(
Session session,
PlanFragment fragment,
Map<PlanFragmentId, Exchange> sourceExchanges,
Multimap<PlanFragmentId, ExchangeSourceHandle> exchangeSourceHandles,
DataSize targetPartitionSize)
{
checkArgument(fragment.getPartitionedSources().isEmpty(), "no partitioned sources (table scans) expected, got: %s", fragment.getPartitionedSources());
IdentityHashMap<ExchangeSourceHandle, Exchange> exchangeForHandleMap = getExchangeForHandleMap(sourceExchanges, exchangeSourceHandles);

return new ArbitraryDistributionTaskSource(
exchangeForHandleMap,
getPartitionedExchangeSourceHandles(fragment, exchangeSourceHandles),
getReplicatedExchangeSourceHandles(fragment, exchangeSourceHandles),
targetPartitionSize,
Expand All @@ -300,26 +289,14 @@ public static ArbitraryDistributionTaskSource create(

@VisibleForTesting
ArbitraryDistributionTaskSource(
IdentityHashMap<ExchangeSourceHandle, Exchange> sourceExchanges,
Multimap<PlanNodeId, ExchangeSourceHandle> partitionedExchangeSourceHandles,
Multimap<PlanNodeId, ExchangeSourceHandle> replicatedExchangeSourceHandles,
DataSize targetPartitionSize,
DataSize taskMemory)
{
this.sourceExchanges = new IdentityHashMap<>(requireNonNull(sourceExchanges, "sourceExchanges is null"));
this.partitionedExchangeSourceHandles = ImmutableListMultimap.copyOf(requireNonNull(partitionedExchangeSourceHandles, "partitionedExchangeSourceHandles is null"));
this.replicatedExchangeSourceHandles = ImmutableListMultimap.copyOf(requireNonNull(replicatedExchangeSourceHandles, "replicatedExchangeSourceHandles is null"));
this.taskMemory = requireNonNull(taskMemory, "taskMemory is null");
checkArgument(
sourceExchanges.keySet().containsAll(partitionedExchangeSourceHandles.values()),
"Unexpected entries in partitionedExchangeSourceHandles map: %s; allowed keys: %s",
partitionedExchangeSourceHandles.values(),
sourceExchanges.keySet());
checkArgument(
sourceExchanges.keySet().containsAll(replicatedExchangeSourceHandles.values()),
"Unexpected entries in replicatedExchangeSourceHandles map: %s; allowed keys: %s",
replicatedExchangeSourceHandles.values(),
sourceExchanges.keySet());
this.targetPartitionSizeInBytes = requireNonNull(targetPartitionSize, "targetPartitionSize is null").toBytes();
}

Expand All @@ -340,33 +317,20 @@ public ListenableFuture<List<TaskDescriptor>> getMoreTasks()

for (Map.Entry<PlanNodeId, ExchangeSourceHandle> entry : partitionedExchangeSourceHandles.entries()) {
PlanNodeId remoteSourcePlanNodeId = entry.getKey();
ExchangeSourceHandle originalExchangeSourceHandle = entry.getValue();
Exchange sourceExchange = sourceExchanges.get(originalExchangeSourceHandle);

ExchangeSourceSplitter splitter = sourceExchange.split(originalExchangeSourceHandle, targetPartitionSizeInBytes);
ImmutableList.Builder<ExchangeSourceHandle> sourceHandles = ImmutableList.builder();
while (true) {
checkState(splitter.isBlocked().isDone(), "not supported");
Optional<ExchangeSourceHandle> next = splitter.getNext();
if (next.isEmpty()) {
break;
}
sourceHandles.add(next.get());
ExchangeSourceHandle handle = entry.getValue();
long handleDataSizeInBytes = handle.getDataSizeInBytes();

if (assignedExchangeDataSize != 0 && assignedExchangeDataSize + handleDataSizeInBytes > targetPartitionSizeInBytes) {
assignedExchangeSourceHandles.putAll(replicatedExchangeSourceHandles);
result.add(new TaskDescriptor(currentPartitionId++, ImmutableListMultimap.of(), assignedExchangeSourceHandles.build(), nodeRequirements));
assignedExchangeSourceHandles = ImmutableListMultimap.builder();
assignedExchangeDataSize = 0;
assignedExchangeSourceHandleCount = 0;
}

for (ExchangeSourceHandle handle : sourceHandles.build()) {
if (assignedExchangeDataSize != 0 && assignedExchangeDataSize + handle.getDataSizeInBytes() > targetPartitionSizeInBytes) {
assignedExchangeSourceHandles.putAll(replicatedExchangeSourceHandles);
result.add(new TaskDescriptor(currentPartitionId++, ImmutableListMultimap.of(), assignedExchangeSourceHandles.build(), nodeRequirements));
assignedExchangeSourceHandles = ImmutableListMultimap.builder();
assignedExchangeDataSize = 0;
assignedExchangeSourceHandleCount = 0;
}

assignedExchangeSourceHandles.put(remoteSourcePlanNodeId, handle);
assignedExchangeDataSize += handle.getDataSizeInBytes();
assignedExchangeSourceHandleCount++;
}
assignedExchangeSourceHandles.put(remoteSourcePlanNodeId, handle);
assignedExchangeDataSize += handleDataSizeInBytes;
assignedExchangeSourceHandleCount++;
}

if (assignedExchangeSourceHandleCount > 0) {
Expand Down Expand Up @@ -937,21 +901,6 @@ public synchronized void close()
}
}

private static IdentityHashMap<ExchangeSourceHandle, Exchange> getExchangeForHandleMap(
Map<PlanFragmentId, Exchange> sourceExchanges,
Multimap<PlanFragmentId, ExchangeSourceHandle> exchangeSourceHandles)
{
IdentityHashMap<ExchangeSourceHandle, Exchange> exchangeForHandle = new IdentityHashMap<>();
for (Map.Entry<PlanFragmentId, ExchangeSourceHandle> entry : exchangeSourceHandles.entries()) {
PlanFragmentId fragmentId = entry.getKey();
ExchangeSourceHandle handle = entry.getValue();
Exchange exchange = sourceExchanges.get(fragmentId);
requireNonNull(exchange, "Exchange not found for fragment " + fragmentId);
exchangeForHandle.put(handle, exchange);
}
return exchangeForHandle;
}

private static ListMultimap<PlanNodeId, ExchangeSourceHandle> getReplicatedExchangeSourceHandles(PlanFragment fragment, Multimap<PlanFragmentId, ExchangeSourceHandle> handles)
{
return getInputsForRemoteSources(
Expand All @@ -978,9 +927,6 @@ private static ListMultimap<PlanNodeId, ExchangeSourceHandle> getInputsForRemote
for (RemoteSourceNode remoteSource : remoteSources) {
for (PlanFragmentId fragmentId : remoteSource.getSourceFragmentIds()) {
Collection<ExchangeSourceHandle> handles = requireNonNull(exchangeSourceHandles.get(fragmentId), () -> "exchange source handle is missing for fragment: " + fragmentId);
if (remoteSource.getExchangeType() == GATHER || remoteSource.getExchangeType() == REPLICATE) {
checkArgument(handles.size() <= 1, "at most 1 exchange source handle is expected, got: %s", handles);
}
result.putAll(remoteSource.getId(), handles);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,10 @@

import com.google.common.collect.Multimap;
import io.trino.Session;
import io.trino.spi.exchange.Exchange;
import io.trino.spi.exchange.ExchangeSourceHandle;
import io.trino.sql.planner.PlanFragment;
import io.trino.sql.planner.plan.PlanFragmentId;

import java.util.Map;
import java.util.Optional;
import java.util.function.LongConsumer;

Expand All @@ -29,7 +27,6 @@ public interface TaskSourceFactory
TaskSource create(
Session session,
PlanFragment fragment,
Map<PlanFragmentId, Exchange> sourceExchanges,
Multimap<PlanFragmentId, ExchangeSourceHandle> exchangeSourceHandles,
LongConsumer getSplitTimeRecorder,
Optional<int[]> bucketToPartitionMap,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,10 @@ public void testHappyPath()
NODE_3, ImmutableList.of(TEST_CATALOG_HANDLE)));
setupNodeAllocatorService(nodeSupplier);

TestingExchange sinkExchange = new TestingExchange(false);
TestingExchange sinkExchange = new TestingExchange();

TestingExchange sourceExchange1 = new TestingExchange(false);
TestingExchange sourceExchange2 = new TestingExchange(false);
TestingExchange sourceExchange1 = new TestingExchange();
TestingExchange sourceExchange2 = new TestingExchange();

try (NodeAllocator nodeAllocator = nodeAllocatorService.getNodeAllocator(SESSION, 1)) {
FaultTolerantStageScheduler scheduler = createFaultTolerantTaskScheduler(
Expand Down Expand Up @@ -325,9 +325,9 @@ public void testTasksWaitingForNodes()
NODE_3, ImmutableList.of(TEST_CATALOG_HANDLE)));
setupNodeAllocatorService(nodeSupplier);

TestingExchange sinkExchange = new TestingExchange(false);
TestingExchange sourceExchange1 = new TestingExchange(false);
TestingExchange sourceExchange2 = new TestingExchange(false);
TestingExchange sinkExchange = new TestingExchange();
TestingExchange sourceExchange1 = new TestingExchange();
TestingExchange sourceExchange2 = new TestingExchange();
try (NodeAllocator nodeAllocator = nodeAllocatorService.getNodeAllocator(SESSION, 1)) {
FaultTolerantStageScheduler scheduler = createFaultTolerantTaskScheduler(
remoteTaskFactory,
Expand Down Expand Up @@ -403,15 +403,15 @@ public void testTaskFailure()
NODE_2, ImmutableList.of(TEST_CATALOG_HANDLE)));
setupNodeAllocatorService(nodeSupplier);

TestingExchange sourceExchange1 = new TestingExchange(false);
TestingExchange sourceExchange2 = new TestingExchange(false);
TestingExchange sourceExchange1 = new TestingExchange();
TestingExchange sourceExchange2 = new TestingExchange();

try (NodeAllocator nodeAllocator = nodeAllocatorService.getNodeAllocator(SESSION, 1)) {
FaultTolerantStageScheduler scheduler = createFaultTolerantTaskScheduler(
remoteTaskFactory,
taskSourceFactory,
nodeAllocator,
new TestingExchange(false),
new TestingExchange(),
ImmutableMap.of(SOURCE_FRAGMENT_ID_1, sourceExchange1, SOURCE_FRAGMENT_ID_2, sourceExchange2),
0,
1);
Expand Down Expand Up @@ -455,8 +455,8 @@ public void testRetryDelay()
NODE_3, ImmutableList.of(TEST_CATALOG_HANDLE)));
setupNodeAllocatorService(nodeSupplier);

TestingExchange sourceExchange1 = new TestingExchange(false);
TestingExchange sourceExchange2 = new TestingExchange(false);
TestingExchange sourceExchange1 = new TestingExchange();
TestingExchange sourceExchange2 = new TestingExchange();

Session session = testSessionBuilder()
.setQueryId(QUERY_ID)
Expand All @@ -471,7 +471,7 @@ public void testRetryDelay()
remoteTaskFactory,
taskSourceFactory,
nodeAllocator,
new TestingExchange(false),
new TestingExchange(),
ImmutableMap.of(SOURCE_FRAGMENT_ID_1, sourceExchange1, SOURCE_FRAGMENT_ID_2, sourceExchange2),
6,
1);
Expand Down Expand Up @@ -738,15 +738,15 @@ private void testCancellation(boolean abort)
NODE_2, ImmutableList.of(TEST_CATALOG_HANDLE)));
setupNodeAllocatorService(nodeSupplier);

TestingExchange sourceExchange1 = new TestingExchange(false);
TestingExchange sourceExchange2 = new TestingExchange(false);
TestingExchange sourceExchange1 = new TestingExchange();
TestingExchange sourceExchange2 = new TestingExchange();

try (NodeAllocator nodeAllocator = nodeAllocatorService.getNodeAllocator(SESSION, 1)) {
FaultTolerantStageScheduler scheduler = createFaultTolerantTaskScheduler(
remoteTaskFactory,
taskSourceFactory,
nodeAllocator,
new TestingExchange(false),
new TestingExchange(),
ImmutableMap.of(SOURCE_FRAGMENT_ID_1, sourceExchange1, SOURCE_FRAGMENT_ID_2, sourceExchange2),
0,
1);
Expand Down Expand Up @@ -794,15 +794,15 @@ public void testAsyncTaskSource()
NODE_2, ImmutableList.of(TEST_CATALOG_HANDLE)));
setupNodeAllocatorService(nodeSupplier);

TestingExchange sourceExchange1 = new TestingExchange(false);
TestingExchange sourceExchange2 = new TestingExchange(false);
TestingExchange sourceExchange1 = new TestingExchange();
TestingExchange sourceExchange2 = new TestingExchange();

try (NodeAllocator nodeAllocator = nodeAllocatorService.getNodeAllocator(SESSION, 1)) {
FaultTolerantStageScheduler scheduler = createFaultTolerantTaskScheduler(
remoteTaskFactory,
taskSourceFactory,
nodeAllocator,
new TestingExchange(false),
new TestingExchange(),
ImmutableMap.of(SOURCE_FRAGMENT_ID_1, sourceExchange1, SOURCE_FRAGMENT_ID_2, sourceExchange2),
2,
1);
Expand Down Expand Up @@ -858,12 +858,12 @@ public void close() {}
try (NodeAllocator nodeAllocator = nodeAllocatorService.getNodeAllocator(SESSION, 1)) {
FaultTolerantStageScheduler scheduler = createFaultTolerantTaskScheduler(
remoteTaskFactory,
(session, fragment, sourceExchanges, exchangeSourceHandles, getSplitTimeRecorder, bucketToPartitionMap, bucketNodeMap) -> {
(session, fragment, exchangeSourceHandles, getSplitTimeRecorder, bucketToPartitionMap, bucketNodeMap) -> {
taskSourceCreated.set(true);
return taskSource;
},
nodeAllocator,
new TestingExchange(false),
new TestingExchange(),
ImmutableMap.of(),
1,
1);
Expand Down
Loading

0 comments on commit d68ee01

Please sign in to comment.