Skip to content

Commit

Permalink
Merge pull request ARMmbed#8 from geky/backgrounding
Browse files Browse the repository at this point in the history
Add set of event queue composition functions
  • Loading branch information
geky authored Aug 8, 2016
2 parents 1b1ed81 + 3a55c74 commit 7e51cd2
Show file tree
Hide file tree
Showing 3 changed files with 168 additions and 0 deletions.
75 changes: 75 additions & 0 deletions equeue.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ int equeue_create_inplace(equeue_t *q, size_t size, void *buffer) {
q->generation = 0;
q->breaks = 0;

q->background.active = false;
q->background.update = 0;
q->background.timer = 0;

int err;
err = equeue_sema_create(&q->eventsema);
if (err < 0) {
Expand Down Expand Up @@ -68,6 +72,10 @@ void equeue_destroy(equeue_t *q) {
}
}

if (q->background.update) {
q->background.update(q->background.timer, -1);
}

equeue_mutex_destroy(&q->memlock);
equeue_mutex_destroy(&q->queuelock);
equeue_sema_destroy(&q->eventsema);
Expand Down Expand Up @@ -199,6 +207,11 @@ static int equeue_enqueue(equeue_t *q, struct equeue_event *e, unsigned ms) {
*p = e;
e->ref = p;

if ((q->background.update && q->background.active) &&
(q->queue == e && !e->sibling)) {
q->background.update(q->background.timer, ms);
}

equeue_mutex_unlock(&q->queuelock);

return id;
Expand Down Expand Up @@ -312,6 +325,7 @@ void equeue_break(equeue_t *q) {
void equeue_dispatch(equeue_t *q, int ms) {
unsigned tick = equeue_tick();
unsigned timeout = tick + ms;
q->background.active = false;

while (1) {
// collect all the available events and next deadline
Expand Down Expand Up @@ -344,6 +358,16 @@ void equeue_dispatch(equeue_t *q, int ms) {
if (ms >= 0) {
deadline = equeue_tickdiff(timeout, tick);
if (deadline <= 0) {
// update background timer if necessary
if (q->background.update) {
equeue_mutex_lock(&q->queuelock);
if (q->background.update && q->queue) {
q->background.update(q->background.timer,
equeue_tickdiff(q->queue->target, tick));
}
q->background.active = true;
equeue_mutex_unlock(&q->queuelock);
}
return;
}
}
Expand Down Expand Up @@ -441,3 +465,54 @@ int equeue_call_every(equeue_t *q, int ms, void (*cb)(void*), void *data) {
e->data = data;
return equeue_post(q, ecallback_dispatch, e);
}

// backgrounding
void equeue_background(equeue_t *q,
void (*update)(void *timer, int ms), void *timer) {
equeue_mutex_lock(&q->queuelock);
if (q->background.update) {
q->background.update(q->background.timer, -1);
}

q->background.update = update;
q->background.timer = timer;

if (q->background.update && q->queue) {
q->background.update(q->background.timer,
equeue_tickdiff(q->queue->target, equeue_tick()));
}
q->background.active = true;
equeue_mutex_unlock(&q->queuelock);
}

struct equeue_chain_context {
equeue_t *q;
equeue_t *target;
int id;
};

static void equeue_chain_dispatch(void *p) {
equeue_dispatch((equeue_t *)p, 0);
}

static void equeue_chain_update(void *p, int ms) {
struct equeue_chain_context *c = (struct equeue_chain_context *)p;
equeue_cancel(c->target, c->id);

if (ms >= 0) {
c->id = equeue_call_in(c->target, ms, equeue_chain_dispatch, c->q);
} else {
equeue_dealloc(c->target, c);
}
}

void equeue_chain(equeue_t *q, equeue_t *target) {
struct equeue_chain_context *c = equeue_alloc(q,
sizeof(struct equeue_chain_context));

c->q = q;
c->target = target;
c->id = 0;

equeue_background(q, equeue_chain_update, c);
}
23 changes: 23 additions & 0 deletions equeue.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ typedef struct equeue {
unsigned char *data;
} slab;

struct equeue_background {
bool active;
void (*update)(void *timer, int ms);
void *timer;
} background;

equeue_sema_t eventsema;
equeue_mutex_t queuelock;
equeue_mutex_t memlock;
Expand Down Expand Up @@ -136,6 +142,23 @@ int equeue_post(equeue_t *queue, void (*cb)(void *), void *event);
// stop a currently executing event
void equeue_cancel(equeue_t *queue, int event);

// Background an event queue onto a single-shot timer
//
// The provided update function will be called to indicate when the queue
// should be dispatched. A negative timeout will be passed to the update
// function when the timer is no longer needed. A null update function
// will disable the existing timer.
void equeue_background(equeue_t *queue,
void (*update)(void *timer, int ms), void *timer);

// Chain an event queue onto another event queue
//
// After chaining a queue to a target, calling equeue_dispatch on the
// target queue will also dispatch events from this queue. The queues
// will use their own buffers and events are handled independently.
// A null queue as the target will unchain this queue.
void equeue_chain(equeue_t *queue, equeue_t *target);


#ifdef __cplusplus
}
Expand Down
70 changes: 70 additions & 0 deletions tests/tests.c
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,74 @@ void multithread_test(void) {
equeue_destroy(&q);
}

void background_func(void *p, int ms) {
*(unsigned *)p = ms;
}

void background_test(void) {
equeue_t q;
int err = equeue_create(&q, 2048);
test_assert(!err);

int id = equeue_call_in(&q, 20, pass_func, 0);
test_assert(id);

unsigned ms;
equeue_background(&q, background_func, &ms);
test_assert(ms == 20);

id = equeue_call_in(&q, 10, pass_func, 0);
test_assert(id);
test_assert(ms == 10);

id = equeue_call(&q, pass_func, 0);
test_assert(id);
test_assert(ms == 0);

equeue_dispatch(&q, 0);
test_assert(ms == 10);

equeue_destroy(&q);
test_assert(ms == -1);
}

void chain_test(void) {
equeue_t q1;
int err = equeue_create(&q1, 2048);
test_assert(!err);

equeue_t q2;
err = equeue_create(&q2, 2048);
test_assert(!err);

equeue_chain(&q2, &q1);

int touched = 0;

int id1 = equeue_call_in(&q1, 20, simple_func, &touched);
int id2 = equeue_call_in(&q2, 20, simple_func, &touched);
test_assert(id1 && id2);

id1 = equeue_call(&q1, simple_func, &touched);
id2 = equeue_call(&q2, simple_func, &touched);
test_assert(id1 && id2);

id1 = equeue_call_in(&q1, 5, simple_func, &touched);
id2 = equeue_call_in(&q2, 5, simple_func, &touched);
test_assert(id1 && id2);

equeue_cancel(&q1, id1);
equeue_cancel(&q2, id2);

id1 = equeue_call_in(&q1, 10, simple_func, &touched);
id2 = equeue_call_in(&q2, 10, simple_func, &touched);
test_assert(id1 && id2);

equeue_dispatch(&q1, 30);

test_assert(touched == 6);
}

// Barrage tests
void simple_barrage_test(int N) {
equeue_t q;
Expand Down Expand Up @@ -589,6 +657,8 @@ int main() {
test_run(period_test);
test_run(nested_test);
test_run(sloth_test);
test_run(background_test);
test_run(chain_test);
test_run(multithread_test);
test_run(simple_barrage_test, 20);
test_run(fragmenting_barrage_test, 20);
Expand Down

0 comments on commit 7e51cd2

Please sign in to comment.