Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

iv_work: add support for submitting work_items from within worker threads #25

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions libivykis.posix.ver
Original file line number Diff line number Diff line change
Expand Up @@ -111,3 +111,7 @@ IVYKIS_0.40 {
# iv_timer
__iv_now_location_valid;
} IVYKIS_0.33;

IVYKIS_0.42 {
iv_work_pool_submit_continuation;
} IVYKIS_0.40;
20 changes: 19 additions & 1 deletion man3/iv_work.3
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
.\" of the modification is added to the header.
.TH iv_work 3 2010-09-14 "ivykis" "ivykis programmer's manual"
.SH NAME
IV_WORK_POOL_INIT, iv_work_pool_create, iv_work_pool_put, IV_WORK_ITEM_INIT, iv_work_pool_submit_work \- ivykis
IV_WORK_POOL_INIT, iv_work_pool_create, iv_work_pool_put, IV_WORK_ITEM_INIT, iv_work_pool_submit_work, iv_work_pool_submit_continuation \- ivykis
worker thread management
.SH SYNOPSIS
.B #include <iv_work.h>
Expand Down Expand Up @@ -35,6 +35,8 @@ struct iv_work_item {
.br
.BI "int iv_work_pool_submit_work(struct iv_work_pool *" this ", struct iv_work_item *" work ");"
.br
.BI "int iv_work_pool_submit_continuation(struct iv_work_pool *" this ", struct iv_work_item *" work ");"
.br
.SH DESCRIPTION
Calling
.B iv_work_pool_create
Expand Down Expand Up @@ -81,6 +83,20 @@ as its sole argument, in the thread that
.B iv_work_pool_create
was called in for this pool object.
.PP
Calling
.B iv_work_pool_submit_continuation
from a worker thread allows submitting a work item similarly to
.B iv_work_pool_submit_work.
But while
.B iv_work_pool_submit_work
can only be called from the thread owning
.B iv_work,
.B iv_work_pool_submit_continuation
can be called from any of the worker threads. The
.B ->completion
callback of these jobs will be executed from the thread owning
.B iv_work.
.PP
As a special case, calling
.B iv_work_pool_submit_work
with a
Expand Down Expand Up @@ -117,6 +133,8 @@ are also not explicitly serialised.
can only be called from the thread that
.B iv_work_pool_create
for this pool object was called in.
.B iv_work_pool_submit_continuation
can called from any of the worker threads.
.PP
There is no way to cancel submitted work items.
.PP
Expand Down
2 changes: 2 additions & 0 deletions src/include/iv_work.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ int iv_work_pool_create(struct iv_work_pool *this);
void iv_work_pool_put(struct iv_work_pool *this);
void iv_work_pool_submit_work(struct iv_work_pool *this,
struct iv_work_item *work);
void iv_work_pool_submit_continuation(struct iv_work_pool *this,
struct iv_work_item *work);

#ifdef __cplusplus
}
Expand Down
51 changes: 48 additions & 3 deletions src/iv_work.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@
struct work_pool_priv {
___mutex_t lock;
struct iv_event ev;
struct iv_event thread_needed;
int shutting_down;
int max_threads;
int started_threads;
struct iv_list_head idle_threads;
void *cookie;
Expand All @@ -44,6 +46,7 @@ struct work_pool_priv {
uint32_t seq_tail;
struct iv_list_head work_items;
struct iv_list_head work_done;
unsigned long tid;
};

struct work_pool_thread {
Expand Down Expand Up @@ -213,13 +216,31 @@ static void iv_work_event(void *_pool)
___mutex_unlock(&pool->lock);
___mutex_destroy(&pool->lock);
iv_event_unregister(&pool->ev);
iv_event_unregister(&pool->thread_needed);
free(pool);
return;
}
___mutex_unlock(&pool->lock);
}
}

static int iv_work_start_thread(struct work_pool_priv *pool);

static void iv_work_thread_needed(void *_pool)
{
struct work_pool_priv *pool = _pool;

___mutex_lock(&pool->lock);

if (iv_list_empty(&pool->idle_threads) &&
pool->started_threads < pool->max_threads) {
iv_work_start_thread(pool);
}

___mutex_unlock(&pool->lock);

}

int iv_work_pool_create(struct iv_work_pool *this)
{
struct work_pool_priv *pool;
Expand All @@ -240,6 +261,12 @@ int iv_work_pool_create(struct iv_work_pool *this)
pool->ev.handler = iv_work_event;
iv_event_register(&pool->ev);

IV_EVENT_INIT(&pool->thread_needed);
pool->thread_needed.cookie = pool;
pool->thread_needed.handler = iv_work_thread_needed;
iv_event_register(&pool->thread_needed);

pool->max_threads = this->max_threads;
pool->shutting_down = 0;
pool->started_threads = 0;
INIT_IV_LIST_HEAD(&pool->idle_threads);
Expand All @@ -250,6 +277,8 @@ int iv_work_pool_create(struct iv_work_pool *this)
pool->seq_tail = 0;
INIT_IV_LIST_HEAD(&pool->work_items);
INIT_IV_LIST_HEAD(&pool->work_done);

pool->tid = iv_get_thread_id();

this->priv = pool;

Expand Down Expand Up @@ -308,9 +337,13 @@ static int iv_work_start_thread(struct work_pool_priv *pool)
}

static void
iv_work_submit_pool(struct iv_work_pool *this, struct iv_work_item *work)
iv_work_submit_pool(struct iv_work_pool *this, struct iv_work_item *work, int continuation)
{
struct work_pool_priv *pool = this->priv;
int called_from_owner_thread = (pool->tid == iv_get_thread_id());

if (!continuation && !called_from_owner_thread)
iv_fatal("iv_work_submit_pool: work items can only be submitted from the owning thread");

___mutex_lock(&pool->lock);

Expand All @@ -325,7 +358,10 @@ iv_work_submit_pool(struct iv_work_pool *this, struct iv_work_item *work)
thr->kicked = 1;
iv_event_post(&thr->kick);
} else if (pool->started_threads < this->max_threads) {
iv_work_start_thread(pool);
if (called_from_owner_thread)
iv_work_start_thread(pool);
else
iv_event_post(&pool->thread_needed);
}

___mutex_unlock(&pool->lock);
Expand Down Expand Up @@ -391,7 +427,16 @@ void
iv_work_pool_submit_work(struct iv_work_pool *this, struct iv_work_item *work)
{
if (this != NULL)
iv_work_submit_pool(this, work);
iv_work_submit_pool(this, work, 0);
else
iv_work_submit_local(work);
}

void
iv_work_pool_submit_continuation(struct iv_work_pool *this, struct iv_work_item *work)
{
if (this != NULL)
iv_work_submit_pool(this, work, 1);
else
iv_work_submit_local(work);
}