From b1f2bf60ab1cccd0d4b9c99ce6a7770c737270d0 Mon Sep 17 00:00:00 2001 From: Anatoly Volkov Date: Thu, 27 Jun 2024 10:22:35 -0700 Subject: [PATCH 1/7] Initial commit --- cpp/daal/include/services/env_detect.h | 6 +- cpp/daal/src/services/env_detect.cpp | 8 +- cpp/daal/src/threading/threading.cpp | 246 ++++++++++++++----------- cpp/daal/src/threading/threading.h | 19 +- 4 files changed, 150 insertions(+), 129 deletions(-) diff --git a/cpp/daal/include/services/env_detect.h b/cpp/daal/include/services/env_detect.h index d132c55794d..76d2f2bdbba 100644 --- a/cpp/daal/include/services/env_detect.h +++ b/cpp/daal/include/services/env_detect.h @@ -198,11 +198,7 @@ class DAAL_EXPORT Environment : public Base void initNumberOfThreads(); env _env; - // Pointer to the oneapi::tbb::task_scheduler_handle class object, global for oneDAL. - // The oneapi::tbb::task_scheduler_handle and the oneapi::tbb::finalize function - // allow user to wait for completion of worker threads. - void * _schedulerHandle; - void * _globalControl; + // void * _globalControl; SharedPtr _executionContext; }; } // namespace interface1 diff --git a/cpp/daal/src/services/env_detect.cpp b/cpp/daal/src/services/env_detect.cpp index 286416ed571..2c7ed0bdf1a 100644 --- a/cpp/daal/src/services/env_detect.cpp +++ b/cpp/daal/src/services/env_detect.cpp @@ -125,7 +125,7 @@ DAAL_EXPORT void daal::services::Environment::setDynamicLibraryThreadingTypeOnWi initNumberOfThreads(); } -DAAL_EXPORT daal::services::Environment::Environment() : _schedulerHandle {}, _globalControl {} +DAAL_EXPORT daal::services::Environment::Environment() /* : _globalControl {}*/ { _env.cpuid_init_flag = false; _env.cpuid = -1; @@ -163,6 +163,7 @@ DAAL_EXPORT void daal::services::Environment::initNumberOfThreads() DAAL_EXPORT daal::services::Environment::~Environment() { daal::services::daal_free_buffers(); + // _daal_tbb_task_scheduler_free(_globalControl); } void daal::services::Environment::_cpu_detect(int enable) @@ -177,10 +178,7 @@ void daal::services::Environment::_cpu_detect(int enable) DAAL_EXPORT void daal::services::Environment::setNumberOfThreads(const size_t numThreads) { isInit = true; -#if defined(TARGET_X86_64) - daal::setSchedulerHandle(&_schedulerHandle); -#endif - daal::setNumberOfThreads(numThreads, &_globalControl); + daal::setNumberOfThreads(numThreads /*, &_globalControl*/); } DAAL_EXPORT size_t daal::services::Environment::getNumberOfThreads() const diff --git a/cpp/daal/src/threading/threading.cpp b/cpp/daal/src/threading/threading.cpp index 15c39368238..a26298b36ec 100644 --- a/cpp/daal/src/threading/threading.cpp +++ b/cpp/daal/src/threading/threading.cpp @@ -52,6 +52,16 @@ DAAL_EXPORT void _threaded_scalable_free(void * ptr) scalable_aligned_free(ptr); } +DAAL_EXPORT void _daal_tbb_task_arena_free(void *& taskArena) +{ + // void* taskArena = daal::threader_env()->getTaskArena(); + if (taskArena) + { + delete reinterpret_cast(taskArena); + taskArena = nullptr; + } +} + DAAL_EXPORT void _daal_tbb_task_scheduler_free(void *& globalControl) { if (globalControl) @@ -61,44 +71,21 @@ DAAL_EXPORT void _daal_tbb_task_scheduler_free(void *& globalControl) } } -DAAL_EXPORT void _daal_tbb_task_scheduler_handle_free(void *& schedulerHandle) -{ - // Note: TBB 13 deletes task_scheduler_handle itself during the destruction of thread context - - // #if defined(TARGET_X86_64) - // if (schedulerHandle) - // { - // delete reinterpret_cast(schedulerHandle); - // schedulerHandle = nullptr; - // } - // #endif -} - -DAAL_EXPORT size_t _setSchedulerHandle(void ** schedulerHandle) -{ -#if defined(TARGET_X86_64) - #if (TBB_INTERFACE_VERSION < 12120) - schedulerHandle = nullptr; - #else - *schedulerHandle = reinterpret_cast(new tbb::task_scheduler_handle(tbb::attach {})); - #endif - // It is necessary for initializing tbb in cases where DAAL does not use it. - tbb::task_arena {}.initialize(); -#endif - return 0; -} - -DAAL_EXPORT size_t _setNumberOfThreads(const size_t numThreads, void ** globalControl) +DAAL_EXPORT size_t _setNumberOfThreads(const size_t numThreads) { static tbb::spin_mutex mt; tbb::spin_mutex::scoped_lock lock(mt); - if (numThreads != 0) + if (numThreads > 1) { - _daal_tbb_task_scheduler_free(*globalControl); - *globalControl = reinterpret_cast(new tbb::global_control(tbb::global_control::max_allowed_parallelism, numThreads)); - daal::threader_env()->setNumberOfThreads(numThreads); - return numThreads; + const size_t maxNumThreads = _daal_threader_get_max_threads(); + const size_t limitedNumThreads = numThreads < maxNumThreads ? numThreads : maxNumThreads; + void *& taskArena = daal::threader_env()->getTaskArena(); + _daal_tbb_task_arena_free(taskArena); + taskArena = reinterpret_cast(new tbb::task_arena(limitedNumThreads)); + daal::threader_env()->setNumberOfThreads(limitedNumThreads); + return limitedNumThreads; } + _daal_tbb_task_arena_free(daal::threader_env()->getTaskArena()); daal::threader_env()->setNumberOfThreads(1); return 1; } @@ -107,12 +94,15 @@ DAAL_EXPORT void _daal_threader_for(int n, int threads_request, const void * a, { if (daal::threader_env()->getNumberOfThreads() > 1) { - tbb::parallel_for(tbb::blocked_range(0, n, 1), [&](tbb::blocked_range r) { - int i; - for (i = r.begin(); i < r.end(); i++) - { - func(i, a); - } + tbb::task_arena * taskArena = reinterpret_cast(daal::threader_env()->getTaskArena()); + taskArena->execute([&] { + tbb::parallel_for(tbb::blocked_range(0, n, 1), [&](tbb::blocked_range r) { + int i; + for (i = r.begin(); i < r.end(); i++) + { + func(i, a); + } + }); }); } else @@ -129,12 +119,15 @@ DAAL_EXPORT void _daal_threader_for_int64(int64_t n, const void * a, daal::funct { if (daal::threader_env()->getNumberOfThreads() > 1) { - tbb::parallel_for(tbb::blocked_range(0, n, 1), [&](tbb::blocked_range r) { - int64_t i; - for (i = r.begin(); i < r.end(); i++) - { - func(i, a); - } + tbb::task_arena * taskArena = reinterpret_cast(daal::threader_env()->getTaskArena()); + taskArena->execute([&] { + tbb::parallel_for(tbb::blocked_range(0, n, 1), [&](tbb::blocked_range r) { + int64_t i; + for (i = r.begin(); i < r.end(); i++) + { + func(i, a); + } + }); }); } else @@ -151,8 +144,11 @@ DAAL_EXPORT void _daal_threader_for_blocked_size(size_t n, size_t block, const v { if (daal::threader_env()->getNumberOfThreads() > 1) { - tbb::parallel_for(tbb::blocked_range(0ul, n, block), - [=](tbb::blocked_range r) -> void { return func(r.begin(), r.end(), a); }); + tbb::task_arena * taskArena = reinterpret_cast(daal::threader_env()->getTaskArena()); + taskArena->execute([&] { + tbb::parallel_for(tbb::blocked_range(0ul, n, block), + [=](tbb::blocked_range r) -> void { return func(r.begin(), r.end(), a); }); + }); } else { @@ -164,16 +160,19 @@ DAAL_EXPORT void _daal_threader_for_simple(int n, int threads_request, const voi { if (daal::threader_env()->getNumberOfThreads() > 1) { - tbb::parallel_for( - tbb::blocked_range(0, n, 1), - [&](tbb::blocked_range r) { - int i; - for (i = r.begin(); i < r.end(); i++) - { - func(i, a); - } - }, - tbb::simple_partitioner {}); + tbb::task_arena * taskArena = reinterpret_cast(daal::threader_env()->getTaskArena()); + taskArena->execute([&] { + tbb::parallel_for( + tbb::blocked_range(0, n, 1), + [&](tbb::blocked_range r) { + int i; + for (i = r.begin(); i < r.end(); i++) + { + func(i, a); + } + }, + tbb::simple_partitioner {}); + }); } else { @@ -189,12 +188,15 @@ DAAL_EXPORT void _daal_threader_for_int32ptr(const int * begin, const int * end, { if (daal::threader_env()->getNumberOfThreads() > 1) { - tbb::parallel_for(tbb::blocked_range(begin, end, 1), [&](tbb::blocked_range r) { - const int * i; - for (i = r.begin(); i != r.end(); i++) - { - func(i, a); - } + tbb::task_arena * taskArena = reinterpret_cast(daal::threader_env()->getTaskArena()); + taskArena->execute([&] { + tbb::parallel_for(tbb::blocked_range(begin, end, 1), [&](tbb::blocked_range r) { + const int * i; + for (i = r.begin(); i != r.end(); i++) + { + func(i, a); + } + }); }); } else @@ -212,10 +214,14 @@ DAAL_EXPORT int64_t _daal_parallel_reduce_int32_int64(int32_t n, int64_t init, c { if (daal::threader_env()->getNumberOfThreads() > 1) { - return tbb::parallel_reduce( - tbb::blocked_range(0, n), init, - [&](const tbb::blocked_range & r, int64_t value_for_reduce) { return loop_func(r.begin(), r.end(), value_for_reduce, a); }, - [&](int64_t x, int64_t y) { return reduction_func(x, y, b); }, tbb::auto_partitioner {}); + tbb::task_arena * taskArena = reinterpret_cast(daal::threader_env()->getTaskArena()); + // ????? + return taskArena->execute([&] { + return tbb::parallel_reduce( + tbb::blocked_range(0, n), init, + [&](const tbb::blocked_range & r, int64_t value_for_reduce) { return loop_func(r.begin(), r.end(), value_for_reduce, a); }, + [&](int64_t x, int64_t y) { return reduction_func(x, y, b); }, tbb::auto_partitioner {}); + }); } else { @@ -229,10 +235,13 @@ DAAL_EXPORT int64_t _daal_parallel_reduce_int32_int64_simple(int32_t n, int64_t { if (daal::threader_env()->getNumberOfThreads() > 1) { - return tbb::parallel_reduce( - tbb::blocked_range(0, n), init, - [&](const tbb::blocked_range & r, int64_t value_for_reduce) { return loop_func(r.begin(), r.end(), value_for_reduce, a); }, - [&](int64_t x, int64_t y) { return reduction_func(x, y, b); }, tbb::simple_partitioner {}); + tbb::task_arena * taskArena = reinterpret_cast(daal::threader_env()->getTaskArena()); + return taskArena->execute([&] { + return tbb::parallel_reduce( + tbb::blocked_range(0, n), init, + [&](const tbb::blocked_range & r, int64_t value_for_reduce) { return loop_func(r.begin(), r.end(), value_for_reduce, a); }, + [&](int64_t x, int64_t y) { return reduction_func(x, y, b); }, tbb::simple_partitioner {}); + }); } else { @@ -247,12 +256,15 @@ DAAL_EXPORT int64_t _daal_parallel_reduce_int32ptr_int64_simple(const int32_t * { if (daal::threader_env()->getNumberOfThreads() > 1) { - return tbb::parallel_reduce( - tbb::blocked_range(begin, end), init, - [&](const tbb::blocked_range & r, int64_t value_for_reduce) { - return loop_func(r.begin(), r.end(), value_for_reduce, a); - }, - [&](int64_t x, int64_t y) { return reduction_func(x, y, b); }, tbb::simple_partitioner {}); + tbb::task_arena * taskArena = reinterpret_cast(daal::threader_env()->getTaskArena()); + return taskArena->execute([&] { + return tbb::parallel_reduce( + tbb::blocked_range(begin, end), init, + [&](const tbb::blocked_range & r, int64_t value_for_reduce) { + return loop_func(r.begin(), r.end(), value_for_reduce, a); + }, + [&](int64_t x, int64_t y) { return reduction_func(x, y, b); }, tbb::simple_partitioner {}); + }); } else { @@ -265,22 +277,24 @@ DAAL_EXPORT void _daal_static_threader_for(size_t n, const void * a, daal::funct { if (daal::threader_env()->getNumberOfThreads() > 1) { - const size_t nthreads = _daal_threader_get_max_threads(); + const size_t nthreads = daal::threader_env()->getNumberOfThreads(); //_daal_threader_get_max_threads(); const size_t nblocks_per_thread = n / nthreads + !!(n % nthreads); - - tbb::parallel_for( - tbb::blocked_range(0, nthreads, 1), - [&](tbb::blocked_range r) { - const size_t tid = r.begin(); - const size_t begin = tid * nblocks_per_thread; - const size_t end = n < begin + nblocks_per_thread ? n : begin + nblocks_per_thread; - - for (size_t i = begin; i < end; ++i) - { - func(i, tid, a); - } - }, - tbb::static_partitioner()); + tbb::task_arena * taskArena = reinterpret_cast(daal::threader_env()->getTaskArena()); + taskArena->execute([&] { + tbb::parallel_for( + tbb::blocked_range(0, nthreads, 1), + [&](tbb::blocked_range r) { + const size_t tid = r.begin(); + const size_t begin = tid * nblocks_per_thread; + const size_t end = n < begin + nblocks_per_thread ? n : begin + nblocks_per_thread; + + for (size_t i = begin; i < end; ++i) + { + func(i, tid, a); + } + }, + tbb::static_partitioner()); + }); } else { @@ -296,7 +310,8 @@ DAAL_EXPORT void _daal_parallel_sort_template(F * begin_p, F * end_p) { if (daal::threader_env()->getNumberOfThreads() > 1) { - tbb::parallel_sort(begin_p, end_p); + tbb::task_arena * taskArena = reinterpret_cast(daal::threader_env()->getTaskArena()); + taskArena->execute([&] { tbb::parallel_sort(begin_p, end_p); }); } else { @@ -322,7 +337,10 @@ DAAL_EXPORT void _daal_threader_for_blocked(int n, int threads_request, const vo { if (daal::threader_env()->getNumberOfThreads() > 1) { - tbb::parallel_for(tbb::blocked_range(0, n, 1), [&](tbb::blocked_range r) { func(r.begin(), r.end() - r.begin(), a); }); + tbb::task_arena * taskArena = reinterpret_cast(daal::threader_env()->getTaskArena()); + taskArena->execute([&] { + tbb::parallel_for(tbb::blocked_range(0, n, 1), [&](tbb::blocked_range r) { func(r.begin(), r.end() - r.begin(), a); }); + }); } else { @@ -358,18 +376,21 @@ DAAL_EXPORT void _daal_threader_for_break(int n, int threads_request, const void if (daal::threader_env()->getNumberOfThreads() > 1) { tbb::task_group_context context; - tbb::parallel_for( - tbb::blocked_range(0, n, 1), - [&](tbb::blocked_range r) { - int i; - for (i = r.begin(); i < r.end(); ++i) - { - bool needBreak = false; - func(i, needBreak, a); - if (needBreak) context.cancel_group_execution(); - } - }, - context); + tbb::task_arena * taskArena = reinterpret_cast(daal::threader_env()->getTaskArena()); + taskArena->execute([&] { + tbb::parallel_for( + tbb::blocked_range(0, n, 1), + [&](tbb::blocked_range r) { + int i; + for (i = r.begin(); i < r.end(); ++i) + { + bool needBreak = false; + func(i, needBreak, a); + if (needBreak) context.cancel_group_execution(); + } + }, + context); + }); } else { @@ -423,6 +444,11 @@ DAAL_EXPORT void _daal_reduce_tls(void * tlsPtr, void * a, daal::tls_reduce_func DAAL_EXPORT void _daal_parallel_reduce_tls(void * tlsPtr, void * a, daal::tls_reduce_functype func) { + if (daal::threader_env()->getNumberOfThreads() <= 1) + { + _daal_reduce_tls(tlsPtr, a, func); + return; + } size_t n = 0; tbb::enumerable_thread_specific * p = static_cast *>(tlsPtr); @@ -436,8 +462,12 @@ DAAL_EXPORT void _daal_parallel_reduce_tls(void * tlsPtr, void * a, daal::tls_re { size_t i = 0; for (auto it = p->begin(); it != p->end(); ++it) aDataPtr[i++] = *it; - tbb::parallel_for(tbb::blocked_range(0, n, 1), [&](tbb::blocked_range r) { - for (size_t i = r.begin(); i < r.end(); i++) func(aDataPtr[i], a); + + tbb::task_arena * taskArena = reinterpret_cast(daal::threader_env()->getTaskArena()); + taskArena->execute([&] { + tbb::parallel_for(tbb::blocked_range(0, n, 1), [&](tbb::blocked_range r) { + for (size_t i = r.begin(); i < r.end(); i++) func(aDataPtr[i], a); + }); }); ::free(aDataPtr); } diff --git a/cpp/daal/src/threading/threading.h b/cpp/daal/src/threading/threading.h index 0b4a9881b97..f5ec7d3b882 100644 --- a/cpp/daal/src/threading/threading.h +++ b/cpp/daal/src/threading/threading.h @@ -101,10 +101,9 @@ extern "C" DAAL_EXPORT void _daal_run_task_group(void * taskGroupPtr, daal::task * t); DAAL_EXPORT void _daal_wait_task_group(void * taskGroupPtr); + DAAL_EXPORT void _daal_tbb_task_arena_free(void *& taskArena); DAAL_EXPORT void _daal_tbb_task_scheduler_free(void *& globalControl); - DAAL_EXPORT void _daal_tbb_task_scheduler_handle_free(void *& schedulerHandle); - DAAL_EXPORT size_t _setNumberOfThreads(const size_t numThreads, void ** globalControl); - DAAL_EXPORT size_t _setSchedulerHandle(void ** schedulerHandle); + DAAL_EXPORT size_t _setNumberOfThreads(const size_t numThreads); DAAL_EXPORT void * _daal_threader_env(); @@ -167,12 +166,15 @@ inline void threaded_scalable_free(void * ptr) class ThreaderEnvironment { public: - ThreaderEnvironment() : _numberOfThreads(_daal_threader_get_max_threads()) {} + ThreaderEnvironment() : _numberOfThreads(1 /*_daal_threader_get_max_threads()*/) {} + ~ThreaderEnvironment() { _daal_tbb_task_arena_free(_taskArena); } size_t getNumberOfThreads() const { return _numberOfThreads; } void setNumberOfThreads(size_t value) { _numberOfThreads = value; } + void *& getTaskArena() { return _taskArena; } private: size_t _numberOfThreads; + void * _taskArena; }; inline ThreaderEnvironment * threader_env() @@ -185,14 +187,9 @@ inline size_t threader_get_threads_number() return threader_env()->getNumberOfThreads(); } -inline size_t setSchedulerHandle(void ** schedulerHandle) +inline size_t setNumberOfThreads(const size_t numThreads) { - return _setSchedulerHandle(schedulerHandle); -} - -inline size_t setNumberOfThreads(const size_t numThreads, void ** globalControl) -{ - return _setNumberOfThreads(numThreads, globalControl); + return _setNumberOfThreads(numThreads); } template From c960c03b4f5ff7836a6b02fd75176d886f846138 Mon Sep 17 00:00:00 2001 From: Anatoly Volkov Date: Wed, 3 Jul 2024 02:33:12 -0700 Subject: [PATCH 2/7] Add init to env constructor --- .../src/externals/core_threading_win_dll.cpp | 32 +++------ cpp/daal/src/services/env_detect.cpp | 25 ++++--- cpp/daal/src/threading/threading.cpp | 65 ++++++++++++------- cpp/daal/src/threading/threading.h | 12 ++-- 4 files changed, 72 insertions(+), 62 deletions(-) diff --git a/cpp/daal/src/externals/core_threading_win_dll.cpp b/cpp/daal/src/externals/core_threading_win_dll.cpp index 37c4f7d0e2b..f3e6791b50d 100644 --- a/cpp/daal/src/externals/core_threading_win_dll.cpp +++ b/cpp/daal/src/externals/core_threading_win_dll.cpp @@ -142,10 +142,8 @@ typedef void (*_daal_run_task_group_t)(void * taskGroupPtr, daal::task * t); typedef void (*_daal_wait_task_group_t)(void * taskGroupPtr); typedef bool (*_daal_is_in_parallel_t)(); -typedef void (*_daal_tbb_task_scheduler_free_t)(void *& globalControl); -typedef void (*_daal_tbb_task_scheduler_handle_free_t)(void *& schedulerHandle); -typedef size_t (*_setNumberOfThreads_t)(const size_t, void **); -typedef size_t (*_setSchedulerHandle_t)(void **); +// typedef void (*_daal_tbb_task_scheduler_free_t)(void *& globalControl); +typedef size_t (*_setNumberOfThreads_t)(const size_t); //, void **); typedef void * (*_daal_threader_env_t)(); typedef void (*_daal_parallel_sort_int32_t)(int *, int *); @@ -207,12 +205,10 @@ static _daal_del_task_group_t _daal_del_task_group_ptr = NULL; static _daal_run_task_group_t _daal_run_task_group_ptr = NULL; static _daal_wait_task_group_t _daal_wait_task_group_ptr = NULL; -static _daal_is_in_parallel_t _daal_is_in_parallel_ptr = NULL; -static _daal_tbb_task_scheduler_free_t _daal_tbb_task_scheduler_free_ptr = NULL; -static _daal_tbb_task_scheduler_handle_free_t _daal_tbb_task_scheduler_handle_free_ptr = NULL; -static _setNumberOfThreads_t _setNumberOfThreads_ptr = NULL; -static _setSchedulerHandle_t _setSchedulerHandle_ptr = NULL; -static _daal_threader_env_t _daal_threader_env_ptr = NULL; +static _daal_is_in_parallel_t _daal_is_in_parallel_ptr = NULL; +// static _daal_tbb_task_scheduler_free_t _daal_tbb_task_scheduler_free_ptr = NULL; +static _setNumberOfThreads_t _setNumberOfThreads_ptr = NULL; +static _daal_threader_env_t _daal_threader_env_ptr = NULL; static _daal_parallel_sort_int32_t _daal_parallel_sort_int32_ptr = NULL; static _daal_parallel_sort_uint64_t _daal_parallel_sort_uint64_ptr = NULL; @@ -640,6 +636,7 @@ DAAL_EXPORT bool _daal_is_in_parallel() return _daal_is_in_parallel_ptr(); } +/* DAAL_EXPORT void _daal_tbb_task_scheduler_free(void *& init) { if (init == NULL) @@ -660,25 +657,16 @@ DAAL_EXPORT void _daal_tbb_task_scheduler_free(void *& init) } return _daal_tbb_task_scheduler_free_ptr(init); } +*/ -DAAL_EXPORT void _daal_tbb_task_scheduler_handle_free(void *& init) -{ - load_daal_thr_dll(); - if (_daal_tbb_task_scheduler_handle_free_ptr == NULL) - { - _daal_tbb_task_scheduler_handle_free_ptr = (_daal_tbb_task_scheduler_handle_free_t)load_daal_thr_func("_daal_tbb_task_scheduler_handle_free"); - } - return _daal_tbb_task_scheduler_handle_free_ptr(init); -} - -DAAL_EXPORT size_t _setNumberOfThreads(const size_t numThreads, void ** init) +DAAL_EXPORT size_t _setNumberOfThreads(const size_t numThreads /*, void ** init*/) { load_daal_thr_dll(); if (_setNumberOfThreads_ptr == NULL) { _setNumberOfThreads_ptr = (_setNumberOfThreads_t)load_daal_thr_func("_setNumberOfThreads"); } - return _setNumberOfThreads_ptr(numThreads, init); + return _setNumberOfThreads_ptr(numThreads /*, init*/); } DAAL_EXPORT size_t _setSchedulerHandle(void ** init) diff --git a/cpp/daal/src/services/env_detect.cpp b/cpp/daal/src/services/env_detect.cpp index 2c7ed0bdf1a..036f2801702 100644 --- a/cpp/daal/src/services/env_detect.cpp +++ b/cpp/daal/src/services/env_detect.cpp @@ -28,6 +28,7 @@ #include "src/externals/service_service.h" #include "src/threading/threading.h" #include "services/error_indexes.h" +#include #include "src/services/service_topo.h" #include "src/threading/service_thread_pinner.h" @@ -127,9 +128,11 @@ DAAL_EXPORT void daal::services::Environment::setDynamicLibraryThreadingTypeOnWi DAAL_EXPORT daal::services::Environment::Environment() /* : _globalControl {}*/ { + std::cerr << "Environment constructor" << std::endl; _env.cpuid_init_flag = false; _env.cpuid = -1; this->setDefaultExecutionContext(internal::CpuExecutionContext()); + daal::services::Environment::initNumberOfThreads(); } DAAL_EXPORT daal::services::Environment::Environment(const Environment & e) : daal::services::Environment::Environment() {} @@ -137,14 +140,8 @@ DAAL_EXPORT daal::services::Environment::Environment(const Environment & e) : da DAAL_EXPORT void daal::services::Environment::initNumberOfThreads() { if (isInit) return; - // Initializes global oneapi::tbb::task_scheduler_handle object in oneDAL to prevent the unexpected - // destruction of the calling thread. - // When the oneapi::tbb::finalize function is called with an oneapi::tbb::task_scheduler_handle - // instance, it blocks the calling thread until the completion of all worker - // threads that were implicitly created by the library. -#if defined(TARGET_X86_64) - daal::setSchedulerHandle(&_schedulerHandle); -#endif + std::cerr << "Inside init" << std::endl; + /* if HT enabled - set _numThreads to physical cores num */ if (daal::internal::ServiceInst::serv_get_ht()) { @@ -152,16 +149,24 @@ DAAL_EXPORT void daal::services::Environment::initNumberOfThreads() int ncores = daal::internal::ServiceInst::serv_get_ncpus() * daal::internal::ServiceInst::serv_get_ncorespercpu(); /* Re-set number of threads if ncores is valid and different to _numThreads */ - if ((ncores > 0) && (ncores < _daal_threader_get_max_threads())) + + std::cerr << "Init with " << ncores << std::endl; + if (ncores > 0) { daal::services::Environment::setNumberOfThreads(ncores); } } + else + { + std::cerr << "Init with " << (_daal_threader_get_max_threads()) << std::endl; + daal::services::Environment::setNumberOfThreads(_daal_threader_get_max_threads()); + } isInit = true; } DAAL_EXPORT daal::services::Environment::~Environment() { + std::cerr << "Env destructor" << std::endl; daal::services::daal_free_buffers(); // _daal_tbb_task_scheduler_free(_globalControl); } @@ -178,7 +183,7 @@ void daal::services::Environment::_cpu_detect(int enable) DAAL_EXPORT void daal::services::Environment::setNumberOfThreads(const size_t numThreads) { isInit = true; - daal::setNumberOfThreads(numThreads /*, &_globalControl*/); + daal::setNumberOfThreads(numThreads); } DAAL_EXPORT size_t daal::services::Environment::getNumberOfThreads() const diff --git a/cpp/daal/src/threading/threading.cpp b/cpp/daal/src/threading/threading.cpp index a26298b36ec..73109fae214 100644 --- a/cpp/daal/src/threading/threading.cpp +++ b/cpp/daal/src/threading/threading.cpp @@ -35,11 +35,43 @@ #include #include #include "services/daal_atomic_int.h" +#include #if defined(TBB_INTERFACE_VERSION) && TBB_INTERFACE_VERSION >= 12002 #include #endif +namespace daal +{ +ThreaderEnvironment::ThreaderEnvironment() : _numberOfThreads(1), _taskArena(nullptr) +{ + std::cout << "ThreaderEnv constructor" << std::endl; +} +ThreaderEnvironment::~ThreaderEnvironment() +{ + std::cerr << "ThreaderEnv destructor" << std::endl; + if (_taskArena) + { + delete reinterpret_cast(_taskArena); + _taskArena = nullptr; + } +} +void ThreaderEnvironment::setNumberOfThreads(size_t value) +{ + std::cerr << "setNumberOfThreads from " << (_numberOfThreads) << " to " << value << std::endl; + if (_taskArena) + { + delete reinterpret_cast(_taskArena); + _taskArena = nullptr; + } + if (value > 1) + { + _taskArena = reinterpret_cast(new tbb::task_arena(value)); + } + _numberOfThreads = value; +} +} // namespace daal + using namespace daal::services; DAAL_EXPORT void * _threaded_scalable_malloc(const size_t size, const size_t alignment) @@ -52,24 +84,14 @@ DAAL_EXPORT void _threaded_scalable_free(void * ptr) scalable_aligned_free(ptr); } -DAAL_EXPORT void _daal_tbb_task_arena_free(void *& taskArena) -{ - // void* taskArena = daal::threader_env()->getTaskArena(); - if (taskArena) - { - delete reinterpret_cast(taskArena); - taskArena = nullptr; - } -} - -DAAL_EXPORT void _daal_tbb_task_scheduler_free(void *& globalControl) -{ - if (globalControl) - { - delete reinterpret_cast(globalControl); - globalControl = nullptr; - } -} +// DAAL_EXPORT void _daal_tbb_task_scheduler_free(void *& globalControl) +// { +// if (globalControl) +// { +// delete reinterpret_cast(globalControl); +// globalControl = nullptr; +// } +// } DAAL_EXPORT size_t _setNumberOfThreads(const size_t numThreads) { @@ -79,13 +101,11 @@ DAAL_EXPORT size_t _setNumberOfThreads(const size_t numThreads) { const size_t maxNumThreads = _daal_threader_get_max_threads(); const size_t limitedNumThreads = numThreads < maxNumThreads ? numThreads : maxNumThreads; - void *& taskArena = daal::threader_env()->getTaskArena(); - _daal_tbb_task_arena_free(taskArena); - taskArena = reinterpret_cast(new tbb::task_arena(limitedNumThreads)); + std::cerr << "_set nthreads " << numThreads << "(max " << maxNumThreads << ")" << std::endl; daal::threader_env()->setNumberOfThreads(limitedNumThreads); return limitedNumThreads; } - _daal_tbb_task_arena_free(daal::threader_env()->getTaskArena()); + std::cerr << "_set nthreads 1" << std::endl; daal::threader_env()->setNumberOfThreads(1); return 1; } @@ -215,7 +235,6 @@ DAAL_EXPORT int64_t _daal_parallel_reduce_int32_int64(int32_t n, int64_t init, c if (daal::threader_env()->getNumberOfThreads() > 1) { tbb::task_arena * taskArena = reinterpret_cast(daal::threader_env()->getTaskArena()); - // ????? return taskArena->execute([&] { return tbb::parallel_reduce( tbb::blocked_range(0, n), init, diff --git a/cpp/daal/src/threading/threading.h b/cpp/daal/src/threading/threading.h index f5ec7d3b882..1bbe05f2de6 100644 --- a/cpp/daal/src/threading/threading.h +++ b/cpp/daal/src/threading/threading.h @@ -100,9 +100,7 @@ extern "C" DAAL_EXPORT void _daal_del_task_group(void * taskGroupPtr); DAAL_EXPORT void _daal_run_task_group(void * taskGroupPtr, daal::task * t); DAAL_EXPORT void _daal_wait_task_group(void * taskGroupPtr); - - DAAL_EXPORT void _daal_tbb_task_arena_free(void *& taskArena); - DAAL_EXPORT void _daal_tbb_task_scheduler_free(void *& globalControl); + // DAAL_EXPORT void _daal_tbb_task_scheduler_free(void *& globalControl); DAAL_EXPORT size_t _setNumberOfThreads(const size_t numThreads); DAAL_EXPORT void * _daal_threader_env(); @@ -166,11 +164,11 @@ inline void threaded_scalable_free(void * ptr) class ThreaderEnvironment { public: - ThreaderEnvironment() : _numberOfThreads(1 /*_daal_threader_get_max_threads()*/) {} - ~ThreaderEnvironment() { _daal_tbb_task_arena_free(_taskArena); } + ThreaderEnvironment(); + ~ThreaderEnvironment(); size_t getNumberOfThreads() const { return _numberOfThreads; } - void setNumberOfThreads(size_t value) { _numberOfThreads = value; } - void *& getTaskArena() { return _taskArena; } + void * getTaskArena() const { return _taskArena; }; + void setNumberOfThreads(size_t value); private: size_t _numberOfThreads; From d03fe03c8ca1e0559cb91fb9dbccf86bad8d0860 Mon Sep 17 00:00:00 2001 From: Anatoly Volkov Date: Thu, 4 Jul 2024 08:03:04 -0700 Subject: [PATCH 3/7] Remove redundant comments and debug prints --- cpp/daal/include/services/env_detect.h | 1 - .../src/externals/core_threading_win_dll.cpp | 45 +++---------------- cpp/daal/src/services/env_detect.cpp | 11 +---- cpp/daal/src/threading/threading.cpp | 21 +-------- cpp/daal/src/threading/threading.h | 1 - 5 files changed, 9 insertions(+), 70 deletions(-) diff --git a/cpp/daal/include/services/env_detect.h b/cpp/daal/include/services/env_detect.h index 76d2f2bdbba..5d50892aa48 100644 --- a/cpp/daal/include/services/env_detect.h +++ b/cpp/daal/include/services/env_detect.h @@ -198,7 +198,6 @@ class DAAL_EXPORT Environment : public Base void initNumberOfThreads(); env _env; - // void * _globalControl; SharedPtr _executionContext; }; } // namespace interface1 diff --git a/cpp/daal/src/externals/core_threading_win_dll.cpp b/cpp/daal/src/externals/core_threading_win_dll.cpp index f3e6791b50d..5ee0901af7f 100644 --- a/cpp/daal/src/externals/core_threading_win_dll.cpp +++ b/cpp/daal/src/externals/core_threading_win_dll.cpp @@ -142,8 +142,7 @@ typedef void (*_daal_run_task_group_t)(void * taskGroupPtr, daal::task * t); typedef void (*_daal_wait_task_group_t)(void * taskGroupPtr); typedef bool (*_daal_is_in_parallel_t)(); -// typedef void (*_daal_tbb_task_scheduler_free_t)(void *& globalControl); -typedef size_t (*_setNumberOfThreads_t)(const size_t); //, void **); +typedef size_t (*_setNumberOfThreads_t)(const size_t); typedef void * (*_daal_threader_env_t)(); typedef void (*_daal_parallel_sort_int32_t)(int *, int *); @@ -206,9 +205,8 @@ static _daal_run_task_group_t _daal_run_task_group_ptr = NULL; static _daal_wait_task_group_t _daal_wait_task_group_ptr = NULL; static _daal_is_in_parallel_t _daal_is_in_parallel_ptr = NULL; -// static _daal_tbb_task_scheduler_free_t _daal_tbb_task_scheduler_free_ptr = NULL; -static _setNumberOfThreads_t _setNumberOfThreads_ptr = NULL; -static _daal_threader_env_t _daal_threader_env_ptr = NULL; +static _setNumberOfThreads_t _setNumberOfThreads_ptr = NULL; +static _daal_threader_env_t _daal_threader_env_ptr = NULL; static _daal_parallel_sort_int32_t _daal_parallel_sort_int32_ptr = NULL; static _daal_parallel_sort_uint64_t _daal_parallel_sort_uint64_ptr = NULL; @@ -636,47 +634,14 @@ DAAL_EXPORT bool _daal_is_in_parallel() return _daal_is_in_parallel_ptr(); } -/* -DAAL_EXPORT void _daal_tbb_task_scheduler_free(void *& init) -{ - if (init == NULL) - { - // If threading library was not opened, there is nothing to free, - // so we do not need to load threading library. - // Moreover, loading threading library in the Environment destructor - // results in a crush because of the use of Wintrust library after it was unloaded. - // This happens due to undefined order of static objects deinitialization - // like Environment, and dependent libraries. - return; - } - - load_daal_thr_dll(); - if (_daal_tbb_task_scheduler_free_ptr == NULL) - { - _daal_tbb_task_scheduler_free_ptr = (_daal_tbb_task_scheduler_free_t)load_daal_thr_func("_daal_tbb_task_scheduler_free"); - } - return _daal_tbb_task_scheduler_free_ptr(init); -} -*/ - -DAAL_EXPORT size_t _setNumberOfThreads(const size_t numThreads /*, void ** init*/) +DAAL_EXPORT size_t _setNumberOfThreads(const size_t numThreads) { load_daal_thr_dll(); if (_setNumberOfThreads_ptr == NULL) { _setNumberOfThreads_ptr = (_setNumberOfThreads_t)load_daal_thr_func("_setNumberOfThreads"); } - return _setNumberOfThreads_ptr(numThreads /*, init*/); -} - -DAAL_EXPORT size_t _setSchedulerHandle(void ** init) -{ - load_daal_thr_dll(); - if (_setSchedulerHandle_ptr == NULL) - { - _setSchedulerHandle_ptr = (_setSchedulerHandle_t)load_daal_thr_func("_setSchedulerHandle"); - } - return _setSchedulerHandle_ptr(init); + return _setNumberOfThreads_ptr(numThreads); } DAAL_EXPORT void * _daal_threader_env() diff --git a/cpp/daal/src/services/env_detect.cpp b/cpp/daal/src/services/env_detect.cpp index 036f2801702..3f3f08246df 100644 --- a/cpp/daal/src/services/env_detect.cpp +++ b/cpp/daal/src/services/env_detect.cpp @@ -28,7 +28,6 @@ #include "src/externals/service_service.h" #include "src/threading/threading.h" #include "services/error_indexes.h" -#include #include "src/services/service_topo.h" #include "src/threading/service_thread_pinner.h" @@ -126,13 +125,12 @@ DAAL_EXPORT void daal::services::Environment::setDynamicLibraryThreadingTypeOnWi initNumberOfThreads(); } -DAAL_EXPORT daal::services::Environment::Environment() /* : _globalControl {}*/ +DAAL_EXPORT daal::services::Environment::Environment() { - std::cerr << "Environment constructor" << std::endl; _env.cpuid_init_flag = false; _env.cpuid = -1; this->setDefaultExecutionContext(internal::CpuExecutionContext()); - daal::services::Environment::initNumberOfThreads(); + initNumberOfThreads(); } DAAL_EXPORT daal::services::Environment::Environment(const Environment & e) : daal::services::Environment::Environment() {} @@ -140,7 +138,6 @@ DAAL_EXPORT daal::services::Environment::Environment(const Environment & e) : da DAAL_EXPORT void daal::services::Environment::initNumberOfThreads() { if (isInit) return; - std::cerr << "Inside init" << std::endl; /* if HT enabled - set _numThreads to physical cores num */ if (daal::internal::ServiceInst::serv_get_ht()) @@ -150,7 +147,6 @@ DAAL_EXPORT void daal::services::Environment::initNumberOfThreads() /* Re-set number of threads if ncores is valid and different to _numThreads */ - std::cerr << "Init with " << ncores << std::endl; if (ncores > 0) { daal::services::Environment::setNumberOfThreads(ncores); @@ -158,7 +154,6 @@ DAAL_EXPORT void daal::services::Environment::initNumberOfThreads() } else { - std::cerr << "Init with " << (_daal_threader_get_max_threads()) << std::endl; daal::services::Environment::setNumberOfThreads(_daal_threader_get_max_threads()); } isInit = true; @@ -166,9 +161,7 @@ DAAL_EXPORT void daal::services::Environment::initNumberOfThreads() DAAL_EXPORT daal::services::Environment::~Environment() { - std::cerr << "Env destructor" << std::endl; daal::services::daal_free_buffers(); - // _daal_tbb_task_scheduler_free(_globalControl); } void daal::services::Environment::_cpu_detect(int enable) diff --git a/cpp/daal/src/threading/threading.cpp b/cpp/daal/src/threading/threading.cpp index 73109fae214..bf873bcf6a7 100644 --- a/cpp/daal/src/threading/threading.cpp +++ b/cpp/daal/src/threading/threading.cpp @@ -35,7 +35,6 @@ #include #include #include "services/daal_atomic_int.h" -#include #if defined(TBB_INTERFACE_VERSION) && TBB_INTERFACE_VERSION >= 12002 #include @@ -43,13 +42,9 @@ namespace daal { -ThreaderEnvironment::ThreaderEnvironment() : _numberOfThreads(1), _taskArena(nullptr) -{ - std::cout << "ThreaderEnv constructor" << std::endl; -} +ThreaderEnvironment::ThreaderEnvironment() : _numberOfThreads(1), _taskArena(nullptr) {} ThreaderEnvironment::~ThreaderEnvironment() { - std::cerr << "ThreaderEnv destructor" << std::endl; if (_taskArena) { delete reinterpret_cast(_taskArena); @@ -58,7 +53,6 @@ ThreaderEnvironment::~ThreaderEnvironment() } void ThreaderEnvironment::setNumberOfThreads(size_t value) { - std::cerr << "setNumberOfThreads from " << (_numberOfThreads) << " to " << value << std::endl; if (_taskArena) { delete reinterpret_cast(_taskArena); @@ -84,15 +78,6 @@ DAAL_EXPORT void _threaded_scalable_free(void * ptr) scalable_aligned_free(ptr); } -// DAAL_EXPORT void _daal_tbb_task_scheduler_free(void *& globalControl) -// { -// if (globalControl) -// { -// delete reinterpret_cast(globalControl); -// globalControl = nullptr; -// } -// } - DAAL_EXPORT size_t _setNumberOfThreads(const size_t numThreads) { static tbb::spin_mutex mt; @@ -101,11 +86,9 @@ DAAL_EXPORT size_t _setNumberOfThreads(const size_t numThreads) { const size_t maxNumThreads = _daal_threader_get_max_threads(); const size_t limitedNumThreads = numThreads < maxNumThreads ? numThreads : maxNumThreads; - std::cerr << "_set nthreads " << numThreads << "(max " << maxNumThreads << ")" << std::endl; daal::threader_env()->setNumberOfThreads(limitedNumThreads); return limitedNumThreads; } - std::cerr << "_set nthreads 1" << std::endl; daal::threader_env()->setNumberOfThreads(1); return 1; } @@ -296,7 +279,7 @@ DAAL_EXPORT void _daal_static_threader_for(size_t n, const void * a, daal::funct { if (daal::threader_env()->getNumberOfThreads() > 1) { - const size_t nthreads = daal::threader_env()->getNumberOfThreads(); //_daal_threader_get_max_threads(); + const size_t nthreads = daal::threader_env()->getNumberOfThreads(); const size_t nblocks_per_thread = n / nthreads + !!(n % nthreads); tbb::task_arena * taskArena = reinterpret_cast(daal::threader_env()->getTaskArena()); taskArena->execute([&] { diff --git a/cpp/daal/src/threading/threading.h b/cpp/daal/src/threading/threading.h index 1bbe05f2de6..730f0e962d3 100644 --- a/cpp/daal/src/threading/threading.h +++ b/cpp/daal/src/threading/threading.h @@ -100,7 +100,6 @@ extern "C" DAAL_EXPORT void _daal_del_task_group(void * taskGroupPtr); DAAL_EXPORT void _daal_run_task_group(void * taskGroupPtr, daal::task * t); DAAL_EXPORT void _daal_wait_task_group(void * taskGroupPtr); - // DAAL_EXPORT void _daal_tbb_task_scheduler_free(void *& globalControl); DAAL_EXPORT size_t _setNumberOfThreads(const size_t numThreads); DAAL_EXPORT void * _daal_threader_env(); From 6274d7d244899097a3bf4ac930afc9dfb2a39ae9 Mon Sep 17 00:00:00 2001 From: Anatoly Volkov Date: Wed, 10 Jul 2024 05:02:37 -0700 Subject: [PATCH 4/7] Add debug prints --- cpp/daal/src/services/env_detect.cpp | 6 ++++++ cpp/daal/src/threading/threading.cpp | 11 ++++++++++- 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/cpp/daal/src/services/env_detect.cpp b/cpp/daal/src/services/env_detect.cpp index 3f3f08246df..14da127c40a 100644 --- a/cpp/daal/src/services/env_detect.cpp +++ b/cpp/daal/src/services/env_detect.cpp @@ -28,6 +28,7 @@ #include "src/externals/service_service.h" #include "src/threading/threading.h" #include "services/error_indexes.h" +#include #include "src/services/service_topo.h" #include "src/threading/service_thread_pinner.h" @@ -127,6 +128,7 @@ DAAL_EXPORT void daal::services::Environment::setDynamicLibraryThreadingTypeOnWi DAAL_EXPORT daal::services::Environment::Environment() { + std::cout << "Env constructor" << std::endl; _env.cpuid_init_flag = false; _env.cpuid = -1; this->setDefaultExecutionContext(internal::CpuExecutionContext()); @@ -138,6 +140,7 @@ DAAL_EXPORT daal::services::Environment::Environment(const Environment & e) : da DAAL_EXPORT void daal::services::Environment::initNumberOfThreads() { if (isInit) return; + std::cout << "Inside init number of threads" << std::endl; /* if HT enabled - set _numThreads to physical cores num */ if (daal::internal::ServiceInst::serv_get_ht()) @@ -149,11 +152,13 @@ DAAL_EXPORT void daal::services::Environment::initNumberOfThreads() if (ncores > 0) { + std::cout << "(ht enabled) init with " << ncores << std::endl; daal::services::Environment::setNumberOfThreads(ncores); } } else { + std::cout << "(ht disabled) init with " << _daal_threader_get_max_threads() << std::endl; daal::services::Environment::setNumberOfThreads(_daal_threader_get_max_threads()); } isInit = true; @@ -161,6 +166,7 @@ DAAL_EXPORT void daal::services::Environment::initNumberOfThreads() DAAL_EXPORT daal::services::Environment::~Environment() { + std::cout << "Env destructor" << std::endl; daal::services::daal_free_buffers(); } diff --git a/cpp/daal/src/threading/threading.cpp b/cpp/daal/src/threading/threading.cpp index bf873bcf6a7..b0c4f696ac6 100644 --- a/cpp/daal/src/threading/threading.cpp +++ b/cpp/daal/src/threading/threading.cpp @@ -35,6 +35,7 @@ #include #include #include "services/daal_atomic_int.h" +#include #if defined(TBB_INTERFACE_VERSION) && TBB_INTERFACE_VERSION >= 12002 #include @@ -42,17 +43,23 @@ namespace daal { -ThreaderEnvironment::ThreaderEnvironment() : _numberOfThreads(1), _taskArena(nullptr) {} +ThreaderEnvironment::ThreaderEnvironment() : _numberOfThreads(1), _taskArena(nullptr) +{ + std::cout << "threader env constructor" << std::endl; +} ThreaderEnvironment::~ThreaderEnvironment() { + std::cout << "threader env destructor" << std::endl; if (_taskArena) { + std::cout << "delete task arena" << std::endl; delete reinterpret_cast(_taskArena); _taskArena = nullptr; } } void ThreaderEnvironment::setNumberOfThreads(size_t value) { + std::cout << "set number of threads from " << _numberOfThreads << " to " << value << std::endl; if (_taskArena) { delete reinterpret_cast(_taskArena); @@ -80,12 +87,14 @@ DAAL_EXPORT void _threaded_scalable_free(void * ptr) DAAL_EXPORT size_t _setNumberOfThreads(const size_t numThreads) { + std::cout << "set nthreads " << numThreads << std::endl; static tbb::spin_mutex mt; tbb::spin_mutex::scoped_lock lock(mt); if (numThreads > 1) { const size_t maxNumThreads = _daal_threader_get_max_threads(); const size_t limitedNumThreads = numThreads < maxNumThreads ? numThreads : maxNumThreads; + std::cout << "Set number of threads to " << limitedNumThreads << " max(" << maxNumThreads << ")" << std::endl; daal::threader_env()->setNumberOfThreads(limitedNumThreads); return limitedNumThreads; } From c2b2b08db33a3ad33bf0afbd026b47fbeb8780e0 Mon Sep 17 00:00:00 2001 From: Anatoly Volkov Date: Fri, 12 Jul 2024 03:27:25 -0700 Subject: [PATCH 5/7] Add arena initialization --- cpp/daal/src/threading/threading.cpp | 13 +++++++++++-- cpp/daal/src/threading/threading.h | 1 + 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/cpp/daal/src/threading/threading.cpp b/cpp/daal/src/threading/threading.cpp index b0c4f696ac6..678750a8284 100644 --- a/cpp/daal/src/threading/threading.cpp +++ b/cpp/daal/src/threading/threading.cpp @@ -43,9 +43,15 @@ namespace daal { -ThreaderEnvironment::ThreaderEnvironment() : _numberOfThreads(1), _taskArena(nullptr) +ThreaderEnvironment::ThreaderEnvironment() : _numberOfThreads(1), _taskArena(nullptr), _schedulerHandle(nullptr) { std::cout << "threader env constructor" << std::endl; + #if defined(TARGET_X86_64) + _schedulerHandle = reinterpret_cast(new tbb::task_scheduler_handle(tbb::attach {})); + #endif + tbb::task_arena {}.initialize(); + std::cout << "threader env constructor end" << std::endl; + } ThreaderEnvironment::~ThreaderEnvironment() { @@ -55,6 +61,7 @@ ThreaderEnvironment::~ThreaderEnvironment() std::cout << "delete task arena" << std::endl; delete reinterpret_cast(_taskArena); _taskArena = nullptr; + std::cout << "after delete task arena" << std::endl; } } void ThreaderEnvironment::setNumberOfThreads(size_t value) @@ -67,7 +74,9 @@ void ThreaderEnvironment::setNumberOfThreads(size_t value) } if (value > 1) { - _taskArena = reinterpret_cast(new tbb::task_arena(value)); + tbb::task_arena* arenaPtr = new tbb::task_arena(value); + // arenaPtr->initialize(); + _taskArena = reinterpret_cast(arenaPtr); } _numberOfThreads = value; } diff --git a/cpp/daal/src/threading/threading.h b/cpp/daal/src/threading/threading.h index 730f0e962d3..4c6d5b42675 100644 --- a/cpp/daal/src/threading/threading.h +++ b/cpp/daal/src/threading/threading.h @@ -172,6 +172,7 @@ class ThreaderEnvironment private: size_t _numberOfThreads; void * _taskArena; + void * _schedulerHandle; }; inline ThreaderEnvironment * threader_env() From 6a949b3deeba1f25e442e382b53d1a8a6024a3c2 Mon Sep 17 00:00:00 2001 From: Anatoly Volkov Date: Fri, 12 Jul 2024 14:54:38 -0700 Subject: [PATCH 6/7] Add scheduler handler delete --- cpp/daal/src/threading/threading.cpp | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/cpp/daal/src/threading/threading.cpp b/cpp/daal/src/threading/threading.cpp index 678750a8284..0b9749aed0f 100644 --- a/cpp/daal/src/threading/threading.cpp +++ b/cpp/daal/src/threading/threading.cpp @@ -46,12 +46,11 @@ namespace daal ThreaderEnvironment::ThreaderEnvironment() : _numberOfThreads(1), _taskArena(nullptr), _schedulerHandle(nullptr) { std::cout << "threader env constructor" << std::endl; - #if defined(TARGET_X86_64) - _schedulerHandle = reinterpret_cast(new tbb::task_scheduler_handle(tbb::attach {})); - #endif +#if defined(TARGET_X86_64) + _schedulerHandle = reinterpret_cast(new tbb::task_scheduler_handle(tbb::attach {})); +#endif tbb::task_arena {}.initialize(); std::cout << "threader env constructor end" << std::endl; - } ThreaderEnvironment::~ThreaderEnvironment() { @@ -63,6 +62,19 @@ ThreaderEnvironment::~ThreaderEnvironment() _taskArena = nullptr; std::cout << "after delete task arena" << std::endl; } +#if defined(TARGET_X86_64) + if (_schedulerHandle) + { + std::cout << "scheduler handle finalize" << std::endl; + tbb::task_scheduler_handle * schedulerHandle = reinterpret_cast(_schedulerHandle); + std::cout << "after reinterpret_cast" << std::endl; + tbb::finalize(*schedulerHandle); + std::cout << "after finalize" << std::endl; + delete schedulerHandle; + _schedulerHandle = nullptr; + std::cout << "after scheduler handle delete" << std::endl; + } +#endif } void ThreaderEnvironment::setNumberOfThreads(size_t value) { @@ -74,7 +86,7 @@ void ThreaderEnvironment::setNumberOfThreads(size_t value) } if (value > 1) { - tbb::task_arena* arenaPtr = new tbb::task_arena(value); + tbb::task_arena * arenaPtr = new tbb::task_arena(value); // arenaPtr->initialize(); _taskArena = reinterpret_cast(arenaPtr); } From a2f4ae1262497058e76851d05f5a189d63b45c7b Mon Sep 17 00:00:00 2001 From: Anatoly Volkov Date: Sun, 14 Jul 2024 15:33:22 -0700 Subject: [PATCH 7/7] Remove finalize --- cpp/daal/src/threading/threading.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cpp/daal/src/threading/threading.cpp b/cpp/daal/src/threading/threading.cpp index 0b9749aed0f..75ed78e9217 100644 --- a/cpp/daal/src/threading/threading.cpp +++ b/cpp/daal/src/threading/threading.cpp @@ -68,7 +68,8 @@ ThreaderEnvironment::~ThreaderEnvironment() std::cout << "scheduler handle finalize" << std::endl; tbb::task_scheduler_handle * schedulerHandle = reinterpret_cast(_schedulerHandle); std::cout << "after reinterpret_cast" << std::endl; - tbb::finalize(*schedulerHandle); + std::cout << "--- no finalize ---" << std::endl; + // tbb::finalize(*schedulerHandle); std::cout << "after finalize" << std::endl; delete schedulerHandle; _schedulerHandle = nullptr;