Skip to content

Commit

Permalink
modules/kvs: Refactor load() logic
Browse files Browse the repository at this point in the history
The load() function may load data from the KVS cache or it may
issue a rpc to retrieve data from the broker content cache.

The load() function's dependency on both the cache and rpcs makes
it difficult to splice functions such as lookup() and walk() into
separate APIs.

This patch removes the call of load() from the lookup() and
walk() functions.  In lookup() and walk(), data is only looked up
in the KVS cache.  If the data is not available, a reference to the
missing data is propogated all the way backer to the original
caller.   It is then the original caller's responsibility to load
the data from the content store.

A new convenience function of load_from_cache() has been added
for the purpose of reading data only from the KVS cache.
  • Loading branch information
chu11 committed May 18, 2017
1 parent 9591f2c commit 93bc6b8
Showing 1 changed file with 43 additions and 12 deletions.
55 changes: 43 additions & 12 deletions src/modules/kvs/kvs.c
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,21 @@ static int content_load_request_send (kvs_ctx_t *ctx, const href_t ref, bool now
return -1;
}

/* return true if entry cached and valid and load successful */
static bool load_from_cache (kvs_ctx_t *ctx,
const href_t ref,
json_object **op)
{
struct cache_entry *hp = cache_lookup (ctx->cache, ref, ctx->epoch);

if (hp && cache_entry_get_valid (hp)) {
if (op)
*op = cache_entry_get_json (hp);
return true;
}
return false;
}

/* Return true if load successful, false if stalling */
static bool load (kvs_ctx_t *ctx, const href_t ref, wait_t *wait, json_object **op)
{
Expand Down Expand Up @@ -732,8 +747,8 @@ static void heartbeat_cb (flux_t *h, flux_msg_handler_t *w,
/* Get dirent containing requested key.
*/
static bool walk (kvs_ctx_t *ctx, json_object *root, const char *path,
json_object **direntp, wait_t *wait, int flags, int depth,
int *ep)
json_object **direntp, int flags, int depth,
const char **load_ref, int *ep)
{
char *cpy = xstrdup (path);
char *next, *name = cpy;
Expand All @@ -756,7 +771,7 @@ static bool walk (kvs_ctx_t *ctx, json_object *root, const char *path,
errnum = ELOOP;
goto error;
}
if (!walk (ctx, root, link, &dirent, wait, flags, depth, ep))
if (!walk (ctx, root, link, &dirent, flags, depth, load_ref, ep))
goto stall;
if (*ep != 0) {
errnum = *ep;
Expand All @@ -767,8 +782,10 @@ static bool walk (kvs_ctx_t *ctx, json_object *root, const char *path,
goto error;
}
if (Jget_str (dirent, "DIRREF", &ref)) {
if (!load (ctx, ref, wait, &dir))
if (!load_from_cache (ctx, ref, &dir)) {
*load_ref = ref;
goto stall;
}

} else if (json_object_object_get_ex (dirent, "DIRVAL", NULL)) {
/* N.B. in current code, directories are never stored by value */
Expand All @@ -792,7 +809,7 @@ static bool walk (kvs_ctx_t *ctx, json_object *root, const char *path,
errnum = ELOOP;
goto error;
}
if (!walk (ctx, root, link, &dirent, wait, flags, depth, ep))
if (!walk (ctx, root, link, &dirent, flags, depth, load_ref, ep))
goto stall;
if (*ep != 0) {
errnum = *ep;
Expand All @@ -814,9 +831,9 @@ static bool walk (kvs_ctx_t *ctx, json_object *root, const char *path,
return false;
}

static bool lookup (kvs_ctx_t *ctx, json_object *root, wait_t *wait,
static bool lookup (kvs_ctx_t *ctx, json_object *root,
int flags, const char *name,
json_object **valp, int *ep)
json_object **valp, const char **load_ref, int *ep)
{
json_object *vp, *dirent, *val = NULL;
int walk_errnum = 0;
Expand All @@ -835,7 +852,7 @@ static bool lookup (kvs_ctx_t *ctx, json_object *root, wait_t *wait,
val = json_object_get (root);
}
} else {
if (!walk (ctx, root, name, &dirent, wait, flags, 0, &walk_errnum))
if (!walk (ctx, root, name, &dirent, flags, 0, load_ref, &walk_errnum))
goto stall;
if (walk_errnum != 0) {
errnum = walk_errnum;
Expand All @@ -858,8 +875,10 @@ static bool lookup (kvs_ctx_t *ctx, json_object *root, wait_t *wait,
errnum = EISDIR;
goto done;
}
if (!load (ctx, json_object_get_string (vp), wait, &val))
if (!load_from_cache (ctx, json_object_get_string (vp), &val)) {
*load_ref = json_object_get_string (vp);
goto stall;
}
val = copydir (val);
} else if (json_object_object_get_ex (dirent, "FILEREF", &vp)) {
if ((flags & KVS_PROTO_READLINK)) {
Expand All @@ -870,8 +889,10 @@ static bool lookup (kvs_ctx_t *ctx, json_object *root, wait_t *wait,
errnum = ENOTDIR;
goto done;
}
if (!load (ctx, json_object_get_string (vp), wait, &val))
if (!load_from_cache (ctx, json_object_get_string (vp), &val)) {
*load_ref = json_object_get_string (vp);
goto stall;
}
val = json_object_get (val);
} else if (json_object_object_get_ex (dirent, "DIRVAL", &vp)) {
if ((flags & KVS_PROTO_READLINK)) {
Expand Down Expand Up @@ -927,6 +948,7 @@ static void get_request_cb (flux_t *h, flux_msg_handler_t *w,
json_object *root_dirent = NULL;
json_object *tmp_dirent = NULL;
const char *root_ref = ctx->rootdir;
const char *load_ref = NULL;
wait_t *wait = NULL;
int lookup_errnum = 0;
int rc = -1;
Expand Down Expand Up @@ -954,8 +976,12 @@ static void get_request_cb (flux_t *h, flux_msg_handler_t *w,
}
if (!load (ctx, root_ref, wait, &root))
goto stall;
if (!lookup (ctx, root, wait, flags, key, &val, &lookup_errnum))
if (!lookup (ctx, root, flags, key, &val, &load_ref, &lookup_errnum)) {
assert (load_ref);
if (load (ctx, load_ref, wait, NULL))
log_msg_exit ("%s: failure in load logic", __FUNCTION__);
goto stall;
}
if (lookup_errnum != 0) {
errno = lookup_errnum;
goto done;
Expand Down Expand Up @@ -1001,6 +1027,7 @@ static void watch_request_cb (flux_t *h, flux_msg_handler_t *w,
const char *key;
int flags;
int lookup_errnum = 0;
const char *load_ref = NULL;
wait_t *wait = NULL;
wait_t *watcher = NULL;
int rc = -1;
Expand All @@ -1017,8 +1044,12 @@ static void watch_request_cb (flux_t *h, flux_msg_handler_t *w,
goto done;
if (!load (ctx, ctx->rootdir, wait, &root))
goto stall;
if (!lookup (ctx, root, wait, flags, key, &val, &lookup_errnum))
if (!lookup (ctx, root, flags, key, &val, &load_ref, &lookup_errnum)) {
assert (load_ref);
if (load (ctx, load_ref, wait, NULL))
log_msg_exit ("%s: failure in load logic", __FUNCTION__);
goto stall;
}
if (lookup_errnum) {
errno = lookup_errnum;
goto done;
Expand Down

0 comments on commit 93bc6b8

Please sign in to comment.