Skip to content

Commit

Permalink
daemon/defer: allow recursive time accounting, fix subreq accounting
Browse files Browse the repository at this point in the history
  • Loading branch information
Lukáš Ondráček committed Nov 25, 2024
1 parent d183df0 commit 939b74b
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 67 deletions.
29 changes: 17 additions & 12 deletions daemon/defer.c
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ static inline void phase_set(enum phase p)
phase = p;
}
}
static inline void phase_account(uint64_t nsec)
static inline void phase_charge(uint64_t nsec)
{
kr_assert(phase != PHASE_ANY);
phase_elapsed += nsec;
Expand Down Expand Up @@ -135,10 +135,10 @@ static bool using_avx2(void)
}

/// Increment KRU counters by given time.
void defer_account(uint64_t nsec, union kr_sockaddr *addr, bool stream)
void defer_charge(uint64_t nsec, union kr_sockaddr *addr, bool stream)
{
if (phase_accounting) {
phase_account(nsec);
phase_charge(nsec);
phase_accounting = false;
}

Expand Down Expand Up @@ -321,7 +321,7 @@ static inline void process_single_deferred(void)
if (kr_fails_assert(ctx)) return;

defer_sample_addr((const union kr_sockaddr *)ctx->comm->comm_addr, ctx->session->stream);
phase_accounting = true;
phase_accounting = true; // TODO check there are no suspensions of sampling

struct pl_defer_iter_data *idata = protolayer_iter_data_get_current(ctx);
struct pl_defer_sess_data *sdata = protolayer_sess_data_get_current(ctx);
Expand Down Expand Up @@ -451,10 +451,15 @@ static enum protolayer_iter_cb_result pl_defer_unwrap(
push_query(ctx, priority, false);
waiting_requests_size += data->size = protolayer_iter_size_est(ctx, !ctx->session->stream);
// for stream, payload is counted in session wire buffer
while (waiting_requests_size > MAX_WAITING_REQS_SIZE) {
defer_sample_restart();
process_single_deferred(); // possibly defers again without decreasing waiting_requests_size
// defer_sample_stop should be called soon outside

if (waiting_requests_size > MAX_WAITING_REQS_SIZE) {
defer_sample_state_t prev_sample_state;
defer_sample_start(&prev_sample_state);
do {
process_single_deferred(); // possibly defers again without decreasing waiting_requests_size
defer_sample_restart();
} while (waiting_requests_size > MAX_WAITING_REQS_SIZE);
defer_sample_stop(&prev_sample_state, true);
}

return protolayer_async();
Expand Down Expand Up @@ -492,14 +497,14 @@ static void defer_queues_idle(uv_idle_t *handle)
kr_assert(waiting_requests > 0);
VERBOSE_LOG("IDLE\n");
VERBOSE_LOG(" %d waiting\n", waiting_requests);
defer_sample_start();
defer_sample_start(NULL);
uint64_t idle_stamp = defer_sample_state.stamp;
while ((waiting_requests > 0) && (defer_sample_state.stamp < idle_stamp + IDLE_TIMEOUT)) {
do {
process_single_deferred();
defer_sample_restart();
}
} while ((waiting_requests > 0) && (defer_sample_state.stamp < idle_stamp + IDLE_TIMEOUT)) {
defer_sample_stop(NULL, true);
cleanup_queues();
defer_sample_stop(); // TODO skip calling and use just restart elsewhere?
udp_queue_send_all();

if (waiting_requests > 0) {
Expand Down
91 changes: 57 additions & 34 deletions daemon/defer.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ int defer_init_idle(uv_loop_t *loop);
void defer_deinit(void);

/// Increment KRU counters by the given time.
void defer_account(uint64_t nsec, union kr_sockaddr *addr, bool stream);
void defer_charge(uint64_t nsec, union kr_sockaddr *addr, bool stream);

typedef struct {
int8_t is_accounting; /// whether currently accounting the time to someone; should be 0/1
bool is_accounting; /// whether currently accounting the time to someone
union kr_sockaddr addr; /// request source (to which we account) or AF_UNSPEC if unknown yet
bool stream;
uint64_t stamp; /// monotonic nanoseconds, probably won't wrap
Expand All @@ -36,23 +36,13 @@ extern bool defer_initialized; /// defer_init was called, possibly keeping defer
// TODO: reconsider `static inline` cases below

#include <time.h>
static inline uint64_t get_stamp(void)
static inline uint64_t defer_get_stamp(void)
{
struct timespec now_ts = {0};
clock_gettime(CLOCK_THREAD_CPUTIME_ID, &now_ts);
return now_ts.tv_nsec + 1000*1000*1000 * (uint64_t)now_ts.tv_sec;
}

/// Start accounting work, if not doing it already.
static inline void defer_sample_start(void)
{
if (!defer) return;
kr_assert(!defer_sample_state.is_accounting);
++defer_sample_state.is_accounting;
defer_sample_state.stamp = get_stamp();
defer_sample_state.addr.ip.sa_family = AF_UNSPEC;
}

/// Annotate the work currently being accounted by an IP address.
static inline void defer_sample_addr(const union kr_sockaddr *addr, bool stream)
{
Expand Down Expand Up @@ -83,41 +73,74 @@ static inline void defer_sample_addr(const union kr_sockaddr *addr, bool stream)
defer_sample_state.stream = stream;
}

/// Stop accounting work - and change the source if applicable.
static inline void defer_sample_stop(void)
/// Internal; start accounting work at specified timestamp.
static inline void defer_sample_start_stamp(uint64_t stamp)
{
if (!defer) return;
kr_assert(!defer_sample_state.is_accounting);
defer_sample_state.is_accounting = true;
defer_sample_state.stamp = stamp;
defer_sample_state.addr.ip.sa_family = AF_UNSPEC;
}

/// Internal; stop accounting work at specified timestamp and charge the source if applicable.
static inline void defer_sample_stop_stamp(uint64_t stamp)
{
if (!defer) return;
kr_assert(defer_sample_state.is_accounting == 1);
kr_assert(defer_sample_state.is_accounting);
defer_sample_state.is_accounting = false;

if (kr_fails_assert(defer_sample_state.is_accounting > 0)) return; // weird
if (--defer_sample_state.is_accounting) return;
if (defer_sample_state.addr.ip.sa_family == AF_UNSPEC) return;

const uint64_t elapsed = get_stamp() - defer_sample_state.stamp;

// we accounted something
const uint64_t elapsed = stamp - defer_sample_state.stamp;
if (elapsed == 0) return;

// TODO: some queries of internal origin have suspicioiusly high numbers.
// We won't be really accounting those, but it might suggest some other issue.

defer_account(elapsed, &defer_sample_state.addr, defer_sample_state.stream);
defer_charge(elapsed, &defer_sample_state.addr, defer_sample_state.stream);
}

/// Stop accounting if active, then start again. Uses just one stamp.
static inline void defer_sample_restart(void)
{
/// Start accounting work; optionally save state of current accounting.
/// Current state can be saved only after having an address assigned.
static inline void defer_sample_start(defer_sample_state_t *prev_state_out) {
if (!defer) return;
uint64_t stamp = defer_get_stamp();

kr_assert(false); // XXX temporarily unreachable
// suspend
if (prev_state_out) {
*prev_state_out = defer_sample_state; // TODO stamp is not needed
if (defer_sample_state.is_accounting)
defer_sample_stop_stamp(stamp);
}

uint64_t stamp = get_stamp();
// start
defer_sample_start_stamp(stamp);
}

if (defer_sample_state.is_accounting > 0) {
const uint64_t elapsed = stamp - defer_sample_state.stamp;
defer_account(elapsed, &defer_sample_state.addr, defer_sample_state.stream);
}
/// Stop accounting and start it again.
static inline void defer_sample_restart(void) {
if (!defer) return;
uint64_t stamp = defer_get_stamp();

defer_sample_state.stamp = stamp;
defer_sample_state.addr.ip.sa_family = AF_UNSPEC;
defer_sample_state.is_accounting = 1;
// stop
defer_sample_stop_stamp(stamp);

// start
defer_sample_start_stamp(stamp);
}

/// Stop accounting and charge the source if applicable; optionally resume previous accounting.
static inline void defer_sample_stop(defer_sample_state_t *prev_state, bool reuse_last_stamp) {
if (!defer) return;
uint64_t stamp = reuse_last_stamp ? defer_sample_state.stamp : defer_get_stamp();

// stop
defer_sample_stop_stamp(stamp);

// resume
if (prev_state) {
defer_sample_state = *prev_state;
defer_sample_state.stamp = stamp;
}
}
8 changes: 4 additions & 4 deletions daemon/session2.c
Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,7 @@ static int session2_submit(
// Note two cases: incoming session (new request)
// vs. outgoing session (resuming work on some request)
if ((direction == PROTOLAYER_UNWRAP) && (layer_ix == 0))
defer_sample_start();
defer_sample_start(NULL);

struct protolayer_iter_ctx *ctx = malloc(session->iter_ctx_size);
kr_require(ctx);
Expand Down Expand Up @@ -692,7 +692,7 @@ static int session2_submit(

int ret = protolayer_step(ctx);
if ((direction == PROTOLAYER_UNWRAP) && (layer_ix == 0))
defer_sample_stop();
defer_sample_stop(NULL, false);
return ret;
}

Expand Down Expand Up @@ -980,10 +980,10 @@ uv_handle_t *session2_get_handle(struct session2 *s)

static void session2_on_timeout(uv_timer_t *timer)
{
defer_sample_start();
defer_sample_start(NULL);
struct session2 *s = timer->data;
session2_event(s, s->timer_event, NULL);
defer_sample_stop();
defer_sample_stop(NULL, false);
}

int session2_timer_start(struct session2 *s, enum protolayer_event_type event, uint64_t timeout, uint64_t repeat)
Expand Down
40 changes: 23 additions & 17 deletions daemon/worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -879,26 +879,32 @@ static void subreq_finalize(struct qr_task *task, const struct sockaddr *packet_
kr_assert(ret == KNOT_EOK && val_deleted == task);
}
/* Notify waiting tasks. */
struct kr_query *leader_qry = array_tail(task->ctx->req.rplan.pending);
for (size_t i = task->waiting.len; i > 0; i--) {
struct qr_task *follower = task->waiting.at[i - 1];
/* Reuse MSGID and 0x20 secret */
if (follower->ctx->req.rplan.pending.len > 0) {
struct kr_query *qry = array_tail(follower->ctx->req.rplan.pending);
qry->id = leader_qry->id;
qry->secret = leader_qry->secret;

// Note that this transport may not be present in `leader_qry`'s server selection
follower->transport = task->transport;
if(follower->transport) {
follower->transport->deduplicated = true;
if (task->waiting.len > 0) {
struct kr_query *leader_qry = array_tail(task->ctx->req.rplan.pending);
defer_sample_state_t defer_prev_sample_state;
defer_sample_start(&defer_prev_sample_state);
for (size_t i = task->waiting.len; i > 0; i--) {
struct qr_task *follower = task->waiting.at[i - 1];
/* Reuse MSGID and 0x20 secret */
if (follower->ctx->req.rplan.pending.len > 0) {
struct kr_query *qry = array_tail(follower->ctx->req.rplan.pending);
qry->id = leader_qry->id;
qry->secret = leader_qry->secret;

// Note that this transport may not be present in `leader_qry`'s server selection
follower->transport = task->transport;
if(follower->transport) {
follower->transport->deduplicated = true;
}
leader_qry->secret = 0; /* Next will be already decoded */
}
leader_qry->secret = 0; /* Next will be already decoded */
qr_task_step(follower, packet_source, pkt);
qr_task_unref(follower);
defer_sample_restart();
}
qr_task_step(follower, packet_source, pkt);
qr_task_unref(follower);
defer_sample_stop(&defer_prev_sample_state, true);
task->waiting.len = 0;
}
task->waiting.len = 0;
task->leading = false;
}

Expand Down

0 comments on commit 939b74b

Please sign in to comment.