Skip to content
This repository has been archived by the owner on Jun 30, 2021. It is now read-only.

Commit

Permalink
evthr initial shared pipe prototype
Browse files Browse the repository at this point in the history
  • Loading branch information
Mark Ellzey committed Mar 20, 2015
1 parent 2d4c22f commit 72f01f5
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 8 deletions.
68 changes: 61 additions & 7 deletions evthr.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ struct evthr_cmd {
TAILQ_HEAD(evthr_pool_slist, evthr);

struct evthr_pool {
#ifdef EVTHR_SHARED_PIPE
int rdr;
int wdr;
#endif
int nthreads;
evthr_pool_slist_t threads;
};
Expand All @@ -51,11 +55,17 @@ struct evthr {
void * arg;
void * aux;

#ifdef EVTHR_SHARED_PIPE
int pool_rdr;
struct event * shared_pool_ev;
#endif
TAILQ_ENTRY(evthr) next;
};

static inline int
_evthr_read(evthr_t * thr, evthr_cmd_t * cmd, evutil_socket_t sock) {
ssize_t r;

if (recv(sock, cmd, sizeof(evthr_cmd_t), 0) != sizeof(evthr_cmd_t)) {
return 0;
}
Expand All @@ -73,7 +83,7 @@ _evthr_read_cmd(evutil_socket_t sock, short which, void * args) {
return;
}

pthread_mutex_lock(&thread->rlock);
//pthread_mutex_lock(&thread->rlock);

stopped = 0;

Expand All @@ -88,7 +98,7 @@ _evthr_read_cmd(evutil_socket_t sock, short which, void * args) {
}
}

pthread_mutex_unlock(&thread->rlock);
//pthread_mutex_unlock(&thread->rlock);

if (stopped == 1) {
event_base_loopbreak(thread->evbase);
Expand All @@ -115,6 +125,14 @@ _evthr_loop(void * args) {

event_add(thread->event, NULL);

#ifdef EVTHR_SHARED_PIPE
if (thread->pool_rdr > 0) {
thread->shared_pool_ev = event_new(thread->evbase, thread->pool_rdr,
EV_READ | EV_PERSIST, _evthr_read_cmd, args);
event_add(thread->shared_pool_ev, NULL);
}
#endif

pthread_mutex_lock(&thread->lock);

if (thread->init_cb != NULL) {
Expand All @@ -130,7 +148,7 @@ _evthr_loop(void * args) {
}

pthread_exit(NULL);
}
} /* _evthr_loop */

evthr_res
evthr_defer(evthr_t * thread, evthr_cb cb, void * arg) {
Expand All @@ -141,14 +159,14 @@ evthr_defer(evthr_t * thread, evthr_cb cb, void * arg) {
cmd.args = arg;
cmd.stop = 0;

pthread_mutex_lock(&thread->rlock);
//pthread_mutex_lock(&thread->rlock);

if (send(thread->wdr, &cmd, sizeof(cmd), 0) <= 0) {
pthread_mutex_unlock(&thread->rlock);
//pthread_mutex_unlock(&thread->rlock);
return EVTHR_RES_RETRY;
}

pthread_mutex_unlock(&thread->rlock);
//pthread_mutex_unlock(&thread->rlock);

return EVTHR_RES_OK;
}
Expand Down Expand Up @@ -302,6 +320,19 @@ evthr_pool_stop(evthr_pool_t * pool) {

evthr_res
evthr_pool_defer(evthr_pool_t * pool, evthr_cb cb, void * arg) {
#ifdef EVTHR_SHARED_PIPE
evthr_cmd_t cmd = {
.cb = cb,
.args = arg,
.stop = 0
};

if (send(pool->wdr, &cmd, sizeof(cmd), 0) <= 0) {
return EVTHR_RES_RETRY;
}

return EVTHR_RES_OK;
#else
evthr_t * thr = NULL;

if (pool == NULL) {
Expand All @@ -319,13 +350,18 @@ evthr_pool_defer(evthr_pool_t * pool, evthr_cb cb, void * arg) {


return evthr_defer(thr, cb, arg);
#endif
} /* evthr_pool_defer */

evthr_pool_t *
evthr_pool_new(int nthreads, evthr_init_cb init_cb, void * shared) {
evthr_pool_t * pool;
int i;

#ifdef EVTHR_SHARED_PIPE
int fds[2];
#endif

if (nthreads == 0) {
return NULL;
}
Expand All @@ -337,6 +373,19 @@ evthr_pool_new(int nthreads, evthr_init_cb init_cb, void * shared) {
pool->nthreads = nthreads;
TAILQ_INIT(&pool->threads);

#ifdef EVTHR_SHARED_PIPE
if (evutil_socketpair(AF_UNIX, SOCK_STREAM, 0, fds) == -1) {
return NULL;
}

evutil_make_socket_nonblocking(fds[0]);
evutil_make_socket_nonblocking(fds[1]);

pool->rdr = fds[0];
pool->wdr = fds[1];
#endif


for (i = 0; i < nthreads; i++) {
evthr_t * thread;

Expand All @@ -345,11 +394,15 @@ evthr_pool_new(int nthreads, evthr_init_cb init_cb, void * shared) {
return NULL;
}

#ifdef EVTHR_SHARED_PIPE
thread->pool_rdr = fds[0];
#endif

TAILQ_INSERT_TAIL(&pool->threads, thread, next);
}

return pool;
}
} /* evthr_pool_new */

int
evthr_pool_start(evthr_pool_t * pool) {
Expand All @@ -369,3 +422,4 @@ evthr_pool_start(evthr_pool_t * pool) {

return 0;
}

2 changes: 1 addition & 1 deletion examples/test_query.c
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ struct test base_tests[] = {
{ NULL, NULL },
} },
{ "hexa%z=some", 0, {
{ "hexa%z", "some" },
{ "hexa%z", "some" },
{ NULL, NULL },
} },
{ "aaa=some\%az", 1 },
Expand Down

0 comments on commit 72f01f5

Please sign in to comment.