Skip to content

Commit

Permalink
allow multiple values in both sarray_insert/remove (#241)
Browse files Browse the repository at this point in the history
* allow multiple values in both sarray_insert/remove

* support multi-element get, fix bug
  • Loading branch information
Yao Yue authored Aug 10, 2019
1 parent ea7cc00 commit 4c43da9
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 83 deletions.
2 changes: 1 addition & 1 deletion src/protocol/data/resp/cmd_sarray.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
ACTION( REQ_SARRAY_FIND, "SArray.find", 3, 0 )\
ACTION( REQ_SARRAY_GET, "SArray.get", 2, 2 )\
ACTION( REQ_SARRAY_INSERT, "SArray.insert", 3, -1 )\
ACTION( REQ_SARRAY_REMOVE, "SArray.remove", 3, 0 )\
ACTION( REQ_SARRAY_REMOVE, "SArray.remove", 3, -1 )\
ACTION( REQ_SARRAY_TRUNCATE, "SArray.truncate", 3, 0 )

typedef enum sarray_elem {
Expand Down
208 changes: 127 additions & 81 deletions src/server/rds/data/cmd_sarray.c
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@
#include <cc_debug.h>
#include <cc_mm.h>

/* TODO(yao): make MAX_NVAL configurable */
#define MAX_NVAL 255 /* max no. of values to insert/remove in one request */
static uint64_t vals[MAX_NVAL];


static inline struct item *
_add_key(struct response *rsp, struct bstring *key)
Expand Down Expand Up @@ -180,11 +184,7 @@ cmd_sarray_len(struct response *rsp, const struct request *req,
}

nentry = sarray_nentry((sarray_p)item_data(it));

rsp->type = reply->type = ELEM_INT;
reply->num = (int64_t)nentry;
log_verb("command '%.*s' '%.*s' succeeded, sarray length %u",
cmd->bstr.len, cmd->bstr.data, key->len, key->data, nentry);
compose_rsp_numeric(rsp, reply, cmd, key, (int64_t)nentry);
}

void
Expand Down Expand Up @@ -257,11 +257,14 @@ cmd_sarray_get(struct response *rsp, const struct request *req, const struct
struct element *reply = (struct element *)array_push(rsp->token);
struct bstring *key;
struct item *it;
int64_t idx;
int64_t idx = 0, cnt = 1;
uint64_t val;
uint32_t narg, nentry, nreturned = 0;
int32_t incr;
sarray_rstatus_e status;

ASSERT(array_nelem(req->token) == cmd->narg);
narg = array_nelem(req->token);
ASSERT(narg >= cmd->narg);

INCR(process_metrics, sarray_get);

Expand All @@ -271,12 +274,6 @@ cmd_sarray_get(struct response *rsp, const struct request *req, const struct

return;
}
if (!req_get_int(&idx, req, SARRAY_IDX)) {
compose_rsp_client_err(rsp, reply, cmd, key);
INCR(process_metrics, sarray_get_ex);

return;
}

it = item_get(key);
if (it == NULL) {
Expand All @@ -286,26 +283,55 @@ cmd_sarray_get(struct response *rsp, const struct request *req, const struct
return;
}

status = sarray_value(&val, (sarray_p)item_data(it), (uint32_t)idx);
switch (status) {
case SARRAY_OK:
rsp->type = reply->type = ELEM_INT;
reply->num = (int64_t)val;
log_verb("command '%.*s' '%.*s' succeeded, index %"PRIu32" has value "
PRIu64, cmd->bstr.len, cmd->bstr.data, key->len, key->data, idx,
val);
INCR(process_metrics, sarray_get_ok);
nentry = sarray_nentry((sarray_p)item_data(it));

break;
case SARRAY_EOOB:
compose_rsp_oob(rsp, reply, cmd, key, idx);
INCR(process_metrics, sarray_get_oob);
if (narg > cmd->narg && !req_get_int(&idx, req, SARRAY_IDX)) {
compose_rsp_client_err(rsp, reply, cmd, key);
INCR(process_metrics, sarray_get_ex);

break;
default:
compose_rsp_server_err(rsp, reply, cmd, key);
NOT_REACHED();
return;
}

if (idx < 0) {
idx += nentry;
if (idx < 0) {
idx = 0;
}
} else {
if (idx > nentry) {
idx = nentry;
}
}

if (narg > cmd->narg + 1 && !req_get_int(&cnt, req, SARRAY_ICNT)) {
compose_rsp_client_err(rsp, reply, cmd, key);
INCR(process_metrics, sarray_get_ex);

return;
}
/* cnt < 0 means return in reverse order */
if (cnt > 0) {
incr = 1;
nreturned = MIN(nentry - idx, cnt);
} else {
incr = -1;
nreturned = MIN(idx + 1, -cnt);
}

/* write the array header */
rsp->type = ELEM_ARRAY;
for (; nreturned > 0; nreturned--, idx += incr) {
status = sarray_value(&val, (sarray_p)item_data(it), (uint32_t)idx);
ASSERT(status == SARRAY_OK);
reply->type = ELEM_INT;
reply->num = val;
reply = (struct element *)array_push(rsp->token);
}
array_pop(rsp->token);

INCR(process_metrics, sarray_get_ok);
log_verb("command '%.*s' '%.*s' succeeded, returning %"PRIu32" elements",
cmd->bstr.len, cmd->bstr.data, key->len, key->data, array_nelem(rsp->token));
}

void
Expand All @@ -315,12 +341,12 @@ cmd_sarray_insert(struct response *rsp, const struct request *req, const struct
struct element *reply = (struct element *)array_push(rsp->token);
struct bstring *key;
struct item *it;
uint32_t delta = 0;
uint32_t nval = 0, ninserted = 0, delta;
int64_t val;
sarray_p sa;
sarray_rstatus_e status;

ASSERT(array_nelem(req->token) == cmd->narg);
ASSERT(array_nelem(req->token) >= cmd->narg);

INCR(process_metrics, sarray_insert);

Expand All @@ -330,12 +356,6 @@ cmd_sarray_insert(struct response *rsp, const struct request *req, const struct

return;
}
if (!req_get_int(&val, req, SARRAY_VAL)) {
compose_rsp_client_err(rsp, reply, cmd, key);
INCR(process_metrics, sarray_insert_ex);

return;
}

it = item_get(key);
if (it == NULL) {
Expand All @@ -345,7 +365,19 @@ cmd_sarray_insert(struct response *rsp, const struct request *req, const struct
return;
}

delta = sarray_esize((sarray_p)item_data(it));
/* parse and store all values to be inserted in array vals */
for (uint32_t i = SARRAY_VAL; i < array_nelem(req->token); ++i, ++nval) {
if (!req_get_int(&val, req, i)) {
compose_rsp_client_err(rsp, reply, cmd, key);
INCR(process_metrics, sarray_insert_ex);

return;
} else {
vals[nval] = (uint64_t)val;
}
}

delta = sarray_esize((sarray_p)item_data(it)) * nval;
if (_realloc_key(&it, key, delta) != ITEM_OK) {
compose_rsp_storage_err(rsp, reply, cmd, key);
INCR(process_metrics, sarray_insert_ex);
Expand All @@ -354,23 +386,24 @@ cmd_sarray_insert(struct response *rsp, const struct request *req, const struct
}

sa = (sarray_p)item_data(it);
status = sarray_insert(sa, (uint64_t)val);

if (status == SARRAY_EINVALID) {
compose_rsp_client_err(rsp, reply, cmd, key);
INCR(process_metrics, sarray_insert_ex);

return;
}
if (status == SARRAY_EDUP) {
compose_rsp_noop(rsp, reply, cmd, key);
INCR(process_metrics, sarray_insert_noop);
for (uint32_t i = 0; i < nval; ++i) {
status = sarray_insert(sa, vals[i]);
if (status == SARRAY_EINVALID) {
compose_rsp_client_err(rsp, reply, cmd, key);
INCR(process_metrics, sarray_insert_ex);
return;
}

return;
if (status == SARRAY_EDUP) {
compose_rsp_noop(rsp, reply, cmd, key);
INCR(process_metrics, sarray_insert_noop);
} else {
INCR(process_metrics, sarray_insert_ok);
ninserted++;
}
}

compose_rsp_ok(rsp, reply, cmd, key);
INCR(process_metrics, sarray_insert_ok);
compose_rsp_numeric(rsp, reply, cmd, key, (int64_t)ninserted);
}

void
Expand All @@ -380,6 +413,7 @@ cmd_sarray_remove(struct response *rsp, const struct request *req, const struct
struct element *reply = (struct element *)array_push(rsp->token);
struct bstring *key;
struct item *it;
uint32_t nval = 0, nremoved = 0;
int64_t val;
sarray_p sa;
sarray_rstatus_e status;
Expand All @@ -394,12 +428,6 @@ cmd_sarray_remove(struct response *rsp, const struct request *req, const struct

return;
}
if (!req_get_int(&val, req, SARRAY_VAL)) {
compose_rsp_client_err(rsp, reply, cmd, key);
INCR(process_metrics, sarray_remove_ex);

return;
}

it = item_get(key);
if (it == NULL) {
Expand All @@ -409,31 +437,49 @@ cmd_sarray_remove(struct response *rsp, const struct request *req, const struct
return;
}

sa = (sarray_p)item_data(it);
status = sarray_remove(sa, val);
/* parse and store all values to be inserted in array vals */
for (uint32_t i = SARRAY_VAL; i < array_nelem(req->token); ++i) {
if (!req_get_int(&val, req, i)) {
compose_rsp_client_err(rsp, reply, cmd, key);
INCR(process_metrics, sarray_remove_ex);

switch (status) {
case SARRAY_OK:
/* TODO: should we try to "fit" to a smaller item here? */
compose_rsp_ok(rsp, reply, cmd, key);
INCR(process_metrics, sarray_remove_ok);

break;
case SARRAY_ENOTFOUND:
compose_rsp_noop(rsp, reply, cmd, key);
INCR(process_metrics, sarray_remove_noop);

break;
case SARRAY_EINVALID:
/* client error, bad argument */
compose_rsp_client_err(rsp, reply, cmd, key);
INCR(process_metrics, sarray_remove_ex);
return;
} else {
vals[nval] = (uint64_t)val;
nval++;
}
}
/* TODO: should we try to "fit" to a smaller item here? */

break;
default:
compose_rsp_server_err(rsp, reply, cmd, key);
NOT_REACHED();
sa = (sarray_p)item_data(it);
for (uint32_t i = 0; i < nval; ++i) {
status = sarray_remove(sa, vals[i]);
switch (status) {
case SARRAY_OK:
nremoved++;
INCR(process_metrics, sarray_remove_ok);

break;
case SARRAY_ENOTFOUND:
compose_rsp_noop(rsp, reply, cmd, key);
INCR(process_metrics, sarray_remove_noop);

break;
case SARRAY_EINVALID:
/* client error, bad argument */
compose_rsp_client_err(rsp, reply, cmd, key);
INCR(process_metrics, sarray_remove_ex);

return;
default:
compose_rsp_server_err(rsp, reply, cmd, key);
INCR(process_metrics, sarray_remove_ex);
NOT_REACHED();
return;
}
}

compose_rsp_numeric(rsp, reply, cmd, key, (int64_t)nremoved);
}

void
Expand Down
2 changes: 1 addition & 1 deletion src/server/rds/data/cmd_sarray.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#pragma once

/* name type description */
#define PROCESS_SARRAY_METRIC(ACTION) \
#define PROCESS_SARRAY_METRIC(ACTION) \
ACTION( sarray_create, METRIC_COUNTER, "# sarray create requests" )\
ACTION( sarray_create_exist, METRIC_COUNTER, "# sarray already exist" )\
ACTION( sarray_create_ok, METRIC_COUNTER, "# sarray stored" )\
Expand Down
10 changes: 10 additions & 0 deletions src/server/rds/data/shared.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,16 @@ compose_rsp_noop(struct response *rsp, struct element *reply,
cmd->bstr.len, cmd->bstr.data, key->len, key->data);
}

static inline void
compose_rsp_numeric(struct response *rsp, struct element *reply,
const struct command *cmd, const struct bstring *key, int64_t num)
{
rsp->type = reply->type = ELEM_INT;
reply->num = num;
log_verb("command '%.*s' '%.*s' succeeded, response value is %"PRIi64,
cmd->bstr.len, cmd->bstr.data, key->len, key->data, num);
}

static inline void
compose_rsp_client_err(struct response *rsp, struct element *reply,
const struct command *cmd, const struct bstring *key)
Expand Down

0 comments on commit 4c43da9

Please sign in to comment.