Skip to content

Commit

Permalink
Add support for graceful shutdown for twemcache/slimcache
Browse files Browse the repository at this point in the history
- add new admin command "shutdown" to support this operation
- add support for SIGTERM which should allow calling properly teardown
methods

Co-authored-by: Piotr Balcer <piotr.balcer@intel.com>
  • Loading branch information
michalbiesek and pbalcer committed Aug 19, 2019
1 parent 4c43da9 commit 2050214
Show file tree
Hide file tree
Showing 17 changed files with 160 additions and 24 deletions.
1 change: 1 addition & 0 deletions deps/ccommon/include/channel/cc_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ enum {
CHANNEL_OPEN, /* opening */
CHANNEL_ESTABLISHED,
CHANNEL_TERM, /* to be closed, don't need a closing state yet */
CHANNEL_TERM_RESET_DB,
CHANNEL_ERROR, /* unrecoverable error occurred */

CHANNEL_SENTINEL
Expand Down
18 changes: 18 additions & 0 deletions src/core/admin/admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,12 @@ _admin_post_read(struct buf_sock *s)
goto done;
}

if (req.type == REQ_SHUTDOWN) {
log_info("peer called shutdown");
s->ch->state = CHANNEL_TERM_RESET_DB;
goto done;
}

admin_response_reset(&rsp);

admin_process_request(&rsp, &req);
Expand Down Expand Up @@ -193,6 +199,12 @@ _admin_event(void *arg, uint32_t events)
NOT_REACHED();
}

if (s->ch->state == CHANNEL_TERM_RESET_DB) {
_admin_close(s);
raise(SIGTERM);
return;
}

if (s->ch->state == CHANNEL_TERM || s->ch->state == CHANNEL_ERROR) {
_admin_close(s);
}
Expand Down Expand Up @@ -310,6 +322,12 @@ core_admin_register(uint64_t intvl_ms, timeout_cb_fn cb, void *arg)
return timing_wheel_insert(tw, &delay, true, cb, arg);
}

void
core_admin_unregister(struct timeout_event *tev)
{
timing_wheel_remove(tw, &tev);
}

static rstatus_i
_admin_evwait(void)
{
Expand Down
2 changes: 1 addition & 1 deletion src/core/admin/admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,5 @@ void core_admin_teardown(void);
/* add a periodic action to be executed on the admin thread, which uses timing wheel */
struct timeout_event *
core_admin_register(uint64_t intvl_ms, timeout_cb_fn cb, void *arg);

void core_admin_unregister(struct timeout_event *tev);
void core_admin_evloop(void);
28 changes: 25 additions & 3 deletions src/core/core.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@
#include <string.h>
#include <sysexits.h>

pthread_t worker, server;
void
core_run(void *arg_worker)
core_run(void *arg_worker, void *arg_server)
{
pthread_t worker, server;
int ret;

if (!admin_init || !server_init || !worker_init) {
Expand All @@ -26,7 +26,7 @@ core_run(void *arg_worker)
goto error;
}

ret = pthread_create(&server, NULL, core_server_evloop, NULL);
ret = pthread_create(&server, NULL, core_server_evloop, arg_server);
if (ret != 0) {
log_crit("pthread create failed for server thread: %s", strerror(ret));
goto error;
Expand All @@ -37,3 +37,25 @@ core_run(void *arg_worker)
error:
exit(EX_OSERR);
}

void core_destroy(void)
{
int ret;

if (!server_init || !worker_init) {
log_crit("cannot run: server/worker have to be initialized");
return;
}

ret = pthread_join(server, NULL);
if (ret != 0) {
log_crit("pthread join failed for worker thread: %s", strerror(ret));
exit(EX_OSERR);
}

ret = pthread_join(worker, NULL);
if (ret != 0) {
log_crit("pthread join failed for server thread: %s", strerror(ret));
exit(EX_OSERR);
}
}
3 changes: 2 additions & 1 deletion src/core/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@
#include "data/server.h"
#include "data/worker.h"

void core_run(void *arg_worker);
void core_run(void *arg_worker, void *arg_server);
void core_destroy(void);
8 changes: 5 additions & 3 deletions src/core/data/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -357,12 +357,14 @@ _server_evwait(void)
void *
core_server_evloop(void *arg)
{
for(;;) {
bool *running = arg;

while (__atomic_load_n(running, __ATOMIC_ACQUIRE)) {
if (_server_evwait() != CC_OK) {
log_crit("server core event loop exited due to failure");
break;
exit(1);
}
}

exit(1);
return NULL;
}
6 changes: 3 additions & 3 deletions src/core/data/worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -295,12 +295,12 @@ core_worker_evloop(void *arg)
{
processor = arg;

for(;;) {
while (__atomic_load_n(&processor->running, __ATOMIC_ACQUIRE)) {
if (_worker_evwait() != CC_OK) {
log_crit("worker core event loop exited due to failure");
break;
exit(1);
}
}

exit(1);
return NULL;
}
1 change: 1 addition & 0 deletions src/core/data/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ struct data_processor {
data_fn read;
data_fn write;
data_fn error;
bool running;
};

void core_worker_setup(worker_options_st *options, worker_metrics_st *metrics);
Expand Down
8 changes: 8 additions & 0 deletions src/protocol/admin/parse.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ _get_req_type(struct request *req, struct bstring *type)
break;
}

break;

case 8:
if (str8cmp(type->data, 's', 'h', 'u', 't', 'd', 'o', 'w', 'n')) {
req->type = REQ_SHUTDOWN;
break;
}

break;
}

Expand Down
3 changes: 2 additions & 1 deletion src/protocol/admin/request.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
ACTION( REQ_UNKNOWN, "" )\
ACTION( REQ_STATS, "stats" )\
ACTION( REQ_VERSION, "version" )\
ACTION( REQ_QUIT, "quit" )
ACTION( REQ_QUIT, "quit" )\
ACTION( REQ_SHUTDOWN, "shutdown" )

#define GET_TYPE(_name, _str) _name,
typedef enum request_type {
Expand Down
3 changes: 2 additions & 1 deletion src/server/cdb/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ struct data_processor worker_processor = {
cdb_process_read,
cdb_process_write,
cdb_process_error,
.running = true
};

static void
Expand Down Expand Up @@ -239,7 +240,7 @@ main(int argc, char **argv)
setup();
option_print_all((struct option *)&setting, nopt);

core_run(&worker_processor);
core_run(&worker_processor, &worker_processor.running);

exit(EX_OK);
}
3 changes: 2 additions & 1 deletion src/server/pingserver/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ struct data_processor worker_processor = {
pingserver_process_read,
pingserver_process_write,
pingserver_process_error,
.running = true
};

static void
Expand Down Expand Up @@ -195,7 +196,7 @@ main(int argc, char **argv)
setup();
option_print_all((struct option *)&setting, nopt);

core_run(&worker_processor);
core_run(&worker_processor, &worker_processor.running);

exit(EX_OK);
}
3 changes: 2 additions & 1 deletion src/server/rds/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ struct data_processor worker_processor = {
rds_process_read,
rds_process_write,
rds_process_error,
.running = true
};

static void
Expand Down Expand Up @@ -197,7 +198,7 @@ main(int argc, char **argv)
setup();
option_print_all((struct option *)&setting, nopt);

core_run(&worker_processor);
core_run(&worker_processor, &worker_processor.running);

exit(EX_OK);
}
34 changes: 30 additions & 4 deletions src/server/slimcache/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,19 @@
#include <sys/socket.h>
#include <sysexits.h>

enum slimcache_timeout_event_type {
DLOG_TIMEOUT_EV,
KLOG_TIMEOUT_EV,
MAX_TIMEOUT_EV
};

static struct timeout_event *slimcache_tev[MAX_TIMEOUT_EV];

struct data_processor worker_processor = {
slimcache_process_read,
slimcache_process_write,
slimcache_process_error
slimcache_process_error,
.running = true
};

static void
Expand Down Expand Up @@ -80,6 +89,18 @@ teardown(void)
log_teardown();
}

static void
_shutdown(int signo)
{
log_stderr("_shutdown received signal %d", signo);
__atomic_store_n(&worker_processor.running, false, __ATOMIC_RELEASE);
core_destroy();
for (int i = DLOG_TIMEOUT_EV; i < MAX_TIMEOUT_EV; ++i) {
core_admin_unregister(slimcache_tev[i]);
}
exit(EX_OK);
}

static void
setup(void)
{
Expand All @@ -91,6 +112,11 @@ setup(void)
exit(EX_OSERR); /* only failure comes from NOMEM */
}

if (signal_override(SIGTERM, "perform shutdown", 0, 0, _shutdown) < 0) {
log_stderr("cannot override signal");
exit(EX_OSERR);
}

/* Setup logging first */
log_setup(&stats.log);
if (debug_setup(&setting.debug) < 0) {
Expand Down Expand Up @@ -134,13 +160,13 @@ setup(void)

/* adding recurring events to maintenance/admin thread */
intvl = option_uint(&setting.slimcache.dlog_intvl);
if (core_admin_register(intvl, debug_log_flush, NULL) == NULL) {
if ((slimcache_tev[DLOG_TIMEOUT_EV] = core_admin_register(intvl, debug_log_flush, NULL)) == NULL) {
log_stderr("Could not register timed event to flush debug log");
goto error;
}

intvl = option_uint(&setting.slimcache.klog_intvl);
if (core_admin_register(intvl, klog_flush, NULL) == NULL) {
if ((slimcache_tev[KLOG_TIMEOUT_EV] = core_admin_register(intvl, klog_flush, NULL)) == NULL) {
log_error("Could not register timed event to flush command log");
goto error;
}
Expand Down Expand Up @@ -213,7 +239,7 @@ main(int argc, char **argv)
setup();
option_print_all((struct option *)&setting, nopt);

core_run(&worker_processor);
core_run(&worker_processor, &worker_processor.running);

exit(EX_OK);
}
3 changes: 2 additions & 1 deletion src/server/slimrds/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ struct data_processor worker_processor = {
slimrds_process_read,
slimrds_process_write,
slimrds_process_error,
.running = true
};

static void
Expand Down Expand Up @@ -197,7 +198,7 @@ main(int argc, char **argv)
setup();
option_print_all((struct option *)&setting, nopt);

core_run(&worker_processor);
core_run(&worker_processor, &worker_processor.running);

exit(EX_OK);
}
Loading

0 comments on commit 2050214

Please sign in to comment.