-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Description
Describe the bug
When aggregating with first_value
with an array as the grouping and using a memory limit, I get a Resource Exhausted error showing an unexpectedly high memory usage that causes to fail the query even on very reasonable data volumes. More specifically, for this query:
SELECT team, first_value(game_id order by score) AS game_with_max_score
FROM games
GROUP BY team;
When game_id
is an array of string and the games
table is 100MiB overall, this query fails with Resource Exhausted when using a 10GiB limit. With the same query and same table size, but using a string instead of an array of strings, the query passes without any issue.
After investigation, it seems that there are some memory management and accounting issues when aggregating on arrays (see analysis below).
To Reproduce
-
Run
datafusion-cli -m 10g
(tested with DataFusion 47) to enforce a 10GiB memory limit. -
Create a table with 100,000 rows, and 3 columns:
game_id
is a string array column. Each row is an array containing one string with 1000 characters.score
is an integer between 1 and 1000 (value is irrelevant)team
is a small string field of cardinality 1000 (there are 1000 different values)
CREATE TABLE games AS
SELECT
ARRAY[LPAD((random() * 100000000)::text, 1000, 'x')] AS game_id,
(random() * 100)::int AS score,
'team_' || (random() * 1000)::int AS team
FROM generate_series(1, 100000);
This table should be roughly 100MiB in memory (100,000 rows * 1000 characters per rows for game_id
, neglecting the two other smaller columns).
- Run the following query:
SELECT team, first_value(game_id order by score) AS game_with_max_score
FROM games
GROUP BY team;
And get the following error:
Resources exhausted: Failed to allocate additional 8422146760 bytes for GroupedHashAggregateStream[1] with 0 bytes already allocated for this reservation - 2315271480 bytes remain available for the total pool
If we replace ARRAY[LPAD((random() * 100000000)::text, 1000, 'x')] AS game_id
with LPAD((random() * 100000000)::text, 1000, 'x') AS game_id
(same data, same volume but going from array of string to string), the query passes with a 10MiB limit (datafusion-cli -m 10m
, 1000 times less), which is the expected behavior: 1000 groups (team
s here) * 1000 characters is supposed to be O(1MiB) in memory.
(Thank you @LiaCastaneda for coming up with this minimal reproducer!)
Expected behavior
At equivalent data volumes, aggregations on arrays of strings should succeed with the same memory limit as aggregations on strings.
Additional context
Investigation
Root Cause
first_value
relies on FirstValueAccumulator. The accumulated group value is a ScalarValue
, that is created with get_row_at_idx
that calls try_from_array
.
This function behaves differently for scalars and arrays:
- For scalars,
array.value(idx)
returns a native value (for example a bool in the expansion oftyped_cast!
here or an i128 there - But for
List
s (Arrow representation of SQL arrays in this case),array.value(idx)
returns anArrayRef
that is a slice of the original larger array. This means that the memory footprint of this new slice (the retained group value) is the size of the underlying Arrow Buffer, which is the size of the whole column (and not just the size of the value itself).
This has two consequences (for arrays only, scalars are all good):
- When all the group values (this field, for
first_value
) come from the sameRecordBatch
the memory footprint of the wholeRecordBatch
is accounted for once per group value, instead of just accounting for the group value sizes. - (not the bug shown, but another problematic consequence) When different group values actually come from different
RecordBatch
es, we retain in memory all the whole record batches that contained each of the retained group values. This is an over usage of memory.
In the first case, queries that should be able to run cannot run, and in the second it can have a much larger actual memory footprint than required, breaking the advantage of the streaming model.
Side Note
Although I don't think it's the main issue, another contributing factor may be that group accumulators are not yet supported for arrays for first_value
(Arrow lists).
Possible fix
I am not sure what is the exact right way of fixing this (lacking knowledge on available Arrow APIs), but we should be retaining copies of arrays instead of a slice pointing to the original array when creating ScalarValues from Lists.
One systematic fix could be to just address this function to work on a copy.
I'd be happy to work on a fix, but would appreciate some guidance on the preferred way forward!