diff --git a/equeue.c b/equeue.c index b33043ace90..5496ce98060 100644 --- a/equeue.c +++ b/equeue.c @@ -39,6 +39,10 @@ int equeue_create_inplace(equeue_t *q, size_t size, void *buffer) { q->generation = 0; q->breaks = 0; + q->background.active = false; + q->background.update = 0; + q->background.timer = 0; + int err; err = equeue_sema_create(&q->eventsema); if (err < 0) { @@ -68,6 +72,10 @@ void equeue_destroy(equeue_t *q) { } } + if (q->background.update) { + q->background.update(q->background.timer, -1); + } + equeue_mutex_destroy(&q->memlock); equeue_mutex_destroy(&q->queuelock); equeue_sema_destroy(&q->eventsema); @@ -199,6 +207,11 @@ static int equeue_enqueue(equeue_t *q, struct equeue_event *e, unsigned ms) { *p = e; e->ref = p; + if ((q->background.update && q->background.active) && + (q->queue == e && !e->sibling)) { + q->background.update(q->background.timer, ms); + } + equeue_mutex_unlock(&q->queuelock); return id; @@ -312,6 +325,7 @@ void equeue_break(equeue_t *q) { void equeue_dispatch(equeue_t *q, int ms) { unsigned tick = equeue_tick(); unsigned timeout = tick + ms; + q->background.active = false; while (1) { // collect all the available events and next deadline @@ -344,6 +358,16 @@ void equeue_dispatch(equeue_t *q, int ms) { if (ms >= 0) { deadline = equeue_tickdiff(timeout, tick); if (deadline <= 0) { + // update background timer if necessary + if (q->background.update) { + equeue_mutex_lock(&q->queuelock); + if (q->background.update && q->queue) { + q->background.update(q->background.timer, + equeue_tickdiff(q->queue->target, tick)); + } + q->background.active = true; + equeue_mutex_unlock(&q->queuelock); + } return; } } @@ -441,3 +465,54 @@ int equeue_call_every(equeue_t *q, int ms, void (*cb)(void*), void *data) { e->data = data; return equeue_post(q, ecallback_dispatch, e); } + +// backgrounding +void equeue_background(equeue_t *q, + void (*update)(void *timer, int ms), void *timer) { + equeue_mutex_lock(&q->queuelock); + if (q->background.update) { + q->background.update(q->background.timer, -1); + } + + q->background.update = update; + q->background.timer = timer; + + if (q->background.update && q->queue) { + q->background.update(q->background.timer, + equeue_tickdiff(q->queue->target, equeue_tick())); + } + q->background.active = true; + equeue_mutex_unlock(&q->queuelock); +} + +struct equeue_chain_context { + equeue_t *q; + equeue_t *target; + int id; +}; + +static void equeue_chain_dispatch(void *p) { + equeue_dispatch((equeue_t *)p, 0); +} + +static void equeue_chain_update(void *p, int ms) { + struct equeue_chain_context *c = (struct equeue_chain_context *)p; + equeue_cancel(c->target, c->id); + + if (ms >= 0) { + c->id = equeue_call_in(c->target, ms, equeue_chain_dispatch, c->q); + } else { + equeue_dealloc(c->target, c); + } +} + +void equeue_chain(equeue_t *q, equeue_t *target) { + struct equeue_chain_context *c = equeue_alloc(q, + sizeof(struct equeue_chain_context)); + + c->q = q; + c->target = target; + c->id = 0; + + equeue_background(q, equeue_chain_update, c); +} diff --git a/equeue.h b/equeue.h index 13dffa94a09..f3aa41bb664 100644 --- a/equeue.h +++ b/equeue.h @@ -58,6 +58,12 @@ typedef struct equeue { unsigned char *data; } slab; + struct equeue_background { + bool active; + void (*update)(void *timer, int ms); + void *timer; + } background; + equeue_sema_t eventsema; equeue_mutex_t queuelock; equeue_mutex_t memlock; @@ -136,6 +142,23 @@ int equeue_post(equeue_t *queue, void (*cb)(void *), void *event); // stop a currently executing event void equeue_cancel(equeue_t *queue, int event); +// Background an event queue onto a single-shot timer +// +// The provided update function will be called to indicate when the queue +// should be dispatched. A negative timeout will be passed to the update +// function when the timer is no longer needed. A null update function +// will disable the existing timer. +void equeue_background(equeue_t *queue, + void (*update)(void *timer, int ms), void *timer); + +// Chain an event queue onto another event queue +// +// After chaining a queue to a target, calling equeue_dispatch on the +// target queue will also dispatch events from this queue. The queues +// will use their own buffers and events are handled independently. +// A null queue as the target will unchain this queue. +void equeue_chain(equeue_t *queue, equeue_t *target); + #ifdef __cplusplus } diff --git a/tests/tests.c b/tests/tests.c index 3fb3a4aa722..78c83927ca9 100644 --- a/tests/tests.c +++ b/tests/tests.c @@ -479,6 +479,74 @@ void multithread_test(void) { equeue_destroy(&q); } +void background_func(void *p, int ms) { + *(unsigned *)p = ms; +} + +void background_test(void) { + equeue_t q; + int err = equeue_create(&q, 2048); + test_assert(!err); + + int id = equeue_call_in(&q, 20, pass_func, 0); + test_assert(id); + + unsigned ms; + equeue_background(&q, background_func, &ms); + test_assert(ms == 20); + + id = equeue_call_in(&q, 10, pass_func, 0); + test_assert(id); + test_assert(ms == 10); + + id = equeue_call(&q, pass_func, 0); + test_assert(id); + test_assert(ms == 0); + + equeue_dispatch(&q, 0); + test_assert(ms == 10); + + equeue_destroy(&q); + test_assert(ms == -1); +} + +void chain_test(void) { + equeue_t q1; + int err = equeue_create(&q1, 2048); + test_assert(!err); + + equeue_t q2; + err = equeue_create(&q2, 2048); + test_assert(!err); + + equeue_chain(&q2, &q1); + + int touched = 0; + + int id1 = equeue_call_in(&q1, 20, simple_func, &touched); + int id2 = equeue_call_in(&q2, 20, simple_func, &touched); + test_assert(id1 && id2); + + id1 = equeue_call(&q1, simple_func, &touched); + id2 = equeue_call(&q2, simple_func, &touched); + test_assert(id1 && id2); + + id1 = equeue_call_in(&q1, 5, simple_func, &touched); + id2 = equeue_call_in(&q2, 5, simple_func, &touched); + test_assert(id1 && id2); + + equeue_cancel(&q1, id1); + equeue_cancel(&q2, id2); + + id1 = equeue_call_in(&q1, 10, simple_func, &touched); + id2 = equeue_call_in(&q2, 10, simple_func, &touched); + test_assert(id1 && id2); + + equeue_dispatch(&q1, 30); + + test_assert(touched == 6); +} + // Barrage tests void simple_barrage_test(int N) { equeue_t q; @@ -589,6 +657,8 @@ int main() { test_run(period_test); test_run(nested_test); test_run(sloth_test); + test_run(background_test); + test_run(chain_test); test_run(multithread_test); test_run(simple_barrage_test, 20); test_run(fragmenting_barrage_test, 20);