Skip to content

Implementation for task_group dynamic dependencies - part 2 - set_task_order #1685

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 26 commits into
base: dev/kboyarinov/poc-dynamic-dependencies-part1
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
9ca1ec8
Introduce make_edge
kboyarinov Mar 20, 2025
c87da0d
Merge branch 'dev/kboyarinov/poc-dynamic-dependencies-part1' into dev…
kboyarinov Mar 21, 2025
7a7fdaa
Introduce successors_list_node
kboyarinov Mar 24, 2025
2b93bff
Minor cleanup
kboyarinov Mar 24, 2025
0dc6885
Add CI branch
kboyarinov Mar 24, 2025
5f68c8d
Fix and cleanup
kboyarinov Mar 24, 2025
9df6152
Add dead successors list
kboyarinov Mar 26, 2025
c745a1b
Change flag value, fix test fails on windows
kboyarinov Mar 26, 2025
2a02a26
Fix constant condition
kboyarinov Mar 27, 2025
acd9085
Merge branch 'dev/kboyarinov/poc-dynamic-dependencies-part1' into dev…
kboyarinov May 6, 2025
0d10d04
Merge remote-tracking branch 'origin/dev/kboyarinov/poc-dynamic-depen…
kboyarinov May 16, 2025
d4d33ed
Cleanup
kboyarinov May 16, 2025
311c6b2
Revert is_successors_list_alive
kboyarinov May 16, 2025
eff9d0a
is_current_list_alive -> is_alive
kboyarinov May 27, 2025
7df8e0a
Merge remote-tracking branch 'origin/dev/kboyarinov/poc-dynamic-depen…
kboyarinov May 30, 2025
e457f16
Add test for bypassing task with dependencies
kboyarinov May 30, 2025
0d58c47
Merge remote-tracking branch 'origin/dev/kboyarinov/poc-dynamic-depen…
kboyarinov Jul 11, 2025
dc86b79
Resolve merging issues
kboyarinov Jul 11, 2025
3d401b8
Class & variable renaming
kboyarinov Jul 11, 2025
02d9392
Add UXL Copyrights
kboyarinov Jul 11, 2025
1d9513d
Merge remote-tracking branch 'origin/dev/kboyarinov/poc-dynamic-depen…
kboyarinov Jul 21, 2025
2483e1e
Complete renaming, suppress warning in test
kboyarinov Jul 21, 2025
aaa07ad
Use separate state for dependencies and transferring
kboyarinov Jul 22, 2025
0e4479b
Remove continuation_vertex class + refactoring
kboyarinov Aug 5, 2025
937fa30
Remove unnecessary function
kboyarinov Aug 5, 2025
cc3db31
Rever protected field in reference_vertex
kboyarinov Aug 5, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ on:
branches: [master]

pull_request:
branches: [master]
branches: [master, dev/kboyarinov/poc-dynamic-dependencies-part1]
types:
- opened
- synchronize
Expand Down
168 changes: 160 additions & 8 deletions include/oneapi/tbb/detail/_task_handle.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,47 @@ class task_handle;

#if __TBB_PREVIEW_TASK_GROUP_EXTENSIONS

class task_handle_task;
class task_dynamic_state;

class successor_list_node {
public:
successor_list_node(task_dynamic_state* continuation, d1::small_object_allocator& alloc)
: m_next_node(nullptr)
, m_continuation(continuation)
, m_allocator(alloc)
{}

task_dynamic_state* get_continuation() const {
return m_continuation;
}

successor_list_node* get_next_node() const {
return m_next_node;
}

void set_next_node(successor_list_node* next_node) {
m_next_node = next_node;
}

void finalize() {
m_allocator.delete_object(this);
}
private:
successor_list_node* m_next_node;
task_dynamic_state* m_continuation;
d1::small_object_allocator m_allocator;
};

inline task_handle_task* release_successor_list(successor_list_node* list);

class task_dynamic_state {
public:
task_dynamic_state(d1::small_object_allocator& alloc)
: m_num_references(1) // reserves a task co-ownership for dynamic state
task_dynamic_state(task_handle_task* task, d1::small_object_allocator& alloc)
: m_task(task)
, m_successor_list_head(nullptr)
, m_num_dependencies(0)
, m_num_references(1) // reserves a task co-ownership for dynamic state
, m_allocator(alloc)
{}

Expand All @@ -50,9 +87,47 @@ class task_dynamic_state {
}
}

void complete_task() {
void register_dependency() {
if (m_num_dependencies++ == 0) {
// Register an additional dependency for a task_handle owning the current task
++m_num_dependencies;
}
}

task_handle_task* release_dependency() {
task_handle_task* next_task = nullptr;
if (--m_num_dependencies == 0) {
next_task = m_task;
}
return next_task;
}

task_handle_task* complete_task() {
successor_list_node* list = fetch_successor_list(COMPLETED_FLAG);
return release_successor_list(list);
}

bool has_dependencies() const {
return m_num_dependencies.load(std::memory_order_acquire) != 0;
}

void add_successor(task_dynamic_state* successor);

using successor_list_state_flag = std::uintptr_t;
static constexpr successor_list_state_flag COMPLETED_FLAG = ~std::uintptr_t(0);

static bool is_completed(successor_list_node* node) {
return node == reinterpret_cast<successor_list_node*>(COMPLETED_FLAG);
}

successor_list_node* fetch_successor_list(successor_list_state_flag new_list_state_flag) {
return m_successor_list_head.exchange(reinterpret_cast<successor_list_node*>(new_list_state_flag));
}

private:
task_handle_task* m_task;
std::atomic<successor_list_node*> m_successor_list_head;
std::atomic<std::size_t> m_num_dependencies;
std::atomic<std::size_t> m_num_references;
d1::small_object_allocator m_allocator;
};
Expand Down Expand Up @@ -110,7 +185,7 @@ class task_handle_task : public d1::task {
if (current_state == nullptr) {
d1::small_object_allocator alloc;

task_dynamic_state* new_state = alloc.new_object<task_dynamic_state>(alloc);
task_dynamic_state* new_state = alloc.new_object<task_dynamic_state>(this, alloc);

if (m_dynamic_state.compare_exchange_strong(current_state, new_state)) {
current_state = new_state;
Expand All @@ -124,16 +199,32 @@ class task_handle_task : public d1::task {
return current_state;
}

void complete_task() {
task_handle_task* complete_task() {
task_handle_task* next_task = nullptr;

task_dynamic_state* current_state = m_dynamic_state.load(std::memory_order_relaxed);
if (current_state != nullptr) {
current_state->complete_task();
next_task = current_state->complete_task();
}
return next_task;
}

task_handle_task* release_dependency() {
task_dynamic_state* current_state = m_dynamic_state.load(std::memory_order_relaxed);
__TBB_ASSERT(current_state != nullptr && current_state->has_dependencies(),
"release_dependency was called for task without dependencies");
task_handle_task* t = current_state->release_dependency();
__TBB_ASSERT(t == nullptr || t == this, nullptr);
return t;
}

bool has_dependencies() const {
task_dynamic_state* current_state = m_dynamic_state.load(std::memory_order_relaxed);
return current_state ? current_state->has_dependencies() : false;
}
#endif
};


class task_handle {
struct task_handle_task_finalizer_t{
void operator()(task_handle_task* p){ p->finalize(); }
Expand Down Expand Up @@ -170,12 +261,20 @@ class task_handle {
struct task_handle_accessor {
static task_handle construct(task_handle_task* t) { return {t}; }

static d1::task* release(task_handle& th) { return th.release(); }
static task_handle_task* release(task_handle& th) {
return th.m_handle.release();
}

static d1::task_group_context& ctx_of(task_handle& th) {
__TBB_ASSERT(th.m_handle, "ctx_of does not expect empty task_handle.");
return th.m_handle->ctx();
}

#if __TBB_PREVIEW_TASK_GROUP_EXTENSIONS
static task_dynamic_state* get_task_dynamic_state(task_handle& th) {
return th.m_handle->get_dynamic_state();
}
#endif
};

inline bool operator==(task_handle const& th, std::nullptr_t) noexcept {
Expand All @@ -194,6 +293,51 @@ inline bool operator!=(std::nullptr_t, task_handle const& th) noexcept {
}

#if __TBB_PREVIEW_TASK_GROUP_EXTENSIONS
inline task_handle_task* release_successor_list(successor_list_node* node) {
task_handle_task* next_task = nullptr;

while (node != nullptr) {
successor_list_node* next_node = node->get_next_node();
task_handle_task* successor_task = node->get_continuation()->release_dependency();
node->finalize();
node = next_node;

if (successor_task) {
if (next_task == nullptr) {
next_task = successor_task;
} else {
d1::spawn(*successor_task, successor_task->ctx());
}
}
}
return next_task;
}

inline void task_dynamic_state::add_successor(task_dynamic_state* successor) {
__TBB_ASSERT(successor != nullptr, nullptr);
successor_list_node* current_successor_list_head = m_successor_list_head.load(std::memory_order_acquire);

if (!is_completed(current_successor_list_head)) {
successor->register_dependency();

d1::small_object_allocator alloc;
successor_list_node* new_successor_node = alloc.new_object<successor_list_node>(successor, alloc);
new_successor_node->set_next_node(current_successor_list_head);

while (!m_successor_list_head.compare_exchange_strong(current_successor_list_head, new_successor_node)) {
// Other thread updated the head of the list

if (is_completed(current_successor_list_head)) {
// Current task has completed while we tried to insert the successor to the list
new_successor_node->finalize();
successor->release_dependency();
break;
}
new_successor_node->set_next_node(current_successor_list_head);
}
}
}

class task_completion_handle {
public:
task_completion_handle() : m_task_state(nullptr) {}
Expand Down Expand Up @@ -287,8 +431,16 @@ class task_completion_handle {
}
#endif // !__TBB_CPP20_COMPARISONS_PRESENT

friend struct task_completion_handle_accessor;

task_dynamic_state* m_task_state;
};

struct task_completion_handle_accessor {
static task_dynamic_state* get_task_dynamic_state(task_completion_handle& tracker) {
return tracker.m_task_state;
}
};
#endif

} // namespace d2
Expand Down
9 changes: 8 additions & 1 deletion include/oneapi/tbb/task_arena.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/*
Copyright (c) 2005-2025 Intel Corporation
Copyright (c) 2025 UXL Foundation Contributors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -109,7 +110,13 @@ inline void enqueue_impl(task_handle&& th, d1::task_arena_base* ta) {
auto& ctx = task_handle_accessor::ctx_of(th);

// Do not access th after release
r1::enqueue(*task_handle_accessor::release(th), ctx, ta);
task_handle_task* t = task_handle_accessor::release(th);
#if __TBB_PREVIEW_TASK_GROUP_EXTENSIONS
if (!t->has_dependencies() || t->release_dependency() != nullptr)
#endif
{
r1::enqueue(*t, ctx, ta);
}
}
} //namespace d2

Expand Down
65 changes: 59 additions & 6 deletions include/oneapi/tbb/task_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,19 @@ template<typename F>
d1::task* task_ptr_or_nullptr(F&& f);
}

#if __TBB_PREVIEW_TASK_GROUP_EXTENSIONS
inline d1::task* combine_tasks(d1::task* body_task, task_handle_task* successor_task) {
if (body_task == nullptr) return successor_task;
// Successor task can't have dependencies
if (successor_task == nullptr) return body_task;

// There is a task returned from the body and the successor task - bypassing the body task
// and spawning the successor one
d1::spawn(*successor_task, successor_task->ctx());
return body_task;
}
#endif

template<typename F>
class function_task : public task_handle_task {
//TODO: apply empty base optimization here
Expand All @@ -87,12 +100,13 @@ class function_task : public task_handle_task {
private:
d1::task* execute(d1::execution_data& ed) override {
__TBB_ASSERT(ed.context == &this->ctx(), "The task group context should be used for all tasks");
task* res = task_ptr_or_nullptr(m_func);
task* next_task = task_ptr_or_nullptr(m_func);
#if __TBB_PREVIEW_TASK_GROUP_EXTENSIONS
this->complete_task();
task_handle_task* successor_task = this->complete_task();
next_task = combine_tasks(next_task, successor_task);
#endif
finalize(&ed);
return res;
return next_task;
}
d1::task* cancel(d1::execution_data& ed) override {
finalize(&ed);
Expand All @@ -110,7 +124,12 @@ namespace {
template<typename F>
d1::task* task_ptr_or_nullptr_impl(std::false_type, F&& f){
task_handle th = std::forward<F>(f)();
return task_handle_accessor::release(th);
task_handle_task* task_ptr = task_handle_accessor::release(th);
// If task has dependencies, it can't be bypassed
if (task_ptr->has_dependencies()) {
task_ptr = task_ptr->release_dependency();
}
return task_ptr;
}

template<typename F>
Expand Down Expand Up @@ -494,7 +513,16 @@ class task_group_base : no_copy {

bool cancellation_status = false;
try_call([&] {
execute_and_wait(*acs::release(h), context(), m_wait_vertex.get_context(), context());
task_handle_task* t = acs::release(h);
#if __TBB_PREVIEW_TASK_GROUP_EXTENSIONS
// If the task has dependencies and the task_handle is not the last dependency
if (t->has_dependencies() && t->release_dependency() == nullptr) {
d1::wait(m_wait_vertex.get_context(), context());
} else
#endif
{
execute_and_wait(*t, context(), m_wait_vertex.get_context(), context());
}
}).on_completion([&] {
// TODO: the reset method is not thread-safe. Ensure the correct behavior.
cancellation_status = context().is_group_execution_cancelled();
Expand Down Expand Up @@ -585,7 +613,14 @@ class task_group : public task_group_base {
using acs = d2::task_handle_accessor;
__TBB_ASSERT(&acs::ctx_of(h) == &context(), "Attempt to schedule task_handle into different task_group");

d1::spawn(*acs::release(h), context());
task_handle_task* t = acs::release(h);
#if __TBB_PREVIEW_TASK_GROUP_EXTENSIONS
// Owned task has no dependencies or the task handle is the last dependency
if (!t->has_dependencies() || t->release_dependency() != nullptr)
#endif
{
d1::spawn(*t, context());
}
}

template<typename F>
Expand All @@ -602,6 +637,24 @@ class task_group : public task_group_base {
task_group_status run_and_wait(d2::task_handle&& h) {
return internal_run_and_wait(std::move(h));
}

#if __TBB_PREVIEW_TASK_GROUP_EXTENSIONS
static void set_task_order(d2::task_handle& pred, d2::task_handle& succ) {
__TBB_ASSERT(pred != nullptr, "empty predecessor handle is not allowed for set_task_order");
__TBB_ASSERT(succ != nullptr, "empty successor handle is not allowed for set_task_order");
task_dynamic_state* pred_state = task_handle_accessor::get_task_dynamic_state(pred);
task_dynamic_state* succ_state = task_handle_accessor::get_task_dynamic_state(succ);
pred_state->add_successor(succ_state);
}

static void set_task_order(d2::task_completion_handle& pred, d2::task_handle& succ) {
__TBB_ASSERT(pred != nullptr, "empty predecessor completion_handle is not allowed for set_task_order");
__TBB_ASSERT(succ != nullptr, "empty successor handle is not allowed for set_task_order");
task_dynamic_state* pred_state = task_completion_handle_accessor::get_task_dynamic_state(pred);
task_dynamic_state* succ_state = task_handle_accessor::get_task_dynamic_state(succ);
pred_state->add_successor(succ_state);
}
#endif
}; // class task_group

#if TBB_PREVIEW_ISOLATED_TASK_GROUP
Expand Down
Loading