diff --git a/src/async/async.c b/src/async/async.c index 4d9b980a3..56c3ecf77 100644 --- a/src/async/async.c +++ b/src/async/async.c @@ -26,12 +26,13 @@ struct async_work { }; struct re_async { - thrd_t thr; + thrd_t *thrd; uint16_t nthrds; RE_ATOMIC bool run; cnd_t wait; mtx_t mtx; struct list workl; + struct list curl; struct tmr tmr; struct mqueue *mqueue; }; @@ -56,9 +57,9 @@ static int worker_thread(void *arg) mtx_unlock(&async->mtx); continue; } - le = list_head(&async->workl); list_unlink(le); + list_append(&async->curl, le, le->data); mtx_unlock(&async->mtx); work = le->data; @@ -79,12 +80,17 @@ static void async_destructor(void *data) mtx_lock(&async->mtx); cnd_broadcast(&async->wait); mtx_unlock(&async->mtx); - thrd_join(async->thr, NULL); + + for (int i = 0; i < async->nthrds; i++) { + thrd_join(async->thrd[i], NULL); + } list_flush(&async->workl); + list_flush(&async->curl); cnd_destroy(&async->wait); mtx_destroy(&async->mtx); mem_deref(async->mqueue); + mem_deref(async->thrd); } @@ -102,7 +108,7 @@ static void job_check(void *arg) } -/* called by re main fd_poll */ +/* called by re main event loop */ static void queueh(int id, void *data, void *arg) { struct async_work *work = data; @@ -110,6 +116,7 @@ static void queueh(int id, void *data, void *arg) (void)id; work->cb(work->err, work->arg); + list_unlink(&work->le); mem_deref(work); } @@ -137,6 +144,13 @@ int re_async_alloc(struct re_async **asyncp, uint16_t nthrds) return err; } + async->thrd = mem_zalloc(sizeof(thrd_t) * nthrds, NULL); + if (!async->thrd) { + mem_deref(async->mqueue); + mem_deref(async); + return ENOMEM; + } + mtx_init(&async->mtx, mtx_plain); cnd_init(&async->wait); tmr_init(&async->tmr); @@ -146,10 +160,13 @@ int re_async_alloc(struct re_async **asyncp, uint16_t nthrds) async->nthrds = nthrds; re_atomic_rlx_set(&async->run, true); - err = thread_create_name(&async->thr, "async worker thread", - worker_thread, async); - if (err) - return err; + for (int i = 0; i < async->nthrds; i++) { + err = thread_create_name(&async->thrd[i], + "async worker thread", worker_thread, + async); + if (err) + return err; + } tmr_start(&async->tmr, 10, job_check, async); @@ -183,9 +200,8 @@ int re_async(struct re_async *async, re_async_work *work, re_async_h *cb, async_work->cb = cb; async_work->arg = arg; - list_append(&async->workl, &async_work->le, async_work); - mtx_lock(&async->mtx); + list_append(&async->workl, &async_work->le, async_work); cnd_signal(&async->wait); mtx_unlock(&async->mtx);