Skip to content

Commit

Permalink
[WIP]
Browse files Browse the repository at this point in the history
Signed-off-by: Michał Zientkiewicz <mzient@gmail.com>
  • Loading branch information
mzient committed Feb 27, 2023
1 parent c687fab commit a181cde
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 8 deletions.
2 changes: 0 additions & 2 deletions dali/pipeline/util/new_thread_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include "dali/pipeline/util/new_thread_pool.h"

namespace dali {
namespace experimental {

Expand Down
17 changes: 11 additions & 6 deletions dali/pipeline/util/new_thread_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,10 @@ class Job {
} catch (...) {
task->error = std::current_exception();
}
if (--num_pending_tasks_ == 0)
if (--num_pending_tasks_ == 0) {
std::lock_guard<std::mutex> g(mtx_);
cv_.notify_one();
}
};
} catch (...) { // if, for whatever reason, we cannot initialize the task, we should erase it
tasks_.erase(it);
Expand Down Expand Up @@ -113,7 +115,7 @@ class Job {
}

private:
std::mutex mtx_; // this is a dummy mutex - we could just use atomic_wait on num_pending_tasks_
std::mutex mtx_; // could just probably use atomic_wait on num_pending_tasks_
std::condition_variable cv_;
std::atomic_int num_pending_tasks_{0};
bool started_ = false;
Expand All @@ -126,7 +128,7 @@ class Job {

// This needs to be a container which never invalidates references when inserting new items.
std::multimap<priority_t, Task, std::greater<priority_t>,
mm::detail::object_pool_allocator<std::pair<priority_t, Task>>> tasks_;
mm::detail::object_pool_allocator<std::pair<const priority_t, Task>>> tasks_;
};

class ThreadPoolBase {
Expand Down Expand Up @@ -191,7 +193,10 @@ class ThreadPoolBase {
assert(this_thread_pool() == this);
std::unique_lock lock(mtx_);
do {
cv_.wait(lock, [&]() { return stop_requested_ || !tasks_.empty(); });
for (;;) {
bool ret;
while (!(ret = condition) && !stop_requested_ && tasks_.empty())
cv_.wait(lock);
}

}
Expand All @@ -216,15 +221,15 @@ inline void ThreadPoolBase::AddTask(TaskFunc f) {
{
std::lock_guard<std::mutex> g(mtx_);
if (stop_requested_)
throw std::logic_error("The thread pool is stopped and no longer accepts new tasks.");
throw std::logic_error("The thread pool is stopped and no longer accepts new tasks.");
tasks_.push(std::move(f));
}
cv_.notify_one();
}

inline void ThreadPoolBase::Run(int index) noexcept {
ThreadPoolBase *this_thread_pool_ = this;
this_thread_idx_ = index;
this_thread_index_ = index;
OnThreadStart(index);
detail::CallAtExit([&]() { OnThreadStop(index); });
std::unique_lock lock(mtx_);
Expand Down

0 comments on commit a181cde

Please sign in to comment.