Skip to content

Commit

Permalink
add optimization for append in sarray (twitter#253)
Browse files Browse the repository at this point in the history
* add optimization for append in sarray

* fix edge case

* optimize for empty entry as well

* fix event handling loop to avoid fd leak when multiple events are received concurrently
  • Loading branch information
Yao Yue authored and michalbiesek committed Sep 10, 2019
1 parent 07ba1a1 commit 7d209e3
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 11 deletions.
11 changes: 6 additions & 5 deletions src/core/data/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,13 @@ _server_event(void *arg, uint32_t events)
log_verb("processing server read event on pipe");
INCR(server_metrics, server_event_read);
_server_pipe_read();
} else if (events & EVENT_WRITE) { /* retrying worker notification */
}
if (events & EVENT_WRITE) { /* retrying worker notification */
log_verb("processing server write event on pipe");
INCR(server_metrics, server_event_write);
_server_pipe_write();
} else { /* EVENT_ERR */
}
if (events & EVENT_ERR) {
log_debug("processing server error event on pipe");
INCR(server_metrics, server_event_error);
}
Expand All @@ -204,12 +206,11 @@ _server_event(void *arg, uint32_t events)
log_verb("processing server read event on buf_sock %p", s);
INCR(server_metrics, server_event_read);
_server_event_read(s);
} else if (events & EVENT_ERR) { /* effectively refusing new conn */
}
if (events & EVENT_ERR) { /* effectively refusing new conn */
/* TODO: shall we retry bind and listen ? */
log_debug("processing server error event on listening socket");
_server_close(s);
} else {
NOT_REACHED();
}
}
}
Expand Down
14 changes: 8 additions & 6 deletions src/core/data/worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -182,10 +182,12 @@ _worker_event(void *arg, uint32_t events)
if (events & EVENT_READ) { /* new connection from server */
INCR(worker_metrics, worker_event_read);
worker_add_stream();
} else if (events & EVENT_WRITE) { /* retry return notification */
}
if (events & EVENT_WRITE) { /* retry return notification */
INCR(worker_metrics, worker_event_write);
_worker_pipe_write();
} else { /* EVENT_ERR */
}
if (events & EVENT_ERR) {
INCR(worker_metrics, worker_event_error);
log_error("error event received on pipe");
}
Expand All @@ -196,7 +198,8 @@ _worker_event(void *arg, uint32_t events)
log_verb("processing worker read event on buf_sock %p", s);
INCR(worker_metrics, worker_event_read);
_worker_event_read(s);
} else if (events & EVENT_WRITE) {
}
if (events & EVENT_WRITE) {
/* got here only when a previous write was incompleted/retried */
log_verb("processing worker write event on buf_sock %p", s);
INCR(worker_metrics, worker_event_write);
Expand All @@ -205,11 +208,10 @@ _worker_event(void *arg, uint32_t events)
event_del(ctx->evb, hdl->wid(s->ch));
event_add_read(ctx->evb, hdl->rid(s->ch), s);
}
} else if (events & EVENT_ERR) {
}
if (events & EVENT_ERR) {
s->ch->state = CHANNEL_TERM;
INCR(worker_metrics, worker_event_error);
} else {
NOT_REACHED();
}

/* TODO(yao): come up with a robust policy about channel connection
Expand Down
7 changes: 7 additions & 0 deletions src/data_structure/sarray/sarray.c
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,13 @@ _binary_search(uint32_t *idx, uint8_t *body, uint32_t nentry, uint32_t esize, ui
static inline bool
_locate(uint32_t *idx, uint8_t *body, uint32_t nentry, uint32_t esize, uint64_t val)
{
/* optimize for inserting at the end, which is dominant in many use cases */
if (nentry == 0 || _get_value(body + esize * (nentry - 1), esize) < val) {
*idx = nentry;

return false;
}

if (_should_scan(nentry, esize)) { /* linear scan */
return _linear_search(idx, body, nentry, esize, val);
} else { /* otherwise, binary search */
Expand Down

0 comments on commit 7d209e3

Please sign in to comment.