Skip to content

Commit

Permalink
async: add work mutex handling (#857)
Browse files Browse the repository at this point in the history
* async: add work mutex

* fix work allocation
  • Loading branch information
sreimers authored Jun 19, 2023
1 parent 5e51615 commit 7e8bec6
Showing 1 changed file with 56 additions and 18 deletions.
74 changes: 56 additions & 18 deletions src/async/async.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

struct async_work {
struct le le;
mtx_t *mtx;
re_async_work_h *workh;
re_async_h *cb;
void *arg;
Expand Down Expand Up @@ -66,8 +67,10 @@ static int worker_thread(void *arg)
mtx_unlock(&a->mtx);

work = le->data;
mtx_lock(work->mtx);
if (work->workh)
work->err = work->workh(work->arg);
mtx_unlock(work->mtx);

mtx_lock(&a->mtx);
mqueue_push(a->mqueue, 0, work);
Expand Down Expand Up @@ -127,15 +130,49 @@ static void queueh(int id, void *data, void *arg)
struct re_async *async = arg;
(void)id;

mtx_lock(work->mtx);
if (work->cb)
work->cb(work->err, work->arg);
mtx_unlock(work->mtx);

mtx_lock(&async->mtx);
list_move(&work->le, &async->freel);
mtx_unlock(&async->mtx);
}


static void work_destruct(void *arg)
{
struct async_work *work = arg;
mem_deref(work->mtx);
}


static int work_alloc(struct async_work **workp)
{
int err;
struct async_work *work;

work = mem_zalloc(sizeof(struct async_work), NULL);
if (!work) {
err = ENOMEM;
return err;
}

err = mutex_alloc(&work->mtx);
if (err) {
mem_deref(work);
return err;
}

mem_destructor(work, work_destruct);

*workp = work;

return 0;
}


/**
* Allocate a new async object
*
Expand All @@ -148,7 +185,7 @@ int re_async_alloc(struct re_async **asyncp, uint16_t workers)
{
int err;
struct re_async *async;
struct async_work *async_work;
struct async_work *work;

if (!asyncp || !workers)
return EINVAL;
Expand Down Expand Up @@ -186,13 +223,11 @@ int re_async_alloc(struct re_async **asyncp, uint16_t workers)
async->workers++;

/* preallocate */
async_work = mem_zalloc(sizeof(struct async_work), NULL);
if (!async_work) {
err = ENOMEM;
err = work_alloc(&work);
if (err)
goto err;
}

list_append(&async->freel, &async_work->le, async_work);
list_append(&async->freel, &work->le, work);
}

tmr_start(&async->tmr, 10, worker_check, async);
Expand Down Expand Up @@ -222,30 +257,29 @@ int re_async(struct re_async *async, intptr_t id, re_async_work_h *workh,
re_async_h *cb, void *arg)
{
int err = 0;
struct async_work *async_work;
struct async_work *work;

if (unlikely(!async))
return EINVAL;

mtx_lock(&async->mtx);
if (unlikely(list_isempty(&async->freel))) {
async_work = mem_zalloc(sizeof(struct async_work), NULL);
if (!async_work) {
err = ENOMEM;

err = work_alloc(&work);
if (err)
goto out;
}
}
else {
async_work = list_head(&async->freel)->data;
list_unlink(&async_work->le);
work = list_head(&async->freel)->data;
list_unlink(&work->le);
}

async_work->workh = workh;
async_work->cb = cb;
async_work->arg = arg;
async_work->id = id;
work->workh = workh;
work->cb = cb;
work->arg = arg;
work->id = id;

list_append(&async->workl, &async_work->le, async_work);
list_append(&async->workl, &work->le, work);
cnd_signal(&async->wait);

out:
Expand Down Expand Up @@ -279,11 +313,13 @@ void re_async_cancel(struct re_async *async, intptr_t id)
if (w->id != id)
continue;

mtx_lock(w->mtx);
w->id = 0;
w->workh = NULL;
w->cb = NULL;
w->arg = mem_deref(w->arg);
list_move(&w->le, &async->freel);
mtx_unlock(w->mtx);
}

le = list_head(&async->curl);
Expand All @@ -295,11 +331,13 @@ void re_async_cancel(struct re_async *async, intptr_t id)
if (w->id != id)
continue;

mtx_lock(w->mtx);
w->id = 0;
w->workh = NULL;
w->cb = NULL;
w->arg = mem_deref(w->arg);
list_move(&w->le, &async->freel);
mtx_unlock(w->mtx);
}

mtx_unlock(&async->mtx);
Expand Down

0 comments on commit 7e8bec6

Please sign in to comment.