Skip to content

Commit

Permalink
add worker threads
Browse files Browse the repository at this point in the history
  • Loading branch information
sreimers committed Jul 30, 2022
1 parent 2d88241 commit c57ed0f
Showing 1 changed file with 26 additions and 10 deletions.
36 changes: 26 additions & 10 deletions src/async/async.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,13 @@ struct async_work {
};

struct re_async {
thrd_t thr;
thrd_t *thrd;
uint16_t nthrds;
RE_ATOMIC bool run;
cnd_t wait;
mtx_t mtx;
struct list workl;
struct list curl;
struct tmr tmr;
struct mqueue *mqueue;
};
Expand All @@ -56,9 +57,9 @@ static int worker_thread(void *arg)
mtx_unlock(&async->mtx);
continue;
}

le = list_head(&async->workl);
list_unlink(le);
list_append(&async->curl, le, le->data);
mtx_unlock(&async->mtx);

work = le->data;
Expand All @@ -79,12 +80,17 @@ static void async_destructor(void *data)
mtx_lock(&async->mtx);
cnd_broadcast(&async->wait);
mtx_unlock(&async->mtx);
thrd_join(async->thr, NULL);

for (int i = 0; i < async->nthrds; i++) {
thrd_join(async->thrd[i], NULL);
}

list_flush(&async->workl);
list_flush(&async->curl);
cnd_destroy(&async->wait);
mtx_destroy(&async->mtx);
mem_deref(async->mqueue);
mem_deref(async->thrd);
}


Expand All @@ -102,14 +108,15 @@ static void job_check(void *arg)
}


/* called by re main fd_poll */
/* called by re main event loop */
static void queueh(int id, void *data, void *arg)
{
struct async_work *work = data;
(void)arg;
(void)id;

work->cb(work->err, work->arg);
list_unlink(&work->le);
mem_deref(work);
}

Expand Down Expand Up @@ -137,6 +144,13 @@ int re_async_alloc(struct re_async **asyncp, uint16_t nthrds)
return err;
}

async->thrd = mem_zalloc(sizeof(thrd_t) * nthrds, NULL);
if (!async->thrd) {
mem_deref(async->mqueue);
mem_deref(async);
return ENOMEM;
}

mtx_init(&async->mtx, mtx_plain);
cnd_init(&async->wait);
tmr_init(&async->tmr);
Expand All @@ -146,10 +160,13 @@ int re_async_alloc(struct re_async **asyncp, uint16_t nthrds)
async->nthrds = nthrds;
re_atomic_rlx_set(&async->run, true);

err = thread_create_name(&async->thr, "async worker thread",
worker_thread, async);
if (err)
return err;
for (int i = 0; i < async->nthrds; i++) {
err = thread_create_name(&async->thrd[i],
"async worker thread", worker_thread,
async);
if (err)
return err;
}

tmr_start(&async->tmr, 10, job_check, async);

Expand Down Expand Up @@ -183,9 +200,8 @@ int re_async(struct re_async *async, re_async_work *work, re_async_h *cb,
async_work->cb = cb;
async_work->arg = arg;

list_append(&async->workl, &async_work->le, async_work);

mtx_lock(&async->mtx);
list_append(&async->workl, &async_work->le, async_work);
cnd_signal(&async->wait);
mtx_unlock(&async->mtx);

Expand Down

0 comments on commit c57ed0f

Please sign in to comment.