diff --git a/client/src/margo_client.c b/client/src/margo_client.c index 06184e9d..dda8db5d 100644 --- a/client/src/margo_client.c +++ b/client/src/margo_client.c @@ -224,14 +224,14 @@ static hg_handle_t create_handle(hg_id_t id) return handle; } -static int forward_to_server( - hg_handle_t hdl, - void* input_ptr, - double timeout_msec) +static int forward_to_server(hg_handle_t hdl, + void* input_ptr, + double timeout_msec) { hg_return_t hret = margo_forward_timed(hdl, input_ptr, timeout_msec); if (hret != HG_SUCCESS) { LOGERR("margo_forward_timed() failed - %s", HG_Error_to_string(hret)); + //margo_state_dump(client_rpc_context->mid, "-", 0, NULL); return UNIFYFS_ERROR_MARGO; } return UNIFYFS_SUCCESS; diff --git a/common/src/unifyfs_server_rpcs.h b/common/src/unifyfs_server_rpcs.h index 6c03a1ee..f4cd5d12 100644 --- a/common/src/unifyfs_server_rpcs.h +++ b/common/src/unifyfs_server_rpcs.h @@ -42,9 +42,9 @@ typedef enum { UNIFYFS_SERVER_RPC_LAMINATE, UNIFYFS_SERVER_RPC_METAGET, UNIFYFS_SERVER_RPC_METASET, - UNIFYFS_SERVER_RPC_PID_REPORT, UNIFYFS_SERVER_RPC_TRANSFER, UNIFYFS_SERVER_RPC_TRUNCATE, + UNIFYFS_SERVER_BCAST_RPC_BOOTSTRAP, UNIFYFS_SERVER_BCAST_RPC_EXTENTS, UNIFYFS_SERVER_BCAST_RPC_FILEATTR, UNIFYFS_SERVER_BCAST_RPC_LAMINATE, @@ -186,6 +186,13 @@ MERCURY_GEN_PROC(bcast_progress_out_t, ((int32_t)(ret))) DECLARE_MARGO_RPC_HANDLER(bcast_progress_rpc) +/* Broadcast 'bootstrap complete' to all servers */ +MERCURY_GEN_PROC(bootstrap_complete_bcast_in_t, + ((int32_t)(root))) +MERCURY_GEN_PROC(bootstrap_complete_bcast_out_t, + ((int32_t)(ret))) +DECLARE_MARGO_RPC_HANDLER(bootstrap_complete_bcast_rpc) + /* Broadcast file extents to all servers */ MERCURY_GEN_PROC(extent_bcast_in_t, ((int32_t)(root)) diff --git a/server/src/margo_server.c b/server/src/margo_server.c index f6a27f53..d3ce6651 100644 --- a/server/src/margo_server.c +++ b/server/src/margo_server.c @@ -155,6 +155,12 @@ static void register_server_server_rpcs(margo_instance_id mid) bcast_progress_in_t, bcast_progress_out_t, bcast_progress_rpc); + unifyfsd_rpc_context->rpcs.bootstrap_complete_bcast_id = + MARGO_REGISTER(mid, "bootstrap_complete_bcast_rpc", + bootstrap_complete_bcast_in_t, + bootstrap_complete_bcast_out_t, + bootstrap_complete_bcast_rpc); + unifyfsd_rpc_context->rpcs.chunk_read_request_id = MARGO_REGISTER(mid, "chunk_read_request_rpc", chunk_read_request_in_t, chunk_read_request_out_t, @@ -534,7 +540,7 @@ int margo_connect_servers(void) /* allocate array of structs to record address for each server */ server_infos = (server_info_t*) calloc(glb_num_servers, - sizeof(server_info_t)); + sizeof(server_info_t)); if (NULL == server_infos) { LOGERR("failed to allocate server_info array"); return ENOMEM; diff --git a/server/src/margo_server.h b/server/src/margo_server.h index e2bfae4f..3265adef 100644 --- a/server/src/margo_server.h +++ b/server/src/margo_server.h @@ -32,6 +32,7 @@ typedef struct ServerRpcIds { /* server-server rpcs */ hg_id_t bcast_progress_id; + hg_id_t bootstrap_complete_bcast_id; hg_id_t chunk_read_request_id; hg_id_t chunk_read_response_id; hg_id_t extent_add_id; diff --git a/server/src/unifyfs_global.h b/server/src/unifyfs_global.h index c8bf1c39..677e80f6 100644 --- a/server/src/unifyfs_global.h +++ b/server/src/unifyfs_global.h @@ -208,8 +208,12 @@ bool check_pending_metaget(int gfid); unifyfs_rc clear_pending_metaget(int gfid); -/* publish the pids of all servers to a shared file */ -int unifyfs_publish_server_pids(void); + +/* notify local server main thread that bootstrap is complete */ +int unifyfs_signal_bootstrap_complete(void); + +/* participate in collective server bootstrap completion process */ +int unifyfs_complete_bootstrap(void); /* report the pid for a server with given rank */ int unifyfs_report_server_pid(int rank, int pid); diff --git a/server/src/unifyfs_group_rpc.c b/server/src/unifyfs_group_rpc.c index 8d979922..ad03c373 100644 --- a/server/src/unifyfs_group_rpc.c +++ b/server/src/unifyfs_group_rpc.c @@ -212,10 +212,12 @@ static int merge_metaget_all_bcast_outputs( * and update the string offsets stored in the child file attrs' filename * members. */ - uint64_t parent_filenames_len = p_out->filenames?strlen(p_out->filenames):0; - uint64_t child_filenames_len = c_out->filenames?strlen(c_out->filenames):0; + uint64_t parent_filenames_len = + p_out->filenames ? strlen(p_out->filenames) : 0; + uint64_t child_filenames_len = + c_out->filenames ? strlen(c_out->filenames) : 0; - char* new_filenames = calloc(parent_filenames_len + child_filenames_len +1, + char* new_filenames = calloc(parent_filenames_len+child_filenames_len+1, sizeof(char)); if (!new_filenames) { free(parent_attr_list); @@ -291,6 +293,15 @@ static int get_child_response(coll_request* coll_req, void* output = coll_req->output; switch (coll_req->req_type) { + case UNIFYFS_SERVER_BCAST_RPC_BOOTSTRAP: { + bootstrap_complete_bcast_out_t* cbbo = (bootstrap_complete_bcast_out_t*) out; + bootstrap_complete_bcast_out_t* bbo = (bootstrap_complete_bcast_out_t*) output; + child_ret = cbbo->ret; + if ((NULL != bbo) && (child_ret != UNIFYFS_SUCCESS)) { + bbo->ret = child_ret; + } + break; + } case UNIFYFS_SERVER_BCAST_RPC_EXTENTS: { extent_bcast_out_t* cebo = (extent_bcast_out_t*) out; extent_bcast_out_t* ebo = (extent_bcast_out_t*) output; @@ -489,16 +500,16 @@ static coll_request* collective_create(server_rpc_e req_type, * before calling bcast_progress_rpc(). */ - int rc = ABT_mutex_create(&coll_req->child_resp_valid_mut); + int rc = ABT_mutex_create(&coll_req->resp_valid_sync); if (ABT_SUCCESS != rc) { LOGERR("ABT_mutex_create failed"); free(coll_req); return NULL; } - rc = ABT_cond_create(&coll_req->child_resp_valid); + rc = ABT_cond_create(&coll_req->resp_valid_cond); if (ABT_SUCCESS != rc) { LOGERR("ABT_cond_create failed"); - ABT_mutex_free(&coll_req->child_resp_valid_mut); + ABT_mutex_free(&coll_req->resp_valid_sync); free(coll_req); return NULL; } @@ -507,8 +518,8 @@ static coll_request* collective_create(server_rpc_e req_type, UNIFYFS_BCAST_K_ARY, &(coll_req->tree)); if (rc) { LOGERR("unifyfs_tree_init() failed"); - ABT_mutex_free(&coll_req->child_resp_valid_mut); - ABT_cond_free(&coll_req->child_resp_valid); + ABT_mutex_free(&coll_req->resp_valid_sync); + ABT_cond_free(&coll_req->resp_valid_cond); free(coll_req); return NULL; } @@ -522,8 +533,8 @@ static coll_request* collective_create(server_rpc_e req_type, free(coll_req->child_hdls); free(coll_req->child_reqs); /* Note: calling free() on NULL is explicitly allowed */ - ABT_mutex_free(&coll_req->child_resp_valid_mut); - ABT_cond_free(&coll_req->child_resp_valid); + ABT_mutex_free(&coll_req->resp_valid_sync); + ABT_cond_free(&coll_req->resp_valid_cond); free(coll_req); return NULL; } @@ -601,8 +612,8 @@ void collective_cleanup(coll_request* coll_req) } /* Release the Argobots mutex and condition variable */ - ABT_cond_free(&coll_req->child_resp_valid); - ABT_mutex_free(&coll_req->child_resp_valid_mut); + ABT_cond_free(&coll_req->resp_valid_cond); + ABT_mutex_free(&coll_req->resp_valid_sync); /* free allocated memory */ if (NULL != coll_req->input) { @@ -644,6 +655,8 @@ static int collective_forward(coll_request* coll_req) /* invoke bcast request rpc on child */ margo_request* creq = coll_req->child_reqs + i; hg_handle_t* chdl = coll_req->child_hdls + i; + LOGDBG("BCAST_RPC: collective(%p) forwarding to child[%d]", + coll_req, i); int rc = forward_child_request(coll_req->input, *chdl, creq); if (rc != UNIFYFS_SUCCESS) { LOGERR("forward to child[%d] failed", i); @@ -664,6 +677,12 @@ void collective_set_local_retval(coll_request* coll_req, int val) } switch (coll_req->req_type) { + case UNIFYFS_SERVER_BCAST_RPC_BOOTSTRAP: { + bootstrap_complete_bcast_out_t* bbo = + (bootstrap_complete_bcast_out_t*) output; + bbo->ret = val; + break; + } case UNIFYFS_SERVER_BCAST_RPC_EXTENTS: { extent_bcast_out_t* ebo = (extent_bcast_out_t*) output; ebo->ret = val; @@ -723,7 +742,7 @@ int collective_finish(coll_request* coll_req) * then send the output back to the caller. If we're at the root * of the tree, though, there might be output data, but no place * to send it. */ - if ((NULL != coll_req->output) && (NULL != coll_req->resp_hdl)) { + if ((NULL != coll_req->output) && (HG_HANDLE_NULL != coll_req->resp_hdl)) { hg_return_t hret = margo_respond(coll_req->resp_hdl, coll_req->output); if (hret != HG_SUCCESS) { LOGERR("margo_respond() failed - %s", HG_Error_to_string(hret)); @@ -735,14 +754,14 @@ int collective_finish(coll_request* coll_req) /* Signal the condition variable in case there are other threads * waiting for the child responses */ - ABT_mutex_lock(coll_req->child_resp_valid_mut); - ABT_cond_signal(coll_req->child_resp_valid); + ABT_mutex_lock(coll_req->resp_valid_sync); + ABT_cond_signal(coll_req->resp_valid_cond); /* There should only be a single thread waiting on the CV, so we don't * need to use ABT_cond_broadcast() */ - ABT_mutex_unlock(coll_req->child_resp_valid_mut); + ABT_mutex_unlock(coll_req->resp_valid_sync); /* Locking the mutex before signaling is required in order to ensure * that the waiting thread has had a chance to actually call - * ABT_cond_wait() before this thread signals the CV. */ + * ABT_cond_timedwait() before this thread signals the CV. */ return ret; } @@ -792,18 +811,9 @@ int invoke_bcast_progress_rpc(coll_request* coll_req) static void bcast_progress_rpc(hg_handle_t handle) { /* assume we'll succeed */ - int32_t ret = UNIFYFS_SUCCESS; + int ret = UNIFYFS_SUCCESS; coll_request* coll = NULL; - - bool cleanup_collective = ((NULL != coll) && (coll->auto_cleanup)); - /* We have to check the auto_cleanup variable now because in the case - * where auto_cleanup is false, another thread will be freeing the - * collective. And once the memory is freed, we can't read the - * auto_cleanup variable. - * - * There's a condition variable that's signaled by collective_finish(), - * and the memory won't be freed until some time after that happens, so - * it's safe to check the variable up here. */ + bool cleanup_collective = false; bcast_progress_in_t in; hg_return_t hret = margo_get_input(handle, &in); @@ -811,19 +821,31 @@ static void bcast_progress_rpc(hg_handle_t handle) LOGERR("margo_get_input() failed - %s", HG_Error_to_string(hret)); ret = UNIFYFS_ERROR_MARGO; } else { - /* call collective_finish() to progress bcast operation */ coll = (coll_request*) in.coll_req; + margo_free_input(handle, &in); + + cleanup_collective = ((NULL != coll) && (coll->auto_cleanup)); + /* We have to check the auto_cleanup variable now because in the case + * where auto_cleanup is false, another thread will be freeing the + * collective. And once the memory is freed, we can't read the + * auto_cleanup variable. + * + * There's a condition variable that's signaled by collective_finish(), + * and the memory won't be freed until some time after that happens, so + * it's safe to check the variable up here. */ + + /* call collective_finish() to progress bcast operation */ LOGDBG("BCAST_RPC: bcast progress collective(%p)", coll); ret = collective_finish(coll); if (ret != UNIFYFS_SUCCESS) { - LOGERR("collective_finish() failed for coll_req(%p) (rc=%d)", + LOGERR("collective_finish() failed for collective(%p) (rc=%d)", coll, ret); } } /* finish rpc */ bcast_progress_out_t out; - out.ret = ret; + out.ret = (int32_t) ret; hret = margo_respond(handle, &out); if (hret != HG_SUCCESS) { LOGERR("margo_respond() failed - %s", HG_Error_to_string(hret)); @@ -834,12 +856,138 @@ static void bcast_progress_rpc(hg_handle_t handle) } /* free margo resources */ - margo_free_input(handle, &in); margo_destroy(handle); } DEFINE_MARGO_RPC_HANDLER(bcast_progress_rpc) +/*************************************************** + * Broadcast server bootstrap completion + ***************************************************/ + +/* bootstrap complete broadcast rpc handler */ +static void bootstrap_complete_bcast_rpc(hg_handle_t handle) +{ + LOGDBG("BCAST_RPC: bootstrap handler"); + + /* assume we'll succeed */ + int ret = UNIFYFS_SUCCESS; + + coll_request* coll = NULL; + server_rpc_req_t* req = calloc(1, sizeof(*req)); + bootstrap_complete_bcast_in_t* in = calloc(1, sizeof(*in)); + bootstrap_complete_bcast_out_t* out = calloc(1, sizeof(*out)); + if ((NULL == req) || (NULL == in) || (NULL == out)) { + ret = ENOMEM; + } else { + /* get input params */ + hg_return_t hret = margo_get_input(handle, in); + if (hret != HG_SUCCESS) { + LOGERR("margo_get_input() failed - %s", HG_Error_to_string(hret)); + ret = UNIFYFS_ERROR_MARGO; + } else { + hg_id_t op_hgid = unifyfsd_rpc_context->rpcs.bootstrap_complete_bcast_id; + server_rpc_e rpc = UNIFYFS_SERVER_BCAST_RPC_BOOTSTRAP; + coll = collective_create(rpc, handle, op_hgid, (int)(in->root), + (void*)in, (void*)out, sizeof(*out), + HG_BULK_NULL, HG_BULK_NULL, NULL); + if (NULL == coll) { + ret = ENOMEM; + } else { + ret = collective_forward(coll); + if (ret == UNIFYFS_SUCCESS) { + req->req_type = rpc; + req->coll = coll; + req->handle = handle; + req->input = (void*) in; + ret = sm_submit_service_request(req); + if (ret != UNIFYFS_SUCCESS) { + LOGERR("failed to submit coll request to svcmgr"); + } + } + } + } + } + + if (ret != UNIFYFS_SUCCESS) { + /* report failure back to caller */ + bootstrap_complete_bcast_out_t bbo; + bbo.ret = (int32_t)ret; + hg_return_t hret = margo_respond(handle, &bbo); + if (hret != HG_SUCCESS) { + LOGERR("margo_respond() failed - %s", HG_Error_to_string(hret)); + } + + if (NULL != coll) { + collective_cleanup(coll); + } else { + margo_destroy(handle); + } + } +} +DEFINE_MARGO_RPC_HANDLER(bootstrap_complete_bcast_rpc) + +/* Execute broadcast tree for 'bootstrap complete' notification */ +int unifyfs_invoke_broadcast_bootstrap_complete(void) +{ + /* assuming success */ + int ret = UNIFYFS_SUCCESS; + + LOGDBG("BCAST_RPC: starting bootstrap complete"); + coll_request* coll = NULL; + bootstrap_complete_bcast_in_t* in = calloc(1, sizeof(*in)); + if (NULL == in) { + ret = ENOMEM; + } else { + /* set input params */ + in->root = (int32_t) glb_pmi_rank; + hg_id_t op_hgid = + unifyfsd_rpc_context->rpcs.bootstrap_complete_bcast_id; + server_rpc_e rpc = UNIFYFS_SERVER_BCAST_RPC_BOOTSTRAP; + coll = collective_create(rpc, HG_HANDLE_NULL, op_hgid, + glb_pmi_rank, (void*)in, + NULL, sizeof(bootstrap_complete_bcast_out_t), + HG_BULK_NULL, HG_BULK_NULL, NULL); + if (NULL == coll) { + ret = ENOMEM; + } else { + ret = collective_forward(coll); + if (ret == UNIFYFS_SUCCESS) { + /* avoid cleanup by the progress rpc */ + coll->auto_cleanup = 0; + ABT_mutex_lock(coll->resp_valid_sync); + ret = invoke_bcast_progress_rpc(coll); + if (ret == UNIFYFS_SUCCESS) { + /* wait for all the child responses to come back */ + struct timespec timeout; + clock_gettime(CLOCK_REALTIME, &timeout); + timeout.tv_sec += 5; /* 5 sec */ + int rc = ABT_cond_timedwait(coll->resp_valid_cond, + coll->resp_valid_sync, + &timeout); + if (ABT_ERR_COND_TIMEDOUT == rc) { + LOGERR("timeout"); + ret = UNIFYFS_ERROR_TIMEOUT; + } else if (rc) { + LOGERR("failed to wait on condition (err=%d)", rc); + ret = UNIFYFS_ERROR_MARGO; + } else if (NULL != coll->output) { + bootstrap_complete_bcast_out_t* out = + (bootstrap_complete_bcast_out_t*) coll->output; + ret = out->ret; + } + } + ABT_mutex_unlock(coll->resp_valid_sync); + } else { + LOGERR("collective(%p) forward failed - cleaning up", coll); + } + collective_cleanup(coll); + } + } + + return ret; +} + /************************************************************************* * Broadcast file extents metadata *************************************************************************/ @@ -922,7 +1070,7 @@ static void extent_bcast_rpc(hg_handle_t handle) DEFINE_MARGO_RPC_HANDLER(extent_bcast_rpc) /* Execute broadcast tree for extent metadata */ -int unifyfs_invoke_broadcast_extents_rpc(int gfid) +int unifyfs_invoke_broadcast_extents(int gfid) { /* assuming success */ int ret = UNIFYFS_SUCCESS; @@ -1735,7 +1883,7 @@ int unifyfs_invoke_broadcast_metaget_all(unifyfs_file_attr_t** file_attrs, * we need to get the output data */ coll->auto_cleanup = 0; - ABT_mutex_lock(coll->child_resp_valid_mut); + ABT_mutex_lock(coll->resp_valid_sync); /* Have to lock the mutex before the bcast_progress_rpc call * so that we're sure to be waiting on the condition * variable before the progress thread gets to the point @@ -1757,16 +1905,25 @@ int unifyfs_invoke_broadcast_metaget_all(unifyfs_file_attr_t** file_attrs, } // Wait for all the child responses to come back - ABT_cond_wait(coll->child_resp_valid, - coll->child_resp_valid_mut); - ABT_mutex_unlock(coll->child_resp_valid_mut); + struct timespec timeout; + clock_gettime(CLOCK_REALTIME, &timeout); + timeout.tv_sec += 10; /* 10 sec */ + int rc = ABT_cond_timedwait(coll->resp_valid_cond, + coll->resp_valid_sync, + &timeout); + if (ABT_ERR_COND_TIMEDOUT == rc) { + LOGERR("timeout"); + ret = UNIFYFS_ERROR_TIMEOUT; + } else if (rc) { + LOGERR("failed to wait on condition (err=%d)", rc); + ret = UNIFYFS_ERROR_MARGO; + } + ABT_mutex_unlock(coll->resp_valid_sync); // Now we can get the data from the output struct if (sizeof(metaget_all_bcast_out_t) != coll->output_sz) { - LOGERR("Unexpected size for collective output struct. " - "Expected %d but value was %d", - sizeof(metaget_all_bcast_out_t), - coll->output_sz); + LOGERR("Unexpected size (%zu) for collective output - expected %zu", + coll->output_sz, sizeof(metaget_all_bcast_out_t)); } // Pull the bulk data (the list of file_attr structs) over @@ -1899,8 +2056,8 @@ int unifyfs_invoke_broadcast_metaget_all(unifyfs_file_attr_t** file_attrs, * might have been left in a locked state. Argobots doesn't * provide a way to test this, so we'll do a trylock() followed * by an unlock to ensure it's unlocked. */ - ABT_mutex_trylock(coll->child_resp_valid_mut); - ABT_mutex_unlock(coll->child_resp_valid_mut); + ABT_mutex_trylock(coll->resp_valid_sync); + ABT_mutex_unlock(coll->resp_valid_sync); } else { /* If we never got as far as creating the collective, then just * free the input and output structs. (These were all initialized diff --git a/server/src/unifyfs_group_rpc.h b/server/src/unifyfs_group_rpc.h index ec6976eb..fb463694 100644 --- a/server/src/unifyfs_group_rpc.h +++ b/server/src/unifyfs_group_rpc.h @@ -31,29 +31,26 @@ typedef struct coll_request { unifyfs_tree_t tree; hg_handle_t progress_hdl; hg_handle_t resp_hdl; - size_t output_sz; /* size of output struct */ - void* output; /* output struct (type is dependent on rpc) */ + size_t output_sz; /* size of output struct */ + void* output; /* output struct (type depends on rpc) */ void* input; - void* bulk_buf; /* allocated buffer for bulk data */ + void* bulk_buf; /* allocated buffer for bulk data */ hg_bulk_t bulk_in; hg_bulk_t bulk_forward; margo_request progress_req; margo_request* child_reqs; hg_handle_t* child_hdls; - int auto_cleanup; /* if true, bcast_progress_rpc() will - * call collective_cleanup() on this - * struct. This is the default behavior. */ - ABT_cond child_resp_valid; /* bcast_progress_rpc() will signal this - * condition variable when all the child - * responses have been processed. - * Intended to provide a mechanism for the - * server that originated a bcast RPC to - * wait for all the results to come - * back. */ - ABT_mutex child_resp_valid_mut; - /* The mutex associated with the above condition variable. */ - + int auto_cleanup; /* If set (non-zero), bcast_progress_rpc() + * will call collective_cleanup(). This is + * the default behavior. */ + ABT_cond resp_valid_cond; /* bcast_progress_rpc() will signal this + * condition variable when all the child + * responses have been processed. + * Provides a mechanism for the root + * server for a bcast RPC to wait for all + * the results to come back. */ + ABT_mutex resp_valid_sync; /* mutex for above condition variable */ } coll_request; /* set collective output return value to local result value */ @@ -75,18 +72,22 @@ void collective_cleanup(coll_request* coll_req); */ int invoke_bcast_progress_rpc(coll_request* coll_req); + +/** + * @brief Broadcast that all servers have completed bootstrapping + * + * @return success|failure + */ +int unifyfs_invoke_broadcast_bootstrap_complete(void); + /** * @brief Broadcast file extents metadata to all servers * * @param gfid target file - * @param len length of file extents array - * @param extents array of extents to broadcast * * @return success|failure */ -int unifyfs_invoke_broadcast_extents(int gfid, - unsigned int len, - struct extent_tree_node* extents); +int unifyfs_invoke_broadcast_extents(int gfid); /** * @brief Broadcast file attributes metadata to all servers diff --git a/server/src/unifyfs_p2p_rpc.c b/server/src/unifyfs_p2p_rpc.c index f71d4aad..e2927c00 100644 --- a/server/src/unifyfs_p2p_rpc.c +++ b/server/src/unifyfs_p2p_rpc.c @@ -82,6 +82,7 @@ int wait_for_p2p_request(p2p_request* req) if (hret != HG_SUCCESS) { LOGERR("wait on p2p request(%p) failed - %s", req, HG_Error_to_string(hret)); + //margo_state_dump(unifyfsd_rpc_context->svr_mid, "-", 0, NULL); rc = UNIFYFS_ERROR_MARGO; } @@ -1598,47 +1599,25 @@ static void server_pid_rpc(hg_handle_t handle) int ret = UNIFYFS_SUCCESS; /* get input params */ - server_pid_in_t* in = calloc(1, sizeof(*in)); - server_rpc_req_t* req = calloc(1, sizeof(*req)); - if ((NULL == in) || (NULL == req)) { - ret = ENOMEM; + server_pid_in_t in; + hg_return_t hret = margo_get_input(handle, &in); + if (hret != HG_SUCCESS) { + LOGERR("margo_get_input() failed"); + ret = UNIFYFS_ERROR_MARGO; } else { - hg_return_t hret = margo_get_input(handle, in); - if (hret != HG_SUCCESS) { - LOGERR("margo_get_input() failed"); - ret = UNIFYFS_ERROR_MARGO; - } else { - req->req_type = UNIFYFS_SERVER_RPC_PID_REPORT; - req->handle = handle; - req->input = (void*) in; - req->bulk_buf = NULL; - req->bulk_sz = 0; - ret = sm_submit_service_request(req); - if (ret != UNIFYFS_SUCCESS) { - margo_free_input(handle, in); - } - } + ret = unifyfs_report_server_pid(in.rank, in.pid); + margo_free_input(handle, &in); } - /* if we hit an error during request submission, respond with the error */ - if (ret != UNIFYFS_SUCCESS) { - if (NULL != in) { - free(in); - } - if (NULL != req) { - free(req); - } - - /* return to caller */ - server_pid_out_t out; - out.ret = (int32_t) ret; - hg_return_t hret = margo_respond(handle, &out); - if (hret != HG_SUCCESS) { - LOGERR("margo_respond() failed"); - } - - /* free margo resources */ - margo_destroy(handle); + /* return to caller */ + server_pid_out_t out; + out.ret = (int32_t) ret; + hret = margo_respond(handle, &out); + if (hret != HG_SUCCESS) { + LOGERR("margo_respond() failed"); } + + /* free margo resources */ + margo_destroy(handle); } DEFINE_MARGO_RPC_HANDLER(server_pid_rpc) diff --git a/server/src/unifyfs_p2p_rpc.h b/server/src/unifyfs_p2p_rpc.h index 65a9055e..0f764cff 100644 --- a/server/src/unifyfs_p2p_rpc.h +++ b/server/src/unifyfs_p2p_rpc.h @@ -35,8 +35,8 @@ typedef struct { /* helper method to initialize peer request rpc handle */ int init_p2p_request_handle(hg_id_t request_hgid, - int peer_rank, - p2p_request* req); + int peer_rank, + p2p_request* req); /* helper method to forward peer rpc request */ int forward_p2p_request(void* input_ptr, diff --git a/server/src/unifyfs_server.c b/server/src/unifyfs_server.c index cff6719b..f226f492 100644 --- a/server/src/unifyfs_server.c +++ b/server/src/unifyfs_server.c @@ -478,10 +478,10 @@ int main(int argc, char* argv[]) /* initialize our tree that maps a gfid to its extent tree */ unifyfs_inode_tree_init(global_inode_tree); - LOGDBG("publishing server pid"); - rc = unifyfs_publish_server_pids(); + LOGDBG("waiting for server bootstrapping to complete"); + rc = unifyfs_complete_bootstrap(); if (rc != 0) { - LOGERR("failed to publish server pid file: %s", + LOGERR("failed to complete server bootstrapping: %s", unifyfs_rc_enum_description(rc)); exit(1); } diff --git a/server/src/unifyfs_server_pid.c b/server/src/unifyfs_server_pid.c index 99c1bcf9..c4f2870f 100644 --- a/server/src/unifyfs_server_pid.c +++ b/server/src/unifyfs_server_pid.c @@ -26,30 +26,17 @@ extern unifyfs_cfg_t server_cfg; -static int n_servers_reported; // = 0 +static volatile int n_servers_reported; // = 0 +static volatile int bootstrap_completed; // = 0 static int* server_pids; // = NULL -static pthread_cond_t server_pid_cond = PTHREAD_COND_INITIALIZER; -static pthread_mutex_t server_pid_mutex = PTHREAD_MUTEX_INITIALIZER; -static struct timespec server_pid_timeout; +static ABT_cond server_bootstrap_cond = ABT_COND_NULL; +static ABT_mutex server_bootstrap_mutex = ABT_MUTEX_NULL; +static struct timespec server_bootstrap_timeout; -static int alloc_server_pids(void) -{ - int ret = 0; - pthread_mutex_lock(&server_pid_mutex); - if (NULL == server_pids) { - server_pids = (int*) calloc(glb_pmi_size, sizeof(int)); - if (NULL == server_pids) { - LOGERR("failed to allocate memory (%s)", strerror(errno)); - ret = ENOMEM; - } - } - pthread_mutex_unlock(&server_pid_mutex); - return ret; -} -static inline int set_pidfile_timeout(void) +static inline int set_bootstrap_timeout(void) { - int ret = 0; + int ret = UNIFYFS_SUCCESS; long timeout_sec = 0; if (server_cfg.server_init_timeout) { @@ -61,16 +48,82 @@ static inline int set_pidfile_timeout(void) } } - clock_gettime(CLOCK_REALTIME, &server_pid_timeout); - server_pid_timeout.tv_sec += timeout_sec; + clock_gettime(CLOCK_REALTIME, &server_bootstrap_timeout); + server_bootstrap_timeout.tv_sec += timeout_sec; + + return ret; +} + +static void free_bootstrap_state(void) +{ + if (ABT_MUTEX_NULL != server_bootstrap_mutex) { + ABT_mutex_lock(server_bootstrap_mutex); + if (ABT_COND_NULL != server_bootstrap_cond) { + ABT_cond_free(&server_bootstrap_cond); + server_bootstrap_cond = ABT_COND_NULL; + } + ABT_mutex_unlock(server_bootstrap_mutex); + ABT_mutex_free(&server_bootstrap_mutex); + server_bootstrap_mutex = ABT_MUTEX_NULL; + } + + if (NULL != server_pids) { + free(server_pids); + server_pids = NULL; + } +} + +static int initialize_bootstrap_state(void) +{ + int rc; + int ret = UNIFYFS_SUCCESS; + + if (ABT_MUTEX_NULL == server_bootstrap_mutex) { + rc = ABT_mutex_create(&server_bootstrap_mutex); + if (ABT_SUCCESS != rc) { + LOGERR("ABT_mutex_create failed"); + return UNIFYFS_ERROR_MARGO; + } + } + + ABT_mutex_lock(server_bootstrap_mutex); + if (ABT_COND_NULL == server_bootstrap_cond) { + rc = ABT_cond_create(&server_bootstrap_cond); + if (ABT_SUCCESS != rc) { + LOGERR("ABT_cond_create failed"); + ret = UNIFYFS_ERROR_MARGO; + } + } + + if (UNIFYFS_SUCCESS == ret) { + ret = set_bootstrap_timeout(); + } + + if ((UNIFYFS_SUCCESS == ret) && (0 == glb_pmi_rank)) { + if (NULL == server_pids) { + server_pids = (int*) calloc(glb_pmi_size, sizeof(int)); + if (NULL == server_pids) { + LOGERR("failed to allocate memory for pid array (%s)", + strerror(errno)); + ret = ENOMEM; + } + } + } + + ABT_mutex_unlock(server_bootstrap_mutex); + + if (ret != UNIFYFS_SUCCESS) { + LOGERR("failed to initialize bootstrap state!"); + free_bootstrap_state(); + } - return 0; + return ret; } static int create_server_pid_file(void) { int i = 0; - int ret = 0; + int ret = UNIFYFS_SUCCESS; char filename[UNIFYFS_MAX_FILENAME] = { 0, }; FILE* fp = NULL; @@ -99,80 +152,117 @@ static int create_server_pid_file(void) int unifyfs_report_server_pid(int rank, int pid) { - assert(rank < glb_pmi_size); + assert((glb_pmi_rank == 0) && (rank < glb_pmi_size)); - int ret = alloc_server_pids(); - if (ret) { - LOGERR("failed to allocate pid array"); - return ret; + /* NOTE: need this here in case we receive a pid report rpc before we + * have initialized state in unifyfs_complete_bootstrap() */ + int rc = initialize_bootstrap_state(); + if (rc) { + LOGERR("failed to initialize bootstrap state"); + return rc; } - pthread_mutex_lock(&server_pid_mutex); + ABT_mutex_lock(server_bootstrap_mutex); n_servers_reported++; server_pids[rank] = pid; - pthread_cond_signal(&server_pid_cond); - pthread_mutex_unlock(&server_pid_mutex); + ABT_cond_signal(server_bootstrap_cond); + ABT_mutex_unlock(server_bootstrap_mutex); + + return UNIFYFS_SUCCESS; +} + +int unifyfs_signal_bootstrap_complete(void) +{ + assert(glb_pmi_rank != 0); + + ABT_mutex_lock(server_bootstrap_mutex); + bootstrap_completed = 1; + ABT_cond_signal(server_bootstrap_cond); + ABT_mutex_unlock(server_bootstrap_mutex); return UNIFYFS_SUCCESS; } -int unifyfs_publish_server_pids(void) +static int wait_for_bootstrap_condition(void) { int ret = UNIFYFS_SUCCESS; + int rc = ABT_cond_timedwait(server_bootstrap_cond, server_bootstrap_mutex, + &server_bootstrap_timeout); + if (ABT_ERR_COND_TIMEDOUT == rc) { + LOGERR("server initialization timeout"); + ret = UNIFYFS_ERROR_TIMEOUT; + } else if (rc) { + LOGERR("failed to wait on condition (err=%d)", rc); + ret = UNIFYFS_ERROR_MARGO; + } + return ret; +} + +int unifyfs_complete_bootstrap(void) +{ + int ret = UNIFYFS_SUCCESS; + + int rc = initialize_bootstrap_state(); + if (UNIFYFS_SUCCESS != rc) { + LOGERR("ABT_mutex_create failed"); + return UNIFYFS_ERROR_MARGO; + } if (glb_pmi_rank > 0) { - /* publish my pid to server 0 */ + /* publish my pid to server rank 0 */ + LOGDBG("server[%d] - reporting pid", glb_pmi_rank); ret = unifyfs_invoke_server_pid_rpc(); if (ret) { LOGERR("failed to invoke pid rpc (%s)", strerror(ret)); + } else { + /* wait for "bootstrap-complete" broadcast from server rank 0 */ + ABT_mutex_lock(server_bootstrap_mutex); + while (!bootstrap_completed) { + ret = wait_for_bootstrap_condition(); + if (UNIFYFS_ERROR_TIMEOUT == ret) { + break; + } + } + ABT_mutex_unlock(server_bootstrap_mutex); + if (bootstrap_completed) { + LOGDBG("server[%d] - bootstrap completed", glb_pmi_rank); + } } - } else { /* rank 0 acts as coordinator */ - ret = alloc_server_pids(); - if (ret) { - return ret; - } - - ret = set_pidfile_timeout(); - if (ret) { - return ret; - } - - pthread_mutex_lock(&server_pid_mutex); - server_pids[0] = server_pid; - n_servers_reported++; + } else { /* server rank 0 acts as coordinator */ /* keep checking count of reported servers until all have reported * or we hit the timeout */ + ABT_mutex_lock(server_bootstrap_mutex); + server_pids[0] = server_pid; + n_servers_reported++; while (n_servers_reported < glb_pmi_size) { - ret = pthread_cond_timedwait(&server_pid_cond, - &server_pid_mutex, - &server_pid_timeout); - if (ETIMEDOUT == ret) { - LOGERR("server initialization timeout"); - break; - } else if (ret) { - LOGERR("failed to wait on condition (err=%d, %s)", - errno, strerror(errno)); + ret = wait_for_bootstrap_condition(); + if (UNIFYFS_ERROR_TIMEOUT == ret) { break; } } + ABT_mutex_unlock(server_bootstrap_mutex); if (n_servers_reported == glb_pmi_size) { - ret = create_server_pid_file(); + bootstrap_completed = 1; + LOGDBG("server[%d] - bootstrap completed", glb_pmi_rank); + ret = unifyfs_invoke_broadcast_bootstrap_complete(); if (UNIFYFS_SUCCESS == ret) { LOGDBG("servers ready to accept client connections"); + ret = create_server_pid_file(); + if (UNIFYFS_SUCCESS != ret) { + LOGERR("pid file creation failed!"); + } + } else { + LOGERR("bootstrap broadcast failed!"); } } else { LOGERR("%d of %d servers reported their pids", n_servers_reported, glb_pmi_size); } - - free(server_pids); - server_pids = NULL; - - pthread_mutex_unlock(&server_pid_mutex); } + free_bootstrap_state(); + return ret; } - diff --git a/server/src/unifyfs_service_manager.c b/server/src/unifyfs_service_manager.c index f9ad6a09..036680b0 100644 --- a/server/src/unifyfs_service_manager.c +++ b/server/src/unifyfs_service_manager.c @@ -42,9 +42,9 @@ typedef struct { pthread_t thrd; pid_t tid; - /* pthread mutex and condition variable for work notification */ - pthread_mutex_t thrd_lock; - pthread_cond_t thrd_cond; + /* mutex and condition variable for work notification */ + ABT_mutex thrd_lock; + ABT_cond thrd_cond; /* thread status */ int initialized; @@ -54,7 +54,7 @@ typedef struct { /* thread return status code */ int sm_exit_rc; - /* argobots mutex for synchronizing access to request state between + /* mutex for synchronizing access to request state between * margo rpc handler ULTs and SM thread */ ABT_mutex reqs_sync; @@ -75,7 +75,7 @@ svcmgr_state_t* sm; // = NULL do { \ if ((NULL != sm) && sm->initialized) { \ /*LOGDBG("locking SM state");*/ \ - pthread_mutex_lock(&(sm->thrd_lock)); \ + ABT_mutex_lock(sm->thrd_lock); \ } \ } while (0) @@ -83,7 +83,7 @@ do { \ do { \ if ((NULL != sm) && sm->initialized) { \ /*LOGDBG("unlocking SM state");*/ \ - pthread_mutex_unlock(&(sm->thrd_lock)); \ + ABT_mutex_unlock(sm->thrd_lock); \ } \ } while (0) @@ -110,7 +110,7 @@ static inline void signal_svcmgr(void) if (this_thread != sm->tid) { /* signal svcmgr to begin processing the requests we just added */ LOGDBG("signaling new service requests"); - pthread_cond_signal(&(sm->thrd_cond)); + ABT_cond_signal(sm->thrd_cond); } } @@ -173,32 +173,21 @@ int svcmgr_init(void) return ENOMEM; } - /* initialize lock for shared data structures of the + /* create mutex locks for thread and request data structures of the * service manager */ - pthread_mutexattr_t attr; - pthread_mutexattr_init(&attr); - pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE); - int rc = pthread_mutex_init(&(sm->thrd_lock), &attr); - if (rc != 0) { - LOGERR("pthread_mutex_init failed for service manager rc=%d (%s)", - rc, strerror(rc)); - svcmgr_fini(); - return rc; - } + ABT_mutex_create(&(sm->thrd_lock)); + ABT_mutex_create(&(sm->reqs_sync)); /* initialize condition variable to synchronize work * notifications for the request manager thread */ - rc = pthread_cond_init(&(sm->thrd_cond), NULL); - if (rc != 0) { - LOGERR("pthread_cond_init failed for service manager rc=%d (%s)", - rc, strerror(rc)); - pthread_mutex_destroy(&(sm->thrd_lock)); + int rc = ABT_cond_create(&(sm->thrd_cond)); + if (rc != ABT_SUCCESS) { + LOGERR("ABT_cond_create() failed for service manager rc=%d", rc); + ABT_mutex_free(&(sm->thrd_lock)); svcmgr_fini(); - return rc; + return UNIFYFS_ERROR_MARGO; } - ABT_mutex_create(&(sm->reqs_sync)); - /* allocate a list to track chunk reads */ sm->chunk_reads = arraylist_create(0); if (sm->chunk_reads == NULL) { @@ -249,10 +238,10 @@ int svcmgr_fini(void) if (sm->initialized) { /* join thread before cleaning up state */ if (sm->tid != -1) { - pthread_mutex_lock(&(sm->thrd_lock)); + ABT_mutex_lock(sm->thrd_lock); sm->time_to_exit = 1; - pthread_cond_signal(&(sm->thrd_cond)); - pthread_mutex_unlock(&(sm->thrd_lock)); + ABT_cond_signal(sm->thrd_cond); + ABT_mutex_unlock(sm->thrd_lock); pthread_join(sm->thrd, NULL); } } @@ -273,15 +262,10 @@ int svcmgr_fini(void) arraylist_free(sm->svc_reqs); } - int abt_err = ABT_mutex_free(&(sm->reqs_sync)); - if (ABT_SUCCESS != abt_err) { - /* All we can really do here is log the error */ - LOGERR("Error code returned from ABT_mutex_free(): %d", abt_err); - } - if (sm->initialized) { - pthread_mutex_destroy(&(sm->thrd_lock)); - pthread_cond_destroy(&(sm->thrd_cond)); + ABT_mutex_free(&(sm->reqs_sync)); + ABT_mutex_free(&(sm->thrd_lock)); + ABT_cond_free(&(sm->thrd_cond)); } /* free the service manager struct allocated during init */ @@ -1058,32 +1042,6 @@ static int process_metaset_rpc(server_rpc_req_t* req) return ret; } -static int process_server_pid_rpc(server_rpc_req_t* req) -{ - /* get input parameters */ - server_pid_in_t* in = req->input; - int src_rank = (int) in->rank; - int pid = (int) in->pid; - margo_free_input(req->handle, in); - free(in); - - /* do pid report */ - int ret = unifyfs_report_server_pid(src_rank, pid); - - /* send rpc response */ - server_pid_out_t out; - out.ret = (int32_t) ret; - hg_return_t hret = margo_respond(req->handle, &out); - if (hret != HG_SUCCESS) { - LOGERR("margo_respond() failed"); - } - - /* cleanup req */ - margo_destroy(req->handle); - - return ret; -} - static int process_transfer_rpc(server_rpc_req_t* req) { /* get target file and requested file size */ @@ -1143,6 +1101,21 @@ static int process_truncate_rpc(server_rpc_req_t* req) return ret; } +static int process_bootstrap_bcast_rpc(server_rpc_req_t* req) +{ + /* signal bootstrap completion */ + int ret = unifyfs_signal_bootstrap_complete(); + if (ret != UNIFYFS_SUCCESS) { + LOGERR("unifyfs_signal_bootstrap_complete() failed - rc=%d", ret); + } + collective_set_local_retval(req->coll, ret); + + /* create a ULT to finish broadcast operation */ + ret = invoke_bcast_progress_rpc(req->coll); + + return ret; +} + static int process_extents_bcast_rpc(server_rpc_req_t* req) { /* get target file and extents */ @@ -1596,15 +1569,15 @@ static int process_service_requests(void) case UNIFYFS_SERVER_RPC_METASET: rret = process_metaset_rpc(req); break; - case UNIFYFS_SERVER_RPC_PID_REPORT: - rret = process_server_pid_rpc(req); - break; case UNIFYFS_SERVER_RPC_TRANSFER: rret = process_transfer_rpc(req); break; case UNIFYFS_SERVER_RPC_TRUNCATE: rret = process_truncate_rpc(req); break; + case UNIFYFS_SERVER_BCAST_RPC_BOOTSTRAP: + rret = process_bootstrap_bcast_rpc(req); + break; case UNIFYFS_SERVER_BCAST_RPC_EXTENTS: rret = process_extents_bcast_rpc(req); break; @@ -1727,12 +1700,11 @@ void* service_manager_thread(void* arg) timeout.tv_nsec -= 1000000000; timeout.tv_sec++; } - int wait_rc = pthread_cond_timedwait(&(sm->thrd_cond), - &(sm->thrd_lock), - &timeout); - if (0 == wait_rc) { + int wait_rc = ABT_cond_timedwait(sm->thrd_cond, sm->thrd_lock, + &timeout); + if (ABT_SUCCESS == wait_rc) { LOGDBG("SM got work"); - } else if (ETIMEDOUT != wait_rc) { + } else if (ABT_ERR_COND_TIMEDOUT != wait_rc) { LOGERR("SM work condition wait failed (rc=%d)", wait_rc); }