From 7c25b4cea6640113230540cadc9abeeeda813446 Mon Sep 17 00:00:00 2001 From: Nadezhda Kogteva Date: Tue, 17 Mar 2015 17:12:45 +0200 Subject: [PATCH] grpcomm: fixed brks and rcd algorithms - added enough space for masks in order to get them working in the large scale. --- orte/mca/grpcomm/base/base.h | 2 ++ orte/mca/grpcomm/base/grpcomm_base_frame.c | 5 ++++- orte/mca/grpcomm/base/grpcomm_base_stubs.c | 24 +++++++++++++++++++++ orte/mca/grpcomm/brks/grpcomm_brks.c | 25 +++++++++++----------- orte/mca/grpcomm/grpcomm.h | 2 +- orte/mca/grpcomm/rcd/grpcomm_rcd.c | 24 ++++++++++----------- 6 files changed, 56 insertions(+), 26 deletions(-) diff --git a/orte/mca/grpcomm/base/base.h b/orte/mca/grpcomm/base/base.h index c25b22d3b88..08790f8569f 100644 --- a/orte/mca/grpcomm/base/base.h +++ b/orte/mca/grpcomm/base/base.h @@ -84,6 +84,8 @@ ORTE_DECLSPEC int orte_grpcomm_API_allgather(orte_grpcomm_signature_t *sig, void *cbdata); ORTE_DECLSPEC orte_grpcomm_coll_t* orte_grpcomm_base_get_tracker(orte_grpcomm_signature_t *sig, bool create); +ORTE_DECLSPEC void orte_grpcomm_base_mark_distance_recv(orte_grpcomm_coll_t *coll, uint32_t distance); +ORTE_DECLSPEC unsigned int orte_grpcomm_base_check_distance_recv(orte_grpcomm_coll_t *coll, uint32_t distance); END_C_DECLS #endif diff --git a/orte/mca/grpcomm/base/grpcomm_base_frame.c b/orte/mca/grpcomm/base/grpcomm_base_frame.c index 424b85b0cd3..b25c17927ce 100644 --- a/orte/mca/grpcomm/base/grpcomm_base_frame.c +++ b/orte/mca/grpcomm/base/grpcomm_base_frame.c @@ -119,7 +119,7 @@ static void ccon(orte_grpcomm_coll_t *p) p->dmns = NULL; p->ndmns = 0; p->nreported = 0; - p->distance_mask_recv = 0; + p->distance_mask_recv = NULL; p->cbfunc = NULL; p->cbdata = NULL; p->buffers = NULL; @@ -134,6 +134,9 @@ static void cdes(orte_grpcomm_coll_t *p) free(p->dmns); } free(p->buffers); + if (NULL != p->distance_mask_recv) { + free(p->distance_mask_recv); + } } OBJ_CLASS_INSTANCE(orte_grpcomm_coll_t, opal_list_item_t, diff --git a/orte/mca/grpcomm/base/grpcomm_base_stubs.c b/orte/mca/grpcomm/base/grpcomm_base_stubs.c index 7210c02e410..97e78193637 100644 --- a/orte/mca/grpcomm/base/grpcomm_base_stubs.c +++ b/orte/mca/grpcomm/base/grpcomm_base_stubs.c @@ -422,3 +422,27 @@ static int pack_xcast(orte_grpcomm_signature_t *sig, return ORTE_SUCCESS; } +void orte_grpcomm_base_mark_distance_recv(orte_grpcomm_coll_t *coll, + uint32_t distance) { + uint32_t maskNumber = distance / 32; + uint32_t bitNumber = distance % 32; + + coll->distance_mask_recv[maskNumber] |= (1 << bitNumber); + + return; +} + +unsigned int orte_grpcomm_base_check_distance_recv(orte_grpcomm_coll_t *coll, + uint32_t distance) { + uint32_t maskNumber = distance / 32; + uint32_t bitNumber = distance % 32; + + if (NULL == coll->distance_mask_recv) { + return 0; + } else { + if (0 == distance) { + return 1; + } + return (((coll->distance_mask_recv[maskNumber] & (1 << bitNumber)) != 0) ? 1 : 0); + } +} diff --git a/orte/mca/grpcomm/brks/grpcomm_brks.c b/orte/mca/grpcomm/brks/grpcomm_brks.c index 7b6221cbcaf..902a6833cb0 100644 --- a/orte/mca/grpcomm/brks/grpcomm_brks.c +++ b/orte/mca/grpcomm/brks/grpcomm_brks.c @@ -20,6 +20,7 @@ #include "orte/types.h" #include "orte/runtime/orte_wait.h" +#include #include #include "opal/dss/dss.h" @@ -85,10 +86,10 @@ static int allgather(orte_grpcomm_coll_t *coll, ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (int)coll->ndmns)); /* record that we contributed */ - coll->nreported += 1; + coll->nreported = 1; /* mark local data received */ - coll->distance_mask_recv |= 1; + coll->distance_mask_recv = (uint32_t *)calloc(sizeof(uint32_t), (coll->ndmns - 1)); /* start by seeding the collection with our own data */ opal_dss.copy_payload(&coll->bucket, sendbuf); @@ -173,8 +174,8 @@ static void brks_allgather_process_data(orte_grpcomm_coll_t *coll, uint32_t dist while (distance < coll->ndmns) { OPAL_OUTPUT_VERBOSE((80, orte_grpcomm_base_framework.framework_output, - "%s grpcomm:coll:brks process distance %u (mask recv: 0x%x)", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), distance, coll->distance_mask_recv)); + "%s grpcomm:coll:brks process distance %u)", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), distance)); /* first send my current contents */ nv = (coll->ndmns + rank - distance) % coll->ndmns; @@ -193,7 +194,7 @@ static void brks_allgather_process_data(orte_grpcomm_coll_t *coll, uint32_t dist return; } coll->nreported += distance; - coll->distance_mask_recv |= (uint32_t)(1 << distance); + orte_grpcomm_base_mark_distance_recv(coll, distance); OBJ_RELEASE(coll->buffers[distance - 1]); coll->buffers[distance - 1] = NULL; distance = distance << 1; @@ -249,14 +250,14 @@ static void brks_allgather_recv_dist(int status, orte_process_name_t* sender, brks_finalize_coll(coll, rc); return; } - assert(0 == (coll->distance_mask_recv & (uint32_t)(1 << distance))); + assert(0 == orte_grpcomm_base_check_distance_recv(coll, distance)); /* Check whether we can process next distance */ - if (coll->distance_mask_recv & ((uint32_t)(1 << (distance >> 1)))) { + if (orte_grpcomm_base_check_distance_recv(coll, (distance >> 1))) { OPAL_OUTPUT_VERBOSE((80, orte_grpcomm_base_framework.framework_output, "%s grpcomm:coll:brks data from %d distance received, " - "Process the next distance (mask recv: 0x%x).", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), distance, coll->distance_mask_recv)); + "Process the next distance.", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), distance)); /* capture any provided content */ if (OPAL_SUCCESS != (rc = opal_dss.copy_payload(&coll->bucket, buffer))) { OBJ_RELEASE(sig); @@ -265,13 +266,13 @@ static void brks_allgather_recv_dist(int status, orte_process_name_t* sender, return; } coll->nreported += distance; - coll->distance_mask_recv |= (uint32_t)(1 << distance); + orte_grpcomm_base_mark_distance_recv(coll, distance); brks_allgather_process_data(coll, (uint32_t)(distance << 1)); } else { OPAL_OUTPUT_VERBOSE((80, orte_grpcomm_base_framework.framework_output, "%s grpcomm:coll:brks data from %d distance received, " - "still waiting for data (mask recv: 0x%x).", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), distance, coll->distance_mask_recv)); + "still waiting for data.", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), distance)); if (NULL == coll->buffers) { if (NULL == (coll->buffers = (opal_buffer_t **)calloc(sizeof(opal_buffer_t *), coll->ndmns - 1))) { rc = OPAL_ERR_OUT_OF_RESOURCE; diff --git a/orte/mca/grpcomm/grpcomm.h b/orte/mca/grpcomm/grpcomm.h index 9f54c9eaacf..91ee540fb79 100644 --- a/orte/mca/grpcomm/grpcomm.h +++ b/orte/mca/grpcomm/grpcomm.h @@ -75,7 +75,7 @@ typedef struct { /* number reported in */ size_t nreported; /* distance masks for receive */ - uint32_t distance_mask_recv; + uint32_t *distance_mask_recv; /* received buckets */ opal_buffer_t ** buffers; /* callback function */ diff --git a/orte/mca/grpcomm/rcd/grpcomm_rcd.c b/orte/mca/grpcomm/rcd/grpcomm_rcd.c index 74dd1bdf473..bf0a794b7fa 100644 --- a/orte/mca/grpcomm/rcd/grpcomm_rcd.c +++ b/orte/mca/grpcomm/rcd/grpcomm_rcd.c @@ -93,10 +93,10 @@ static int allgather(orte_grpcomm_coll_t *coll, ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (int)coll->ndmns)); /* record that we contributed */ - coll->nreported += 1; + coll->nreported = 1; /* mark local data received */ - coll->distance_mask_recv |= 1; + coll->distance_mask_recv = (uint32_t *)calloc(sizeof(uint32_t), log2(coll->ndmns)); /* start by seeding the collection with our own data */ opal_dss.copy_payload(&coll->bucket, sendbuf); @@ -178,8 +178,8 @@ static void rcd_allgather_process_data(orte_grpcomm_coll_t *coll, uint32_t dista while(distance < coll->ndmns) { OPAL_OUTPUT_VERBOSE((80, orte_grpcomm_base_framework.framework_output, - "%s grpcomm:coll:recdub process distance %u (mask recv: 0x%x)", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), distance, coll->distance_mask_recv)); + "%s grpcomm:coll:recdub process distance %u", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), distance)); /* first send my current contents */ nv = rank ^ distance; @@ -199,7 +199,7 @@ static void rcd_allgather_process_data(orte_grpcomm_coll_t *coll, uint32_t dista return; } coll->nreported += distance; - coll->distance_mask_recv |= (uint32_t)(1 << distance); + orte_grpcomm_base_mark_distance_recv(coll, distance); OBJ_RELEASE(coll->buffers[distance_index]); coll->buffers[distance_index] = NULL; distance = distance << 1; @@ -255,14 +255,14 @@ static void rcd_allgather_recv_dist(int status, orte_process_name_t* sender, rcd_finalize_coll(coll, rc); return; } - assert(0 == (coll->distance_mask_recv & (uint32_t)(1 << distance))); + assert(0 == orte_grpcomm_base_check_distance_recv(coll, distance)); /* Check whether we can process next distance */ - if (coll->distance_mask_recv & ((uint32_t)(1 << (distance >> 1)))) { + if (orte_grpcomm_base_check_distance_recv(coll, (distance >> 1))) { OPAL_OUTPUT_VERBOSE((80, orte_grpcomm_base_framework.framework_output, "%s grpcomm:coll:recdub data from %d distance received, " - "Process the next distance (mask recv: 0x%x).", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), distance, coll->distance_mask_recv)); + "Process the next distance.", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), distance)); /* capture any provided content */ if (OPAL_SUCCESS != (rc = opal_dss.copy_payload(&coll->bucket, buffer))) { OBJ_RELEASE(sig); @@ -271,13 +271,13 @@ static void rcd_allgather_recv_dist(int status, orte_process_name_t* sender, return; } coll->nreported += distance; - coll->distance_mask_recv |= (uint32_t)(1 << distance); + orte_grpcomm_base_mark_distance_recv(coll, distance); rcd_allgather_process_data(coll, (uint32_t)(distance << 1)); } else { OPAL_OUTPUT_VERBOSE((80, orte_grpcomm_base_framework.framework_output, "%s grpcomm:coll:recdub data from %d distance received, " - "still waiting for data (mask recv: 0x%x).", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), distance, coll->distance_mask_recv)); + "still waiting for data.", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), distance)); if (NULL == coll->buffers) { if (NULL == (coll->buffers = (opal_buffer_t **)calloc(sizeof(opal_buffer_t *), log2(coll->ndmns)))) { rc = OPAL_ERR_OUT_OF_RESOURCE;