Skip to content

Commit

Permalink
Internal duckdb#2850: Window Local States
Browse files Browse the repository at this point in the history
Fix thread count race condition.

fixes: duckdblabs/duckdb-internal#2850
  • Loading branch information
hawkfish committed Aug 26, 2024
1 parent f94b8ac commit 9541354
Showing 1 changed file with 8 additions and 3 deletions.
11 changes: 8 additions & 3 deletions src/execution/operator/aggregate/physical_window.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ class WindowGlobalSourceState : public GlobalSourceState {
//! State mutex
mutable mutex lock;
//! The number of local states
atomic<idx_t> threads;
atomic<idx_t> locals;
//! The list of tasks
vector<Task> tasks;
//! The the next task
Expand All @@ -392,7 +392,7 @@ class WindowGlobalSourceState : public GlobalSourceState {
};

WindowGlobalSourceState::WindowGlobalSourceState(ClientContext &context_p, WindowGlobalSinkState &gsink_p)
: context(context_p), gsink(gsink_p), threads(0), next_task(0), finished(0), stopped(false), returned(0) {
: context(context_p), gsink(gsink_p), locals(0), next_task(0), finished(0), stopped(false), returned(0) {
auto &gpart = gsink.global_partition;
auto &window_hash_groups = gsink.global_partition->window_hash_groups;

Expand Down Expand Up @@ -449,8 +449,13 @@ void WindowGlobalSourceState::CreateTaskList() {
std::sort(partition_blocks.begin(), partition_blocks.end(), std::greater<PartitionBlock>());

// Schedule the largest group on as many threads as possible
const auto threads = locals.load();
const auto &max_block = partition_blocks.front();
const auto per_thread = (max_block.first + threads - 1) / threads;
if (!per_thread) {
throw InternalException("No blocks per thread! %ld threads, %ld groups, %ld blocks, %ld hash group", threads,
partition_blocks.size(), max_block.first, max_block.second);
}

// TODO: Generate dynamically instead of building a big list?
vector<WindowGroupStage> states {WindowGroupStage::SINK, WindowGroupStage::FINALIZE, WindowGroupStage::GETDATA};
Expand Down Expand Up @@ -705,7 +710,7 @@ WindowLocalSourceState::WindowLocalSourceState(WindowGlobalSourceState &gsource)
}
output_chunk.Initialize(global_partition.allocator, output_types);

++gsource.threads;
++gsource.locals;
}

bool WindowGlobalSourceState::TryNextTask(TaskPtr &task) {
Expand Down

0 comments on commit 9541354

Please sign in to comment.