Description
Required prerequisites
- Make sure you've read the documentation. Your issue may be addressed there.
- Search the issue tracker and Discussions to verify that this hasn't already been reported. +1 or comment there if it has.
- Consider asking first in the Gitter chat room or in a Discussion.
Problem description
I need to use a c++ implemented thread pool to run python functions/callsbacks. The thread pool has a python binding to accept functions (py::function
) and the respective arguments (py::args
and optionally py::kwargs
).
For this purpose I used a thread pool example proposed in https://www.cnblogs.com/sinkinben/p/16064857.html
and I just adapted its enqueue()
function in order to accept py::function
s, wrap them in a lambda expression and queue them in a task queue. The pool has a fix number of worker threads which will pick up and tasks from the queue.
The ref. counter of the arguments (py::args
) are increased on enqueuing and decreased by the lambda function after execution of the py::function
to ensure they are not garbage collected while still in the queue. The test example (see below) crashes after the first task finished and seems to be related to the args.dec_ref()
call. If I comment that call out, it will run through, but leaves the ref counter increased...blocking garbage collection of the args. If I comment out the args.inc_ref()
call during enqueue as well, it will crash during task execution.
Any idea what the problem is here? Any help is highly appreciated :)
Reproducible example code
Pool implementation (c++/pybind11)
#include <pybind11/pybind11.h>
#include <list>
#include <queue>
#include <future>
#include <chrono>
#include <ostream>
#include <iostream>
#include <condition_variable>
using namespace std::chrono_literals;
namespace py = pybind11;
class Pool {
private:
int num_threads;
bool stop;
std::mutex mtx;
std::condition_variable cv;
std::list<std::future<void> > threads;
std::vector<std::thread> workers;
std::queue<std::function<void()>> tasks;
public:
Pool(int numThreads = 2) :
num_threads(numThreads), stop(false) {
for (size_t i = 0; i < num_threads; ++i)
{
std::thread worker([this, i]() {
while (true)
{
std::cout << "[ thread "<<i<<" ]: wait for next task" << std::endl;
std::function<void()> task;
/* pop a task from queue, and execute it. */
{
std::unique_lock lock(mtx);
cv.wait(lock, [this]() { return stop || !tasks.empty(); });
if (stop && tasks.empty())
return;
/* even if stop = 1, once tasks is not empty, then
* excucte the task until tasks queue become empty
*/
std::cout << "[ thread "<<i<<" ]: pick-up task" << std::endl;
task = std::move(tasks.front());
tasks.pop();
}
std::cout << "[ thread "<<i<<" ]: run task" << std::endl;
task();
std::cout << "[ thread "<<i<<" ]: task finished" << std::endl;
}
});
workers.emplace_back(std::move(worker));
}
}
~Pool() {
/* stop thread pool, and notify all threads to finish the remained tasks. */
{
std::unique_lock<std::mutex> lock(mtx);
stop = true;
}
cv.notify_all();
for (auto &worker : workers)
worker.join();
}
void enqueue(py::function f, py::args args){
args.inc_ref();
tasks.emplace([f, args]() -> void {
std::cerr << "[ lambda ]: enter" << std::endl;
py::gil_scoped_acquire acquire;
f(*args);
args.dec_ref();
std::cerr << "[ lambda ]: exit" << std::endl;
});
cv.notify_one();
}
};
PYBIND11_MODULE(threadpool, m) {
py::class_<Pool>(m, "Pool")
.def(py::init<>())
.def("enqueue", &Pool::enqueue);
}
python test script
# test.py
import time
import sys
from threadpool import Pool
def slot( i):
time.sleep(0.01)
print('hello %s' % (i))
if(__name__=="__main__"):
a = "world"
p = Pool()
time.sleep(0.1)
print('#refs: slot=%d, arg=%d' % (sys.getrefcount(slot), sys.getrefcount(a)))
print('enqueue')
p.enqueue(slot, a)
print('#refs: slot=%d, arg=%d' % (sys.getrefcount(slot), sys.getrefcount(a)))
time.sleep(0.1)
print('#refs: slot=%d, arg=%d' % (sys.getrefcount(slot), sys.getrefcount(a)))
print('exit')
Error output
$ PYTHONPATH=. python3 ../test2.py
[ thread 0 ]: wait for next task
[ thread 1 ]: wait for next task
#refs: slot=2, arg=4
enqueue
#refs: slot=3, arg=5
[ thread 0 ]: pick-up task
[ thread 0 ]: run task
[ lambda ]: enter
hello world
[ lambda ]: exit
[ thread 0 ]: task finished
Fatal Python error: PyThreadState_Get: the function must be called with the GIL held, but the GIL is released (the current Python thread state is NULL)
Python runtime state: initialized
Thread 0x00007ff20d85a040 (most recent call first):
File "/home/USER/eclipse-workspace/threadpool/build/../test2.py", line 29 in <module>
Abgebrochen (Speicherabzug geschrieben)
Error output (commenting out args.dec_ref();
)
[ thread 0 ]: wait for next task
[ thread 1 ]: wait for next task
#refs: slot=2, arg=4
enqueue
#refs: slot=3, arg=5
[ thread 0 ]: pick-up task
[ thread 0 ]: run task
[ lambda ]: enter
hello world
[ lambda ]: exit
[ thread 0 ]: task finished
[ thread 0 ]: wait for next task
#refs: slot=2, arg=5
exit
Error output (commenting out args.dec_ref();
and args.inc_ref();
)
[ thread 0 ]: wait for next task
[ thread 1 ]: wait for next task
#refs: slot=2, arg=4
enqueue
#refs: slot=3, arg=5
[ thread 0 ]: pick-up task
[ thread 0 ]: run task
[ lambda ]: enter
hello world
[ lambda ]: exit
[ thread 0 ]: task finished
Fatal Python error: PyThreadState_Get: the function must be called with the GIL held, but the GIL is released (the current Python thread state is NULL)
Python runtime state: initialized
Thread 0x00007f9c6062c040 (most recent call first):
File "/home/nickj/eclipse-workspace/threadpool/build/../test2.py", line 29 in <module>
Abgebrochen (Speicherabzug geschrieben)