Skip to content

Commit

Permalink
Support combine with other event loops
Browse files Browse the repository at this point in the history
  • Loading branch information
shuai132 committed Jun 20, 2024
1 parent 12e0ce9 commit af0128d
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 2 deletions.
18 changes: 16 additions & 2 deletions asio/include/asio/detail/impl/kqueue_reactor.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -444,8 +444,18 @@ void kqueue_reactor::run(long usec, op_queue<operation>& ops)
lock.unlock();

// Block on the kqueue descriptor.
struct kevent events[128];
int num_events = kevent(kqueue_fd_, 0, 0, events, 128, timeout);
struct kevent *events = event_loop_field_.events;
int &num_events = event_loop_field_.num_events;

if (!event_loop_field_.has_event) {
num_events = kevent(kqueue_fd_, 0, 0, events, 128, timeout);
if (event_loop_field_.is_wait) {
event_loop_field_.has_event = true;
return;
}
} else {
event_loop_field_.has_event = false;
}

#if defined(ASIO_ENABLE_HANDLER_TRACKING)
// Trace the waiting events.
Expand Down Expand Up @@ -542,6 +552,10 @@ void kqueue_reactor::run(long usec, op_queue<operation>& ops)
timer_queues_.get_ready_timers(ops);
}

void kqueue_reactor::mask_wait_only() {
event_loop_field_.is_wait = true;
}

void kqueue_reactor::interrupt()
{
interrupter_.interrupt();
Expand Down
6 changes: 6 additions & 0 deletions asio/include/asio/detail/impl/scheduler.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,12 @@ std::size_t scheduler::run_one(asio::error_code& ec)
return do_run_one(lock, this_thread, ec);
}

std::size_t scheduler::wait_event(long usec, asio::error_code &ec)
{
task_->mask_wait_only();
return wait_one(usec, ec);
}

std::size_t scheduler::wait_one(long usec, asio::error_code& ec)
{
ec = asio::error_code();
Expand Down
10 changes: 10 additions & 0 deletions asio/include/asio/detail/kqueue_reactor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,8 @@ class kqueue_reactor
// Run the kqueue loop.
ASIO_DECL void run(long usec, op_queue<operation>& ops);

ASIO_DECL void mask_wait_only();

// Interrupt the kqueue loop.
ASIO_DECL void interrupt();

Expand Down Expand Up @@ -254,6 +256,14 @@ class kqueue_reactor

// Keep track of all registered descriptors.
object_pool<descriptor_state> registered_descriptors_;

// Save fired events
struct {
struct kevent events[128];
int num_events;
std::atomic_bool has_event;
std::atomic_bool is_wait;
} event_loop_field_ {};
};

} // namespace detail
Expand Down
3 changes: 3 additions & 0 deletions asio/include/asio/detail/scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ class scheduler
// Poll for one operation without blocking.
ASIO_DECL std::size_t poll_one(asio::error_code& ec);

// Wait until timeout, interrupted, or one operation event.
ASIO_DECL std::size_t wait_event(long usec, asio::error_code& ec);

// Interrupt the event processing loop.
ASIO_DECL void stop();

Expand Down
5 changes: 5 additions & 0 deletions asio/include/asio/detail/scheduler_task.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ class scheduler_task
// Run the task once until interrupted or events are ready to be dispatched.
virtual void run(long usec, op_queue<scheduler_operation>& ops) = 0;

// Mask just wait event for this time call run
virtual void mask_wait_only() {
throw_error(std::make_error_code(std::errc::operation_not_supported));
};

// Interrupt the task.
virtual void interrupt() = 0;

Expand Down
56 changes: 56 additions & 0 deletions asio/src/examples/cpp11/other_eventloop/with_libuv.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#include <asio.hpp>
#include <iostream>
#include <uv.h>

//#define USE_LOG
#ifdef USE_LOG
#include "log.h"
#else
#define LOG printf
#endif

int main() {
uv_loop_t *loop = uv_default_loop();
static asio::io_context io_context;
asio::io_context::work work(io_context);

uv_timer_t timer_req;
uv_timer_init(loop, &timer_req);
uv_timer_start(
&timer_req, [](uv_timer_t *handle) { LOG("libuv Timer fired!\n"); },
10000, 10000);

asio::steady_timer asio_timer(io_context);
std::function<void()> start_asio_timer;
start_asio_timer = [&] {
asio_timer.expires_after(asio::chrono::milliseconds(1000));
asio_timer.async_wait([&](const asio::error_code &ec) {
LOG("asio Timer fired!\n");
start_asio_timer();
});
};
start_asio_timer();

static std::condition_variable async_done;
std::mutex async_done_lock;
uv_async_t async;
uv_async_init(loop, &async, [](uv_async_t *handle) {
io_context.poll_one();
async_done.notify_one();
});

std::thread([&] {
for (;;) {
LOG("wait_event...");
asio::error_code ec;
auto &service = asio::use_service<asio::detail::scheduler>(io_context);
service.wait_event(INTMAX_MAX, ec);
uv_async_send(&async);
std::unique_lock<std::mutex> lock(async_done_lock);
async_done.wait(lock);
}
}).detach();

uv_run(loop, UV_RUN_DEFAULT);
return 0;
}

0 comments on commit af0128d

Please sign in to comment.