Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resolve deadlocking when adding to IOMonitor #390

Merged
merged 1 commit into from
Jul 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/logid/DeviceManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,11 @@ void DeviceManager::addDevice(std::string path) {

// Check if device is ignored before continuing
{
raw::RawDevice raw_dev(path, self<DeviceManager>().lock());
auto raw_dev = raw::RawDevice::make(path, self<DeviceManager>().lock());
if (config()->ignore.has_value() &&
config()->ignore.value().contains(raw_dev.productId())) {
config()->ignore.value().contains(raw_dev->productId())) {
logPrintf(DEBUG, "%s: Device 0x%04x ignored.",
path.c_str(), raw_dev.productId());
path.c_str(), raw_dev->productId());
return;
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/logid/backend/hidpp/Device.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ Device::Device(const std::string& path, DeviceIndex index,
const std::shared_ptr<raw::DeviceMonitor>& monitor, double timeout) :
io_timeout(duration_cast<milliseconds>(
duration<double, std::milli>(timeout))),
_raw_device(std::make_shared<raw::RawDevice>(path, monitor)),
_raw_device(raw::RawDevice::make(path, monitor)),
_receiver(nullptr), _path(path), _index(index) {
}

Expand Down
36 changes: 19 additions & 17 deletions src/logid/backend/raw/DeviceMonitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,23 +89,25 @@ void DeviceMonitor::ready() {
_ready = true;

_io_monitor->add(_fd, {
[this]() {
struct udev_device* device = udev_monitor_receive_device(_udev_monitor);
std::string action = udev_device_get_action(device);
std::string dev_node = udev_device_get_devnode(device);

if (action == "add")
run_task([self_weak = _self, dev_node]() {
if (auto self = self_weak.lock())
self->_addHandler(dev_node);
});
else if (action == "remove")
run_task([self_weak = _self, dev_node]() {
if (auto self = self_weak.lock())
self->_removeHandler(dev_node);
});

udev_device_unref(device);
[self_weak = _self]() {
if (auto self = self_weak.lock()) {
struct udev_device* device = udev_monitor_receive_device(self->_udev_monitor);
std::string action = udev_device_get_action(device);
std::string dev_node = udev_device_get_devnode(device);

if (action == "add")
run_task([self_weak, dev_node]() {
if (auto self = self_weak.lock())
self->_addHandler(dev_node);
});
else if (action == "remove")
run_task([self_weak, dev_node]() {
if (auto self = self_weak.lock())
self->_removeHandler(dev_node);
});

udev_device_unref(device);
}
},
[]() {
throw std::runtime_error("udev hangup");
Expand Down
150 changes: 63 additions & 87 deletions src/logid/backend/raw/IOMonitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*
*/
#include <backend/raw/IOMonitor.h>
#include <cassert>
#include <util/log.h>
#include <optional>

extern "C"
Expand All @@ -36,55 +36,6 @@ IOHandler::IOHandler(std::function<void()> r,
error(std::move(err)) {
}

class IOMonitor::io_lock {
std::optional<std::unique_lock<std::mutex>> _lock;
IOMonitor* _io_monitor;
const uint64_t counter = 1;

public:
explicit io_lock(IOMonitor* io_monitor) : _io_monitor(io_monitor) {
_io_monitor->_interrupting = true;
[[maybe_unused]] ssize_t ret = ::write(_io_monitor->_event_fd, &counter, sizeof(counter));
assert(ret == sizeof(counter));
_lock.emplace(_io_monitor->_run_mutex);
}

io_lock(const io_lock&) = delete;

io_lock& operator=(const io_lock&) = delete;

io_lock(io_lock&& o) noexcept: _lock(std::move(o._lock)), _io_monitor(o._io_monitor) {
o._lock.reset();
o._io_monitor = nullptr;
}

io_lock& operator=(io_lock&& o) noexcept {
if (this != &o) {
_lock = std::move(o._lock);
_io_monitor = o._io_monitor;
o._lock.reset();
o._io_monitor = nullptr;
}

return *this;
}

~io_lock() noexcept {
if (_lock && _io_monitor) {
uint64_t buf{};
[[maybe_unused]] const ssize_t ret = ::read(
_io_monitor->_event_fd, &buf, sizeof(counter));

assert(ret != -1);

if (buf == counter) {
_io_monitor->_interrupting = false;
_io_monitor->_interrupt_cv.notify_one();
}
}
}
};

IOMonitor::IOMonitor() : _epoll_fd(epoll_create1(0)),
_event_fd(eventfd(0, EFD_NONBLOCK)) {
if (_epoll_fd < 0) {
Expand All @@ -106,12 +57,7 @@ IOMonitor::IOMonitor() : _epoll_fd(epoll_create1(0)),
throw std::system_error(errno, std::generic_category());
}

_fds.emplace(std::piecewise_construct, std::forward_as_tuple(_event_fd),
std::forward_as_tuple([]() {}, []() {
throw std::runtime_error("event_fd hangup");
}, []() {
throw std::runtime_error("event_fd error");
}));
_fds.emplace(_event_fd, nullptr);

_io_thread = std::make_unique<std::thread>([this]() {
_listen();
Expand All @@ -122,70 +68,100 @@ IOMonitor::~IOMonitor() noexcept {
_stop();

if (_event_fd >= 0)
close(_event_fd);
::close(_event_fd);

if (_epoll_fd >= 0)
close(_epoll_fd);
::close(_epoll_fd);
}

void IOMonitor::_listen() {
std::unique_lock lock(_run_mutex);
std::vector<struct epoll_event> events;

if (_is_running)
throw std::runtime_error("IOMonitor double run");

_is_running = true;

while (_is_running) {
if (_interrupting) {
_interrupt_cv.wait(lock, [this]() {
return !_interrupting;
});

if (!_is_running)
break;
}

if (events.size() != _fds.size())
events.resize(_fds.size());

int ev_count = ::epoll_wait(_epoll_fd, events.data(), (int) events.size(), -1);
for (int i = 0; i < ev_count; ++i) {
const auto& handler = _fds.at(events[i].data.fd);
if (events[i].events & EPOLLIN)
handler.read();
if (events[i].events & EPOLLHUP)
handler.hangup();
if (events[i].events & EPOLLERR)
handler.error();
std::shared_ptr<IOHandler> handler;

if (events[i].data.fd == _event_fd) {
if (events[i].events & EPOLLIN) {
lock.unlock();
/* Wait until done yielding */
const std::lock_guard yield_lock(_yield_mutex);
uint64_t event;
while (-1 != ::eventfd_read(_event_fd, &event)) { }
lock.lock();
}
} else {
try {
handler = _fds.at(events[i].data.fd);
} catch (std::out_of_range& e) {
continue;
}

lock.unlock();
try {
if (events[i].events & EPOLLIN)
handler->read();
if (events[i].events & EPOLLHUP)
handler->hangup();
if (events[i].events & EPOLLERR)
handler->error();
} catch (std::exception& e) {
logPrintf(ERROR, "Unhandled I/O handler error: %s", e.what());
}
lock.lock();

}
}
}
}

void IOMonitor::_stop() noexcept {
{
[[maybe_unused]] const io_lock lock(this);
_is_running = false;
}
_is_running = false;
_yield();
_io_thread->join();
}

std::unique_lock<std::mutex> IOMonitor::_yield() noexcept {
/* Prevent listener thread from grabbing lock during yielding */
std::unique_lock yield_lock(_yield_mutex);

std::unique_lock run_lock(_run_mutex, std::try_to_lock);
if (!run_lock.owns_lock()) {
::eventfd_write(_event_fd, 1);
run_lock = std::unique_lock<std::mutex>(_run_mutex);
}

return run_lock;
}

void IOMonitor::add(int fd, IOHandler handler) {
[[maybe_unused]] const io_lock lock(this);
const auto lock = _yield();

struct epoll_event event{};
event.events = EPOLLIN | EPOLLHUP | EPOLLERR;
event.data.fd = fd;

// TODO: EPOLL_CTL_MOD
if (_fds.contains(fd))
if (!_fds.contains(fd)) {
if (::epoll_ctl(_epoll_fd, EPOLL_CTL_ADD, fd, &event))
throw std::system_error(errno, std::generic_category());
_fds.emplace(fd, std::make_shared<IOHandler>(std::move(handler)));
} else {
throw std::runtime_error("duplicate io fd");

if (::epoll_ctl(_epoll_fd, EPOLL_CTL_ADD, fd, &event))
throw std::system_error(errno, std::generic_category());
_fds.emplace(fd, std::move(handler));
}
}

void IOMonitor::remove(int fd) noexcept {
[[maybe_unused]] const io_lock lock(this);

const auto lock = _yield();
::epoll_ctl(_epoll_fd, EPOLL_CTL_DEL, fd, nullptr);
_fds.erase(fd);
}
16 changes: 7 additions & 9 deletions src/logid/backend/raw/IOMonitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@
#include <atomic>
#include <functional>
#include <map>
#include <memory>
#include <mutex>
#include <thread>
#include <condition_variable>
#include <thread>

namespace logid::backend::raw {
struct IOHandler {
Expand Down Expand Up @@ -53,24 +54,21 @@ namespace logid::backend::raw {
void add(int fd, IOHandler handler);

void remove(int fd) noexcept;

private:
void _listen(); // This is a blocking call
void _stop() noexcept;
std::unique_lock<std::mutex> _yield() noexcept;

std::unique_ptr<std::thread> _io_thread;

std::map<int, IOHandler> _fds;
mutable std::mutex _run_mutex;
std::atomic_bool _is_running;
std::mutex _run_mutex;
std::mutex _yield_mutex;

std::atomic_bool _interrupting;
std::condition_variable _interrupt_cv;
std::map<int, std::shared_ptr<IOHandler>> _fds;
std::atomic_bool _is_running;

const int _epoll_fd;
const int _event_fd;

class io_lock;
};
}

Expand Down
17 changes: 14 additions & 3 deletions src/logid/backend/raw/RawDevice.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,22 @@ RawDevice::RawDevice(std::string path, const std::shared_ptr<DeviceMonitor>& mon
auto phys = get_phys(_fd);
_sub_device = std::regex_match(phys, virtual_path_regex);
}
}

void RawDevice::_ready() {
_io_monitor->add(_fd, {
[this]() { _readReports(); },
[this]() { _valid = false; },
[this]() { _valid = false; }
[self_weak = _self]() {
if (auto self = self_weak.lock())
self->_readReports();
},
[self_weak = _self]() {
if (auto self = self_weak.lock())
self->_valid = false;
},
[self_weak = _self]() {
if (auto self = self_weak.lock())
self->_valid = false;
}
});
}

Expand Down
24 changes: 23 additions & 1 deletion src/logid/backend/raw/RawDevice.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,16 @@ namespace logid::backend::raw {

class IOMonitor;

template <typename T>
class RawDeviceWrapper : public T {
public:
template <typename... Args>
RawDeviceWrapper(Args... args) : T(std::forward<Args>(args)...) { }
};

class RawDevice {
template <typename>
friend class RawDeviceWrapper;
public:
static constexpr int max_data_length = 32;
typedef RawEventHandler EventHandler;
Expand All @@ -51,7 +60,14 @@ namespace logid::backend::raw {
BusType bus_type;
};

RawDevice(std::string path, const std::shared_ptr<DeviceMonitor>& monitor);
template <typename... Args>
static std::shared_ptr<RawDevice> make(Args... args) {
auto raw_dev = std::make_shared<RawDeviceWrapper<RawDevice>>(
std::forward<Args>(args)...);
raw_dev->_self = raw_dev;
raw_dev->_ready();
return raw_dev;
}

~RawDevice() noexcept;

Expand Down Expand Up @@ -79,6 +95,10 @@ namespace logid::backend::raw {
[[nodiscard]] EventHandlerLock<RawDevice> addEventHandler(RawEventHandler handler);

private:
RawDevice(std::string path, const std::shared_ptr<DeviceMonitor>& monitor);

void _ready();

void _readReports();

std::atomic_bool _valid;
Expand All @@ -91,6 +111,8 @@ namespace logid::backend::raw {

std::shared_ptr<IOMonitor> _io_monitor;

std::weak_ptr<RawDevice> _self;

bool _sub_device = false;

std::shared_ptr<EventHandlerList<RawDevice>> _event_handlers;
Expand Down