-
Notifications
You must be signed in to change notification settings - Fork 3.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Consider memory during writer scaling #18771
Conversation
d5a3407
to
4d2b7fd
Compare
import static java.util.Objects.requireNonNull; | ||
import static java.util.function.Function.identity; | ||
|
||
@ThreadSafe | ||
public class LocalExchange | ||
{ | ||
private static final int SCALE_WRITERS_MAX_PARTITIONS_PER_WRITER = 128; | ||
public static final double SCALE_WRITERS_MAX_MEMORY_RATIO = 0.7; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe 0.2, 0.5? IMO the totalMemoryUsed
should be pretty accurate during scaling up, because the writers are supposed to be full
This should account for how many writers we can add up in single iteration (e.g. scale by 2x, 1.5x)?
core/trino-main/src/main/java/io/trino/operator/output/SkewedPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/ScaleWriterExchanger.java
Outdated
Show resolved
Hide resolved
e82d241
to
212804c
Compare
212804c
to
f52bb4e
Compare
core/trino-main/src/main/java/io/trino/execution/scheduler/ScaledWriterScheduler.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/execution/scheduler/ScaledWriterScheduler.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/execution/TaskStatus.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/execution/scheduler/ScaledWriterScheduler.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/execution/TaskManagerConfig.java
Outdated
Show resolved
Hide resolved
// because preferred write partitioning is always enabled for local exchange thus partitioned inserts will never | ||
// use this property. Hence, there is no risk in terms of more numbers of physical writers which can cause high | ||
// resource utilization. | ||
private int scaleWritersMaxWriterCount = min(getAvailablePhysicalProcessorCount(), 32); | ||
private int scaleWritersMaxWriterCount = min(getAvailablePhysicalProcessorCount(), 32) * 2; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Commit message says same value as the number of worker threads
, but maxWorkerThreads
is Runtime.getRuntime().availableProcessors() * 2
which is a different number
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sopel39 Should we set this to the same number? I think it makes sense in case there are more cores.
c63376b
to
0c38e6e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's fine, but it becomes pretty sophisticated.
I was thinking about something simpler, like task.scale-writers.rebalace-multiplier=2
where we essentially can multiply partition writers up to a factor (decided by user)
core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/output/SkewedPartitionRebalancer.java
Show resolved
Hide resolved
// use this property. Hence, there is no risk in terms of more numbers of physical writers which can cause high | ||
// resource utilization. | ||
private int scaleWritersMaxWriterCount = min(getAvailablePhysicalProcessorCount(), 32); | ||
// Set the value of default max writer count to the number of processors * 2 and cap it to 64. We can set this value |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as discussed we might simplify properties:
min-writer-count
max-writer-count
scale-writers.enabled
(remove partition writer count)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm creating another PR for this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
core/trino-main/src/main/java/io/trino/operator/output/SkewedPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java
Outdated
Show resolved
Hide resolved
f0cda79
to
804cec8
Compare
804cec8
to
3d233a8
Compare
core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/test/java/io/trino/operator/output/TestSkewedPartitionRebalancer.java
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/SystemSessionProperties.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/LocalExchange.java
Outdated
Show resolved
Hide resolved
de7d6ba
to
9b8da1a
Compare
If we don't reset then it's possible that an unskewed partition gets scaled eventually since partitionDataSizeSinceLastRebalance gets accumulated over multiple rebalance cycles. This causes unnecessary partitions to get overscaled which can lead to high memory usage.
This session property specifies the minimum data processed to trigger skewed partition rebalancing in local and remote exchange. Currently, the value of this property is too small and since now we do not work with physicalWrittenBytes in scaling, we can increase the value of this property. This will also make scaling a bit less aggressive.
e83e2f5
to
7999410
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm % comments
core/trino-main/src/main/java/io/trino/operator/exchange/LocalExchange.java
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/LocalExchange.java
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/output/SkewedPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/output/SkewedPartitionRebalancer.java
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/output/SkewedPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java
Outdated
Show resolved
Hide resolved
@@ -581,10 +585,14 @@ public LocalExecutionPlan plan( | |||
int taskCount = getTaskCount(partitioningScheme); | |||
if (checkCanScalePartitionsRemotely(taskContext.getSession(), taskCount, partitioningScheme.getPartitioning().getHandle(), nodePartitioningManager)) { | |||
partitionFunction = createPartitionFunction(taskContext.getSession(), nodePartitioningManager, partitioningScheme, partitionChannelTypes); | |||
// Consider memory while calculating the number of writers. This is to avoid creating too many task buckets. | |||
int partitionedWriterCount = min( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just comment: we kind of have no visibility into partitionedWriterCount
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WDYM?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean that it's value that is computed internally, but user isn't really aware how big it is. Hence it could potentially be suprising.
core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java
Outdated
Show resolved
Hide resolved
Add maxRebalancedPartitions limit to SkewedPartitionRebalancer such that in extreme cases we don't do over scaling which can cause high memory utilization. The value of maxRebalancedPartitions is calculated using max memory allowed per node.
7999410
to
8adfdf7
Compare
Since writers are both CPU and IO bound, we should increase them to the same value as the number of worker threads.
8adfdf7
to
ce62b56
Compare
core/trino-main/src/main/java/io/trino/operator/exchange/LocalExchange.java
Show resolved
Hide resolved
merged, thanks! |
public static int getScaleWritersMaxSkewedPartitions(Session session) | ||
{ | ||
// Set the value of maxSkewedPartitions to scale to 60% of maximum number of writers possible per node. | ||
return (int) (ceil((double) getQueryMaxMemoryPerNode(session).toBytes() / MAX_MEMORY_PER_PARTITION_WRITER) * 0.60); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: comment where 60% came from.
// Keep the maxPartitionsToRebalance to atleast writer count such that single partition writes do | ||
// not suffer from skewness and can scale uniformly across all writers. Additionally, note that | ||
// maxWriterCount is calculated considering memory into account. So, it is safe to set the | ||
// maxPartitionsToRebalance to maximum number of writers. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, it is safe to set the maxPartitionsToRebalance to maximum number of writers.
That kind of depends on order whether you will scale first or see all partitions first.
@@ -3497,24 +3503,29 @@ private int getWriterCount(Session session, WriterScalingOptions connectorScalin | |||
return 1; | |||
} | |||
|
|||
int maxWritersBasedOnMemory = getMaxPartitionWritersBasedOnMemory(session); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: just getMaxWritersBasedOnMemory
if (partitioningScheme.isPresent()) { | ||
// The default value of partitioned writer count is 32 which is high enough to use it | ||
// for both cases when scaling is enabled or not. Additionally, it doesn't lead to too many | ||
// small files since when scaling is disabled only single writer will handle a single partition. | ||
int partitionedWriterCount = getTaskWriterCount(session); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't it be getTaskPartitionedWriterCount
as it was getTaskPartitionedWriterCount(session)
before? This is unpartitioned, unscaled case.
.map(writerCount -> min(writerCount, getTaskPartitionedWriterCount(session))) | ||
.orElse(getTaskPartitionedWriterCount(session)); | ||
} | ||
return getTaskPartitionedWriterCount(session); | ||
// Consider memory while calculating writer count. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you could extract min(partitionedWriterCount, previousPowerOfTwo(maxWritersBasedOnMemory))
and use it here and in plan
method. Then it would be obvious it's same thing and where it comes from
Description
Benchmarks (Unpartitioned)
Before:
After:
Benchmarks (Partitioned)
Before:
After:
However, note that this query takes ~4 minutes with 64GB query_max_memory_per_node.
Additional context and related issues
Release notes
( ) This is not user-visible or is docs only, and no release notes are required.
( ) Release notes are required. Please propose a release note for me.
( ) Release notes are required, with the following suggested text: