Skip to content

Commit

Permalink
Merge pull request #1349 from chu11/issue1344-part3
Browse files Browse the repository at this point in the history
kvs: have two request handlers for commits & fences
  • Loading branch information
garlick authored Mar 6, 2018
2 parents 3e73946 + 864285d commit 4d30efa
Show file tree
Hide file tree
Showing 7 changed files with 305 additions and 111 deletions.
47 changes: 42 additions & 5 deletions src/common/libkvs/kvs_commit.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,15 @@ flux_future_t *flux_kvs_fence (flux_t *h, int flags, const char *name,
errno = EINVAL;
return NULL;
}
return flux_rpc_pack (h, "kvs.commit", FLUX_NODEID_ANY, 0,
return flux_rpc_pack (h, "kvs.fence", FLUX_NODEID_ANY, 0,
"{s:s s:i s:s s:i s:O}",
"name", name,
"nprocs", nprocs,
"namespace", namespace,
"flags", flags,
"ops", ops);
} else {
return flux_rpc_pack (h, "kvs.commit", FLUX_NODEID_ANY, 0,
return flux_rpc_pack (h, "kvs.fence", FLUX_NODEID_ANY, 0,
"{s:s s:i s:s s:i s:[]}",
"name", name,
"nprocs", nprocs,
Expand All @@ -71,14 +71,51 @@ flux_future_t *flux_kvs_fence (flux_t *h, int flags, const char *name,

flux_future_t *flux_kvs_commit (flux_t *h, int flags, flux_kvs_txn_t *txn)
{
zuuid_t *uuid;
zuuid_t *uuid = NULL;
const char *namespace;
const char *name;
flux_future_t *f = NULL;
int saved_errno = 0;

if (!(uuid = zuuid_new ())
|| !(f = flux_kvs_fence (h, flags, zuuid_str (uuid), 1, txn)))
if (!(uuid = zuuid_new ())) {
saved_errno = errno;
goto cleanup;
}
name = zuuid_str (uuid);

if (!(namespace = flux_kvs_get_namespace (h))) {
saved_errno = errno;
goto cleanup;
}

if (txn) {
json_t *ops;
if (!(ops = txn_get_ops (txn))) {
saved_errno = EINVAL;
goto cleanup;
}
if (!(f = flux_rpc_pack (h, "kvs.commit", FLUX_NODEID_ANY, 0,
"{s:s s:s s:i s:O}",
"name", name,
"namespace", namespace,
"flags", flags,
"ops", ops))) {
saved_errno = errno;
goto cleanup;
}
} else {
if (!(f = flux_rpc_pack (h, "kvs.commit", FLUX_NODEID_ANY, 0,
"{s:s s:s s:i s:[]}",
"name", name,
"namespace", namespace,
"flags", flags,
"ops"))) {
saved_errno = errno;
goto cleanup;
}
}

cleanup:
zuuid_destroy (&uuid);
if (saved_errno)
errno = saved_errno;
Expand Down
1 change: 0 additions & 1 deletion src/modules/kvs/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ test_kvstxn_t_SOURCES = test/kvstxn.c
test_kvstxn_t_CPPFLAGS = $(test_cppflags)
test_kvstxn_t_LDADD = \
$(top_builddir)/src/modules/kvs/kvstxn.o \
$(top_builddir)/src/modules/kvs/treq.o \
$(top_builddir)/src/modules/kvs/cache.o \
$(top_builddir)/src/modules/kvs/lookup.o \
$(top_builddir)/src/modules/kvs/waitqueue.o \
Expand Down
214 changes: 196 additions & 18 deletions src/modules/kvs/kvs.c
Original file line number Diff line number Diff line change
Expand Up @@ -885,9 +885,9 @@ static void kvstxn_apply (kvstxn_t *kt)
namespace = kvstxn_get_namespace (kt);
assert (namespace);

/* Between call to kvstxn_mgr_process_transaction_request() and here,
* possible namespace marked for removal. Also namespace could
* have been removed if we waited and this is a replay.
/* Between call to kvstxn_mgr_add_transaction() and here, possible
* namespace marked for removal. Also namespace could have been
* removed if we waited and this is a replay.
*
* root should never be NULL, as it should not be garbage
* collected until all ready transactions have been processed.
Expand Down Expand Up @@ -1699,7 +1699,144 @@ static void relaycommit_request_cb (flux_t *h, flux_msg_handler_t *mh,
struct kvsroot *root;
const char *namespace;
const char *name;
int nprocs, flags;
int flags;
json_t *ops = NULL;

if (flux_request_unpack (msg, NULL, "{ s:o s:s s:s s:i }",
"ops", &ops,
"name", &name,
"namespace", &namespace,
"flags", &flags) < 0) {
flux_log_error (h, "%s: flux_request_unpack", __FUNCTION__);
return;
}

/* namespace must exist given we are on rank 0 */
if (!(root = kvsroot_mgr_lookup_root_safe (ctx->km, namespace))) {
flux_log (h, LOG_ERR, "%s: namespace %s not available",
__FUNCTION__, namespace);
errno = ENOTSUP;
goto error;
}

if (kvstxn_mgr_add_transaction (root->ktm, name, ops, flags) < 0) {
flux_log_error (h, "%s: kvstxn_mgr_add_transaction",
__FUNCTION__);
goto error;
}

return;

error:
/* An error has occurred, so we will return an error similarly to
* how an error would be returned via a transaction error in
* kvstxn_apply().
*/
if (error_event_send_to_name (ctx, namespace, name, errno) < 0)
flux_log_error (h, "%s: error_event_send_to_name", __FUNCTION__);
}

/* kvs.commit
* Sent from users to local kvs module.
*/
static void commit_request_cb (flux_t *h, flux_msg_handler_t *mh,
const flux_msg_t *msg, void *arg)
{
kvs_ctx_t *ctx = arg;
struct kvsroot *root;
const char *namespace;
const char *name;
int saved_errno, flags;
bool stall = false;
json_t *ops = NULL;
treq_t *tr;

if (flux_request_unpack (msg, NULL, "{ s:o s:s s:s s:i }",
"ops", &ops,
"name", &name,
"namespace", &namespace,
"flags", &flags) < 0) {
flux_log_error (h, "%s: flux_request_unpack", __FUNCTION__);
goto error;
}

if (!(root = getroot (ctx, namespace, mh, msg, commit_request_cb,
&stall))) {
if (stall)
goto stall;
goto error;
}

if (!(tr = treq_create (name, 1, flags))) {
flux_log_error (h, "%s: treq_create", __FUNCTION__);
goto error;
}
if (treq_mgr_add_transaction (root->trm, tr) < 0) {
saved_errno = errno;
flux_log_error (h, "%s: treq_mgr_add_transaction", __FUNCTION__);
treq_destroy (tr);
errno = saved_errno;
goto error;
}

/* save copy of request, will be used later via
* finalize_transaction_bynames() to send error code to original
* send.
*/
if (treq_add_request_copy (tr, msg) < 0)
goto error;

if (ctx->rank == 0) {
/* we use this flag to indicate if a treq has been added to
* the ready queue. We don't need to call
* treq_count_reached() b/c this is a commit and nprocs is 1
*/
treq_set_processed (tr, true);

if (kvstxn_mgr_add_transaction (root->ktm,
name,
ops,
flags) < 0) {
flux_log_error (h, "%s: kvstxn_mgr_add_transaction",
__FUNCTION__);
goto error;
}
}
else {
flux_future_t *f;

/* route to rank 0 as instance owner */
if (!(f = flux_rpc_pack (h, "kvs.relaycommit", 0, FLUX_RPC_NORESPONSE,
"{ s:O s:s s:s s:i }",
"ops", ops,
"name", name,
"namespace", namespace,
"flags", flags))) {
flux_log_error (h, "%s: flux_rpc_pack", __FUNCTION__);
goto error;
}
flux_future_destroy (f);
}
return;

error:
if (flux_respond (h, msg, errno, NULL) < 0)
flux_log_error (h, "%s: flux_respond", __FUNCTION__);
stall:
return;
}


/* kvs.relayfence (rank 0 only, no response).
*/
static void relayfence_request_cb (flux_t *h, flux_msg_handler_t *mh,
const flux_msg_t *msg, void *arg)
{
kvs_ctx_t *ctx = arg;
struct kvsroot *root;
const char *namespace;
const char *name;
int saved_errno, nprocs, flags;
json_t *ops = NULL;
treq_t *tr;

Expand Down Expand Up @@ -1727,8 +1864,10 @@ static void relaycommit_request_cb (flux_t *h, flux_msg_handler_t *mh,
goto error;
}
if (treq_mgr_add_transaction (root->trm, tr) < 0) {
saved_errno = errno;
flux_log_error (h, "%s: treq_mgr_add_transaction", __FUNCTION__);
treq_destroy (tr);
errno = saved_errno;
goto error;
}
}
Expand All @@ -1744,10 +1883,24 @@ static void relaycommit_request_cb (flux_t *h, flux_msg_handler_t *mh,
goto error;
}

if (kvstxn_mgr_process_transaction_request (root->ktm, tr) < 0) {
flux_log_error (h, "%s: kvstxn_mgr_process_transaction_request",
__FUNCTION__);
goto error;
if (treq_count_reached (tr)) {

/* If user called fence > nprocs time, should have been caught
* earlier */
assert (!treq_get_processed (tr));

/* we use this flag to indicate if a treq has been added to
* the ready queue */
treq_set_processed (tr, true);

if (kvstxn_mgr_add_transaction (root->ktm,
treq_get_name (tr),
treq_get_ops (tr),
treq_get_flags (tr)) < 0) {
flux_log_error (h, "%s: kvstxn_mgr_add_transaction",
__FUNCTION__);
goto error;
}
}

return;
Expand All @@ -1761,11 +1914,11 @@ static void relaycommit_request_cb (flux_t *h, flux_msg_handler_t *mh,
flux_log_error (h, "%s: error_event_send_to_name", __FUNCTION__);
}

/* kvs.commit
/* kvs.fence
* Sent from users to local kvs module.
*/
static void commit_request_cb (flux_t *h, flux_msg_handler_t *mh,
const flux_msg_t *msg, void *arg)
static void fence_request_cb (flux_t *h, flux_msg_handler_t *mh,
const flux_msg_t *msg, void *arg)
{
kvs_ctx_t *ctx = arg;
struct kvsroot *root;
Expand All @@ -1786,7 +1939,7 @@ static void commit_request_cb (flux_t *h, flux_msg_handler_t *mh,
goto error;
}

if (!(root = getroot (ctx, namespace, mh, msg, commit_request_cb,
if (!(root = getroot (ctx, namespace, mh, msg, fence_request_cb,
&stall))) {
if (stall)
goto stall;
Expand All @@ -1813,25 +1966,48 @@ static void commit_request_cb (flux_t *h, flux_msg_handler_t *mh,
goto error;
}

/* save copy of request, will be used later via
* finalize_transaction_bynames() to send error code to original
* send.
*/
if (treq_add_request_copy (tr, msg) < 0)
goto error;

/* If we happen to be on rank 0, perform equivalent of
* relayfence_request_cb() here instead of sending an RPC
*/
if (ctx->rank == 0) {

if (treq_add_request_ops (tr, ops) < 0) {
flux_log_error (h, "%s: treq_add_request_ops", __FUNCTION__);
goto error;
}

if (kvstxn_mgr_process_transaction_request (root->ktm, tr) < 0) {
flux_log_error (h, "%s: kvstxn_mgr_process_transaction_request",
__FUNCTION__);
goto error;
if (treq_count_reached (tr)) {

/* If user called fence > nprocs time, should have been caught
* earlier */
assert (!treq_get_processed (tr));

/* we use this flag to indicate if a treq has been added to
* the ready queue */
treq_set_processed (tr, true);

if (kvstxn_mgr_add_transaction (root->ktm,
treq_get_name (tr),
treq_get_ops (tr),
treq_get_flags (tr)) < 0) {
flux_log_error (h, "%s: kvstxn_mgr_add_transaction",
__FUNCTION__);
goto error;
}
}
}
else {
flux_future_t *f;

/* route to rank 0 as instance owner */
if (!(f = flux_rpc_pack (h, "kvs.relaycommit", 0, FLUX_RPC_NORESPONSE,
if (!(f = flux_rpc_pack (h, "kvs.relayfence", 0, FLUX_RPC_NORESPONSE,
"{ s:O s:s s:s s:i s:i }",
"ops", ops,
"name", name,
Expand All @@ -1852,7 +2028,6 @@ static void commit_request_cb (flux_t *h, flux_msg_handler_t *mh,
return;
}


/* For wait_version().
*/
static void sync_request_cb (flux_t *h, flux_msg_handler_t *mh,
Expand Down Expand Up @@ -2599,6 +2774,9 @@ static const struct flux_msg_handler_spec htab[] = {
{ FLUX_MSGTYPE_REQUEST, "kvs.commit",
commit_request_cb, FLUX_ROLE_USER },
{ FLUX_MSGTYPE_REQUEST, "kvs.relaycommit", relaycommit_request_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "kvs.fence",
fence_request_cb, FLUX_ROLE_USER },
{ FLUX_MSGTYPE_REQUEST, "kvs.relayfence", relayfence_request_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "kvs.namespace-create",
namespace_create_request_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "kvs.namespace-remove",
Expand Down
Loading

0 comments on commit 4d30efa

Please sign in to comment.