diff --git a/libivykis.posix.ver b/libivykis.posix.ver index 2962cb60..9bb7891f 100644 --- a/libivykis.posix.ver +++ b/libivykis.posix.ver @@ -111,3 +111,7 @@ IVYKIS_0.40 { # iv_timer __iv_now_location_valid; } IVYKIS_0.33; + +IVYKIS_0.42 { + iv_work_pool_submit_continuation; +} IVYKIS_0.40; diff --git a/man3/iv_work.3 b/man3/iv_work.3 index 011fcf9a..4d7f5b8a 100644 --- a/man3/iv_work.3 +++ b/man3/iv_work.3 @@ -5,7 +5,7 @@ .\" of the modification is added to the header. .TH iv_work 3 2010-09-14 "ivykis" "ivykis programmer's manual" .SH NAME -IV_WORK_POOL_INIT, iv_work_pool_create, iv_work_pool_put, IV_WORK_ITEM_INIT, iv_work_pool_submit_work \- ivykis +IV_WORK_POOL_INIT, iv_work_pool_create, iv_work_pool_put, IV_WORK_ITEM_INIT, iv_work_pool_submit_work, iv_work_pool_submit_continuation \- ivykis worker thread management .SH SYNOPSIS .B #include @@ -35,6 +35,8 @@ struct iv_work_item { .br .BI "int iv_work_pool_submit_work(struct iv_work_pool *" this ", struct iv_work_item *" work ");" .br +.BI "int iv_work_pool_submit_continuation(struct iv_work_pool *" this ", struct iv_work_item *" work ");" +.br .SH DESCRIPTION Calling .B iv_work_pool_create @@ -81,6 +83,20 @@ as its sole argument, in the thread that .B iv_work_pool_create was called in for this pool object. .PP +Calling +.B iv_work_pool_submit_continuation +from a worker thread allows submitting a work item similarly to +.B iv_work_pool_submit_work. +But while +.B iv_work_pool_submit_work +can only be called from the thread owning +.B iv_work, +.B iv_work_pool_submit_continuation +can be called from any of the worker threads. The +.B ->completion +callback of these jobs will be executed from the thread owning +.B iv_work. +.PP As a special case, calling .B iv_work_pool_submit_work with a @@ -117,6 +133,8 @@ are also not explicitly serialised. can only be called from the thread that .B iv_work_pool_create for this pool object was called in. +.B iv_work_pool_submit_continuation +can called from any of the worker threads. .PP There is no way to cancel submitted work items. .PP diff --git a/src/include/iv_work.h b/src/include/iv_work.h index 5e90fc7e..840ddfa1 100644 --- a/src/include/iv_work.h +++ b/src/include/iv_work.h @@ -59,6 +59,8 @@ int iv_work_pool_create(struct iv_work_pool *this); void iv_work_pool_put(struct iv_work_pool *this); void iv_work_pool_submit_work(struct iv_work_pool *this, struct iv_work_item *work); +void iv_work_pool_submit_continuation(struct iv_work_pool *this, + struct iv_work_item *work); #ifdef __cplusplus } diff --git a/src/iv_work.c b/src/iv_work.c index 20af69d2..9ee74cf2 100644 --- a/src/iv_work.c +++ b/src/iv_work.c @@ -34,7 +34,9 @@ struct work_pool_priv { ___mutex_t lock; struct iv_event ev; + struct iv_event thread_needed; int shutting_down; + int max_threads; int started_threads; struct iv_list_head idle_threads; void *cookie; @@ -44,6 +46,7 @@ struct work_pool_priv { uint32_t seq_tail; struct iv_list_head work_items; struct iv_list_head work_done; + unsigned long tid; }; struct work_pool_thread { @@ -213,6 +216,7 @@ static void iv_work_event(void *_pool) ___mutex_unlock(&pool->lock); ___mutex_destroy(&pool->lock); iv_event_unregister(&pool->ev); + iv_event_unregister(&pool->thread_needed); free(pool); return; } @@ -220,6 +224,23 @@ static void iv_work_event(void *_pool) } } +static int iv_work_start_thread(struct work_pool_priv *pool); + +static void iv_work_thread_needed(void *_pool) +{ + struct work_pool_priv *pool = _pool; + + ___mutex_lock(&pool->lock); + + if (iv_list_empty(&pool->idle_threads) && + pool->started_threads < pool->max_threads) { + iv_work_start_thread(pool); + } + + ___mutex_unlock(&pool->lock); + +} + int iv_work_pool_create(struct iv_work_pool *this) { struct work_pool_priv *pool; @@ -240,6 +261,12 @@ int iv_work_pool_create(struct iv_work_pool *this) pool->ev.handler = iv_work_event; iv_event_register(&pool->ev); + IV_EVENT_INIT(&pool->thread_needed); + pool->thread_needed.cookie = pool; + pool->thread_needed.handler = iv_work_thread_needed; + iv_event_register(&pool->thread_needed); + + pool->max_threads = this->max_threads; pool->shutting_down = 0; pool->started_threads = 0; INIT_IV_LIST_HEAD(&pool->idle_threads); @@ -250,6 +277,8 @@ int iv_work_pool_create(struct iv_work_pool *this) pool->seq_tail = 0; INIT_IV_LIST_HEAD(&pool->work_items); INIT_IV_LIST_HEAD(&pool->work_done); + + pool->tid = iv_get_thread_id(); this->priv = pool; @@ -308,9 +337,13 @@ static int iv_work_start_thread(struct work_pool_priv *pool) } static void -iv_work_submit_pool(struct iv_work_pool *this, struct iv_work_item *work) +iv_work_submit_pool(struct iv_work_pool *this, struct iv_work_item *work, int continuation) { struct work_pool_priv *pool = this->priv; + int called_from_owner_thread = (pool->tid == iv_get_thread_id()); + + if (!continuation && !called_from_owner_thread) + iv_fatal("iv_work_submit_pool: work items can only be submitted from the owning thread"); ___mutex_lock(&pool->lock); @@ -325,7 +358,10 @@ iv_work_submit_pool(struct iv_work_pool *this, struct iv_work_item *work) thr->kicked = 1; iv_event_post(&thr->kick); } else if (pool->started_threads < this->max_threads) { - iv_work_start_thread(pool); + if (called_from_owner_thread) + iv_work_start_thread(pool); + else + iv_event_post(&pool->thread_needed); } ___mutex_unlock(&pool->lock); @@ -391,7 +427,16 @@ void iv_work_pool_submit_work(struct iv_work_pool *this, struct iv_work_item *work) { if (this != NULL) - iv_work_submit_pool(this, work); + iv_work_submit_pool(this, work, 0); + else + iv_work_submit_local(work); +} + +void +iv_work_pool_submit_continuation(struct iv_work_pool *this, struct iv_work_item *work) +{ + if (this != NULL) + iv_work_submit_pool(this, work, 1); else iv_work_submit_local(work); }