Skip to content

Commit

Permalink
Merge pull request #730 from Netflix/dev
Browse files Browse the repository at this point in the history
Merge dev into v0.6
  • Loading branch information
smukil authored Oct 20, 2019
2 parents 35722e2 + 46c73f7 commit 1da0457
Show file tree
Hide file tree
Showing 12 changed files with 213 additions and 38 deletions.
12 changes: 11 additions & 1 deletion src/dyn_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ struct msg *req_recv_next(struct context *ctx, struct conn *conn, bool alloc) {
if (is_read_repairs_enabled()) {
req->timestamp = current_timestamp_in_millis();
}

return req;
}

Expand All @@ -364,9 +365,18 @@ static bool req_filter(struct context *ctx, struct conn *conn,
if (req->quit) {
ASSERT(conn->rmsg == NULL);
log_debug(LOG_VERB, "%s filter quit %s", print_obj(conn), print_obj(req));

// The client expects to receive an "+OK\r\n" response, so make sure
// to do that.
IGNORE_RET_VAL(simulate_ok_rsp(ctx, conn, req));

conn->eof = 1;
conn->recv_ready = 0;
req_put(req);
return true;
}

// If this is a Dynomite configuration message, don't forward it.
if (is_msg_type_dyno_config(req->type)) {
return true;
}

Expand Down
6 changes: 2 additions & 4 deletions src/dyn_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -127,17 +127,15 @@ inline void conn_set_read_consistency(struct conn *conn, consistency_t cons) {
}

inline consistency_t conn_get_read_consistency(struct conn *conn) {
// return conn->read_consistency;
return g_read_consistency;
return conn->read_consistency;
}

inline void conn_set_write_consistency(struct conn *conn, consistency_t cons) {
conn->write_consistency = cons;
}

inline consistency_t conn_get_write_consistency(struct conn *conn) {
// return conn->write_consistency;
return g_write_consistency;
return conn->write_consistency;
}

rstatus_t conn_event_del_conn(struct conn *conn) {
Expand Down
8 changes: 3 additions & 5 deletions src/dyn_core.c
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,9 @@ static rstatus_t core_event_base_create(struct context *ctx) {
}

/**
*
* NOTE: DEPRECATED and not currently used in the codebase.
*
* Initialize anti-entropy.
* @param[in,out] ctx Context.
* @return rstatus_t Return status code.
Expand Down Expand Up @@ -269,11 +272,6 @@ rstatus_t core_start(struct instance *nci) {
goto error;
}

status = core_entropy_init(ctx);
if (status != DN_OK) {
goto error;
}

status = core_event_base_create(ctx);
if (status != DN_OK) {
goto error;
Expand Down
8 changes: 4 additions & 4 deletions src/dyn_gossip.c
Original file line number Diff line number Diff line change
Expand Up @@ -840,12 +840,12 @@ static void *gossip_loop(void *arg) {
}

rstatus_t gossip_start(struct server_pool *sp) {
rstatus_t status;
pthread_t tid;

status = pthread_create(&tid, NULL, gossip_loop, sp);
if (status < 0) {
log_error("gossip service create failed: %s", strerror(status));
int pthread_status;
pthread_status = pthread_create(&tid, NULL, gossip_loop, sp);
if (pthread_status < 0) {
log_error("gossip service create failed: %s", strerror(pthread_status));
return DN_ERROR;
}

Expand Down
108 changes: 103 additions & 5 deletions src/dyn_message.c
Original file line number Diff line number Diff line change
Expand Up @@ -915,6 +915,93 @@ static rstatus_t msg_repair(struct context *ctx, struct conn *conn,
return DN_OK;
}

/*
* Crafts a success response message for the respective datastore.
*
* TODO: This currently does only Redis. The Redis specific code should
* be moved out of this file.
*
* Returns a 'msg' with the expected success response.
*/
static struct msg *craft_ok_rsp(struct context *ctx, struct conn *conn,
struct msg *req) {

ASSERT(req->is_request);

rstatus_t ret_status = DN_OK;
const char *QUIT_FMT_STRING = "+OK\r\n";

struct msg *rsp = msg_get(conn, false, __FUNCTION__);
if (rsp == NULL) {
conn->err = errno;
return NULL;
}

rstatus_t append_status = msg_append(rsp, QUIT_FMT_STRING, strlen(QUIT_FMT_STRING));
if (append_status != DN_OK) {
rsp_put(rsp);
return NULL;
}

rsp->peer = req;
rsp->is_request = 0;

req->done = 1;

return rsp;
}

rstatus_t simulate_ok_rsp(struct context *ctx, struct conn *conn,
struct msg *msg) {
// Create an OK response.
struct msg *ok_rsp = craft_ok_rsp(ctx, conn, msg);

// Add it to the outstanding messages dictionary, so that 'conn_handle_response'
// can process it appropriately.
dictAdd(conn->outstanding_msgs_dict, &msg->id, msg);

// Enqueue the message in the outbound queue so that the code on the response
// path can find it.
conn_enqueue_outq(ctx, conn, msg);

THROW_STATUS(conn_handle_response(ctx, conn,
msg->parent_id ? msg->parent_id : msg->id, ok_rsp));

return DN_OK;
}

/*
* If the command sent to Dynomite was a special Dynomite configuration
* command, we process and apply the configuration here.
*
* Returns: DN_OK on successful application, DN_ERROR otherwise.
*/
static rstatus_t msg_apply_config(struct context *ctx, struct conn *conn,
struct msg *msg) {

// We only support one type of configuration now.
// TODO: If we support more, convert this to a switch case.
ASSERT(msg->type == MSG_HACK_SETTING_CONN_CONSISTENCY);

struct argpos *consistency_string = (struct argpos*) array_get(msg->args, 0);

// We must have a consistency string, else we wouldn't have reached here.
ASSERT(consistency_string != NULL);

consistency_t cons = get_consistency_enum_from_string(consistency_string->start);
if (cons == -1) return DN_ERROR;

conn_set_read_consistency(conn, cons);
conn_set_write_consistency(conn, cons);

// Set the consistency to DC_ONE, since this is just a configuration setting.
msg->consistency = DC_ONE;

THROW_STATUS(simulate_ok_rsp(ctx, conn, msg));

return DN_OK;
}

static rstatus_t msg_parse(struct context *ctx, struct conn *conn,
struct msg *msg) {
rstatus_t status;
Expand All @@ -929,19 +1016,24 @@ static rstatus_t msg_parse(struct context *ctx, struct conn *conn,

switch (msg->result) {
case MSG_PARSE_OK:
// log_debug(LOG_VVERB, "MSG_PARSE_OK");
status = msg_parsed(ctx, conn, msg);
break;

case MSG_PARSE_REPAIR:
// log_debug(LOG_VVERB, "MSG_PARSE_REPAIR");
status = msg_repair(ctx, conn, msg);
break;

case MSG_PARSE_AGAIN:
// log_debug(LOG_VVERB, "MSG_PARSE_AGAIN");
status = DN_OK;
break;
case MSG_PARSE_DYNO_CONFIG:
status = msg_apply_config(ctx, conn, msg);

// No more data to parse.
conn_recv_done(ctx, conn, msg, NULL);
break;

case MSG_PARSE_NOOP:
status = DN_NOOPS;
break;

default:
/*
Expand Down Expand Up @@ -1611,3 +1703,9 @@ rstatus_t msg_append_format(struct msg *msg, const char *fmt, int num_args, ...)

return DN_OK;
}

bool is_msg_type_dyno_config(msg_type_t msg_type) {
// TODO: Convert to a switch case if we support more.
if (msg_type == MSG_HACK_SETTING_CONN_CONSISTENCY) return true;
return false;
}
48 changes: 43 additions & 5 deletions src/dyn_message.h
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@
ACTION(RSP_REDIS_ERROR_EXECABORT) \
ACTION(RSP_REDIS_ERROR_MASTERDOWN) \
ACTION(RSP_REDIS_ERROR_NOREPLICAS) \
ACTION(HACK_SETTING_CONN_CONSISTENCY) \
ACTION(SENTINEL) \
ACTION(END_IDX) \
/* ACTION( REQ_REDIS_AUTH) */ \
Expand Down Expand Up @@ -260,11 +261,21 @@ extern func_msg_repair_t g_make_repair_query; /* Create a repair msg. */
void set_datastore_ops(void);

typedef enum msg_parse_result {
MSG_PARSE_OK, /* parsing ok */
MSG_PARSE_ERROR, /* parsing error */
MSG_PARSE_REPAIR, /* more to parse -> repair parsed & unparsed data */
MSG_PARSE_FRAGMENT, /* multi-vector request -> fragment */
MSG_PARSE_AGAIN, /* incomplete -> parse again */
// Parsing OK
MSG_PARSE_OK,
// Parsing error
MSG_PARSE_ERROR,
// More to parse -> Repair parsed & unparsed data
MSG_PARSE_REPAIR,
// Multi-vector request -> fragment
MSG_PARSE_FRAGMENT,
// Incomplete, parse again.
MSG_PARSE_AGAIN,
// Parsing done, but do nothing after
MSG_PARSE_NOOP,
// Parsing done, command was a dynomite configuration
MSG_PARSE_DYNO_CONFIG,
// OOM error during parsing (TODO: consider removing)
MSG_OOM_ERROR
} msg_parse_result_t;

Expand Down Expand Up @@ -350,6 +361,19 @@ static inline char *get_consistency_string(consistency_t cons) {
return "INVALID CONSISTENCY";
}

static inline consistency_t get_consistency_enum_from_string(char *cons) {
if (dn_strcasecmp(cons, "DC_ONE") == 0) {
return DC_ONE;
} else if (dn_strcasecmp(cons, "DC_QUORUM") == 0) {
return DC_QUORUM;
} else if (dn_strcasecmp(cons, "DC_SAFE_QUORUM") == 0) {
return DC_SAFE_QUORUM;
} else if (dn_strcasecmp(cons, "DC_EACH_SAFE_QUORUM") == 0) {
return DC_EACH_SAFE_QUORUM;
}
return -1;
}

#define DEFAULT_READ_CONSISTENCY DC_ONE
#define DEFAULT_WRITE_CONSISTENCY DC_ONE
extern consistency_t g_write_consistency;
Expand Down Expand Up @@ -616,4 +640,18 @@ rstatus_t dnode_peer_req_forward(struct context *ctx, struct conn *c_conn,
// string *data);
void dnode_peer_gossip_forward(struct context *ctx, struct conn *conn,
struct mbuf *data);

/*
* Simulates a successful response as though the datastore sent it.
* Also, does the necessary to make sure that the response path is
* able to send this response back to the client.
*
* Returns DN_OK on success and an appropriate error otherwise.
*/
rstatus_t simulate_ok_rsp(struct context *ctx, struct conn *conn,
struct msg *msg);

// Returns 'true' if 'msg_type' is a Dynomite configuration command.
bool is_msg_type_dyno_config(msg_type_t msg_type);

#endif
2 changes: 1 addition & 1 deletion src/dyn_response_mgr.c
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ static void rspmgr_incr_non_quorum_responses_stats(
*/
bool perform_repairs_if_necessary(struct context *ctx, struct response_mgr *rspmgr) {

struct msg* repair_msg;
struct msg* repair_msg = NULL;
rstatus_t repair_create_status = g_make_repair_query(ctx, rspmgr, &repair_msg);

if (repair_create_status == DN_OK && repair_msg != NULL) {
Expand Down
5 changes: 3 additions & 2 deletions src/dyn_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -886,10 +886,11 @@ void req_send_done(struct context *ctx, struct conn *conn, struct msg *req) {
* enqueue message (request) in server outq, if response is expected.
* Otherwise, free the request
*/
if (req->expect_datastore_reply || (conn->type == CONN_SERVER))
if (req->expect_datastore_reply || (conn->type == CONN_SERVER)) {
conn_enqueue_outq(ctx, conn, req);
else
} else {
req_put(req);
}
}

static void req_server_enqueue_imsgq(struct context *ctx, struct conn *conn,
Expand Down
9 changes: 4 additions & 5 deletions src/dyn_stats.c
Original file line number Diff line number Diff line change
Expand Up @@ -1346,17 +1346,16 @@ static rstatus_t stats_listen(struct stats *st) {
}

static rstatus_t stats_start_aggregator(struct stats *st) {
rstatus_t status;

if (!stats_enabled) {
return DN_OK;
}

THROW_STATUS(stats_listen(st));

status = pthread_create(&st->tid, NULL, stats_loop, st);
if (status < 0) {
log_error("stats aggregator create failed: %s", strerror(status));
int pthread_status;
pthread_status = pthread_create(&st->tid, NULL, stats_loop, st);
if (pthread_status < 0) {
log_error("stats aggregator create failed: %s", strerror(pthread_status));
return DN_ERROR;
}

Expand Down
12 changes: 7 additions & 5 deletions src/entropy/dyn_entropy_util.c
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,9 @@ rstatus_t entropy_key_iv_load(struct context *ctx) {
}

/*
*
* NOTE: DEPRECATED and not currently used in the codebase.
*
* Function: entropy_snd_init
* --------------------
* Initiates the data for the connection towards another cluster for
Expand Down Expand Up @@ -627,14 +630,13 @@ void *entropy_loop(void *arg) {
*/

rstatus_t entropy_conn_start(struct entropy *cn) {
rstatus_t status;

THROW_STATUS(entropy_listen(cn));

status = pthread_create(&cn->tid, NULL, entropy_loop, cn);
if (status < 0) {
int pthread_status;
pthread_status = pthread_create(&cn->tid, NULL, entropy_loop, cn);
if (pthread_status < 0) {
log_error("reconciliation thread for socket create failed: %s",
strerror(status));
strerror(pthread_status));
return DN_ERROR;
}

Expand Down
Loading

0 comments on commit 1da0457

Please sign in to comment.