From 4c43da9cc0f38c30e994fe5df9c6b0bffaf54df9 Mon Sep 17 00:00:00 2001 From: Yao Yue Date: Sat, 10 Aug 2019 02:04:52 +0000 Subject: [PATCH] allow multiple values in both sarray_insert/remove (#241) * allow multiple values in both sarray_insert/remove * support multi-element get, fix bug --- src/protocol/data/resp/cmd_sarray.h | 2 +- src/server/rds/data/cmd_sarray.c | 208 +++++++++++++++++----------- src/server/rds/data/cmd_sarray.h | 2 +- src/server/rds/data/shared.h | 10 ++ 4 files changed, 139 insertions(+), 83 deletions(-) diff --git a/src/protocol/data/resp/cmd_sarray.h b/src/protocol/data/resp/cmd_sarray.h index 80453d8a2..e45572673 100644 --- a/src/protocol/data/resp/cmd_sarray.h +++ b/src/protocol/data/resp/cmd_sarray.h @@ -36,7 +36,7 @@ ACTION( REQ_SARRAY_FIND, "SArray.find", 3, 0 )\ ACTION( REQ_SARRAY_GET, "SArray.get", 2, 2 )\ ACTION( REQ_SARRAY_INSERT, "SArray.insert", 3, -1 )\ - ACTION( REQ_SARRAY_REMOVE, "SArray.remove", 3, 0 )\ + ACTION( REQ_SARRAY_REMOVE, "SArray.remove", 3, -1 )\ ACTION( REQ_SARRAY_TRUNCATE, "SArray.truncate", 3, 0 ) typedef enum sarray_elem { diff --git a/src/server/rds/data/cmd_sarray.c b/src/server/rds/data/cmd_sarray.c index e12d04dd3..c0eceec6b 100644 --- a/src/server/rds/data/cmd_sarray.c +++ b/src/server/rds/data/cmd_sarray.c @@ -10,6 +10,10 @@ #include #include +/* TODO(yao): make MAX_NVAL configurable */ +#define MAX_NVAL 255 /* max no. of values to insert/remove in one request */ +static uint64_t vals[MAX_NVAL]; + static inline struct item * _add_key(struct response *rsp, struct bstring *key) @@ -180,11 +184,7 @@ cmd_sarray_len(struct response *rsp, const struct request *req, } nentry = sarray_nentry((sarray_p)item_data(it)); - - rsp->type = reply->type = ELEM_INT; - reply->num = (int64_t)nentry; - log_verb("command '%.*s' '%.*s' succeeded, sarray length %u", - cmd->bstr.len, cmd->bstr.data, key->len, key->data, nentry); + compose_rsp_numeric(rsp, reply, cmd, key, (int64_t)nentry); } void @@ -257,11 +257,14 @@ cmd_sarray_get(struct response *rsp, const struct request *req, const struct struct element *reply = (struct element *)array_push(rsp->token); struct bstring *key; struct item *it; - int64_t idx; + int64_t idx = 0, cnt = 1; uint64_t val; + uint32_t narg, nentry, nreturned = 0; + int32_t incr; sarray_rstatus_e status; - ASSERT(array_nelem(req->token) == cmd->narg); + narg = array_nelem(req->token); + ASSERT(narg >= cmd->narg); INCR(process_metrics, sarray_get); @@ -271,12 +274,6 @@ cmd_sarray_get(struct response *rsp, const struct request *req, const struct return; } - if (!req_get_int(&idx, req, SARRAY_IDX)) { - compose_rsp_client_err(rsp, reply, cmd, key); - INCR(process_metrics, sarray_get_ex); - - return; - } it = item_get(key); if (it == NULL) { @@ -286,26 +283,55 @@ cmd_sarray_get(struct response *rsp, const struct request *req, const struct return; } - status = sarray_value(&val, (sarray_p)item_data(it), (uint32_t)idx); - switch (status) { - case SARRAY_OK: - rsp->type = reply->type = ELEM_INT; - reply->num = (int64_t)val; - log_verb("command '%.*s' '%.*s' succeeded, index %"PRIu32" has value " - PRIu64, cmd->bstr.len, cmd->bstr.data, key->len, key->data, idx, - val); - INCR(process_metrics, sarray_get_ok); + nentry = sarray_nentry((sarray_p)item_data(it)); - break; - case SARRAY_EOOB: - compose_rsp_oob(rsp, reply, cmd, key, idx); - INCR(process_metrics, sarray_get_oob); + if (narg > cmd->narg && !req_get_int(&idx, req, SARRAY_IDX)) { + compose_rsp_client_err(rsp, reply, cmd, key); + INCR(process_metrics, sarray_get_ex); - break; - default: - compose_rsp_server_err(rsp, reply, cmd, key); - NOT_REACHED(); + return; + } + + if (idx < 0) { + idx += nentry; + if (idx < 0) { + idx = 0; + } + } else { + if (idx > nentry) { + idx = nentry; + } + } + + if (narg > cmd->narg + 1 && !req_get_int(&cnt, req, SARRAY_ICNT)) { + compose_rsp_client_err(rsp, reply, cmd, key); + INCR(process_metrics, sarray_get_ex); + + return; + } + /* cnt < 0 means return in reverse order */ + if (cnt > 0) { + incr = 1; + nreturned = MIN(nentry - idx, cnt); + } else { + incr = -1; + nreturned = MIN(idx + 1, -cnt); } + + /* write the array header */ + rsp->type = ELEM_ARRAY; + for (; nreturned > 0; nreturned--, idx += incr) { + status = sarray_value(&val, (sarray_p)item_data(it), (uint32_t)idx); + ASSERT(status == SARRAY_OK); + reply->type = ELEM_INT; + reply->num = val; + reply = (struct element *)array_push(rsp->token); + } + array_pop(rsp->token); + + INCR(process_metrics, sarray_get_ok); + log_verb("command '%.*s' '%.*s' succeeded, returning %"PRIu32" elements", + cmd->bstr.len, cmd->bstr.data, key->len, key->data, array_nelem(rsp->token)); } void @@ -315,12 +341,12 @@ cmd_sarray_insert(struct response *rsp, const struct request *req, const struct struct element *reply = (struct element *)array_push(rsp->token); struct bstring *key; struct item *it; - uint32_t delta = 0; + uint32_t nval = 0, ninserted = 0, delta; int64_t val; sarray_p sa; sarray_rstatus_e status; - ASSERT(array_nelem(req->token) == cmd->narg); + ASSERT(array_nelem(req->token) >= cmd->narg); INCR(process_metrics, sarray_insert); @@ -330,12 +356,6 @@ cmd_sarray_insert(struct response *rsp, const struct request *req, const struct return; } - if (!req_get_int(&val, req, SARRAY_VAL)) { - compose_rsp_client_err(rsp, reply, cmd, key); - INCR(process_metrics, sarray_insert_ex); - - return; - } it = item_get(key); if (it == NULL) { @@ -345,7 +365,19 @@ cmd_sarray_insert(struct response *rsp, const struct request *req, const struct return; } - delta = sarray_esize((sarray_p)item_data(it)); + /* parse and store all values to be inserted in array vals */ + for (uint32_t i = SARRAY_VAL; i < array_nelem(req->token); ++i, ++nval) { + if (!req_get_int(&val, req, i)) { + compose_rsp_client_err(rsp, reply, cmd, key); + INCR(process_metrics, sarray_insert_ex); + + return; + } else { + vals[nval] = (uint64_t)val; + } + } + + delta = sarray_esize((sarray_p)item_data(it)) * nval; if (_realloc_key(&it, key, delta) != ITEM_OK) { compose_rsp_storage_err(rsp, reply, cmd, key); INCR(process_metrics, sarray_insert_ex); @@ -354,23 +386,24 @@ cmd_sarray_insert(struct response *rsp, const struct request *req, const struct } sa = (sarray_p)item_data(it); - status = sarray_insert(sa, (uint64_t)val); - - if (status == SARRAY_EINVALID) { - compose_rsp_client_err(rsp, reply, cmd, key); - INCR(process_metrics, sarray_insert_ex); - - return; - } - if (status == SARRAY_EDUP) { - compose_rsp_noop(rsp, reply, cmd, key); - INCR(process_metrics, sarray_insert_noop); + for (uint32_t i = 0; i < nval; ++i) { + status = sarray_insert(sa, vals[i]); + if (status == SARRAY_EINVALID) { + compose_rsp_client_err(rsp, reply, cmd, key); + INCR(process_metrics, sarray_insert_ex); + return; + } - return; + if (status == SARRAY_EDUP) { + compose_rsp_noop(rsp, reply, cmd, key); + INCR(process_metrics, sarray_insert_noop); + } else { + INCR(process_metrics, sarray_insert_ok); + ninserted++; + } } - compose_rsp_ok(rsp, reply, cmd, key); - INCR(process_metrics, sarray_insert_ok); + compose_rsp_numeric(rsp, reply, cmd, key, (int64_t)ninserted); } void @@ -380,6 +413,7 @@ cmd_sarray_remove(struct response *rsp, const struct request *req, const struct struct element *reply = (struct element *)array_push(rsp->token); struct bstring *key; struct item *it; + uint32_t nval = 0, nremoved = 0; int64_t val; sarray_p sa; sarray_rstatus_e status; @@ -394,12 +428,6 @@ cmd_sarray_remove(struct response *rsp, const struct request *req, const struct return; } - if (!req_get_int(&val, req, SARRAY_VAL)) { - compose_rsp_client_err(rsp, reply, cmd, key); - INCR(process_metrics, sarray_remove_ex); - - return; - } it = item_get(key); if (it == NULL) { @@ -409,31 +437,49 @@ cmd_sarray_remove(struct response *rsp, const struct request *req, const struct return; } - sa = (sarray_p)item_data(it); - status = sarray_remove(sa, val); + /* parse and store all values to be inserted in array vals */ + for (uint32_t i = SARRAY_VAL; i < array_nelem(req->token); ++i) { + if (!req_get_int(&val, req, i)) { + compose_rsp_client_err(rsp, reply, cmd, key); + INCR(process_metrics, sarray_remove_ex); - switch (status) { - case SARRAY_OK: - /* TODO: should we try to "fit" to a smaller item here? */ - compose_rsp_ok(rsp, reply, cmd, key); - INCR(process_metrics, sarray_remove_ok); - - break; - case SARRAY_ENOTFOUND: - compose_rsp_noop(rsp, reply, cmd, key); - INCR(process_metrics, sarray_remove_noop); - - break; - case SARRAY_EINVALID: - /* client error, bad argument */ - compose_rsp_client_err(rsp, reply, cmd, key); - INCR(process_metrics, sarray_remove_ex); + return; + } else { + vals[nval] = (uint64_t)val; + nval++; + } + } + /* TODO: should we try to "fit" to a smaller item here? */ - break; - default: - compose_rsp_server_err(rsp, reply, cmd, key); - NOT_REACHED(); + sa = (sarray_p)item_data(it); + for (uint32_t i = 0; i < nval; ++i) { + status = sarray_remove(sa, vals[i]); + switch (status) { + case SARRAY_OK: + nremoved++; + INCR(process_metrics, sarray_remove_ok); + + break; + case SARRAY_ENOTFOUND: + compose_rsp_noop(rsp, reply, cmd, key); + INCR(process_metrics, sarray_remove_noop); + + break; + case SARRAY_EINVALID: + /* client error, bad argument */ + compose_rsp_client_err(rsp, reply, cmd, key); + INCR(process_metrics, sarray_remove_ex); + + return; + default: + compose_rsp_server_err(rsp, reply, cmd, key); + INCR(process_metrics, sarray_remove_ex); + NOT_REACHED(); + return; + } } + + compose_rsp_numeric(rsp, reply, cmd, key, (int64_t)nremoved); } void diff --git a/src/server/rds/data/cmd_sarray.h b/src/server/rds/data/cmd_sarray.h index 0d8725b17..3d9b0a49e 100644 --- a/src/server/rds/data/cmd_sarray.h +++ b/src/server/rds/data/cmd_sarray.h @@ -1,7 +1,7 @@ #pragma once /* name type description */ -#define PROCESS_SARRAY_METRIC(ACTION) \ +#define PROCESS_SARRAY_METRIC(ACTION) \ ACTION( sarray_create, METRIC_COUNTER, "# sarray create requests" )\ ACTION( sarray_create_exist, METRIC_COUNTER, "# sarray already exist" )\ ACTION( sarray_create_ok, METRIC_COUNTER, "# sarray stored" )\ diff --git a/src/server/rds/data/shared.h b/src/server/rds/data/shared.h index 8f8095edb..5b28ccda6 100644 --- a/src/server/rds/data/shared.h +++ b/src/server/rds/data/shared.h @@ -52,6 +52,16 @@ compose_rsp_noop(struct response *rsp, struct element *reply, cmd->bstr.len, cmd->bstr.data, key->len, key->data); } +static inline void +compose_rsp_numeric(struct response *rsp, struct element *reply, + const struct command *cmd, const struct bstring *key, int64_t num) +{ + rsp->type = reply->type = ELEM_INT; + reply->num = num; + log_verb("command '%.*s' '%.*s' succeeded, response value is %"PRIi64, + cmd->bstr.len, cmd->bstr.data, key->len, key->data, num); +} + static inline void compose_rsp_client_err(struct response *rsp, struct element *reply, const struct command *cmd, const struct bstring *key)