diff --git a/src/core/data/worker.c b/src/core/data/worker.c index 149f7f1e6..cee307e7d 100644 --- a/src/core/data/worker.c +++ b/src/core/data/worker.c @@ -29,7 +29,7 @@ static struct context *ctx = &context; static channel_handler_st handlers; static channel_handler_st *hdl = &handlers; -struct post_processor *processor; +struct processor *post_processor; static inline rstatus_i _worker_write(struct buf_sock *s) @@ -72,7 +72,7 @@ _worker_event_write(struct buf_sock *s) c->state = CHANNEL_TERM; } - if (processor->post_write(&s->rbuf, &s->wbuf, &s->data) < 0) { + if (post_processor->write(s) < 0) { log_debug("handler signals channel termination"); s->ch->state = CHANNEL_TERM; return CC_ERROR; @@ -99,7 +99,7 @@ worker_close(struct buf_sock *s) { log_info("worker core close on buf_sock %p", s); - processor->post_error(&s->rbuf, &s->wbuf, &s->data); + post_processor->error(s); event_del(ctx->evb, hdl->rid(s->ch)); hdl->term(s->ch); buf_sock_return(&s); @@ -112,7 +112,7 @@ _worker_event_read(struct buf_sock *s) ASSERT(s != NULL); _worker_read(s); - if (processor->post_read(&s->rbuf, &s->wbuf, &s->data) < 0) { + if (post_processor->read(s) < 0) { log_debug("handler signals channel termination"); s->ch->state = CHANNEL_TERM; return; @@ -292,7 +292,7 @@ _worker_evwait(void) void * core_worker_evloop(void *arg) { - processor = arg; + post_processor = arg; for(;;) { if (_worker_evwait() != CC_OK) { diff --git a/src/core/data/worker.h b/src/core/data/worker.h index 337c3a7d3..d1d904761 100644 --- a/src/core/data/worker.h +++ b/src/core/data/worker.h @@ -33,17 +33,17 @@ extern worker_metrics_st *worker_metrics; /* * To allow the use application-specific logic in the handling of read/write * events, each application is expected to implement their own versions of - * post_processing functions called after the channel-level read/write is done. + * (post) processing functions called after the channel-level read/write is done. * - * Applications should set and pass their instance of post_processor as argument + * Applications should set and pass their instance of processor as argument * to core_worker_evloop(). */ -struct buf; -typedef int (*post_process_fn)(struct buf **, struct buf **, void **); -struct post_processor { - post_process_fn post_read; - post_process_fn post_write; - post_process_fn post_error; +struct buf_sock; +typedef int (*process_fn)(struct buf_sock *); +struct processor { + process_fn read; + process_fn write; + process_fn error; }; void core_worker_setup(worker_options_st *options, worker_metrics_st *metrics); diff --git a/src/server/pingserver/data/process.c b/src/server/pingserver/data/process.c index 02ac5f473..0ced268d4 100644 --- a/src/server/pingserver/data/process.c +++ b/src/server/pingserver/data/process.c @@ -6,17 +6,17 @@ #include int -pingserver_process_read(struct buf **rbuf, struct buf **wbuf, void **data) +pingserver_process_read(struct buf_sock *s) { parse_rstatus_t status; log_verb("post-read processing"); /* keep parse-process-compose until running out of data in rbuf */ - while (buf_rsize(*rbuf) > 0) { - log_verb("%"PRIu32" bytes left", buf_rsize(*rbuf)); + while (buf_rsize(s->rbuf) > 0) { + log_verb("%"PRIu32" bytes left", buf_rsize(s->rbuf)); - status = parse_req(*rbuf); + status = parse_req(s->rbuf); if (status == PARSE_EUNFIN) { return 0; } @@ -24,7 +24,7 @@ pingserver_process_read(struct buf **rbuf, struct buf **wbuf, void **data) return -1; } - if (compose_rsp(wbuf) != COMPOSE_OK) { + if (compose_rsp(&s->wbuf) != COMPOSE_OK) { return -1; } } @@ -33,12 +33,12 @@ pingserver_process_read(struct buf **rbuf, struct buf **wbuf, void **data) } int -pingserver_process_write(struct buf **rbuf, struct buf **wbuf, void **data) +pingserver_process_write(struct buf_sock *s) { log_verb("post-write processing"); - dbuf_shrink(rbuf); - dbuf_shrink(wbuf); + dbuf_shrink(&s->rbuf); + dbuf_shrink(&s->wbuf); return 0; } diff --git a/src/server/pingserver/data/process.h b/src/server/pingserver/data/process.h index b40d431ed..16a5ccfae 100644 --- a/src/server/pingserver/data/process.h +++ b/src/server/pingserver/data/process.h @@ -1,6 +1,6 @@ #pragma once -#include +#include -int pingserver_process_read(struct buf **rbuf, struct buf **wbuf, void **data); -int pingserver_process_write(struct buf **rbuf, struct buf **wbuf, void **data); +int pingserver_process_read(struct buf_sock *s); +int pingserver_process_write(struct buf_sock *s); diff --git a/src/server/pingserver/main.c b/src/server/pingserver/main.c index cde8b254b..8f9c0f846 100644 --- a/src/server/pingserver/main.c +++ b/src/server/pingserver/main.c @@ -13,7 +13,7 @@ #include #include -struct post_processor worker_processor = { +struct processor worker_processor = { pingserver_process_read, pingserver_process_write }; diff --git a/src/server/slimcache/data/process.c b/src/server/slimcache/data/process.c index 9e8a3a07f..cc1ba23d5 100644 --- a/src/server/slimcache/data/process.c +++ b/src/server/slimcache/data/process.c @@ -446,7 +446,7 @@ _cleanup(struct request **req, struct response **rsp) } int -slimcache_process_read(struct buf **rbuf, struct buf **wbuf, void **data) +slimcache_process_read(struct buf_sock *s) { parse_rstatus_t status; struct request *req; @@ -464,14 +464,14 @@ slimcache_process_read(struct buf **rbuf, struct buf **wbuf, void **data) } /* keep parse-process-compose until running out of data in rbuf */ - while (buf_rsize(*rbuf) > 0) { + while (buf_rsize(s->rbuf) > 0) { struct response *nr; int i, card; /* stage 1: parsing */ - log_verb("%"PRIu32" bytes left", buf_rsize(*rbuf)); + log_verb("%"PRIu32" bytes left", buf_rsize(s->rbuf)); - status = parse_req(req, *rbuf); + status = parse_req(req, s->rbuf); if (status == PARSE_EUNFIN) { goto done; } @@ -528,7 +528,7 @@ slimcache_process_read(struct buf **rbuf, struct buf **wbuf, void **data) card = req->nfound + 1; } /* no need to update for other req types- card remains 1 */ for (i = 0; i < card; nr = STAILQ_NEXT(nr, next), ++i) { - if (compose_rsp(wbuf, nr) < 0) { + if (compose_rsp(&s->wbuf, nr) < 0) { log_error("composing rsp erred"); INCR(process_metrics, process_ex); goto error; @@ -546,12 +546,12 @@ slimcache_process_read(struct buf **rbuf, struct buf **wbuf, void **data) } int -slimcache_process_write(struct buf **rbuf, struct buf **wbuf, void **data) +slimcache_process_write(struct buf_sock *s) { log_verb("post-write processing"); - dbuf_shrink(rbuf); - dbuf_shrink(wbuf); + dbuf_shrink(&s->rbuf); + dbuf_shrink(&s->wbuf); return 0; } diff --git a/src/server/slimcache/data/process.h b/src/server/slimcache/data/process.h index 740af2fa5..59cc3be1c 100644 --- a/src/server/slimcache/data/process.h +++ b/src/server/slimcache/data/process.h @@ -3,6 +3,7 @@ #include #include #include +#include #define ALLOW_FLUSH false @@ -64,5 +65,5 @@ typedef struct { void process_setup(process_options_st *options, process_metrics_st *metrics); void process_teardown(void); -int slimcache_process_read(struct buf **rbuf, struct buf **wbuf, void **data); -int slimcache_process_write(struct buf **rbuf, struct buf **wbuf, void **data); +int slimcache_process_read(struct buf_sock *s); +int slimcache_process_write(struct buf_sock *s); diff --git a/src/server/slimcache/main.c b/src/server/slimcache/main.c index 3c400b8b5..87d91ac4e 100644 --- a/src/server/slimcache/main.c +++ b/src/server/slimcache/main.c @@ -13,7 +13,7 @@ #include #include -struct post_processor worker_processor = { +struct processor worker_processor = { slimcache_process_read, slimcache_process_write }; diff --git a/src/server/twemcache/data/process.c b/src/server/twemcache/data/process.c index 6c77fce1d..a0df7c32d 100644 --- a/src/server/twemcache/data/process.c +++ b/src/server/twemcache/data/process.c @@ -652,7 +652,7 @@ _data_create(void) } int -twemcache_process_read(struct buf **rbuf, struct buf **wbuf, void **data) +twemcache_process_read(struct buf_sock *s) { parse_rstatus_t status; struct data *state; @@ -662,8 +662,8 @@ twemcache_process_read(struct buf **rbuf, struct buf **wbuf, void **data) log_verb("post-read processing"); /* deal with the stateful part: request and response */ - if (*data == NULL) { - if ((*data = _data_create()) == NULL) { + if (s->data == NULL) { + if ((s->data = _data_create()) == NULL) { /* TODO(yao): simply return for now, better to respond with OOM */ log_error("cannot process request: OOM"); INCR(process_metrics, process_ex); @@ -671,21 +671,21 @@ twemcache_process_read(struct buf **rbuf, struct buf **wbuf, void **data) return -1; } } - state = (struct data *)*data; + state = (struct data *)s->data; req = state->req; rsp = state->rsp; /* keep parse-process-compose until running out of data in rbuf */ - while (buf_rsize(*rbuf) > 0) { + while (buf_rsize(s->rbuf) > 0) { struct response *nr; int i, card; /* stage 1: parsing */ - log_verb("%"PRIu32" bytes left", buf_rsize(*rbuf)); + log_verb("%"PRIu32" bytes left", buf_rsize(s->rbuf)); - status = parse_req(req, *rbuf); + status = parse_req(req, s->rbuf); if (status == PARSE_EUNFIN) { - buf_lshift(*rbuf); + buf_lshift(s->rbuf); return 0; } if (status != PARSE_OK) { @@ -732,7 +732,7 @@ twemcache_process_read(struct buf **rbuf, struct buf **wbuf, void **data) process_request(rsp, req); if (req->partial) { /* implies end of rbuf w/o complete processing */ /* in this case, do not attempt to log or write response */ - buf_lshift(*rbuf); + buf_lshift(s->rbuf); return 0; } @@ -747,7 +747,7 @@ twemcache_process_read(struct buf **rbuf, struct buf **wbuf, void **data) card = req->nfound + 1; } for (i = 0; i < card; nr = STAILQ_NEXT(nr, next), ++i) { - if (compose_rsp(wbuf, nr) < 0) { + if (compose_rsp(&s->wbuf, nr) < 0) { log_error("composing rsp erred"); INCR(process_metrics, process_ex); _cleanup(req, rsp); @@ -766,36 +766,36 @@ twemcache_process_read(struct buf **rbuf, struct buf **wbuf, void **data) int -twemcache_process_write(struct buf **rbuf, struct buf **wbuf, void **data) +twemcache_process_write(struct buf_sock *s) { log_verb("post-write processing"); - buf_lshift(*rbuf); - buf_lshift(*wbuf); - dbuf_shrink(rbuf); - dbuf_shrink(wbuf); + buf_lshift(s->rbuf); + dbuf_shrink(&s->rbuf); + buf_lshift(s->wbuf); + dbuf_shrink(&s->wbuf); return 0; } int -twemcache_process_error(struct buf **rbuf, struct buf **wbuf, void **data) +twemcache_process_error(struct buf_sock *s) { - struct data *state = (struct data *)*data; + struct data *state = (struct data *)s->data; struct request *req; struct response *rsp; log_verb("post-error processing"); /* normalize buffer size */ - buf_reset(*rbuf); - dbuf_shrink(rbuf); - buf_reset(*wbuf); - dbuf_shrink(wbuf); + buf_reset(s->rbuf); + dbuf_shrink(&s->rbuf); + buf_reset(s->wbuf); + dbuf_shrink(&s->wbuf); /* release request data & associated reserved data */ - if (*data != NULL) { + if (state != NULL) { req = state->req; rsp = state->rsp; if (req->reserved != NULL) { diff --git a/src/server/twemcache/data/process.h b/src/server/twemcache/data/process.h index 4ca8ffd91..2ca10bea5 100644 --- a/src/server/twemcache/data/process.h +++ b/src/server/twemcache/data/process.h @@ -3,6 +3,7 @@ #include #include #include +#include #define ALLOW_FLUSH false @@ -73,6 +74,6 @@ typedef struct { void process_setup(process_options_st *options, process_metrics_st *metrics); void process_teardown(void); -int twemcache_process_read(struct buf **rbuf, struct buf **wbuf, void **data); -int twemcache_process_write(struct buf **rbuf, struct buf **wbuf, void **data); -int twemcache_process_error(struct buf **rbuf, struct buf **wbuf, void **data); +int twemcache_process_read(struct buf_sock *s); +int twemcache_process_write(struct buf_sock *s); +int twemcache_process_error(struct buf_sock *s); diff --git a/src/server/twemcache/main.c b/src/server/twemcache/main.c index a52dea38e..371d873e6 100644 --- a/src/server/twemcache/main.c +++ b/src/server/twemcache/main.c @@ -13,7 +13,7 @@ #include #include -struct post_processor worker_processor = { +struct processor worker_processor = { twemcache_process_read, twemcache_process_write, twemcache_process_error,