Skip to content

Commit

Permalink
refactor slab module to generalize dataflag into metadata storage (tw…
Browse files Browse the repository at this point in the history
…itter#168)

* refactor slab module to generalize dataflag into metadata storage

* remembered how metadata as a word is misused/over-used...
  • Loading branch information
thinkingfish authored May 20, 2018
1 parent 2e8c7b6 commit 0508bf0
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 80 deletions.
2 changes: 2 additions & 0 deletions src/protocol/data/memcache/constant.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,5 @@
#define MAX_KEY_LEN 250
#define MAX_TOKEN_LEN 256
#define MAX_BATCH_SIZE 100

#define DATAFLAG_SIZE 4
25 changes: 20 additions & 5 deletions src/server/twemcache/data/process.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#define CMD_ERR_MSG "command not supported"
#define OTHER_ERR_MSG "unknown server error"


typedef enum put_rstatus {
PUT_OK,
PUT_PARTIAL,
Expand Down Expand Up @@ -57,6 +58,17 @@ process_teardown(void)
process_init = false;
}

static inline uint32_t
_get_dataflag(struct item *it)
{
return *((uint32_t *)item_optional(it));
}

static inline void
_set_dataflag(struct item *it, uint32_t flag)
{
*((uint32_t *)item_optional(it)) = flag;
}

static bool
_get_key(struct response *rsp, struct bstring *key)
Expand All @@ -67,7 +79,7 @@ _get_key(struct response *rsp, struct bstring *key)
if (it != NULL) {
rsp->type = RSP_VALUE;
rsp->key = *key;
rsp->flag = item_flag(it);
rsp->flag = _get_dataflag(it);
rsp->vcas = item_get_cas(it);
rsp->vstr.len = it->vlen;
rsp->vstr.data = item_data(it);
Expand Down Expand Up @@ -200,7 +212,7 @@ _put(item_rstatus_t *istatus, struct request *req)
*istatus = ITEM_OK;
if (req->first) { /* self-contained req */
struct bstring *key = array_first(req->keys);
*istatus = item_reserve(&it, key, &req->vstr, req->vlen, req->flag,
*istatus = item_reserve(&it, key, &req->vstr, req->vlen, DATAFLAG_SIZE,
time_reltime(req->expiry));
req->first = false;
req->reserved = it;
Expand All @@ -217,6 +229,8 @@ _put(item_rstatus_t *istatus, struct request *req)
if (status == PUT_ERROR) {
req->swallow = true;
req->serror = true;
} else {
_set_dataflag(it, req->flag);
}

return status;
Expand Down Expand Up @@ -403,15 +417,16 @@ _process_delta(struct response *rsp, struct item *it, struct request *req,
rsp->vint = vint;
nval.len = cc_print_uint64_unsafe(buf, vint);
nval.data = buf;
if (item_slabid(it->klen, nval.len) == it->id) {
if (item_slabid(it->klen, nval.len, it->olen) == it->id) {
item_update(it, &nval);
return ITEM_OK;
}

dataflag = it->dataflag;
status = item_reserve(&it, key, &nval, nval.len, dataflag,
dataflag = _get_dataflag(it);
status = item_reserve(&it, key, &nval, nval.len, DATAFLAG_SIZE,
it->expire_at);
if (status == ITEM_OK) {
_set_dataflag(it, dataflag);
item_insert(it, key);
}

Expand Down
37 changes: 20 additions & 17 deletions src/storage/slab/item.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ _item_expired(struct item *it)
static inline void
_copy_key_item(struct item *nit, struct item *oit)
{
nit->olen = oit->olen;
cc_memcpy(item_key(nit), item_key(oit), oit->klen);
nit->klen = oit->klen;
}
Expand All @@ -41,8 +42,8 @@ _item_reset(struct item *it)
it->in_freeq = 0;
it->is_raligned = 0;
it->vlen = 0;
it->dataflag = 0;
it->klen = 0;
it->olen = 0;
it->expire_at = 0;
it->create_at = 0;
}
Expand All @@ -54,9 +55,9 @@ _item_reset(struct item *it)
* On success we return the pointer to the allocated item.
*/
static item_rstatus_t
_item_alloc(struct item **it_p, uint8_t klen, uint32_t vlen)
_item_alloc(struct item **it_p, uint8_t klen, uint32_t vlen, uint8_t olen)
{
uint8_t id = slab_id(item_ntotal(klen, vlen));
uint8_t id = slab_id(item_ntotal(klen, vlen, olen));
struct item *it;

log_verb("allocate item with klen %u vlen %u", klen, vlen);
Expand Down Expand Up @@ -120,6 +121,7 @@ _item_link(struct item *it)

INCR(slab_metrics, item_linked_curr);
INCR(slab_metrics, item_link);
/* TODO(yao): how do we track optional storage? Separate or treat as val? */
INCR_N(slab_metrics, item_keyval_byte, it->klen + it->vlen);
INCR_N(slab_metrics, item_val_byte, it->vlen);
PERSLAB_INCR_N(it->id, item_keyval_byte, it->klen + it->vlen);
Expand Down Expand Up @@ -193,35 +195,37 @@ item_get(const struct bstring *key)

/* TODO(yao): move this to memcache-specific location */
static void
_item_define(struct item *it, const struct bstring *key, const struct bstring *val, uint32_t dataflag, rel_time_t expire_at)
_item_define(struct item *it, const struct bstring *key, const struct bstring
*val, uint8_t olen, rel_time_t expire_at)
{
it->create_at = time_now();
it->expire_at = expire_at;
it->dataflag = dataflag;
item_set_cas(it);
it->olen = olen;
cc_memcpy(item_key(it), key->data, key->len);
it->klen = key->len;
cc_memcpy(item_data(it), val->data, val->len);
it->vlen = val->len;
}

item_rstatus_t
item_reserve(struct item **it_p, const struct bstring *key, const struct bstring *val, uint32_t vlen, uint32_t dataflag, rel_time_t expire_at)
item_reserve(struct item **it_p, const struct bstring *key, const struct bstring
*val, uint32_t vlen, uint8_t olen, rel_time_t expire_at)
{
item_rstatus_t status;
struct item *it;

if ((status = _item_alloc(it_p, key->len, vlen)) != ITEM_OK) {
if ((status = _item_alloc(it_p, key->len, vlen, olen)) != ITEM_OK) {
log_debug("item reservation failed");
return status;
}

it = *it_p;

_item_define(it, key, val, dataflag, expire_at);
_item_define(it, key, val, olen, expire_at);

log_verb("reserve it %p of id %"PRIu8" for key '%.*s' dataflag %u", it,
it->id, key->len, key->data, it->dataflag);
log_verb("reserve it %p of id %"PRIu8" for key '%.*s' optional len %"PRIu8,
it, it->id,key->len, key->data, olen);

return ITEM_OK;
}
Expand All @@ -246,14 +250,15 @@ item_backfill(struct item *it, const struct bstring *val)
}

item_rstatus_t
item_annex(struct item *oit, const struct bstring *key, const struct bstring *val, bool append)
item_annex(struct item *oit, const struct bstring *key, const struct bstring
*val, bool append)
{
item_rstatus_t status = ITEM_OK;
struct item *nit = NULL;
uint8_t id;
uint32_t ntotal = oit->vlen + val->len;

id = item_slabid(oit->klen, ntotal);
id = item_slabid(oit->klen, ntotal, oit->olen);
if (id == SLABCLASS_INVALID_ID) {
log_info("client error: annex operation results in oversized item with"
"key size %"PRIu8" old value size %"PRIu32" and new value "
Expand All @@ -275,15 +280,14 @@ item_annex(struct item *oit, const struct bstring *key, const struct bstring *va
INCR_N(slab_metrics, item_val_byte, val->len);
item_set_cas(oit);
} else {
status = _item_alloc(&nit, oit->klen, ntotal);
status = _item_alloc(&nit, oit->klen, ntotal, oit->olen);
if (status != ITEM_OK) {
log_debug("annex failed due to failure to allocate new item");
return status;
}
_copy_key_item(nit, oit);
nit->expire_at = oit->expire_at;
nit->create_at = time_now();
nit->dataflag = oit->dataflag;
item_set_cas(nit);
/* value is left-aligned */
cc_memcpy(item_data(nit), item_data(oit), oit->vlen);
Expand All @@ -304,15 +308,14 @@ item_annex(struct item *oit, const struct bstring *key, const struct bstring *va
INCR_N(slab_metrics, item_val_byte, val->len);
item_set_cas(oit);
} else {
status = _item_alloc(&nit, oit->klen, ntotal);
status = _item_alloc(&nit, oit->klen, ntotal, oit->olen);
if (status != ITEM_OK) {
log_debug("annex failed due to failure to allocate new item");
return status;
}
_copy_key_item(nit, oit);
nit->expire_at = oit->expire_at;
nit->create_at = time_now();
nit->dataflag = oit->dataflag;
item_set_cas(nit);
/* value is right-aligned */
nit->is_raligned = 1;
Expand All @@ -332,7 +335,7 @@ item_annex(struct item *oit, const struct bstring *key, const struct bstring *va
void
item_update(struct item *it, const struct bstring *val)
{
ASSERT(item_slabid(it->klen, val->len) == it->id);
ASSERT(item_slabid(it->klen, val->len, it->olen) == it->id);

it->vlen = val->len;
cc_memcpy(item_data(it), val->data, val->len);
Expand Down
50 changes: 29 additions & 21 deletions src/storage/slab/item.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
* | | \
* \ | item_key()
* item \
* item->end, (if enabled) item_get_cas()
* item->end, (if enabled) item_get_cas(), metadata
*
* item->end is followed by:
* - 8-byte cas, if ITEM_CAS flag is set
Expand All @@ -57,18 +57,20 @@ struct item {
uint32_t is_linked:1; /* item in hash */
uint32_t in_freeq:1; /* item in free queue */
uint32_t is_raligned:1; /* item data (payload) is right-aligned */
uint32_t vlen:29; /* data size (29 bits since uint32_t is 32 bits and we have 3 flags)
NOTE: need at least enough bits to support the largest value size allowed
by the implementation, i.e. SLAB_MAX_SIZE */
uint32_t vlen:29; /* data size (29 bits since uint32_t is 32
* bits and we have 3 flags)
* NOTE: need at least enough bits to
* support the largest value size allowed
* by the implementation, i.e. SLAB_MAX_SIZE
*/

uint32_t offset; /* offset of item in slab */
uint32_t dataflag; /* data flags opaque to the server */
uint8_t id; /* slab class id */
uint8_t klen; /* key length */
uint16_t padding; /* keep end 64-bit aligned, it may be a cas */
uint8_t olen; /* optional length (right after cas) */
uint8_t padding; /* keep end 64-bit aligned */
char end[1]; /* item data */
};
/* TODO(yao): dataflag is memcached-specific, can we abstract it out of storage? */

#define ITEM_MAGIC 0xfeedface
#define ITEM_HDR_SIZE offsetof(struct item, end)
Expand All @@ -89,12 +91,6 @@ typedef enum item_rstatus {
extern bool use_cas;
extern uint64_t cas_id;

static inline uint32_t
item_flag(struct item *it)
{
return it->dataflag;
}

static inline uint64_t
item_get_cas(struct item *it)
{
Expand Down Expand Up @@ -124,21 +120,27 @@ item_cas_size(void)
static inline char *
item_key(struct item *it)
{
return it->end + item_cas_size();
return it->end + item_cas_size() + it->olen;
}

static inline size_t
item_ntotal(uint8_t klen, uint32_t vlen)
item_ntotal(uint8_t klen, uint32_t vlen, uint8_t olen)
{
return ITEM_HDR_SIZE + item_cas_size() + klen + vlen;
return ITEM_HDR_SIZE + item_cas_size() + olen + klen + vlen;
}

static inline size_t
item_size(struct item *it)
{
ASSERT(it->magic == ITEM_MAGIC);

return item_ntotal(it->klen, it->vlen);
return item_ntotal(it->klen, it->vlen, it->olen);
}

static inline char *
item_optional(struct item *it)
{
return it->end + item_cas_size();
}

/*
Expand All @@ -152,7 +154,7 @@ item_data(struct item *it)
if (it->is_raligned) {
data = (char *)it + slabclass[it->id].size - it->vlen;
} else {
data = it->end + item_cas_size() + it->klen;
data = it->end + item_cas_size() + it->olen + it->klen;
}

return data;
Expand Down Expand Up @@ -185,15 +187,21 @@ struct item *item_get(const struct bstring *key);
/* insert an item, removes existing item of the same key (if applicable) */
void item_insert(struct item *it, const struct bstring *key);

/* reserve an item, this does not link it or remove existing item with the same key */
item_rstatus_t item_reserve(struct item **it_p, const struct bstring *key, const struct bstring *val, uint32_t vlen, uint32_t dataflag, rel_time_t expire_at);
/* reserve an item, this does not link it or remove existing item with the same
* key.
* olen- optional data length, this can be used to reserve space for optional
* data, e.g. flag in Memcached protocol) in payload, after cas.
* */
item_rstatus_t item_reserve(struct item **it_p, const struct bstring *key, const
struct bstring *val, uint32_t vlen, uint8_t olen, rel_time_t expire_at);
/* item_release is used for reserved item only (not linked) */
void item_release(struct item **it_p);

void item_backfill(struct item *it, const struct bstring *val);

/* Append/prepend */
item_rstatus_t item_annex(struct item *it, const struct bstring *key, const struct bstring *val, bool append);
item_rstatus_t item_annex(struct item *it, const struct bstring *key, const
struct bstring *val, bool append);


/* In place item update (replace item value) */
Expand Down
4 changes: 2 additions & 2 deletions src/storage/slab/slab.h
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,9 @@ uint8_t slab_id(size_t size);

/* Calculate slab id that will accommodate item with given key/val lengths */
static inline uint8_t
item_slabid(uint8_t klen, uint32_t vlen)
item_slabid(uint8_t klen, uint32_t vlen, uint8_t mlen)
{
return slab_id(item_ntotal(klen, vlen));
return slab_id(item_ntotal(klen, vlen, mlen));
}

void slab_setup(slab_options_st *options, slab_metrics_st *metrics);
Expand Down
Loading

0 comments on commit 0508bf0

Please sign in to comment.