-
Notifications
You must be signed in to change notification settings - Fork 0
/
msg_ring.cpp
149 lines (128 loc) · 4.01 KB
/
msg_ring.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
//
// Created by th on 3/29/24.
// Copyright (c) 2024 Exasol AG. All rights reserved.
//
#include "helper.hpp"
#include <liburing.h>
#include <vector>
#include <thread>
std::size_t num_messages{1'000'000};
std::size_t num_dispatcher{1};
std::atomic<bool> event_loop_running{false};
std::atomic<bool> stop{false};
::io_uring event_ring;
void handle_events(::io_uring* ring)
{
::io_uring_cqe* cqe{nullptr};
unsigned head;
unsigned i = 0;
io_uring_for_each_cqe(ring, head, cqe)
{
auto [type, op] = decode_userdata(cqe->user_data);
if (op != nullptr)
{
op->handle_completion(ring, type, cqe);
}
i++;
}
if (i == 0)
{
return;
}
io_uring_cq_advance(ring, i);
}
void event_loop()
{
int rc{0};
int flags = IORING_SETUP_SINGLE_ISSUER;
flags |= IORING_SETUP_DEFER_TASKRUN;
rc = ::io_uring_queue_init(1024, &event_ring, flags);
throw_error_code_if(rc < 0, -rc);
::io_uring_register_ring_fd(&event_ring);
event_loop_running = true;
while (!stop)
{
rc = ::io_uring_submit_and_wait(&event_ring, 1);
throw_error_code_if(rc < 0, -rc);
handle_events(&event_ring);
}
::io_uring_queue_exit(&event_ring);
}
// This is responsible for handling the completion of the msg we sent to the
// event ring. Once this message completed, we immediately respond with the reply
// to the source ring.
struct message_dispatch : base_operation
{
void handle_completion(io_uring* ring, operation_type type,
io_uring_cqe* cqe) final
{
throw_error_code_if(cqe->res < 0, -cqe->res);
// cqe->res contains the file descriptor for the source ring
int ring_fd = cqe->res;
auto* sqe = get_sqe(ring);
auto msg_data = encode_userdata(operation_type::ring_msg, nullptr);
::io_uring_prep_msg_ring(sqe, ring_fd, 0, msg_data, 0);
sqe->flags = IOSQE_CQE_SKIP_SUCCESS;
}
};
// The "main" loop of this example:
// We repeat the message "ping-pong" for a couple of times and then exit.
void dispatch()
{
::io_uring dispatch_ring;
int rc{0};
int flags = IORING_SETUP_SINGLE_ISSUER;
//flags |= IORING_SETUP_DEFER_TASKRUN;
rc = ::io_uring_queue_init(1024, &dispatch_ring, flags);
throw_error_code_if(rc < 0, -rc);
::io_uring_register_ring_fd(&dispatch_ring);
message_dispatch msg;
for (std::size_t i = 0; i != num_messages; ++i)
{
auto* sqe = get_sqe(&dispatch_ring);
auto msg_data = encode_userdata(operation_type::ring_msg, &msg);
::io_uring_prep_msg_ring(sqe, event_ring.ring_fd, dispatch_ring.ring_fd, msg_data, 0);
sqe->flags = IOSQE_CQE_SKIP_SUCCESS;
::io_uring_submit_and_wait(&dispatch_ring, 1);
handle_events(&dispatch_ring);
}
::io_uring_queue_exit(&dispatch_ring);
}
int main(int argc, char** argv)
{
for (int i = 1; i != argc; ++i) {
if (argv[i] == std::string_view{"num_requests"}) {
if (i + 1 < argc) {
num_messages = std::atoi(argv[i + 1]);
}
}
if (argv[i] == std::string_view{"num_dispatcher"}) {
if (i + 1 < argc) {
num_dispatcher = std::atoi(argv[i + 1]);
}
}
}
std::jthread event_loop_thread{&event_loop};
while (event_loop_running == false)
{
std::this_thread::yield();
}
std::vector<std::jthread> dispatcher;
dispatcher.reserve(num_dispatcher-1);
auto begin = std::chrono::steady_clock::now();
for (std::size_t i = 1; i != num_dispatcher; ++i)
{
dispatcher.emplace_back(dispatch);
}
dispatch();
// Reset the dispatcher...
dispatcher.clear();
auto end = std::chrono::steady_clock::now();
// Convert the runtime into seconds, casting elapsed time to a floating
// point number
double duration = std::chrono::duration<double>(end - begin).count();
printf("Requests/s: %f\n", num_messages / duration);
// Send stop signal...
stop = true;
request_stop(&event_ring);
}