Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add agg final spill event if have spill data #9023

Merged
merged 5 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ namespace DB
M(force_fap_worker_throw) \
M(delta_tree_create_node_fail) \
M(disable_flush_cache) \
M(force_agg_two_level_hash_table_before_merge)
M(force_agg_two_level_hash_table_before_merge) \
M(force_thread_0_no_agg_spill)

#define APPLY_FOR_PAUSEABLE_FAILPOINTS_ONCE(M) \
M(pause_with_alter_locks_acquired) \
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Planner/Plans/PhysicalAggregationBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ EventPtr PhysicalAggregationBuild::doSinkComplete(PipelineExecutorContext & exec
}
}

if (need_final_spill)
if (aggregate_context->hasSpilledData() || need_final_spill)
{
/// Currently, the aggregation spill algorithm requires all bucket data to be spilled,
/// so a new event is added here to execute the final spill.
Expand Down
18 changes: 18 additions & 0 deletions dbms/src/Flash/tests/gtest_spill_aggregation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ namespace DB
namespace FailPoints
{
extern const char force_agg_on_partial_block[];
extern const char force_thread_0_no_agg_spill[];
} // namespace FailPoints

namespace tests
Expand All @@ -47,6 +48,17 @@ class SpillAggregationTestRunner : public DB::tests::ExecutorTest

#define WRAP_FOR_AGG_PARTIAL_BLOCK_END }

#define WRAP_FOR_AGG_THREAD_0_NO_SPILL_START \
for (auto thread_0_no_spill : {true, false}) \
{ \
if (thread_0_no_spill) \
FailPointHelper::enableFailPoint(FailPoints::force_thread_0_no_agg_spill); \
else \
FailPointHelper::disableFailPoint(FailPoints::force_thread_0_no_agg_spill);

#define WRAP_FOR_AGG_THREAD_0_NO_SPILL_END }


#define WRAP_FOR_SPILL_TEST_BEGIN \
std::vector<bool> pipeline_bools{false, true}; \
for (auto enable_pipeline : pipeline_bools) \
Expand Down Expand Up @@ -103,9 +115,11 @@ try
/// don't use `executeAndAssertColumnsEqual` since it takes too long to run
/// test single thread aggregation
WRAP_FOR_AGG_PARTIAL_BLOCK_START
WRAP_FOR_AGG_THREAD_0_NO_SPILL_START
ASSERT_COLUMNS_EQ_UR(ref_columns, executeStreams(request, 1));
/// test parallel aggregation
ASSERT_COLUMNS_EQ_UR(ref_columns, executeStreams(request, original_max_streams));
WRAP_FOR_AGG_THREAD_0_NO_SPILL_END
WRAP_FOR_AGG_PARTIAL_BLOCK_END
/// enable spill and use small max_cached_data_bytes_in_spiller
context.context->setSetting("max_cached_data_bytes_in_spiller", Field(static_cast<UInt64>(total_data_size / 200)));
Expand Down Expand Up @@ -249,6 +263,7 @@ try
context.context->setSetting("max_block_size", Field(static_cast<UInt64>(max_block_size)));
WRAP_FOR_SPILL_TEST_BEGIN
WRAP_FOR_AGG_PARTIAL_BLOCK_START
WRAP_FOR_AGG_THREAD_0_NO_SPILL_START
auto blocks = getExecuteStreamsReturnBlocks(request, concurrency);
for (auto & block : blocks)
{
Expand All @@ -273,6 +288,7 @@ try
vstackBlocks(std::move(blocks)).getColumnsWithTypeAndName(),
false));
}
WRAP_FOR_AGG_THREAD_0_NO_SPILL_END
WRAP_FOR_AGG_PARTIAL_BLOCK_END
WRAP_FOR_SPILL_TEST_END
}
Expand Down Expand Up @@ -402,6 +418,7 @@ try
context.context->setSetting("max_block_size", Field(static_cast<UInt64>(max_block_size)));
WRAP_FOR_SPILL_TEST_BEGIN
WRAP_FOR_AGG_PARTIAL_BLOCK_START
WRAP_FOR_AGG_THREAD_0_NO_SPILL_START
auto blocks = getExecuteStreamsReturnBlocks(request, concurrency);
for (auto & block : blocks)
{
Expand All @@ -426,6 +443,7 @@ try
vstackBlocks(std::move(blocks)).getColumnsWithTypeAndName(),
false));
}
WRAP_FOR_AGG_THREAD_0_NO_SPILL_END
WRAP_FOR_AGG_PARTIAL_BLOCK_END
WRAP_FOR_SPILL_TEST_END
}
Expand Down
7 changes: 7 additions & 0 deletions dbms/src/Interpreters/AggSpillContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ namespace DB
namespace FailPoints
{
extern const char random_marked_for_auto_spill[];
extern const char force_thread_0_no_agg_spill[];
} // namespace FailPoints

AggSpillContext::AggSpillContext(
Expand Down Expand Up @@ -55,6 +56,12 @@ bool AggSpillContext::updatePerThreadRevocableMemory(Int64 new_value, size_t thr
if (new_value == 0)
// new_value == 0 means no agg data to spill
return false;
fiu_do_on(FailPoints::force_thread_0_no_agg_spill, {
if (thread_num == 0)
{
return false;
}
});
if (auto_spill_mode)
{
AutoSpillStatus old_value = AutoSpillStatus::NEED_AUTO_SPILL;
Expand Down
6 changes: 5 additions & 1 deletion dbms/src/Interpreters/Aggregator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ bool AggregatedDataVariants::tryMarkNeedSpill()
/// Data can only be flushed to disk if a two-level aggregation is supported.
if (!isConvertibleToTwoLevel())
return false;
convertToTwoLevel();
SeaRise marked this conversation as resolved.
Show resolved Hide resolved
}
need_spill = true;
return true;
Expand Down Expand Up @@ -1028,6 +1027,11 @@ void Aggregator::spill(AggregatedDataVariants & data_variants, size_t thread_num
{
assert(data_variants.need_spill);
agg_spill_context->markSpilled();
if unlikely (!data_variants.isTwoLevel())
{
assert(isConvertibleToTwoLevel());
data_variants.convertToTwoLevel();
}
/// Flush only two-level data and possibly overflow data.
#define M(NAME) \
case AggregationMethodType(NAME): \
Expand Down