Skip to content

Commit

Permalink
daemon/defer: limit deferred queries by memory usage
Browse files Browse the repository at this point in the history
  • Loading branch information
Lukáš Ondráček committed Nov 20, 2024
1 parent f492f0b commit 6c246b8
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 4 deletions.
30 changes: 26 additions & 4 deletions daemon/defer.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,11 @@
#define IDLE_TIMEOUT 1000000 // ns (THREAD_CPUTIME); if exceeded, continue processing after next poll phase
#define PHASE_UDP_TIMEOUT 400000 // ns (THREAD_CPUTIME); switch between udp, non-udp phases
#define PHASE_NON_UDP_TIMEOUT 400000 // ns (THREAD_CPUTIME); after timeout or emptying queue
#define MAX_WAITING_REQS 10000 // if exceeded, process single deferred request immediatelly in poll phase
// TODO measure memory usage instead
#define MAX_WAITING_REQS_SIZE (64 * 1024 * 1024) // bytes; if exceeded, some deferred requests are processed in poll phase
// single TCP allocates more than 64KiB wire buffer
// TODO check whether all important allocations are counted;
// different things are not counted: tasks and subsessions (not deferred after created), uv handles, queues overhead, ...;
// payload is counted either as part of session wire buffer (for stream) or as part of iter ctx (for datagrams)

#define VERBOSE_LOG(...) kr_log_debug(DEFER, " | " __VA_ARGS__)

Expand All @@ -81,6 +84,7 @@ static void defer_queues_idle(uv_idle_t *handle);

protolayer_iter_ctx_queue_t queues[QUEUES_CNT];
int waiting_requests = 0;
ptrdiff_t waiting_requests_size = 0; // signed for non-negativeness asserts
int queue_ix = QUEUES_CNT; // MIN( last popped queue, first non-empty queue )

enum phase {
Expand Down Expand Up @@ -113,11 +117,13 @@ struct pl_defer_sess_data {
struct protolayer_data h;
protolayer_iter_ctx_queue_t queue; // properly ordered sequence of deferred packets, for stream only
// the first ctx in the queue is also in a defer queue
size_t size;
};

struct pl_defer_iter_data {
struct protolayer_data h;
uint64_t req_stamp; // time when request was received, uses get_stamp()
size_t size;
};

/// Return whether we're using optimized variant right now.
Expand Down Expand Up @@ -281,17 +287,23 @@ static inline void break_query(struct protolayer_iter_ctx *ctx, int err)
{
if (ctx->session->stream) {
struct pl_defer_sess_data *sdata = protolayer_sess_data_get_current(ctx);
waiting_requests_size -= sdata->size;
if (!ctx->session->closing) {
session2_force_close(ctx->session); // session is not freed here as iter contexts exist
}
queue_pop(sdata->queue);
while (queue_len(sdata->queue) > 0) {
struct pl_defer_iter_data *idata = protolayer_iter_data_get_current(ctx);
waiting_requests_size -= idata->size;
protolayer_break(ctx, kr_error(err)); // session is not freed here as other contexts exist
ctx = queue_head(sdata->queue);
queue_pop(sdata->queue);
}
}
struct pl_defer_iter_data *idata = protolayer_iter_data_get_current(ctx);
waiting_requests_size -= idata->size;
protolayer_break(ctx, kr_error(err));
kr_assert(waiting_requests ? waiting_requests_size > 0 : waiting_requests_size == 0);
}

/// Process a single deferred query (or defer again) if there is any.
Expand Down Expand Up @@ -344,9 +356,14 @@ static inline void process_single_deferred(void)
if (queue_len(sdata->queue) > 0) {
VERBOSE_LOG(" PUSH follow-up to head of %d\n", priority);
push_query(queue_head(sdata->queue), priority, true);
} else {
waiting_requests_size -= sdata->size;
}
}

waiting_requests_size -= idata->size;
kr_assert(waiting_requests ? waiting_requests_size > 0 : waiting_requests_size == 0);

if (eof) {
// Keep session alive even if it is somehow force-closed during continuation.
// TODO Is it needed?
Expand Down Expand Up @@ -402,6 +419,8 @@ static enum protolayer_iter_cb_result pl_defer_unwrap(

if (queue_len(sdata->queue) > 0) { // stream with preceding packet already deferred
queue_push(sdata->queue, ctx);
waiting_requests_size += data->size = protolayer_iter_size_est(ctx, false);
// payload counted in session wire buffer
VERBOSE_LOG(" PUSH as follow-up\n");
return protolayer_async();
}
Expand All @@ -417,11 +436,14 @@ static enum protolayer_iter_cb_result pl_defer_unwrap(
VERBOSE_LOG(" PUSH to %d\n", priority);
if (ctx->session->stream) {
queue_push(sdata->queue, ctx);
waiting_requests_size += sdata->size = protolayer_sess_size_est(ctx->session);
}
push_query(ctx, priority, false);
while (waiting_requests > MAX_WAITING_REQS) { // TODO follow-up stream packets are not counted here
waiting_requests_size += data->size = protolayer_iter_size_est(ctx, !ctx->session->stream);
// for stream, payload is counted in session wire buffer
while (waiting_requests_size > MAX_WAITING_REQS_SIZE) {
defer_sample_restart();
process_single_deferred(); // possibly defers again without decreasing waiting_requests
process_single_deferred(); // possibly defers again without decreasing waiting_requests_size
// defer_sample_stop should be called soon outside
}

Expand Down
14 changes: 14 additions & 0 deletions daemon/session2.c
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,19 @@ void *protolayer_iter_data_get_current(struct protolayer_iter_ctx *ctx)
return protolayer_iter_data_get(ctx, ctx->layer_ix);
}

size_t protolayer_sess_size_est(struct session2 *s)
{
return s->session_size + s->wire_buf.size;
}

size_t protolayer_iter_size_est(struct protolayer_iter_ctx *ctx, bool incl_payload)
{
size_t size = ctx->session->iter_ctx_size;
if (incl_payload)
size += protolayer_payload_size(&ctx->payload);
return size;
}

static inline bool protolayer_iter_ctx_is_last(struct protolayer_iter_ctx *ctx)
{
unsigned int last_ix = (ctx->direction == PROTOLAYER_UNWRAP)
Expand Down Expand Up @@ -852,6 +865,7 @@ struct session2 *session2_new(enum session2_transport_type transport_type,

.proto = proto,
.iter_ctx_size = iter_ctx_size,
.session_size = session_size,
};

memcpy(&s->layer_data, offsets, sizeof(offsets));
Expand Down
10 changes: 10 additions & 0 deletions daemon/session2.h
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,13 @@ void *protolayer_sess_data_get_current(struct protolayer_iter_ctx *ctx);
* To be used after returning from its callback for async continuation but before calling protolayer_continue. */
void *protolayer_iter_data_get_current(struct protolayer_iter_ctx *ctx);

/** Gets rough memory footprint estimate of session/iteration for use in defer.
* Different, hopefully minor, allocations are not counted here;
* tasks and subsessions are also not counted;
* read the code before using elsewhere. */
size_t protolayer_sess_size_est(struct session2 *s);
size_t protolayer_iter_size_est(struct protolayer_iter_ctx *ctx, bool incl_payload);

/** Layer-specific data - the generic struct. To be added as the first member of
* each specific struct. */
struct protolayer_data {
Expand Down Expand Up @@ -874,6 +881,9 @@ struct session2 {
* (`struct protolayer_iter_ctx`), including layer-specific data. */
size_t iter_ctx_size;

/** The size of this session struct. */
size_t session_size;

/** The following flexible array has basically this structure:
*
* struct {
Expand Down

0 comments on commit 6c246b8

Please sign in to comment.