-
Notifications
You must be signed in to change notification settings - Fork 3.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor exchange interfaces #13968
Refactor exchange interfaces #13968
Conversation
On top of #13945 |
core/trino-main/src/main/java/io/trino/execution/scheduler/Exchanges.java
Show resolved
Hide resolved
core/trino-main/src/test/java/io/trino/execution/scheduler/TestingExchange.java
Show resolved
Hide resolved
core/trino-spi/src/main/java/io/trino/spi/exchange/ExchangeSourceHandleSource.java
Outdated
Show resolved
Hide resolved
...xchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchange.java
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/ExchangeOperator.java
Outdated
Show resolved
Hide resolved
noMoreSplitsTracker.noMoreOperators(); | ||
if (noMoreSplitsTracker.isNoMoreSplits()) { | ||
if (exchangeDataSource != null) { | ||
exchangeDataSource.noMoreInputs(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this can be called multiple times due to races. Is that a problem?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is allowed in other place, see https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/server/protocol/Query.java#L587 which can be called multiple times today through https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/execution/scheduler/PipelinedStageExecution.java#L187. Though I don't have a strong opinion here.
for (DriverFactory driverFactory : localExecutionPlan.getDriverFactories()) { | ||
Optional<PlanNodeId> sourceId = driverFactory.getSourceId(); | ||
if (sourceId.isPresent() && partitionedSources.contains(sourceId.get())) { | ||
driverRunnerFactoriesWithSplitLifeCycle.put(sourceId.get(), new DriverSplitRunnerFactory(driverFactory, true)); | ||
} | ||
else { | ||
driverRunnerFactoriesWithTaskLifeCycle.add(new DriverSplitRunnerFactory(driverFactory, false)); | ||
DriverSplitRunnerFactory runnerFactory = new DriverSplitRunnerFactory(driverFactory, false); | ||
sourceId.ifPresent(planNodeId -> driverRunnerFactoriesWithRemoteSource.put(planNodeId, runnerFactory)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I do not fully understand that. Does it mean that if sourceId
is in partitionedSources
we are not reading from remote source?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, this is the logic today. Basically "partitionedSources" is what we call table scans. Other sources are remote sources that are run with FIXED
distribution
core/trino-main/src/main/java/io/trino/execution/SqlTaskExecution.java
Outdated
Show resolved
Hide resolved
...e-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeSource.java
Show resolved
Hide resolved
...e-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeSource.java
Outdated
Show resolved
Hide resolved
...e-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeSource.java
Show resolved
Hide resolved
...e-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeSource.java
Outdated
Show resolved
Hide resolved
@@ -249,6 +249,13 @@ public OutputBuffers withNoMoreBufferIds() | |||
return new OutputBuffers(type, version + 1, true, buffers, exchangeSinkInstanceHandle); | |||
} | |||
|
|||
public OutputBuffers withExchangeSinkInstanceHandle(ExchangeSinkInstanceHandle updatedExchangeSinkInstanceHandle) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
withUpdatedExchangeSinkInstanceHandle?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not called yet. To be added separatelly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, let me drop it for now
core/trino-spi/src/main/java/io/trino/spi/exchange/ExchangeManager.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly good. Some minor comments and questions when I do not fully understand.
5949398
to
1b801c7
Compare
On top of #13978 |
What's the goal of the refactor? Does it have change to introduce regressions/conjestions? Are benchmarks required? Please mind that #13463 should probably land first (as it was started earlier) |
@sopel39 This refactor makes Exchange interfaces more flexible allowing integration with more advanced exchange implementations |
core/trino-main/src/main/java/io/trino/operator/DeduplicatingDirectExchangeBuffer.java
Outdated
Show resolved
Hide resolved
return (getUtilization() > 0.5) && stateMachine.getState().canAddPages(); | ||
// do not grab lock to acquire outputBuffers to avoid delaying TaskStatus response | ||
return OutputBufferStatus.builder(outputBuffers.getVersion()) | ||
.setOverutilized(memoryManager.getUtilization() >= 0.5 || !stateMachine.getState().canAddPages()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this changes logic - new version looks ok. Was there a bug? Looks like fix itself should be a separate commit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. Let me revert it to what it were. Changing the logic wasn't an intention I had.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note errorprone failures
1b801c7
to
753c034
Compare
b407f58
to
a208556
Compare
core/trino-spi/src/main/java/io/trino/spi/exchange/ExchangeSourceHandleSource.java
Outdated
Show resolved
Hide resolved
core/trino-spi/src/main/java/io/trino/spi/exchange/ExchangeSourceHandleSource.java
Show resolved
Hide resolved
long readerFileSize = 0; | ||
while (!files.isEmpty()) { | ||
ExchangeSourceFile file = files.peek(); | ||
if (readerFileSize == 0 || readerFileSize + file.getFileSize() <= maxPageStorageSize + exchangeStorage.getWriteBufferSize()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
readerFileSize + file.getFileSize() <= maxPageStorageSize + exchangeStorage.getWriteBufferSize()
What do we need this? I don't get it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To parallelize reads. Basically I don't want to add more files to a reader than necessary to let the other readers do the processing in parallel.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't quite understand. Shouldn't we let all readers try to poll from the queue like what we used to do?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was trying to implement it the way it was (with the pull based model) but then the flow is becoming significantly more complex, as the engine delivers splits through push model. I looked at the reader implementation and thought that there's little reason to reuse the readers. A new buffer is allocated any way, and as long as we provide enough files to fill the entire buffer parallelism shouldn't be impacted. Do you see any potential issues with the new approach?
core/trino-spi/src/main/java/io/trino/spi/exchange/ExchangeSink.java
Outdated
Show resolved
Hide resolved
catch (TimeoutException e) { | ||
updateSinkInstanceHandleIfNecessary(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain the logic here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When an ExchangeSink
is blocked there's a change that the handle needs to be updated. This waits for some amount of time, and if the ExchangeSink
is not unblocked - we check if the handle needs to get updated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we refer to a handle update, it means that we switch to a different buffer node to write to? But we only switch to a different buffer node when the old buffer node is offline or is being drained, right? How does it related to how much time we are blocked?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't. The idea is that when an ExchangeSink
needs a handle update it will get blocked as it will no longer able to accept new data. Do you see any use case when we would like to refresh the handle while the ExchangeSink
is still able to accept some data?
Closer closer = Closer.create(); | ||
for (ExchangeStorageReader reader : readers.getAndSet(ImmutableList.of())) { | ||
closer.register(reader); | ||
} | ||
try { | ||
closer.close(); | ||
} | ||
catch (IOException e) { | ||
throw new UncheckedIOException(e); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does using Closer make a difference at all?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's safer as it makes sure every object is closed even if one of the close
method threw and exception.
a208556
to
6907088
Compare
Preparation for introduction of ExchangeSinkInstanceHandle refresh. When ExchangeSinkInstanceHandle can be refreshed it is no longer clear what instance of the ExchangeSinkInstanceHandle should be passed to the finish method.
Upon ExchangeSink request
6907088
to
0accd89
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@linzebing updated
Benchmark results: Overall:
Suites:
Detailed: https://gist.github.com/arhimondr/7a116dc95ce63a7b62ec82b0df6123c5 No major difference in CPU and Wall timings |
Description
See commit messages for more details
Refactoring
Core engine
N/A
Related issues, pull requests, and links
Documentation
(X) No documentation is needed.
( ) Sufficient documentation is included in this PR.
( ) Documentation PR is available with #prnumber.
( ) Documentation issue #issuenumber is filed, and can be handled later.
Release notes
(X) No release notes entries required.
( ) Release notes entries required with the following suggested text: