-
Notifications
You must be signed in to change notification settings - Fork 270
/
Copy pathloop.c
396 lines (327 loc) · 16 KB
/
loop.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
/*
* Authored by Alex Hultman, 2018-2021.
* Intellectual property of third-party.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef LIBUS_USE_IO_URING
#include "libusockets.h"
#include "internal/internal.h"
#include <stdlib.h>
/* The loop has 2 fallthrough polls */
void us_internal_loop_data_init(struct us_loop_t *loop, void (*wakeup_cb)(struct us_loop_t *loop),
void (*pre_cb)(struct us_loop_t *loop), void (*post_cb)(struct us_loop_t *loop)) {
loop->data.sweep_timer = us_create_timer(loop, 1, 0);
loop->data.recv_buf = malloc(LIBUS_RECV_BUFFER_LENGTH + LIBUS_RECV_BUFFER_PADDING * 2);
loop->data.ssl_data = 0;
loop->data.head = 0;
loop->data.iterator = 0;
loop->data.closed_head = 0;
loop->data.low_prio_head = 0;
loop->data.low_prio_budget = 0;
loop->data.pre_cb = pre_cb;
loop->data.post_cb = post_cb;
loop->data.iteration_nr = 0;
loop->data.wakeup_async = us_internal_create_async(loop, 1, 0);
us_internal_async_set(loop->data.wakeup_async, (void (*)(struct us_internal_async *)) wakeup_cb);
}
void us_internal_loop_data_free(struct us_loop_t *loop) {
#ifndef LIBUS_NO_SSL
us_internal_free_loop_ssl_data(loop);
#endif
free(loop->data.recv_buf);
us_timer_close(loop->data.sweep_timer);
us_internal_async_close(loop->data.wakeup_async);
}
void us_wakeup_loop(struct us_loop_t *loop) {
us_internal_async_wakeup(loop->data.wakeup_async);
}
void us_internal_loop_link(struct us_loop_t *loop, struct us_socket_context_t *context) {
/* Insert this context as the head of loop */
context->next = loop->data.head;
context->prev = 0;
if (loop->data.head) {
loop->data.head->prev = context;
}
loop->data.head = context;
}
/* Unlink is called before free */
void us_internal_loop_unlink(struct us_loop_t *loop, struct us_socket_context_t *context) {
if (loop->data.head == context) {
loop->data.head = context->next;
if (loop->data.head) {
loop->data.head->prev = 0;
}
} else {
context->prev->next = context->next;
if (context->next) {
context->next->prev = context->prev;
}
}
}
/* This functions should never run recursively */
void us_internal_timer_sweep(struct us_loop_t *loop) {
struct us_internal_loop_data_t *loop_data = &loop->data;
/* For all socket contexts in this loop */
for (loop_data->iterator = loop_data->head; loop_data->iterator; loop_data->iterator = loop_data->iterator->next) {
struct us_socket_context_t *context = loop_data->iterator;
/* Update this context's timestamps (this could be moved to loop and done once) */
context->global_tick++;
unsigned char short_ticks = context->timestamp = context->global_tick % 240;
unsigned char long_ticks = context->long_timestamp = (context->global_tick / 15) % 240;
/* Begin at head */
struct us_socket_t *s = context->head_sockets;
while (s) {
/* Seek until end or timeout found (tightest loop) */
while (1) {
/* We only read from 1 random cache line here */
if (short_ticks == s->timeout || long_ticks == s->long_timeout) {
break;
}
/* Did we reach the end without a find? */
if ((s = s->next) == 0) {
goto next_context;
}
}
/* Here we have a timeout to emit (slow path) */
context->iterator = s;
if (short_ticks == s->timeout) {
s->timeout = 255;
context->on_socket_timeout(s);
}
if (context->iterator == s && long_ticks == s->long_timeout) {
s->long_timeout = 255;
context->on_socket_long_timeout(s);
}
/* Check for unlink / link (if the event handler did not modify the chain, we step 1) */
if (s == context->iterator) {
s = s->next;
} else {
/* The iterator was changed by event handler */
s = context->iterator;
}
}
/* We always store a 0 to context->iterator here since we are no longer iterating this context */
next_context:
context->iterator = 0;
}
}
/* We do not want to block the loop with tons and tons of CPU-intensive work for SSL handshakes.
* Spread it out during many loop iterations, prioritizing already open connections, they are far
* easier on CPU */
static const int MAX_LOW_PRIO_SOCKETS_PER_LOOP_ITERATION = 5;
void us_internal_handle_low_priority_sockets(struct us_loop_t *loop) {
struct us_internal_loop_data_t *loop_data = &loop->data;
struct us_socket_t *s;
loop_data->low_prio_budget = MAX_LOW_PRIO_SOCKETS_PER_LOOP_ITERATION;
for (s = loop_data->low_prio_head; s && loop_data->low_prio_budget > 0; s = loop_data->low_prio_head, loop_data->low_prio_budget--) {
/* Unlink this socket from the low-priority queue */
loop_data->low_prio_head = s->next;
if (s->next) s->next->prev = 0;
s->next = 0;
us_internal_socket_context_link_socket(s->context, s);
us_poll_change(&s->p, us_socket_context(0, s)->loop, us_poll_events(&s->p) | LIBUS_SOCKET_READABLE);
s->low_prio_state = 2;
}
}
/* Note: Properly takes the linked list and timeout sweep into account */
void us_internal_free_closed_sockets(struct us_loop_t *loop) {
/* Free all closed sockets (maybe it is better to reverse order?) */
if (loop->data.closed_head) {
for (struct us_socket_t *s = loop->data.closed_head; s; ) {
struct us_socket_t *next = s->next;
us_poll_free((struct us_poll_t *) s, loop);
s = next;
}
loop->data.closed_head = 0;
}
}
void sweep_timer_cb(struct us_internal_callback_t *cb) {
us_internal_timer_sweep(cb->loop);
}
long long us_loop_iteration_number(struct us_loop_t *loop) {
return loop->data.iteration_nr;
}
/* These may have somewhat different meaning depending on the underlying event library */
void us_internal_loop_pre(struct us_loop_t *loop) {
loop->data.iteration_nr++;
us_internal_handle_low_priority_sockets(loop);
loop->data.pre_cb(loop);
}
void us_internal_loop_post(struct us_loop_t *loop) {
us_internal_free_closed_sockets(loop);
loop->data.post_cb(loop);
}
struct us_socket_t *us_adopt_accepted_socket(int ssl, struct us_socket_context_t *context, LIBUS_SOCKET_DESCRIPTOR accepted_fd,
unsigned int socket_ext_size, char *addr_ip, int addr_ip_length) {
#ifndef LIBUS_NO_SSL
if (ssl) {
return (struct us_socket_t *)us_internal_ssl_adopt_accepted_socket((struct us_internal_ssl_socket_context_t *)context, accepted_fd,
socket_ext_size, addr_ip, addr_ip_length);
}
#endif
struct us_poll_t *accepted_p = us_create_poll(context->loop, 0, sizeof(struct us_socket_t) - sizeof(struct us_poll_t) + socket_ext_size);
us_poll_init(accepted_p, accepted_fd, POLL_TYPE_SOCKET);
us_poll_start(accepted_p, context->loop, LIBUS_SOCKET_READABLE);
struct us_socket_t *s = (struct us_socket_t *) accepted_p;
s->context = context;
s->timeout = 255;
s->long_timeout = 255;
s->low_prio_state = 0;
/* We always use nodelay */
bsd_socket_nodelay(accepted_fd, 1);
us_internal_socket_context_link_socket(context, s);
context->on_open(s, 0, addr_ip, addr_ip_length);
return s;
}
void us_internal_dispatch_ready_poll(struct us_poll_t *p, int error, int events) {
switch (us_internal_poll_type(p)) {
case POLL_TYPE_CALLBACK: {
struct us_internal_callback_t *cb = (struct us_internal_callback_t *) p;
/* Timers, asyncs should accept (read), while UDP sockets should obviously not */
if (!cb->leave_poll_ready) {
/* Let's just have this macro to silence the CodeQL alert regarding empty function when using libuv */
#ifndef LIBUS_USE_LIBUV
us_internal_accept_poll_event(p);
#endif
}
cb->cb(cb->cb_expects_the_loop ? (struct us_internal_callback_t *) cb->loop : (struct us_internal_callback_t *) &cb->p);
}
break;
case POLL_TYPE_SEMI_SOCKET: {
/* Both connect and listen sockets are semi-sockets
* but they poll for different events */
if (us_poll_events(p) == LIBUS_SOCKET_WRITABLE) {
struct us_socket_t *s = (struct us_socket_t *) p;
/* It is perfectly possible to come here with an error */
if (error) {
/* Emit error, close without emitting on_close */
s->context->on_connect_error(s, 0);
us_socket_close_connecting(0, s);
} else {
/* All sockets poll for readable */
us_poll_change(p, s->context->loop, LIBUS_SOCKET_READABLE);
/* We always use nodelay */
bsd_socket_nodelay(us_poll_fd(p), 1);
/* We are now a proper socket */
us_internal_poll_set_type(p, POLL_TYPE_SOCKET);
/* If we used a connection timeout we have to reset it here */
us_socket_timeout(0, s, 0);
s->context->on_open(s, 1, 0, 0);
}
} else {
struct us_listen_socket_t *listen_socket = (struct us_listen_socket_t *) p;
struct bsd_addr_t addr;
LIBUS_SOCKET_DESCRIPTOR client_fd = bsd_accept_socket(us_poll_fd(p), &addr);
if (client_fd == LIBUS_SOCKET_ERROR) {
/* Todo: start timer here */
} else {
/* Todo: stop timer if any */
do {
struct us_socket_context_t *context = us_socket_context(0, &listen_socket->s);
/* See if we want to export the FD or keep it here (this event can be unset) */
if (context->on_pre_open == 0 || context->on_pre_open(context, client_fd) == client_fd) {
/* Adopt the newly accepted socket */
us_adopt_accepted_socket(0, context,
client_fd, listen_socket->socket_ext_size, bsd_addr_get_ip(&addr), bsd_addr_get_ip_length(&addr));
/* Exit accept loop if listen socket was closed in on_open handler */
if (us_socket_is_closed(0, &listen_socket->s)) {
break;
}
}
} while ((client_fd = bsd_accept_socket(us_poll_fd(p), &addr)) != LIBUS_SOCKET_ERROR);
}
}
}
break;
case POLL_TYPE_SOCKET_SHUT_DOWN:
case POLL_TYPE_SOCKET: {
/* We should only use s, no p after this point */
struct us_socket_t *s = (struct us_socket_t *) p;
/* Such as epollerr epollhup */
if (error) {
/* Todo: decide what code we give here */
s = us_socket_close(0, s, 0, NULL);
return;
}
if (events & LIBUS_SOCKET_WRITABLE) {
/* Note: if we failed a write as a socket of one loop then adopted
* to another loop, this will be wrong. Absurd case though */
s->context->loop->data.last_write_failed = 0;
s = s->context->on_writable(s);
if (us_socket_is_closed(0, s)) {
return;
}
/* If we have no failed write or if we shut down, then stop polling for more writable */
if (!s->context->loop->data.last_write_failed || us_socket_is_shut_down(0, s)) {
us_poll_change(&s->p, us_socket_context(0, s)->loop, us_poll_events(&s->p) & LIBUS_SOCKET_READABLE);
}
}
if (events & LIBUS_SOCKET_READABLE) {
/* Contexts may prioritize down sockets that are currently readable, e.g. when SSL handshake has to be done.
* SSL handshakes are CPU intensive, so we limit the number of handshakes per loop iteration, and move the rest
* to the low-priority queue */
if (s->context->is_low_prio(s)) {
if (s->low_prio_state == 2) {
s->low_prio_state = 0; /* Socket has been delayed and now it's time to process incoming data for one iteration */
} else if (s->context->loop->data.low_prio_budget > 0) {
s->context->loop->data.low_prio_budget--; /* Still having budget for this iteration - do normal processing */
} else {
us_poll_change(&s->p, us_socket_context(0, s)->loop, us_poll_events(&s->p) & LIBUS_SOCKET_WRITABLE);
us_internal_socket_context_unlink_socket(s->context, s);
/* Link this socket to the low-priority queue - we use a LIFO queue, to prioritize newer clients that are
* maybe not already timeouted - sounds unfair, but works better in real-life with smaller client-timeouts
* under high load */
s->prev = 0;
s->next = s->context->loop->data.low_prio_head;
if (s->next) s->next->prev = s;
s->context->loop->data.low_prio_head = s;
s->low_prio_state = 1;
break;
}
}
int length;
read_more:
length = bsd_recv(us_poll_fd(&s->p), s->context->loop->data.recv_buf + LIBUS_RECV_BUFFER_PADDING, LIBUS_RECV_BUFFER_LENGTH, 0);
if (length > 0) {
s = s->context->on_data(s, s->context->loop->data.recv_buf + LIBUS_RECV_BUFFER_PADDING, length);
/* If we filled the entire recv buffer, we need to immediately read again since otherwise a
* pending hangup event in the same even loop iteration can close the socket before we get
* the chance to read again next iteration */
if (length == LIBUS_RECV_BUFFER_LENGTH && s && !us_socket_is_closed(0, s)) {
goto read_more;
}
} else if (!length) {
if (us_socket_is_shut_down(0, s)) {
/* We got FIN back after sending it */
/* Todo: We should give "CLEAN SHUTDOWN" as reason here */
s = us_socket_close(0, s, 0, NULL);
} else {
/* We got FIN, so stop polling for readable */
us_poll_change(&s->p, us_socket_context(0, s)->loop, us_poll_events(&s->p) & LIBUS_SOCKET_WRITABLE);
s = s->context->on_end(s);
}
} else if (length == LIBUS_SOCKET_ERROR && !bsd_would_block()) {
/* Todo: decide also here what kind of reason we should give */
s = us_socket_close(0, s, 0, NULL);
}
}
}
break;
}
}
/* Integration only requires the timer to be set up */
void us_loop_integrate(struct us_loop_t *loop) {
us_timer_set(loop->data.sweep_timer, (void (*)(struct us_timer_t *)) sweep_timer_cb, LIBUS_TIMEOUT_GRANULARITY * 1000, LIBUS_TIMEOUT_GRANULARITY * 1000);
}
void *us_loop_ext(struct us_loop_t *loop) {
return loop + 1;
}
#endif