diff --git a/src/async/async.c b/src/async/async.c index d78375478..0c17504e4 100644 --- a/src/async/async.c +++ b/src/async/async.c @@ -18,6 +18,7 @@ struct async_work { struct le le; + mtx_t *mtx; re_async_work_h *workh; re_async_h *cb; void *arg; @@ -66,8 +67,10 @@ static int worker_thread(void *arg) mtx_unlock(&a->mtx); work = le->data; + mtx_lock(work->mtx); if (work->workh) work->err = work->workh(work->arg); + mtx_unlock(work->mtx); mtx_lock(&a->mtx); mqueue_push(a->mqueue, 0, work); @@ -127,8 +130,10 @@ static void queueh(int id, void *data, void *arg) struct re_async *async = arg; (void)id; + mtx_lock(work->mtx); if (work->cb) work->cb(work->err, work->arg); + mtx_unlock(work->mtx); mtx_lock(&async->mtx); list_move(&work->le, &async->freel); @@ -136,6 +141,38 @@ static void queueh(int id, void *data, void *arg) } +static void work_destruct(void *arg) +{ + struct async_work *work = arg; + mem_deref(work->mtx); +} + + +static int work_alloc(struct async_work **workp) +{ + int err; + struct async_work *work; + + work = mem_zalloc(sizeof(struct async_work), NULL); + if (!work) { + err = ENOMEM; + return err; + } + + err = mutex_alloc(&work->mtx); + if (err) { + mem_deref(work); + return err; + } + + mem_destructor(work, work_destruct); + + *workp = work; + + return 0; +} + + /** * Allocate a new async object * @@ -148,7 +185,7 @@ int re_async_alloc(struct re_async **asyncp, uint16_t workers) { int err; struct re_async *async; - struct async_work *async_work; + struct async_work *work; if (!asyncp || !workers) return EINVAL; @@ -186,13 +223,11 @@ int re_async_alloc(struct re_async **asyncp, uint16_t workers) async->workers++; /* preallocate */ - async_work = mem_zalloc(sizeof(struct async_work), NULL); - if (!async_work) { - err = ENOMEM; + err = work_alloc(&work); + if (err) goto err; - } - list_append(&async->freel, &async_work->le, async_work); + list_append(&async->freel, &work->le, work); } tmr_start(&async->tmr, 10, worker_check, async); @@ -222,30 +257,29 @@ int re_async(struct re_async *async, intptr_t id, re_async_work_h *workh, re_async_h *cb, void *arg) { int err = 0; - struct async_work *async_work; + struct async_work *work; if (unlikely(!async)) return EINVAL; mtx_lock(&async->mtx); if (unlikely(list_isempty(&async->freel))) { - async_work = mem_zalloc(sizeof(struct async_work), NULL); - if (!async_work) { - err = ENOMEM; + + err = work_alloc(&work); + if (err) goto out; - } } else { - async_work = list_head(&async->freel)->data; - list_unlink(&async_work->le); + work = list_head(&async->freel)->data; + list_unlink(&work->le); } - async_work->workh = workh; - async_work->cb = cb; - async_work->arg = arg; - async_work->id = id; + work->workh = workh; + work->cb = cb; + work->arg = arg; + work->id = id; - list_append(&async->workl, &async_work->le, async_work); + list_append(&async->workl, &work->le, work); cnd_signal(&async->wait); out: @@ -279,11 +313,13 @@ void re_async_cancel(struct re_async *async, intptr_t id) if (w->id != id) continue; + mtx_lock(w->mtx); w->id = 0; w->workh = NULL; w->cb = NULL; w->arg = mem_deref(w->arg); list_move(&w->le, &async->freel); + mtx_unlock(w->mtx); } le = list_head(&async->curl); @@ -295,11 +331,13 @@ void re_async_cancel(struct re_async *async, intptr_t id) if (w->id != id) continue; + mtx_lock(w->mtx); w->id = 0; w->workh = NULL; w->cb = NULL; w->arg = mem_deref(w->arg); list_move(&w->le, &async->freel); + mtx_unlock(w->mtx); } mtx_unlock(&async->mtx);