Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP kvs-watch: do not load all KVS blobs at once #6532

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 80 additions & 50 deletions src/modules/kvs-watch/kvs-watch.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@
#include "src/common/libcontent/content.h"
#include "src/common/libutil/errprintf.h"

/* if the valref is very large, we won't load all immediately,
* load at max MAX_LOAD
*/
#define MAX_LOADS 32768

/* State for one watcher */
struct watcher {
const flux_msg_t *request; // request message
Expand All @@ -43,9 +48,11 @@

struct ns_monitor *nsm; // back pointer for removal
json_t *prev; // previous watch value for KVS_WATCH_FULL/UNIQ
bool index_valid; // flag if prev_start_index/prev_end_index set
int prev_start_index; // previous start index loaded
int prev_end_index; // previous end index loaded
bool index_valid; // flag if start_index/end_index set
int start_index; // start index of blobrefs to load
int end_index; // end index of blobrefs to load
int last_index_loaded; // last index sent
json_t *valref_treeobj; // valref for loads
int loaded_blob_count; // number of indices loaded (for FLUX_KVS_STREAM)
void *handle; // zlistx_t handle
};
Expand Down Expand Up @@ -84,6 +91,8 @@
zhash_t *namespaces; // hash of monitored namespaces
};

static int load_data (flux_t *h, struct watcher *w);

static void watcher_destroy (struct watcher *w)
{
if (w) {
Expand All @@ -103,6 +112,7 @@
zlist_destroy (&w->loads);
}
json_decref (w->prev);
json_decref (w->valref_treeobj);
free (w);
errno = saved_errno;
}
Expand All @@ -127,6 +137,7 @@
goto error_nomem;
w->flags = flags;
w->rootseq = -1;
w->last_index_loaded = -1;
return w;
error_nomem:
errno = ENOMEM;
Expand Down Expand Up @@ -311,11 +322,27 @@
&& !(w->flags & FLUX_KVS_STREAM)))
w->finished = true;
}
if (load_data (w->nsm->ctx->h, w) < 0) {
if (!w->mute) {
flux_error_t err;
errprintf (&err,

Check warning on line 328 in src/modules/kvs-watch/kvs-watch.c

View check run for this annotation

Codecov / codecov/patch

src/modules/kvs-watch/kvs-watch.c#L326-L328

Added lines #L326 - L328 were not covered by tests
"error sending request for content blobs [%d:%d]",
w->start_index,
w->end_index);
if (flux_respond_error (w->nsm->ctx->h,

Check warning on line 332 in src/modules/kvs-watch/kvs-watch.c

View check run for this annotation

Codecov / codecov/patch

src/modules/kvs-watch/kvs-watch.c#L332

Added line #L332 was not covered by tests
w->request,
errno,

Check warning on line 334 in src/modules/kvs-watch/kvs-watch.c

View check run for this annotation

Codecov / codecov/patch

src/modules/kvs-watch/kvs-watch.c#L334

Added line #L334 was not covered by tests
err.text) < 0)
flux_log_error (w->nsm->ctx->h,

Check warning on line 336 in src/modules/kvs-watch/kvs-watch.c

View check run for this annotation

Codecov / codecov/patch

src/modules/kvs-watch/kvs-watch.c#L336

Added line #L336 was not covered by tests
"%s: flux_respond_error",
__FUNCTION__);
}
return;

Check warning on line 340 in src/modules/kvs-watch/kvs-watch.c

View check run for this annotation

Codecov / codecov/patch

src/modules/kvs-watch/kvs-watch.c#L340

Added line #L340 was not covered by tests
}
if ((w->flags & FLUX_KVS_STREAM)
&& w->responded
&& w->index_valid
&& (w->loaded_blob_count
== (w->prev_end_index - w->prev_start_index + 1))) {
&& (w->loaded_blob_count == (w->end_index - w->start_index + 1))) {
if (!w->mute) {
if (flux_respond_error (w->nsm->ctx->h,
w->request,
Expand Down Expand Up @@ -350,21 +377,31 @@
return NULL;
}

static int load_range (flux_t *h,
struct watcher *w,
int start_index,
int end_index,
json_t *val)
static int load_data (flux_t *h, struct watcher *w)
{
int i;
int start_index, i;

assert (w->index_valid);
assert (w->valref_treeobj);

for (i = start_index; i <= end_index; i++) {
if (w->last_index_loaded >= w->end_index)
return 0;

if (w->start_index > w->last_index_loaded)
start_index = w->start_index;
else
start_index = w->last_index_loaded + 1;

for (i = start_index; i <= w->end_index; i++) {
flux_future_t *f;
const char *ref = treeobj_get_blobref (val, i);
const char *ref = treeobj_get_blobref (w->valref_treeobj, i);
if (!ref)
return -1;
if (!(f = load_ref (h, w, ref)))
return -1;
w->last_index_loaded = i;
if (zlist_size (w->loads) > MAX_LOADS)
break;
}
return 0;
}
Expand All @@ -391,16 +428,18 @@
*/
if (treeobj_is_val (val)) {
w->index_valid = true;
w->prev_start_index = 0;
w->prev_end_index = 0;
w->start_index = 0;
w->end_index = 0;
/* since this is a val object, we can just return it */
w->last_index_loaded = 0;
w->loaded_blob_count++;
goto out;
}
else if (treeobj_is_valref (val)) {
w->index_valid = true;
w->prev_start_index = 0;
w->prev_end_index = treeobj_get_count (val) - 1;
w->start_index = 0;
w->end_index = treeobj_get_count (val) - 1;
w->valref_treeobj = json_incref (val);
}
else {
if (w->flags & FLUX_KVS_WATCH_APPEND)
Expand All @@ -419,15 +458,11 @@
goto error_respond;
}

if (load_range (h,
w,
w->prev_start_index,
w->prev_end_index,
val) < 0) {
if (load_data (h, w) < 0) {
errprintf (&err,
"error sending request for content blobs [%d:%d]",
w->prev_start_index,
w->prev_end_index);
w->start_index,
w->end_index);
goto error_respond;
}

Expand Down Expand Up @@ -513,15 +548,16 @@
*/
if (treeobj_is_val (val)) {
w->index_valid = true;
w->prev_start_index = 0;
w->prev_end_index = 0;
w->start_index = 0;
w->end_index = 0;
/* since this is a val object, we can just return it */
if (flux_respond_pack (h, w->request, "{ s:O }", "val", val) < 0) {
flux_log_error (h,
"%s: failed to respond to kvs-watch.lookup",
__FUNCTION__);
goto error_out;
}
w->last_index_loaded = 0;
w->loaded_blob_count++;
w->responded = true;
}
Expand All @@ -537,11 +573,12 @@
if (w->flags & FLUX_KVS_STREAM)
goto out;
new_end_index = treeobj_get_count (val) - 1;
if (new_end_index > w->prev_end_index) {
w->prev_start_index = w->prev_end_index + 1;
w->prev_end_index = new_end_index;
if (new_end_index > w->end_index) {
w->end_index = new_end_index;
json_decref (w->valref_treeobj);
w->valref_treeobj = json_incref (val);
}
else if (new_end_index < w->prev_end_index) {
else if (new_end_index < w->end_index) {

Check warning on line 581 in src/modules/kvs-watch/kvs-watch.c

View check run for this annotation

Codecov / codecov/patch

src/modules/kvs-watch/kvs-watch.c#L581

Added line #L581 was not covered by tests
errprintf (&err, "key watched with WATCH_APPEND truncated");
errno = EINVAL;
goto error_respond;
Expand All @@ -551,19 +588,16 @@
}
else {
w->index_valid = true;
w->prev_start_index = 0;
w->prev_end_index = treeobj_get_count (val) - 1;
w->start_index = 0;
w->end_index = treeobj_get_count (val) - 1;
w->valref_treeobj = json_incref (val);
}

if (load_range (h,
w,
w->prev_start_index,
w->prev_end_index,
val) < 0) {
if (load_data (h, w) < 0) {
errprintf (&err,
"error sending request for content blobs [%d:%d]",
w->prev_start_index,
w->prev_end_index);
w->start_index,
w->end_index);
goto error_respond;
}
}
Expand Down Expand Up @@ -594,23 +628,20 @@
if (w->flags & FLUX_KVS_STREAM)
goto out;
new_end_index = treeobj_get_count (val) - 1;
if (new_end_index > w->prev_end_index) {
w->prev_start_index = w->prev_end_index + 1;
w->prev_end_index = new_end_index;
if (new_end_index > w->end_index) {
w->end_index = new_end_index;
json_decref (w->valref_treeobj);
w->valref_treeobj = json_incref (val);
}
else if (new_end_index < w->prev_end_index) {
else if (new_end_index < w->end_index) {
errprintf (&err, "key watched with WATCH_APPEND shortened");
errno = EINVAL;
goto error_respond;
}
else
goto out;

if (load_range (h,
w,
w->prev_start_index,
w->prev_end_index,
val) < 0) {
if (load_data (h, w) < 0) {
errprintf (&err, "error loading reference");
goto error_respond;
}
Expand Down Expand Up @@ -768,8 +799,7 @@
if ((w->flags & FLUX_KVS_STREAM)
&& w->responded
&& w->index_valid
&& (w->loaded_blob_count
== (w->prev_end_index - w->prev_start_index + 1))) {
&& (w->loaded_blob_count == (w->end_index - w->start_index + 1))) {
errno = ENODATA;
goto error;
}
Expand Down
Loading