diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/FaultTolerantStageScheduler.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/FaultTolerantStageScheduler.java index 392bd8a707be..1b87b22fb2f3 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/FaultTolerantStageScheduler.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/FaultTolerantStageScheduler.java @@ -275,7 +275,6 @@ public synchronized void schedule() taskSource = taskSourceFactory.create( session, stage.getFragment(), - sourceExchanges, exchangeSources, stage::recordGetSplitTime, sourceBucketToPartitionMap, diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/StageTaskSourceFactory.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/StageTaskSourceFactory.java index d9025176934e..74ffe52ac15e 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/StageTaskSourceFactory.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/StageTaskSourceFactory.java @@ -45,9 +45,7 @@ import io.trino.spi.Node; import io.trino.spi.QueryId; import io.trino.spi.SplitWeight; -import io.trino.spi.exchange.Exchange; import io.trino.spi.exchange.ExchangeSourceHandle; -import io.trino.spi.exchange.ExchangeSourceSplitter; import io.trino.split.SplitSource; import io.trino.split.SplitSource.SplitBatch; import io.trino.sql.planner.PartitioningHandle; @@ -66,7 +64,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; -import java.util.IdentityHashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -100,7 +97,6 @@ import static io.trino.sql.planner.SystemPartitioningHandle.SCALED_WRITER_DISTRIBUTION; import static io.trino.sql.planner.SystemPartitioningHandle.SINGLE_DISTRIBUTION; import static io.trino.sql.planner.SystemPartitioningHandle.SOURCE_DISTRIBUTION; -import static io.trino.sql.planner.plan.ExchangeNode.Type.GATHER; import static io.trino.sql.planner.plan.ExchangeNode.Type.REPLICATE; import static java.util.Objects.requireNonNull; @@ -149,7 +145,6 @@ public StageTaskSourceFactory( public TaskSource create( Session session, PlanFragment fragment, - Map sourceExchanges, Multimap exchangeSourceHandles, LongConsumer getSplitTimeRecorder, Optional bucketToPartitionMap, @@ -164,7 +159,6 @@ public TaskSource create( return ArbitraryDistributionTaskSource.create( session, fragment, - sourceExchanges, exchangeSourceHandles, getFaultTolerantExecutionTargetTaskInputSize(session)); } @@ -272,7 +266,6 @@ public void close() public static class ArbitraryDistributionTaskSource implements TaskSource { - private final IdentityHashMap sourceExchanges; private final Multimap partitionedExchangeSourceHandles; private final Multimap replicatedExchangeSourceHandles; private final long targetPartitionSizeInBytes; @@ -283,15 +276,11 @@ public static class ArbitraryDistributionTaskSource public static ArbitraryDistributionTaskSource create( Session session, PlanFragment fragment, - Map sourceExchanges, Multimap exchangeSourceHandles, DataSize targetPartitionSize) { checkArgument(fragment.getPartitionedSources().isEmpty(), "no partitioned sources (table scans) expected, got: %s", fragment.getPartitionedSources()); - IdentityHashMap exchangeForHandleMap = getExchangeForHandleMap(sourceExchanges, exchangeSourceHandles); - return new ArbitraryDistributionTaskSource( - exchangeForHandleMap, getPartitionedExchangeSourceHandles(fragment, exchangeSourceHandles), getReplicatedExchangeSourceHandles(fragment, exchangeSourceHandles), targetPartitionSize, @@ -300,26 +289,14 @@ public static ArbitraryDistributionTaskSource create( @VisibleForTesting ArbitraryDistributionTaskSource( - IdentityHashMap sourceExchanges, Multimap partitionedExchangeSourceHandles, Multimap replicatedExchangeSourceHandles, DataSize targetPartitionSize, DataSize taskMemory) { - this.sourceExchanges = new IdentityHashMap<>(requireNonNull(sourceExchanges, "sourceExchanges is null")); this.partitionedExchangeSourceHandles = ImmutableListMultimap.copyOf(requireNonNull(partitionedExchangeSourceHandles, "partitionedExchangeSourceHandles is null")); this.replicatedExchangeSourceHandles = ImmutableListMultimap.copyOf(requireNonNull(replicatedExchangeSourceHandles, "replicatedExchangeSourceHandles is null")); this.taskMemory = requireNonNull(taskMemory, "taskMemory is null"); - checkArgument( - sourceExchanges.keySet().containsAll(partitionedExchangeSourceHandles.values()), - "Unexpected entries in partitionedExchangeSourceHandles map: %s; allowed keys: %s", - partitionedExchangeSourceHandles.values(), - sourceExchanges.keySet()); - checkArgument( - sourceExchanges.keySet().containsAll(replicatedExchangeSourceHandles.values()), - "Unexpected entries in replicatedExchangeSourceHandles map: %s; allowed keys: %s", - replicatedExchangeSourceHandles.values(), - sourceExchanges.keySet()); this.targetPartitionSizeInBytes = requireNonNull(targetPartitionSize, "targetPartitionSize is null").toBytes(); } @@ -340,33 +317,20 @@ public ListenableFuture> getMoreTasks() for (Map.Entry entry : partitionedExchangeSourceHandles.entries()) { PlanNodeId remoteSourcePlanNodeId = entry.getKey(); - ExchangeSourceHandle originalExchangeSourceHandle = entry.getValue(); - Exchange sourceExchange = sourceExchanges.get(originalExchangeSourceHandle); - - ExchangeSourceSplitter splitter = sourceExchange.split(originalExchangeSourceHandle, targetPartitionSizeInBytes); - ImmutableList.Builder sourceHandles = ImmutableList.builder(); - while (true) { - checkState(splitter.isBlocked().isDone(), "not supported"); - Optional next = splitter.getNext(); - if (next.isEmpty()) { - break; - } - sourceHandles.add(next.get()); + ExchangeSourceHandle handle = entry.getValue(); + long handleDataSizeInBytes = handle.getDataSizeInBytes(); + + if (assignedExchangeDataSize != 0 && assignedExchangeDataSize + handleDataSizeInBytes > targetPartitionSizeInBytes) { + assignedExchangeSourceHandles.putAll(replicatedExchangeSourceHandles); + result.add(new TaskDescriptor(currentPartitionId++, ImmutableListMultimap.of(), assignedExchangeSourceHandles.build(), nodeRequirements)); + assignedExchangeSourceHandles = ImmutableListMultimap.builder(); + assignedExchangeDataSize = 0; + assignedExchangeSourceHandleCount = 0; } - for (ExchangeSourceHandle handle : sourceHandles.build()) { - if (assignedExchangeDataSize != 0 && assignedExchangeDataSize + handle.getDataSizeInBytes() > targetPartitionSizeInBytes) { - assignedExchangeSourceHandles.putAll(replicatedExchangeSourceHandles); - result.add(new TaskDescriptor(currentPartitionId++, ImmutableListMultimap.of(), assignedExchangeSourceHandles.build(), nodeRequirements)); - assignedExchangeSourceHandles = ImmutableListMultimap.builder(); - assignedExchangeDataSize = 0; - assignedExchangeSourceHandleCount = 0; - } - - assignedExchangeSourceHandles.put(remoteSourcePlanNodeId, handle); - assignedExchangeDataSize += handle.getDataSizeInBytes(); - assignedExchangeSourceHandleCount++; - } + assignedExchangeSourceHandles.put(remoteSourcePlanNodeId, handle); + assignedExchangeDataSize += handleDataSizeInBytes; + assignedExchangeSourceHandleCount++; } if (assignedExchangeSourceHandleCount > 0) { @@ -937,21 +901,6 @@ public synchronized void close() } } - private static IdentityHashMap getExchangeForHandleMap( - Map sourceExchanges, - Multimap exchangeSourceHandles) - { - IdentityHashMap exchangeForHandle = new IdentityHashMap<>(); - for (Map.Entry entry : exchangeSourceHandles.entries()) { - PlanFragmentId fragmentId = entry.getKey(); - ExchangeSourceHandle handle = entry.getValue(); - Exchange exchange = sourceExchanges.get(fragmentId); - requireNonNull(exchange, "Exchange not found for fragment " + fragmentId); - exchangeForHandle.put(handle, exchange); - } - return exchangeForHandle; - } - private static ListMultimap getReplicatedExchangeSourceHandles(PlanFragment fragment, Multimap handles) { return getInputsForRemoteSources( @@ -978,9 +927,6 @@ private static ListMultimap getInputsForRemote for (RemoteSourceNode remoteSource : remoteSources) { for (PlanFragmentId fragmentId : remoteSource.getSourceFragmentIds()) { Collection handles = requireNonNull(exchangeSourceHandles.get(fragmentId), () -> "exchange source handle is missing for fragment: " + fragmentId); - if (remoteSource.getExchangeType() == GATHER || remoteSource.getExchangeType() == REPLICATE) { - checkArgument(handles.size() <= 1, "at most 1 exchange source handle is expected, got: %s", handles); - } result.putAll(remoteSource.getId(), handles); } } diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/TaskSourceFactory.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/TaskSourceFactory.java index 70f610a8aeeb..06b8cc8449be 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/TaskSourceFactory.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/TaskSourceFactory.java @@ -15,12 +15,10 @@ import com.google.common.collect.Multimap; import io.trino.Session; -import io.trino.spi.exchange.Exchange; import io.trino.spi.exchange.ExchangeSourceHandle; import io.trino.sql.planner.PlanFragment; import io.trino.sql.planner.plan.PlanFragmentId; -import java.util.Map; import java.util.Optional; import java.util.function.LongConsumer; @@ -29,7 +27,6 @@ public interface TaskSourceFactory TaskSource create( Session session, PlanFragment fragment, - Map sourceExchanges, Multimap exchangeSourceHandles, LongConsumer getSplitTimeRecorder, Optional bucketToPartitionMap, diff --git a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestFaultTolerantStageScheduler.java b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestFaultTolerantStageScheduler.java index 761c25bf11f3..45df0a7bdd17 100644 --- a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestFaultTolerantStageScheduler.java +++ b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestFaultTolerantStageScheduler.java @@ -174,10 +174,10 @@ public void testHappyPath() NODE_3, ImmutableList.of(TEST_CATALOG_HANDLE))); setupNodeAllocatorService(nodeSupplier); - TestingExchange sinkExchange = new TestingExchange(false); + TestingExchange sinkExchange = new TestingExchange(); - TestingExchange sourceExchange1 = new TestingExchange(false); - TestingExchange sourceExchange2 = new TestingExchange(false); + TestingExchange sourceExchange1 = new TestingExchange(); + TestingExchange sourceExchange2 = new TestingExchange(); try (NodeAllocator nodeAllocator = nodeAllocatorService.getNodeAllocator(SESSION, 1)) { FaultTolerantStageScheduler scheduler = createFaultTolerantTaskScheduler( @@ -325,9 +325,9 @@ public void testTasksWaitingForNodes() NODE_3, ImmutableList.of(TEST_CATALOG_HANDLE))); setupNodeAllocatorService(nodeSupplier); - TestingExchange sinkExchange = new TestingExchange(false); - TestingExchange sourceExchange1 = new TestingExchange(false); - TestingExchange sourceExchange2 = new TestingExchange(false); + TestingExchange sinkExchange = new TestingExchange(); + TestingExchange sourceExchange1 = new TestingExchange(); + TestingExchange sourceExchange2 = new TestingExchange(); try (NodeAllocator nodeAllocator = nodeAllocatorService.getNodeAllocator(SESSION, 1)) { FaultTolerantStageScheduler scheduler = createFaultTolerantTaskScheduler( remoteTaskFactory, @@ -403,15 +403,15 @@ public void testTaskFailure() NODE_2, ImmutableList.of(TEST_CATALOG_HANDLE))); setupNodeAllocatorService(nodeSupplier); - TestingExchange sourceExchange1 = new TestingExchange(false); - TestingExchange sourceExchange2 = new TestingExchange(false); + TestingExchange sourceExchange1 = new TestingExchange(); + TestingExchange sourceExchange2 = new TestingExchange(); try (NodeAllocator nodeAllocator = nodeAllocatorService.getNodeAllocator(SESSION, 1)) { FaultTolerantStageScheduler scheduler = createFaultTolerantTaskScheduler( remoteTaskFactory, taskSourceFactory, nodeAllocator, - new TestingExchange(false), + new TestingExchange(), ImmutableMap.of(SOURCE_FRAGMENT_ID_1, sourceExchange1, SOURCE_FRAGMENT_ID_2, sourceExchange2), 0, 1); @@ -455,8 +455,8 @@ public void testRetryDelay() NODE_3, ImmutableList.of(TEST_CATALOG_HANDLE))); setupNodeAllocatorService(nodeSupplier); - TestingExchange sourceExchange1 = new TestingExchange(false); - TestingExchange sourceExchange2 = new TestingExchange(false); + TestingExchange sourceExchange1 = new TestingExchange(); + TestingExchange sourceExchange2 = new TestingExchange(); Session session = testSessionBuilder() .setQueryId(QUERY_ID) @@ -471,7 +471,7 @@ public void testRetryDelay() remoteTaskFactory, taskSourceFactory, nodeAllocator, - new TestingExchange(false), + new TestingExchange(), ImmutableMap.of(SOURCE_FRAGMENT_ID_1, sourceExchange1, SOURCE_FRAGMENT_ID_2, sourceExchange2), 6, 1); @@ -738,15 +738,15 @@ private void testCancellation(boolean abort) NODE_2, ImmutableList.of(TEST_CATALOG_HANDLE))); setupNodeAllocatorService(nodeSupplier); - TestingExchange sourceExchange1 = new TestingExchange(false); - TestingExchange sourceExchange2 = new TestingExchange(false); + TestingExchange sourceExchange1 = new TestingExchange(); + TestingExchange sourceExchange2 = new TestingExchange(); try (NodeAllocator nodeAllocator = nodeAllocatorService.getNodeAllocator(SESSION, 1)) { FaultTolerantStageScheduler scheduler = createFaultTolerantTaskScheduler( remoteTaskFactory, taskSourceFactory, nodeAllocator, - new TestingExchange(false), + new TestingExchange(), ImmutableMap.of(SOURCE_FRAGMENT_ID_1, sourceExchange1, SOURCE_FRAGMENT_ID_2, sourceExchange2), 0, 1); @@ -794,15 +794,15 @@ public void testAsyncTaskSource() NODE_2, ImmutableList.of(TEST_CATALOG_HANDLE))); setupNodeAllocatorService(nodeSupplier); - TestingExchange sourceExchange1 = new TestingExchange(false); - TestingExchange sourceExchange2 = new TestingExchange(false); + TestingExchange sourceExchange1 = new TestingExchange(); + TestingExchange sourceExchange2 = new TestingExchange(); try (NodeAllocator nodeAllocator = nodeAllocatorService.getNodeAllocator(SESSION, 1)) { FaultTolerantStageScheduler scheduler = createFaultTolerantTaskScheduler( remoteTaskFactory, taskSourceFactory, nodeAllocator, - new TestingExchange(false), + new TestingExchange(), ImmutableMap.of(SOURCE_FRAGMENT_ID_1, sourceExchange1, SOURCE_FRAGMENT_ID_2, sourceExchange2), 2, 1); @@ -858,12 +858,12 @@ public void close() {} try (NodeAllocator nodeAllocator = nodeAllocatorService.getNodeAllocator(SESSION, 1)) { FaultTolerantStageScheduler scheduler = createFaultTolerantTaskScheduler( remoteTaskFactory, - (session, fragment, sourceExchanges, exchangeSourceHandles, getSplitTimeRecorder, bucketToPartitionMap, bucketNodeMap) -> { + (session, fragment, exchangeSourceHandles, getSplitTimeRecorder, bucketToPartitionMap, bucketNodeMap) -> { taskSourceCreated.set(true); return taskSource; }, nodeAllocator, - new TestingExchange(false), + new TestingExchange(), ImmutableMap.of(), 1, 1); diff --git a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestStageTaskSourceFactory.java b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestStageTaskSourceFactory.java index 18e866d249bc..88308c637c6e 100644 --- a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestStageTaskSourceFactory.java +++ b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestStageTaskSourceFactory.java @@ -40,9 +40,6 @@ import io.trino.spi.QueryId; import io.trino.spi.SplitWeight; import io.trino.spi.connector.ConnectorSplit; -import io.trino.spi.exchange.Exchange; -import io.trino.spi.exchange.ExchangeContext; -import io.trino.spi.exchange.ExchangeManager; import io.trino.spi.exchange.ExchangeSourceHandle; import io.trino.split.SplitSource; import io.trino.sql.planner.plan.PlanNodeId; @@ -52,7 +49,6 @@ import java.net.URI; import java.util.ArrayList; import java.util.Arrays; -import java.util.IdentityHashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -71,7 +67,6 @@ import static io.airlift.slice.SizeOf.sizeOf; import static io.airlift.units.DataSize.Unit.BYTE; import static io.airlift.units.DataSize.Unit.GIGABYTE; -import static io.trino.spi.exchange.ExchangeId.createRandomExchangeId; import static io.trino.testing.TestingHandles.TEST_CATALOG_HANDLE; import static java.util.Collections.nCopies; import static java.util.Objects.requireNonNull; @@ -145,10 +140,7 @@ public void testCoordinatorDistributionTaskSource() @Test public void testArbitraryDistributionTaskSource() { - ExchangeManager splittingExchangeManager = new TestingExchangeManager(true); - ExchangeManager nonSplittingExchangeManager = new TestingExchangeManager(false); - - TaskSource taskSource = new ArbitraryDistributionTaskSource(new IdentityHashMap<>(), + TaskSource taskSource = new ArbitraryDistributionTaskSource( ImmutableListMultimap.of(), ImmutableListMultimap.of(), DataSize.of(3, BYTE), @@ -165,9 +157,7 @@ public void testArbitraryDistributionTaskSource() TestingExchangeSourceHandle sourceHandle123 = new TestingExchangeSourceHandle(0, 123); TestingExchangeSourceHandle sourceHandle321 = new TestingExchangeSourceHandle(0, 321); Multimap nonReplicatedSources = ImmutableListMultimap.of(PLAN_NODE_1, sourceHandle3); - Exchange exchange = splittingExchangeManager.createExchange(new ExchangeContext(new QueryId("query"), createRandomExchangeId()), 3, false); taskSource = new ArbitraryDistributionTaskSource( - new IdentityHashMap<>(ImmutableMap.of(sourceHandle3, exchange)), nonReplicatedSources, ImmutableListMultimap.of(), DataSize.of(3, BYTE), @@ -182,9 +172,7 @@ public void testArbitraryDistributionTaskSource() new NodeRequirements(Optional.empty(), ImmutableSet.of(), DataSize.of(4, GIGABYTE))))); nonReplicatedSources = ImmutableListMultimap.of(PLAN_NODE_1, sourceHandle123); - exchange = nonSplittingExchangeManager.createExchange(new ExchangeContext(new QueryId("query"), createRandomExchangeId()), 3, false); taskSource = new ArbitraryDistributionTaskSource( - new IdentityHashMap<>(ImmutableMap.of(sourceHandle123, exchange)), nonReplicatedSources, ImmutableListMultimap.of(), DataSize.of(3, BYTE), @@ -199,11 +187,7 @@ public void testArbitraryDistributionTaskSource() nonReplicatedSources = ImmutableListMultimap.of( PLAN_NODE_1, sourceHandle123, PLAN_NODE_2, sourceHandle321); - exchange = nonSplittingExchangeManager.createExchange(new ExchangeContext(new QueryId("query"), createRandomExchangeId()), 3, false); taskSource = new ArbitraryDistributionTaskSource( - new IdentityHashMap<>(ImmutableMap.of( - sourceHandle123, exchange, - sourceHandle321, exchange)), nonReplicatedSources, ImmutableListMultimap.of(), DataSize.of(3, BYTE), @@ -225,12 +209,7 @@ public void testArbitraryDistributionTaskSource() PLAN_NODE_1, sourceHandle1, PLAN_NODE_1, sourceHandle2, PLAN_NODE_2, sourceHandle4); - exchange = splittingExchangeManager.createExchange(new ExchangeContext(new QueryId("query"), createRandomExchangeId()), 3, false); taskSource = new ArbitraryDistributionTaskSource( - new IdentityHashMap<>(ImmutableMap.of( - sourceHandle1, exchange, - sourceHandle2, exchange, - sourceHandle4, exchange)), nonReplicatedSources, ImmutableListMultimap.of(), DataSize.of(3, BYTE), @@ -247,24 +226,13 @@ PLAN_NODE_1, new TestingExchangeSourceHandle(0, 2)), new TaskDescriptor( 1, ImmutableListMultimap.of(), - ImmutableListMultimap.of(PLAN_NODE_2, new TestingExchangeSourceHandle(0, 3)), - new NodeRequirements(Optional.empty(), ImmutableSet.of(), DataSize.of(4, GIGABYTE))), - new TaskDescriptor( - 2, - ImmutableListMultimap.of(), - ImmutableListMultimap.of(PLAN_NODE_2, new TestingExchangeSourceHandle(0, 1)), + ImmutableListMultimap.of(PLAN_NODE_2, new TestingExchangeSourceHandle(0, 4)), new NodeRequirements(Optional.empty(), ImmutableSet.of(), DataSize.of(4, GIGABYTE))))); - nonReplicatedSources = ImmutableListMultimap.of( PLAN_NODE_1, sourceHandle1, PLAN_NODE_1, sourceHandle3, PLAN_NODE_2, sourceHandle4); - exchange = splittingExchangeManager.createExchange(new ExchangeContext(new QueryId("query"), createRandomExchangeId()), 3, false); taskSource = new ArbitraryDistributionTaskSource( - new IdentityHashMap<>(ImmutableMap.of( - sourceHandle1, exchange, - sourceHandle3, exchange, - sourceHandle4, exchange)), nonReplicatedSources, ImmutableListMultimap.of(), DataSize.of(3, BYTE), @@ -284,12 +252,7 @@ PLAN_NODE_1, new TestingExchangeSourceHandle(0, 2)), new TaskDescriptor( 2, ImmutableListMultimap.of(), - ImmutableListMultimap.of(PLAN_NODE_2, new TestingExchangeSourceHandle(0, 3)), - new NodeRequirements(Optional.empty(), ImmutableSet.of(), DataSize.of(4, GIGABYTE))), - new TaskDescriptor( - 3, - ImmutableListMultimap.of(), - ImmutableListMultimap.of(PLAN_NODE_2, new TestingExchangeSourceHandle(0, 1)), + ImmutableListMultimap.of(PLAN_NODE_2, new TestingExchangeSourceHandle(0, 4)), new NodeRequirements(Optional.empty(), ImmutableSet.of(), DataSize.of(4, GIGABYTE))))); // with replicated sources @@ -299,13 +262,7 @@ PLAN_NODE_1, new TestingExchangeSourceHandle(0, 2)), PLAN_NODE_1, sourceHandle4); Multimap replicatedSources = ImmutableListMultimap.of( PLAN_NODE_2, sourceHandle321); - exchange = splittingExchangeManager.createExchange(new ExchangeContext(new QueryId("query"), createRandomExchangeId()), 3, false); taskSource = new ArbitraryDistributionTaskSource( - new IdentityHashMap<>(ImmutableMap.of( - sourceHandle1, exchange, - sourceHandle2, exchange, - sourceHandle4, exchange, - sourceHandle321, exchange)), nonReplicatedSources, replicatedSources, DataSize.of(3, BYTE), @@ -324,14 +281,7 @@ PLAN_NODE_2, new TestingExchangeSourceHandle(0, 321)), 1, ImmutableListMultimap.of(), ImmutableListMultimap.of( - PLAN_NODE_1, new TestingExchangeSourceHandle(0, 3), - PLAN_NODE_2, sourceHandle321), - new NodeRequirements(Optional.empty(), ImmutableSet.of(), DataSize.of(4, GIGABYTE))), - new TaskDescriptor( - 2, - ImmutableListMultimap.of(), - ImmutableListMultimap.of( - PLAN_NODE_1, new TestingExchangeSourceHandle(0, 1), + PLAN_NODE_1, new TestingExchangeSourceHandle(0, 4), PLAN_NODE_2, sourceHandle321), new NodeRequirements(Optional.empty(), ImmutableSet.of(), DataSize.of(4, GIGABYTE))))); } diff --git a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestingExchange.java b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestingExchange.java index 7602a77b6778..32ca3e4242ae 100644 --- a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestingExchange.java +++ b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestingExchange.java @@ -19,41 +19,26 @@ import io.trino.spi.exchange.ExchangeSinkHandle; import io.trino.spi.exchange.ExchangeSinkInstanceHandle; import io.trino.spi.exchange.ExchangeSourceHandle; -import io.trino.spi.exchange.ExchangeSourceSplitter; import org.openjdk.jol.info.ClassLayout; -import java.util.Iterator; import java.util.List; import java.util.Objects; -import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; import static com.google.common.base.MoreObjects.toStringHelper; -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.collect.Iterators.cycle; -import static com.google.common.collect.Iterators.limit; import static com.google.common.collect.Sets.newConcurrentHashSet; -import static java.lang.Math.toIntExact; import static java.util.Objects.requireNonNull; -import static java.util.concurrent.CompletableFuture.completedFuture; public class TestingExchange implements Exchange { - private final boolean splitPartitionsEnabled; - private final Set finishedSinks = newConcurrentHashSet(); private final Set allSinks = newConcurrentHashSet(); private final AtomicBoolean noMoreSinks = new AtomicBoolean(); private final CompletableFuture> sourceHandles = new CompletableFuture<>(); - public TestingExchange(boolean splitPartitionsEnabled) - { - this.splitPartitionsEnabled = splitPartitionsEnabled; - } - @Override public ExchangeSinkHandle addSink(int taskPartitionId) { @@ -101,55 +86,6 @@ public void setSourceHandles(List handles) sourceHandles.complete(ImmutableList.copyOf(handles)); } - @Override - public ExchangeSourceSplitter split(ExchangeSourceHandle handle, long targetSizeInBytes) - { - List splitHandles = splitIntoList(handle, targetSizeInBytes); - Iterator iterator = splitHandles.iterator(); - return new ExchangeSourceSplitter() - { - @Override - public CompletableFuture isBlocked() - { - return completedFuture(null); - } - - @Override - public Optional getNext() - { - if (iterator.hasNext()) { - return Optional.of(iterator.next()); - } - return Optional.empty(); - } - - @Override - public void close() - { - } - }; - } - - private List splitIntoList(ExchangeSourceHandle handle, long targetSizeInBytes) - { - if (!splitPartitionsEnabled) { - return ImmutableList.of(handle); - } - checkArgument(targetSizeInBytes > 0, "targetSizeInBytes must be positive: %s", targetSizeInBytes); - TestingExchangeSourceHandle testingExchangeSourceHandle = (TestingExchangeSourceHandle) handle; - long currentSize = testingExchangeSourceHandle.getSizeInBytes(); - int fullPartitions = toIntExact(currentSize / targetSizeInBytes); - long remainder = currentSize % targetSizeInBytes; - ImmutableList.Builder result = ImmutableList.builder(); - if (fullPartitions > 0) { - result.addAll(limit(cycle(new TestingExchangeSourceHandle(testingExchangeSourceHandle.getPartitionId(), targetSizeInBytes)), fullPartitions)); - } - if (remainder > 0) { - result.add(new TestingExchangeSourceHandle(testingExchangeSourceHandle.getPartitionId(), remainder)); - } - return result.build(); - } - @Override public void close() { diff --git a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestingExchangeManager.java b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestingExchangeManager.java deleted file mode 100644 index 5c025b70d420..000000000000 --- a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestingExchangeManager.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.execution.scheduler; - -import io.trino.spi.exchange.Exchange; -import io.trino.spi.exchange.ExchangeContext; -import io.trino.spi.exchange.ExchangeManager; -import io.trino.spi.exchange.ExchangeSink; -import io.trino.spi.exchange.ExchangeSinkInstanceHandle; -import io.trino.spi.exchange.ExchangeSource; -import io.trino.spi.exchange.ExchangeSourceHandle; - -import java.util.List; - -public class TestingExchangeManager - implements ExchangeManager -{ - private final boolean splitPartitionsEnabled; - - public TestingExchangeManager(boolean splitPartitionsEnabled) - { - this.splitPartitionsEnabled = splitPartitionsEnabled; - } - - @Override - public Exchange createExchange(ExchangeContext context, int outputPartitionCount, boolean preserveOrderWithinPartition) - { - return new TestingExchange(splitPartitionsEnabled); - } - - @Override - public ExchangeSink createSink(ExchangeSinkInstanceHandle handle) - { - throw new UnsupportedOperationException(); - } - - @Override - public ExchangeSource createSource(List handles) - { - throw new UnsupportedOperationException(); - } -} diff --git a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestingTaskSourceFactory.java b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestingTaskSourceFactory.java index ff7cd4ef35b2..4d471a07c5cf 100644 --- a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestingTaskSourceFactory.java +++ b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestingTaskSourceFactory.java @@ -24,7 +24,6 @@ import io.trino.Session; import io.trino.connector.CatalogHandle; import io.trino.metadata.Split; -import io.trino.spi.exchange.Exchange; import io.trino.spi.exchange.ExchangeSourceHandle; import io.trino.sql.planner.PlanFragment; import io.trino.sql.planner.plan.PlanFragmentId; @@ -34,7 +33,6 @@ import java.util.Collection; import java.util.Iterator; import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.LongConsumer; @@ -71,7 +69,6 @@ public TestingTaskSourceFactory(Optional catalog, ListenableFutur public TaskSource create( Session session, PlanFragment fragment, - Map sourceExchanges, Multimap exchangeSourceHandles, LongConsumer getSplitTimeRecorder, Optional bucketToPartitionMap, diff --git a/core/trino-spi/src/main/java/io/trino/spi/exchange/Exchange.java b/core/trino-spi/src/main/java/io/trino/spi/exchange/Exchange.java index 79245d8c4629..6c0fcd647e74 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/exchange/Exchange.java +++ b/core/trino-spi/src/main/java/io/trino/spi/exchange/Exchange.java @@ -13,7 +13,6 @@ */ package io.trino.spi.exchange; -import io.airlift.slice.Slice; import io.trino.spi.Experimental; import javax.annotation.concurrent.ThreadSafe; @@ -72,30 +71,12 @@ public interface Exchange * Returns a future containing handles to be used to read data from an exchange. *

* Future must be resolved when the data is available to be read. - *

- * The implementation is expected to return one handle per output partition (see {@link ExchangeSink#add(int, Slice)}) - *

- * Partitions can be further split if needed by calling {@link #split(ExchangeSourceHandle, long)} * * @return Future containing a list of {@link ExchangeSourceHandle} to be sent to a * worker that is needed to create an {@link ExchangeSource} using {@link ExchangeManager#createSource(List)} */ CompletableFuture> getSourceHandles(); - /** - * Splits an {@link ExchangeSourceHandle} into a number of smaller partitions. - *

- * Exchange implementation is allowed to return {@link ExchangeSourceHandle} even before all the data - * is written to an exchange. At the moment when the method is called it may not be possible to - * complete the split operation. This methods returns a {@link ExchangeSourceSplitter} object - * that allows an iterative splitting while the data is still being written to an exchange. - * - * @param handle returned by the {@link #getSourceHandles()} - * @param targetSizeInBytes desired maximum size of a single partition produced by {@link ExchangeSourceSplitter} - * @return {@link ExchangeSourceSplitter} to be used for iterative splitting of a given partition - */ - ExchangeSourceSplitter split(ExchangeSourceHandle handle, long targetSizeInBytes); - @Override void close(); } diff --git a/core/trino-spi/src/main/java/io/trino/spi/exchange/ExchangeManager.java b/core/trino-spi/src/main/java/io/trino/spi/exchange/ExchangeManager.java index f8171324d164..7f4a5e2172ca 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/exchange/ExchangeManager.java +++ b/core/trino-spi/src/main/java/io/trino/spi/exchange/ExchangeManager.java @@ -73,9 +73,8 @@ public interface ExchangeManager * Called by a worker to create an {@link ExchangeSource} to read data corresponding to * a given list of exchange source handles. *

- * Usually a single {@link ExchangeSourceHandle} corresponds to a single output partition - * (see {@link ExchangeSink#add(int, Slice)}) unless a partition got split by calling - * {@link Exchange#split(ExchangeSourceHandle, long)}. + * A single {@link ExchangeSourceHandle} corresponds to a single output partition + * (see {@link ExchangeSink#add(int, Slice)}). *

* Based on the partition statistic (such as partition size) coordinator may also decide * to process several partitions by the same task. In such scenarios the handles diff --git a/core/trino-spi/src/main/java/io/trino/spi/exchange/ExchangeSourceSplitter.java b/core/trino-spi/src/main/java/io/trino/spi/exchange/ExchangeSourceSplitter.java deleted file mode 100644 index 19d543b39ec6..000000000000 --- a/core/trino-spi/src/main/java/io/trino/spi/exchange/ExchangeSourceSplitter.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.spi.exchange; - -import io.trino.spi.Experimental; - -import java.io.Closeable; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; - -@Experimental(eta = "2023-01-01") -public interface ExchangeSourceSplitter - extends Closeable -{ - /** - * Returns a future that will be completed when the splitter becomes unblocked. - */ - CompletableFuture isBlocked(); - - /** - * Returns next sub partition, or {@link Optional#empty()} if the splitting process is finished. - */ - Optional getNext(); - - @Override - void close(); -} diff --git a/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchange.java b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchange.java index 7990b6c364f6..982fc6357f05 100644 --- a/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchange.java +++ b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchange.java @@ -25,7 +25,6 @@ import io.trino.spi.exchange.ExchangeSinkHandle; import io.trino.spi.exchange.ExchangeSinkInstanceHandle; import io.trino.spi.exchange.ExchangeSourceHandle; -import io.trino.spi.exchange.ExchangeSourceSplitter; import javax.annotation.concurrent.GuardedBy; import javax.crypto.SecretKey; @@ -36,9 +35,9 @@ import java.net.URI; import java.security.Key; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; @@ -60,7 +59,6 @@ import static io.trino.plugin.exchange.filesystem.FileSystemExchangeSink.DATA_FILE_SUFFIX; import static java.lang.String.format; import static java.util.Objects.requireNonNull; -import static java.util.concurrent.CompletableFuture.completedFuture; public class FileSystemExchange implements Exchange @@ -77,6 +75,7 @@ public class FileSystemExchange private final boolean preserveOrderWithinPartition; private final int fileListingParallelism; private final Optional secretKey; + private final long exchangeSourceHandleTargetDataSizeInBytes; private final ExecutorService executor; private final Map randomizedPrefixes = new ConcurrentHashMap<>(); @@ -101,6 +100,7 @@ public FileSystemExchange( boolean preserveOrderWithinPartition, int fileListingParallelism, Optional secretKey, + long exchangeSourceHandleTargetDataSizeInBytes, ExecutorService executor) { List directories = new ArrayList<>(requireNonNull(baseDirectories, "baseDirectories is null")); @@ -115,6 +115,7 @@ public FileSystemExchange( this.fileListingParallelism = fileListingParallelism; this.secretKey = requireNonNull(secretKey, "secretKey is null"); + this.exchangeSourceHandleTargetDataSizeInBytes = exchangeSourceHandleTargetDataSizeInBytes; this.executor = requireNonNull(executor, "executor is null"); } @@ -211,7 +212,21 @@ private ListenableFuture> createExchangeSourceHandles partitionsList.forEach(partitions -> partitions.forEach(partitionFiles::put)); ImmutableList.Builder result = ImmutableList.builder(); for (Integer partitionId : partitionFiles.keySet()) { - result.add(new FileSystemExchangeSourceHandle(partitionId, ImmutableList.copyOf(partitionFiles.get(partitionId)), secretKey.map(SecretKey::getEncoded))); + Collection files = partitionFiles.get(partitionId); + long currentExchangeHandleDataSizeInBytes = 0; + ImmutableList.Builder currentExchangeHandleFiles = ImmutableList.builder(); + for (FileStatus file : files) { + if (currentExchangeHandleDataSizeInBytes > 0 && currentExchangeHandleDataSizeInBytes + file.getFileSize() > exchangeSourceHandleTargetDataSizeInBytes) { + result.add(new FileSystemExchangeSourceHandle(partitionId, currentExchangeHandleFiles.build(), secretKey.map(SecretKey::getEncoded))); + currentExchangeHandleDataSizeInBytes = 0; + currentExchangeHandleFiles = ImmutableList.builder(); + } + currentExchangeHandleDataSizeInBytes += file.getFileSize(); + currentExchangeHandleFiles.add(file); + } + if (currentExchangeHandleDataSizeInBytes > 0) { + result.add(new FileSystemExchangeSourceHandle(partitionId, currentExchangeHandleFiles.build(), secretKey.map(SecretKey::getEncoded))); + } } return result.build(); }, @@ -269,36 +284,6 @@ public CompletableFuture> getSourceHandles() return exchangeSourceHandlesFuture; } - @Override - public ExchangeSourceSplitter split(ExchangeSourceHandle handle, long targetSizeInBytes) - { - // Currently we only split at the file level, and external logic groups sources that are not large enough - FileSystemExchangeSourceHandle sourceHandle = (FileSystemExchangeSourceHandle) handle; - Iterator filesIterator = sourceHandle.getFiles().iterator(); - return new ExchangeSourceSplitter() - { - @Override - public CompletableFuture isBlocked() - { - return completedFuture(null); - } - - @Override - public Optional getNext() - { - if (filesIterator.hasNext()) { - return Optional.of(new FileSystemExchangeSourceHandle(sourceHandle.getPartitionId(), ImmutableList.of(filesIterator.next()), secretKey.map(SecretKey::getEncoded))); - } - return Optional.empty(); - } - - @Override - public void close() - { - } - }; - } - @Override public void close() { diff --git a/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeConfig.java b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeConfig.java index 34ec5fba4ddd..0bc5fe8fe507 100644 --- a/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeConfig.java +++ b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeConfig.java @@ -44,6 +44,7 @@ public class FileSystemExchangeConfig private int exchangeSourceConcurrentReaders = 4; private int maxOutputPartitionCount = 50; private int exchangeFileListingParallelism = 50; + private DataSize exchangeSourceHandleTargetDataSize = DataSize.of(256, MEGABYTE); @NotNull @NotEmpty(message = "At least one base directory needs to be configured") @@ -176,4 +177,18 @@ public FileSystemExchangeConfig setExchangeFileListingParallelism(int exchangeFi this.exchangeFileListingParallelism = exchangeFileListingParallelism; return this; } + + @NotNull + public DataSize getExchangeSourceHandleTargetDataSize() + { + return exchangeSourceHandleTargetDataSize; + } + + @Config("exchange.source-handle-target-data-size") + @ConfigDescription("Target size of the data referenced by a single source handle") + public FileSystemExchangeConfig setExchangeSourceHandleTargetDataSize(DataSize exchangeSourceHandleTargetDataSize) + { + this.exchangeSourceHandleTargetDataSize = exchangeSourceHandleTargetDataSize; + return this; + } } diff --git a/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeManager.java b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeManager.java index 6ae8afded4af..e35ddb0c2d04 100644 --- a/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeManager.java +++ b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeManager.java @@ -62,6 +62,7 @@ public class FileSystemExchangeManager private final int exchangeSourceConcurrentReaders; private final int maxOutputPartitionCount; private final int exchangeFileListingParallelism; + private final long exchangeSourceHandleTargetDataSizeInBytes; private final ExecutorService executor; @Inject @@ -83,6 +84,7 @@ public FileSystemExchangeManager( this.exchangeSourceConcurrentReaders = fileSystemExchangeConfig.getExchangeSourceConcurrentReaders(); this.maxOutputPartitionCount = fileSystemExchangeConfig.getMaxOutputPartitionCount(); this.exchangeFileListingParallelism = fileSystemExchangeConfig.getExchangeFileListingParallelism(); + this.exchangeSourceHandleTargetDataSizeInBytes = fileSystemExchangeConfig.getExchangeSourceHandleTargetDataSize().toBytes(); this.executor = newCachedThreadPool(daemonThreadsNamed("exchange-source-handles-creation-%s")); } @@ -115,6 +117,7 @@ public Exchange createExchange(ExchangeContext context, int outputPartitionCount preserveOrderWithinPartition, exchangeFileListingParallelism, secretKey, + exchangeSourceHandleTargetDataSizeInBytes, executor); } diff --git a/plugin/trino-exchange-filesystem/src/test/java/io/trino/plugin/exchange/filesystem/AbstractTestExchangeManager.java b/plugin/trino-exchange-filesystem/src/test/java/io/trino/plugin/exchange/filesystem/AbstractTestExchangeManager.java index c18599409ffb..8025439292b0 100644 --- a/plugin/trino-exchange-filesystem/src/test/java/io/trino/plugin/exchange/filesystem/AbstractTestExchangeManager.java +++ b/plugin/trino-exchange-filesystem/src/test/java/io/trino/plugin/exchange/filesystem/AbstractTestExchangeManager.java @@ -15,6 +15,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableListMultimap; +import com.google.common.collect.ListMultimap; import com.google.common.collect.Multimap; import io.airlift.slice.Slice; import io.airlift.slice.Slices; @@ -36,6 +37,7 @@ import java.util.Map; import java.util.function.Function; +import static com.google.common.collect.ImmutableListMultimap.toImmutableListMultimap; import static com.google.common.collect.ImmutableMap.toImmutableMap; import static io.airlift.concurrent.MoreFutures.getFutureValue; import static io.airlift.units.DataSize.Unit.BYTE; @@ -210,10 +212,10 @@ public void testLargePages() exchange.sinkFinished(sinkInstanceHandle); List partitionHandles = exchange.getSourceHandles().get(); - assertThat(partitionHandles).hasSize(3); + assertThat(partitionHandles).hasSize(10); - Map partitions = partitionHandles.stream() - .collect(toImmutableMap(ExchangeSourceHandle::getPartitionId, Function.identity())); + ListMultimap partitions = partitionHandles.stream() + .collect(toImmutableListMultimap(ExchangeSourceHandle::getPartitionId, Function.identity())); assertThat(readData(partitions.get(0))) .containsExactlyInAnyOrder(smallPage, mediumPage, largePage, maxPage); diff --git a/plugin/trino-exchange-filesystem/src/test/java/io/trino/plugin/exchange/filesystem/TestFileSystemExchangeConfig.java b/plugin/trino-exchange-filesystem/src/test/java/io/trino/plugin/exchange/filesystem/TestFileSystemExchangeConfig.java index 1ae5cd1ee4fc..09592070dee3 100644 --- a/plugin/trino-exchange-filesystem/src/test/java/io/trino/plugin/exchange/filesystem/TestFileSystemExchangeConfig.java +++ b/plugin/trino-exchange-filesystem/src/test/java/io/trino/plugin/exchange/filesystem/TestFileSystemExchangeConfig.java @@ -39,7 +39,8 @@ public void testDefaults() .setExchangeSinkMaxFileSize(DataSize.of(1, GIGABYTE)) .setExchangeSourceConcurrentReaders(4) .setMaxOutputPartitionCount(50) - .setExchangeFileListingParallelism(50)); + .setExchangeFileListingParallelism(50) + .setExchangeSourceHandleTargetDataSize(DataSize.of(256, MEGABYTE))); } @Test @@ -55,6 +56,7 @@ public void testExplicitPropertyMappings() .put("exchange.source-concurrent-readers", "10") .put("exchange.max-output-partition-count", "53") .put("exchange.file-listing-parallelism", "20") + .put("exchange.source-handle-target-data-size", "1GB") .buildOrThrow(); FileSystemExchangeConfig expected = new FileSystemExchangeConfig() @@ -66,7 +68,8 @@ public void testExplicitPropertyMappings() .setExchangeSinkMaxFileSize(DataSize.of(2, GIGABYTE)) .setExchangeSourceConcurrentReaders(10) .setMaxOutputPartitionCount(53) - .setExchangeFileListingParallelism(20); + .setExchangeFileListingParallelism(20) + .setExchangeSourceHandleTargetDataSize(DataSize.of(1, GIGABYTE)); assertFullMapping(properties, expected); } diff --git a/plugin/trino-exchange-filesystem/src/test/java/io/trino/plugin/exchange/filesystem/containers/MinioStorage.java b/plugin/trino-exchange-filesystem/src/test/java/io/trino/plugin/exchange/filesystem/containers/MinioStorage.java index f81170603336..a7bcfd16e8e1 100644 --- a/plugin/trino-exchange-filesystem/src/test/java/io/trino/plugin/exchange/filesystem/containers/MinioStorage.java +++ b/plugin/trino-exchange-filesystem/src/test/java/io/trino/plugin/exchange/filesystem/containers/MinioStorage.java @@ -95,6 +95,8 @@ public static Map getExchangeManagerProperties(MinioStorage mini .put("exchange.s3.aws-secret-key", MinioStorage.SECRET_KEY) .put("exchange.s3.region", "us-east-1") .put("exchange.s3.endpoint", "http://" + minioStorage.getMinio().getMinioApiEndpoint()) + // create more granular source handles given the fault-tolerant-execution-target-task-input-size is set to lower value for testing + .put("exchange.source-handle-target-data-size", "1MB") .buildOrThrow(); } } diff --git a/plugin/trino-exchange-filesystem/src/test/java/io/trino/plugin/exchange/filesystem/local/TestLocalFileSystemExchangeManager.java b/plugin/trino-exchange-filesystem/src/test/java/io/trino/plugin/exchange/filesystem/local/TestLocalFileSystemExchangeManager.java index afebeafb86f3..599557239695 100644 --- a/plugin/trino-exchange-filesystem/src/test/java/io/trino/plugin/exchange/filesystem/local/TestLocalFileSystemExchangeManager.java +++ b/plugin/trino-exchange-filesystem/src/test/java/io/trino/plugin/exchange/filesystem/local/TestLocalFileSystemExchangeManager.java @@ -29,6 +29,7 @@ protected ExchangeManager createExchangeManager() return new FileSystemExchangeManagerFactory().create(ImmutableMap.of( "exchange.base-directories", baseDirectory1 + "," + baseDirectory2, // to trigger file split in some tests - "exchange.sink-max-file-size", "16MB")); + "exchange.sink-max-file-size", "16MB", + "exchange.source-handle-target-data-size", "1MB")); } }