diff --git a/src/modules/kvs-watch/kvs-watch.c b/src/modules/kvs-watch/kvs-watch.c index 8e4e035439e7..ecee3670c1af 100644 --- a/src/modules/kvs-watch/kvs-watch.c +++ b/src/modules/kvs-watch/kvs-watch.c @@ -24,6 +24,11 @@ #include "src/common/libcontent/content.h" #include "src/common/libutil/errprintf.h" +/* if the valref is very large, we won't load all immediately, + * load at max MAX_LOAD + */ +#define MAX_LOADS 32768 + /* State for one watcher */ struct watcher { const flux_msg_t *request; // request message @@ -43,9 +48,11 @@ struct watcher { struct ns_monitor *nsm; // back pointer for removal json_t *prev; // previous watch value for KVS_WATCH_FULL/UNIQ - bool index_valid; // flag if prev_start_index/prev_end_index set - int prev_start_index; // previous start index loaded - int prev_end_index; // previous end index loaded + bool index_valid; // flag if start_index/end_index set + int start_index; // start index of blobrefs to load + int end_index; // end index of blobrefs to load + int last_index_loaded; // last index sent + json_t *valref_treeobj; // valref for loads int loaded_blob_count; // number of indices loaded (for FLUX_KVS_STREAM) void *handle; // zlistx_t handle }; @@ -84,6 +91,8 @@ struct watch_ctx { zhash_t *namespaces; // hash of monitored namespaces }; +static int load_data (flux_t *h, struct watcher *w); + static void watcher_destroy (struct watcher *w) { if (w) { @@ -103,6 +112,7 @@ static void watcher_destroy (struct watcher *w) zlist_destroy (&w->loads); } json_decref (w->prev); + json_decref (w->valref_treeobj); free (w); errno = saved_errno; } @@ -127,6 +137,7 @@ static struct watcher *watcher_create (const flux_msg_t *msg, goto error_nomem; w->flags = flags; w->rootseq = -1; + w->last_index_loaded = -1; return w; error_nomem: errno = ENOMEM; @@ -311,11 +322,27 @@ static void load_continuation (flux_future_t *f, void *arg) && !(w->flags & FLUX_KVS_STREAM))) w->finished = true; } + if (load_data (w->nsm->ctx->h, w) < 0) { + if (!w->mute) { + flux_error_t err; + errprintf (&err, + "error sending request for content blobs [%d:%d]", + w->start_index, + w->end_index); + if (flux_respond_error (w->nsm->ctx->h, + w->request, + errno, + err.text) < 0) + flux_log_error (w->nsm->ctx->h, + "%s: flux_respond_error", + __FUNCTION__); + } + return; + } if ((w->flags & FLUX_KVS_STREAM) && w->responded && w->index_valid - && (w->loaded_blob_count - == (w->prev_end_index - w->prev_start_index + 1))) { + && (w->loaded_blob_count == (w->end_index - w->start_index + 1))) { if (!w->mute) { if (flux_respond_error (w->nsm->ctx->h, w->request, @@ -350,21 +377,31 @@ static flux_future_t *load_ref (flux_t *h, struct watcher *w, const char *ref) return NULL; } -static int load_range (flux_t *h, - struct watcher *w, - int start_index, - int end_index, - json_t *val) +static int load_data (flux_t *h, struct watcher *w) { - int i; + int start_index, i; + + assert (w->index_valid); + assert (w->valref_treeobj); - for (i = start_index; i <= end_index; i++) { + if (w->last_index_loaded >= w->end_index) + return 0; + + if (w->start_index > w->last_index_loaded) + start_index = w->start_index; + else + start_index = w->last_index_loaded + 1; + + for (i = start_index; i <= w->end_index; i++) { flux_future_t *f; - const char *ref = treeobj_get_blobref (val, i); + const char *ref = treeobj_get_blobref (w->valref_treeobj, i); if (!ref) return -1; if (!(f = load_ref (h, w, ref))) return -1; + w->last_index_loaded = i; + if (zlist_size (w->loads) > MAX_LOADS) + break; } return 0; } @@ -391,16 +428,18 @@ static int handle_initial_response (flux_t *h, */ if (treeobj_is_val (val)) { w->index_valid = true; - w->prev_start_index = 0; - w->prev_end_index = 0; + w->start_index = 0; + w->end_index = 0; /* since this is a val object, we can just return it */ + w->last_index_loaded = 0; w->loaded_blob_count++; goto out; } else if (treeobj_is_valref (val)) { w->index_valid = true; - w->prev_start_index = 0; - w->prev_end_index = treeobj_get_count (val) - 1; + w->start_index = 0; + w->end_index = treeobj_get_count (val) - 1; + w->valref_treeobj = json_incref (val); } else { if (w->flags & FLUX_KVS_WATCH_APPEND) @@ -419,15 +458,11 @@ static int handle_initial_response (flux_t *h, goto error_respond; } - if (load_range (h, - w, - w->prev_start_index, - w->prev_end_index, - val) < 0) { + if (load_data (h, w) < 0) { errprintf (&err, "error sending request for content blobs [%d:%d]", - w->prev_start_index, - w->prev_end_index); + w->start_index, + w->end_index); goto error_respond; } @@ -513,8 +548,8 @@ static int handle_append_response (flux_t *h, */ if (treeobj_is_val (val)) { w->index_valid = true; - w->prev_start_index = 0; - w->prev_end_index = 0; + w->start_index = 0; + w->end_index = 0; /* since this is a val object, we can just return it */ if (flux_respond_pack (h, w->request, "{ s:O }", "val", val) < 0) { flux_log_error (h, @@ -522,6 +557,7 @@ static int handle_append_response (flux_t *h, __FUNCTION__); goto error_out; } + w->last_index_loaded = 0; w->loaded_blob_count++; w->responded = true; } @@ -537,11 +573,12 @@ static int handle_append_response (flux_t *h, if (w->flags & FLUX_KVS_STREAM) goto out; new_end_index = treeobj_get_count (val) - 1; - if (new_end_index > w->prev_end_index) { - w->prev_start_index = w->prev_end_index + 1; - w->prev_end_index = new_end_index; + if (new_end_index > w->end_index) { + w->end_index = new_end_index; + json_decref (w->valref_treeobj); + w->valref_treeobj = json_incref (val); } - else if (new_end_index < w->prev_end_index) { + else if (new_end_index < w->end_index) { errprintf (&err, "key watched with WATCH_APPEND truncated"); errno = EINVAL; goto error_respond; @@ -551,19 +588,16 @@ static int handle_append_response (flux_t *h, } else { w->index_valid = true; - w->prev_start_index = 0; - w->prev_end_index = treeobj_get_count (val) - 1; + w->start_index = 0; + w->end_index = treeobj_get_count (val) - 1; + w->valref_treeobj = json_incref (val); } - if (load_range (h, - w, - w->prev_start_index, - w->prev_end_index, - val) < 0) { + if (load_data (h, w) < 0) { errprintf (&err, "error sending request for content blobs [%d:%d]", - w->prev_start_index, - w->prev_end_index); + w->start_index, + w->end_index); goto error_respond; } } @@ -594,11 +628,12 @@ static int handle_append_response (flux_t *h, if (w->flags & FLUX_KVS_STREAM) goto out; new_end_index = treeobj_get_count (val) - 1; - if (new_end_index > w->prev_end_index) { - w->prev_start_index = w->prev_end_index + 1; - w->prev_end_index = new_end_index; + if (new_end_index > w->end_index) { + w->end_index = new_end_index; + json_decref (w->valref_treeobj); + w->valref_treeobj = json_incref (val); } - else if (new_end_index < w->prev_end_index) { + else if (new_end_index < w->end_index) { errprintf (&err, "key watched with WATCH_APPEND shortened"); errno = EINVAL; goto error_respond; @@ -606,11 +641,7 @@ static int handle_append_response (flux_t *h, else goto out; - if (load_range (h, - w, - w->prev_start_index, - w->prev_end_index, - val) < 0) { + if (load_data (h, w) < 0) { errprintf (&err, "error loading reference"); goto error_respond; } @@ -768,8 +799,7 @@ static void handle_lookup_response (flux_future_t *f, if ((w->flags & FLUX_KVS_STREAM) && w->responded && w->index_valid - && (w->loaded_blob_count - == (w->prev_end_index - w->prev_start_index + 1))) { + && (w->loaded_blob_count == (w->end_index - w->start_index + 1))) { errno = ENODATA; goto error; }