Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change processor signature to expose buf_sock #152

Merged
merged 4 commits into from
Apr 11, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions src/core/data/worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ static struct context *ctx = &context;
static channel_handler_st handlers;
static channel_handler_st *hdl = &handlers;

struct post_processor *processor;
struct processor *post_processor;

static inline rstatus_i
_worker_write(struct buf_sock *s)
Expand Down Expand Up @@ -72,7 +72,7 @@ _worker_event_write(struct buf_sock *s)
c->state = CHANNEL_TERM;
}

if (processor->post_write(&s->rbuf, &s->wbuf, &s->data) < 0) {
if (post_processor->write(s) < 0) {
log_debug("handler signals channel termination");
s->ch->state = CHANNEL_TERM;
return CC_ERROR;
Expand All @@ -99,7 +99,7 @@ worker_close(struct buf_sock *s)
{
log_info("worker core close on buf_sock %p", s);

processor->post_error(&s->rbuf, &s->wbuf, &s->data);
post_processor->error(s);
event_del(ctx->evb, hdl->rid(s->ch));
hdl->term(s->ch);
buf_sock_return(&s);
Expand All @@ -112,7 +112,7 @@ _worker_event_read(struct buf_sock *s)
ASSERT(s != NULL);

_worker_read(s);
if (processor->post_read(&s->rbuf, &s->wbuf, &s->data) < 0) {
if (post_processor->read(s) < 0) {
log_debug("handler signals channel termination");
s->ch->state = CHANNEL_TERM;
return;
Expand Down Expand Up @@ -292,7 +292,7 @@ _worker_evwait(void)
void *
core_worker_evloop(void *arg)
{
processor = arg;
post_processor = arg;

for(;;) {
if (_worker_evwait() != CC_OK) {
Expand Down
16 changes: 8 additions & 8 deletions src/core/data/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,17 @@ extern worker_metrics_st *worker_metrics;
/*
* To allow the use application-specific logic in the handling of read/write
* events, each application is expected to implement their own versions of
* post_processing functions called after the channel-level read/write is done.
* (post) processing functions called after the channel-level read/write is done.
*
* Applications should set and pass their instance of post_processor as argument
* Applications should set and pass their instance of processor as argument
* to core_worker_evloop().
*/
struct buf;
typedef int (*post_process_fn)(struct buf **, struct buf **, void **);
struct post_processor {
post_process_fn post_read;
post_process_fn post_write;
post_process_fn post_error;
struct buf_sock;
typedef int (*process_fn)(struct buf_sock *);
struct processor {
process_fn read;
process_fn write;
process_fn error;
};

void core_worker_setup(worker_options_st *options, worker_metrics_st *metrics);
Expand Down
16 changes: 8 additions & 8 deletions src/server/pingserver/data/process.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,25 @@
#include <cc_debug.h>

int
pingserver_process_read(struct buf **rbuf, struct buf **wbuf, void **data)
pingserver_process_read(struct buf_sock *s)
{
parse_rstatus_t status;

log_verb("post-read processing");

/* keep parse-process-compose until running out of data in rbuf */
while (buf_rsize(*rbuf) > 0) {
log_verb("%"PRIu32" bytes left", buf_rsize(*rbuf));
while (buf_rsize(s->rbuf) > 0) {
log_verb("%"PRIu32" bytes left", buf_rsize(s->rbuf));

status = parse_req(*rbuf);
status = parse_req(s->rbuf);
if (status == PARSE_EUNFIN) {
return 0;
}
if (status != PARSE_OK) {
return -1;
}

if (compose_rsp(wbuf) != COMPOSE_OK) {
if (compose_rsp(&s->wbuf) != COMPOSE_OK) {
return -1;
}
}
Expand All @@ -33,12 +33,12 @@ pingserver_process_read(struct buf **rbuf, struct buf **wbuf, void **data)
}

int
pingserver_process_write(struct buf **rbuf, struct buf **wbuf, void **data)
pingserver_process_write(struct buf_sock *s)
{
log_verb("post-write processing");

dbuf_shrink(rbuf);
dbuf_shrink(wbuf);
dbuf_shrink(&s->rbuf);
dbuf_shrink(&s->wbuf);

return 0;
}
6 changes: 3 additions & 3 deletions src/server/pingserver/data/process.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#pragma once

#include <buffer/cc_buf.h>
#include <stream/cc_sockio.h>

int pingserver_process_read(struct buf **rbuf, struct buf **wbuf, void **data);
int pingserver_process_write(struct buf **rbuf, struct buf **wbuf, void **data);
int pingserver_process_read(struct buf_sock *s);
int pingserver_process_write(struct buf_sock *s);
2 changes: 1 addition & 1 deletion src/server/pingserver/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
#include <sys/socket.h>
#include <sysexits.h>

struct post_processor worker_processor = {
struct processor worker_processor = {
pingserver_process_read,
pingserver_process_write
};
Expand Down
16 changes: 8 additions & 8 deletions src/server/slimcache/data/process.c
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ _cleanup(struct request **req, struct response **rsp)
}

int
slimcache_process_read(struct buf **rbuf, struct buf **wbuf, void **data)
slimcache_process_read(struct buf_sock *s)
{
parse_rstatus_t status;
struct request *req;
Expand All @@ -464,14 +464,14 @@ slimcache_process_read(struct buf **rbuf, struct buf **wbuf, void **data)
}

/* keep parse-process-compose until running out of data in rbuf */
while (buf_rsize(*rbuf) > 0) {
while (buf_rsize(s->rbuf) > 0) {
struct response *nr;
int i, card;

/* stage 1: parsing */
log_verb("%"PRIu32" bytes left", buf_rsize(*rbuf));
log_verb("%"PRIu32" bytes left", buf_rsize(s->rbuf));

status = parse_req(req, *rbuf);
status = parse_req(req, s->rbuf);
if (status == PARSE_EUNFIN) {
goto done;
}
Expand Down Expand Up @@ -528,7 +528,7 @@ slimcache_process_read(struct buf **rbuf, struct buf **wbuf, void **data)
card = req->nfound + 1;
} /* no need to update for other req types- card remains 1 */
for (i = 0; i < card; nr = STAILQ_NEXT(nr, next), ++i) {
if (compose_rsp(wbuf, nr) < 0) {
if (compose_rsp(&s->wbuf, nr) < 0) {
log_error("composing rsp erred");
INCR(process_metrics, process_ex);
goto error;
Expand All @@ -546,12 +546,12 @@ slimcache_process_read(struct buf **rbuf, struct buf **wbuf, void **data)
}

int
slimcache_process_write(struct buf **rbuf, struct buf **wbuf, void **data)
slimcache_process_write(struct buf_sock *s)
{
log_verb("post-write processing");

dbuf_shrink(rbuf);
dbuf_shrink(wbuf);
dbuf_shrink(&s->rbuf);
dbuf_shrink(&s->wbuf);

return 0;
}
5 changes: 3 additions & 2 deletions src/server/slimcache/data/process.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <buffer/cc_buf.h>
#include <cc_metric.h>
#include <cc_option.h>
#include <stream/cc_sockio.h>

#define ALLOW_FLUSH false

Expand Down Expand Up @@ -64,5 +65,5 @@ typedef struct {
void process_setup(process_options_st *options, process_metrics_st *metrics);
void process_teardown(void);

int slimcache_process_read(struct buf **rbuf, struct buf **wbuf, void **data);
int slimcache_process_write(struct buf **rbuf, struct buf **wbuf, void **data);
int slimcache_process_read(struct buf_sock *s);
int slimcache_process_write(struct buf_sock *s);
2 changes: 1 addition & 1 deletion src/server/slimcache/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
#include <sys/socket.h>
#include <sysexits.h>

struct post_processor worker_processor = {
struct processor worker_processor = {
slimcache_process_read,
slimcache_process_write
};
Expand Down
44 changes: 22 additions & 22 deletions src/server/twemcache/data/process.c
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,7 @@ _data_create(void)
}

int
twemcache_process_read(struct buf **rbuf, struct buf **wbuf, void **data)
twemcache_process_read(struct buf_sock *s)
{
parse_rstatus_t status;
struct data *state;
Expand All @@ -662,30 +662,30 @@ twemcache_process_read(struct buf **rbuf, struct buf **wbuf, void **data)
log_verb("post-read processing");

/* deal with the stateful part: request and response */
if (*data == NULL) {
if ((*data = _data_create()) == NULL) {
if (s->data == NULL) {
if ((s->data = _data_create()) == NULL) {
/* TODO(yao): simply return for now, better to respond with OOM */
log_error("cannot process request: OOM");
INCR(process_metrics, process_ex);

return -1;
}
}
state = (struct data *)*data;
state = (struct data *)s->data;
req = state->req;
rsp = state->rsp;

/* keep parse-process-compose until running out of data in rbuf */
while (buf_rsize(*rbuf) > 0) {
while (buf_rsize(s->rbuf) > 0) {
struct response *nr;
int i, card;

/* stage 1: parsing */
log_verb("%"PRIu32" bytes left", buf_rsize(*rbuf));
log_verb("%"PRIu32" bytes left", buf_rsize(s->rbuf));

status = parse_req(req, *rbuf);
status = parse_req(req, s->rbuf);
if (status == PARSE_EUNFIN) {
buf_lshift(*rbuf);
buf_lshift(s->rbuf);
return 0;
}
if (status != PARSE_OK) {
Expand Down Expand Up @@ -732,7 +732,7 @@ twemcache_process_read(struct buf **rbuf, struct buf **wbuf, void **data)
process_request(rsp, req);
if (req->partial) { /* implies end of rbuf w/o complete processing */
/* in this case, do not attempt to log or write response */
buf_lshift(*rbuf);
buf_lshift(s->rbuf);
return 0;
}

Expand All @@ -747,7 +747,7 @@ twemcache_process_read(struct buf **rbuf, struct buf **wbuf, void **data)
card = req->nfound + 1;
}
for (i = 0; i < card; nr = STAILQ_NEXT(nr, next), ++i) {
if (compose_rsp(wbuf, nr) < 0) {
if (compose_rsp(&s->wbuf, nr) < 0) {
log_error("composing rsp erred");
INCR(process_metrics, process_ex);
_cleanup(req, rsp);
Expand All @@ -766,36 +766,36 @@ twemcache_process_read(struct buf **rbuf, struct buf **wbuf, void **data)


int
twemcache_process_write(struct buf **rbuf, struct buf **wbuf, void **data)
twemcache_process_write(struct buf_sock *s)
{
log_verb("post-write processing");

buf_lshift(*rbuf);
buf_lshift(*wbuf);
dbuf_shrink(rbuf);
dbuf_shrink(wbuf);
buf_lshift(s->rbuf);
dbuf_shrink(&s->rbuf);
buf_lshift(s->wbuf);
dbuf_shrink(&s->wbuf);

return 0;
}


int
twemcache_process_error(struct buf **rbuf, struct buf **wbuf, void **data)
twemcache_process_error(struct buf_sock *s)
{
struct data *state = (struct data *)*data;
struct data *state = (struct data *)s->data;
struct request *req;
struct response *rsp;

log_verb("post-error processing");

/* normalize buffer size */
buf_reset(*rbuf);
dbuf_shrink(rbuf);
buf_reset(*wbuf);
dbuf_shrink(wbuf);
buf_reset(s->rbuf);
dbuf_shrink(&s->rbuf);
buf_reset(s->wbuf);
dbuf_shrink(&s->wbuf);

/* release request data & associated reserved data */
if (*data != NULL) {
if (state != NULL) {
req = state->req;
rsp = state->rsp;
if (req->reserved != NULL) {
Expand Down
7 changes: 4 additions & 3 deletions src/server/twemcache/data/process.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <buffer/cc_buf.h>
#include <cc_metric.h>
#include <cc_option.h>
#include <stream/cc_sockio.h>

#define ALLOW_FLUSH false

Expand Down Expand Up @@ -73,6 +74,6 @@ typedef struct {
void process_setup(process_options_st *options, process_metrics_st *metrics);
void process_teardown(void);

int twemcache_process_read(struct buf **rbuf, struct buf **wbuf, void **data);
int twemcache_process_write(struct buf **rbuf, struct buf **wbuf, void **data);
int twemcache_process_error(struct buf **rbuf, struct buf **wbuf, void **data);
int twemcache_process_read(struct buf_sock *s);
int twemcache_process_write(struct buf_sock *s);
int twemcache_process_error(struct buf_sock *s);
2 changes: 1 addition & 1 deletion src/server/twemcache/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
#include <sys/socket.h>
#include <sysexits.h>

struct post_processor worker_processor = {
struct processor worker_processor = {
twemcache_process_read,
twemcache_process_write,
twemcache_process_error,
Expand Down