From bac1fb6eea7e6185d38fa3e383d6a25b1ac771dc Mon Sep 17 00:00:00 2001 From: Christopher Haster Date: Sat, 6 Aug 2016 23:49:07 -0500 Subject: [PATCH 1/2] Added the ability to background an event queue onto a single-shot timer The equeue_background function requires only a user-provided update function, which is called to indicate when the queue should be dispatched. The equeue_background function adds a new level of composability to the event queue library. Allowing equeues to be easily backgrounded on hardware timers or even be combined with existing event loops. --- equeue.c | 43 +++++++++++++++++++++++++++++++++++++++++++ equeue.h | 14 ++++++++++++++ tests/tests.c | 32 ++++++++++++++++++++++++++++++++ 3 files changed, 89 insertions(+) diff --git a/equeue.c b/equeue.c index b33043ace90..28c246a54bf 100644 --- a/equeue.c +++ b/equeue.c @@ -39,6 +39,10 @@ int equeue_create_inplace(equeue_t *q, size_t size, void *buffer) { q->generation = 0; q->breaks = 0; + q->background.active = false; + q->background.update = 0; + q->background.timer = 0; + int err; err = equeue_sema_create(&q->eventsema); if (err < 0) { @@ -68,6 +72,10 @@ void equeue_destroy(equeue_t *q) { } } + if (q->background.update) { + q->background.update(q->background.timer, -1); + } + equeue_mutex_destroy(&q->memlock); equeue_mutex_destroy(&q->queuelock); equeue_sema_destroy(&q->eventsema); @@ -199,6 +207,11 @@ static int equeue_enqueue(equeue_t *q, struct equeue_event *e, unsigned ms) { *p = e; e->ref = p; + if ((q->background.update && q->background.active) && + (q->queue == e && !e->sibling)) { + q->background.update(q->background.timer, ms); + } + equeue_mutex_unlock(&q->queuelock); return id; @@ -312,6 +325,7 @@ void equeue_break(equeue_t *q) { void equeue_dispatch(equeue_t *q, int ms) { unsigned tick = equeue_tick(); unsigned timeout = tick + ms; + q->background.active = false; while (1) { // collect all the available events and next deadline @@ -344,6 +358,16 @@ void equeue_dispatch(equeue_t *q, int ms) { if (ms >= 0) { deadline = equeue_tickdiff(timeout, tick); if (deadline <= 0) { + // update background timer if necessary + if (q->background.update) { + equeue_mutex_lock(&q->queuelock); + if (q->background.update && q->queue) { + q->background.update(q->background.timer, + equeue_tickdiff(q->queue->target, tick)); + } + q->background.active = true; + equeue_mutex_unlock(&q->queuelock); + } return; } } @@ -441,3 +465,22 @@ int equeue_call_every(equeue_t *q, int ms, void (*cb)(void*), void *data) { e->data = data; return equeue_post(q, ecallback_dispatch, e); } + +// backgrounding +void equeue_background(equeue_t *q, + void (*update)(void *timer, int ms), void *timer) { + equeue_mutex_lock(&q->queuelock); + if (q->background.update) { + q->background.update(q->background.timer, -1); + } + + q->background.update = update; + q->background.timer = timer; + + if (q->background.update && q->queue) { + q->background.update(q->background.timer, + equeue_tickdiff(q->queue->target, equeue_tick())); + } + q->background.active = true; + equeue_mutex_unlock(&q->queuelock); +} diff --git a/equeue.h b/equeue.h index 13dffa94a09..624e8043c68 100644 --- a/equeue.h +++ b/equeue.h @@ -58,6 +58,12 @@ typedef struct equeue { unsigned char *data; } slab; + struct equeue_background { + bool active; + void (*update)(void *timer, int ms); + void *timer; + } background; + equeue_sema_t eventsema; equeue_mutex_t queuelock; equeue_mutex_t memlock; @@ -136,6 +142,14 @@ int equeue_post(equeue_t *queue, void (*cb)(void *), void *event); // stop a currently executing event void equeue_cancel(equeue_t *queue, int event); +// Background an event queue onto a single-shot timer +// +// The provided update function will be called to indicate when the queue +// should be dispatched. A negative timeout will be passed to the update +// function when the timer is no longer needed. +void equeue_background(equeue_t *queue, + void (*update)(void *timer, int ms), void *timer); + #ifdef __cplusplus } diff --git a/tests/tests.c b/tests/tests.c index 3fb3a4aa722..046179a796b 100644 --- a/tests/tests.c +++ b/tests/tests.c @@ -479,6 +479,37 @@ void multithread_test(void) { equeue_destroy(&q); } +void background_func(void *p, int ms) { + *(unsigned *)p = ms; +} + +void background_test(void) { + equeue_t q; + int err = equeue_create(&q, 2048); + test_assert(!err); + + int id = equeue_call_in(&q, 20, pass_func, 0); + test_assert(id); + + unsigned ms; + equeue_background(&q, background_func, &ms); + test_assert(ms == 20); + + id = equeue_call_in(&q, 10, pass_func, 0); + test_assert(id); + test_assert(ms == 10); + + id = equeue_call(&q, pass_func, 0); + test_assert(id); + test_assert(ms == 0); + + equeue_dispatch(&q, 0); + test_assert(ms == 10); + + equeue_destroy(&q); + test_assert(ms == -1); +} + // Barrage tests void simple_barrage_test(int N) { equeue_t q; @@ -589,6 +620,7 @@ int main() { test_run(period_test); test_run(nested_test); test_run(sloth_test); + test_run(background_test); test_run(multithread_test); test_run(simple_barrage_test, 20); test_run(fragmenting_barrage_test, 20); From 3a55c74ba99f9eb45b7c46a697577531b2dfa0d3 Mon Sep 17 00:00:00 2001 From: Christopher Haster Date: Sun, 7 Aug 2016 00:27:16 -0500 Subject: [PATCH 2/2] Added function for chaining event queues together The equeue_chain function chains one queue onto a target queue. Calling equeue_dispatch on the target queue will also dispatch events on the target, however each queue uses their own buffers and events are handled independently. The equeue_chain adds a very powerful level of composability for equeues freely built on the equeue_background function. This allows multiple modules to own their own event queues while maintaining composability on threadless systems. --- equeue.c | 32 ++++++++++++++++++++++++++++++++ equeue.h | 11 ++++++++++- tests/tests.c | 38 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 80 insertions(+), 1 deletion(-) diff --git a/equeue.c b/equeue.c index 28c246a54bf..5496ce98060 100644 --- a/equeue.c +++ b/equeue.c @@ -484,3 +484,35 @@ void equeue_background(equeue_t *q, q->background.active = true; equeue_mutex_unlock(&q->queuelock); } + +struct equeue_chain_context { + equeue_t *q; + equeue_t *target; + int id; +}; + +static void equeue_chain_dispatch(void *p) { + equeue_dispatch((equeue_t *)p, 0); +} + +static void equeue_chain_update(void *p, int ms) { + struct equeue_chain_context *c = (struct equeue_chain_context *)p; + equeue_cancel(c->target, c->id); + + if (ms >= 0) { + c->id = equeue_call_in(c->target, ms, equeue_chain_dispatch, c->q); + } else { + equeue_dealloc(c->target, c); + } +} + +void equeue_chain(equeue_t *q, equeue_t *target) { + struct equeue_chain_context *c = equeue_alloc(q, + sizeof(struct equeue_chain_context)); + + c->q = q; + c->target = target; + c->id = 0; + + equeue_background(q, equeue_chain_update, c); +} diff --git a/equeue.h b/equeue.h index 624e8043c68..f3aa41bb664 100644 --- a/equeue.h +++ b/equeue.h @@ -146,10 +146,19 @@ void equeue_cancel(equeue_t *queue, int event); // // The provided update function will be called to indicate when the queue // should be dispatched. A negative timeout will be passed to the update -// function when the timer is no longer needed. +// function when the timer is no longer needed. A null update function +// will disable the existing timer. void equeue_background(equeue_t *queue, void (*update)(void *timer, int ms), void *timer); +// Chain an event queue onto another event queue +// +// After chaining a queue to a target, calling equeue_dispatch on the +// target queue will also dispatch events from this queue. The queues +// will use their own buffers and events are handled independently. +// A null queue as the target will unchain this queue. +void equeue_chain(equeue_t *queue, equeue_t *target); + #ifdef __cplusplus } diff --git a/tests/tests.c b/tests/tests.c index 046179a796b..78c83927ca9 100644 --- a/tests/tests.c +++ b/tests/tests.c @@ -510,6 +510,43 @@ void background_test(void) { test_assert(ms == -1); } +void chain_test(void) { + equeue_t q1; + int err = equeue_create(&q1, 2048); + test_assert(!err); + + equeue_t q2; + err = equeue_create(&q2, 2048); + test_assert(!err); + + equeue_chain(&q2, &q1); + + int touched = 0; + + int id1 = equeue_call_in(&q1, 20, simple_func, &touched); + int id2 = equeue_call_in(&q2, 20, simple_func, &touched); + test_assert(id1 && id2); + + id1 = equeue_call(&q1, simple_func, &touched); + id2 = equeue_call(&q2, simple_func, &touched); + test_assert(id1 && id2); + + id1 = equeue_call_in(&q1, 5, simple_func, &touched); + id2 = equeue_call_in(&q2, 5, simple_func, &touched); + test_assert(id1 && id2); + + equeue_cancel(&q1, id1); + equeue_cancel(&q2, id2); + + id1 = equeue_call_in(&q1, 10, simple_func, &touched); + id2 = equeue_call_in(&q2, 10, simple_func, &touched); + test_assert(id1 && id2); + + equeue_dispatch(&q1, 30); + + test_assert(touched == 6); +} + // Barrage tests void simple_barrage_test(int N) { equeue_t q; @@ -621,6 +658,7 @@ int main() { test_run(nested_test); test_run(sloth_test); test_run(background_test); + test_run(chain_test); test_run(multithread_test); test_run(simple_barrage_test, 20); test_run(fragmenting_barrage_test, 20);