Skip to content

Commit

Permalink
Merge pull request #482 from nkogteva/master
Browse files Browse the repository at this point in the history
grpcomm: fixed brks and rcd algorithms - added enough space for masks in...
  • Loading branch information
mike-dubman committed Mar 18, 2015
2 parents 50277fe + 7c25b4c commit c68a0ba
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 26 deletions.
2 changes: 2 additions & 0 deletions orte/mca/grpcomm/base/base.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ ORTE_DECLSPEC int orte_grpcomm_API_allgather(orte_grpcomm_signature_t *sig,
void *cbdata);

ORTE_DECLSPEC orte_grpcomm_coll_t* orte_grpcomm_base_get_tracker(orte_grpcomm_signature_t *sig, bool create);
ORTE_DECLSPEC void orte_grpcomm_base_mark_distance_recv(orte_grpcomm_coll_t *coll, uint32_t distance);
ORTE_DECLSPEC unsigned int orte_grpcomm_base_check_distance_recv(orte_grpcomm_coll_t *coll, uint32_t distance);

END_C_DECLS
#endif
5 changes: 4 additions & 1 deletion orte/mca/grpcomm/base/grpcomm_base_frame.c
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ static void ccon(orte_grpcomm_coll_t *p)
p->dmns = NULL;
p->ndmns = 0;
p->nreported = 0;
p->distance_mask_recv = 0;
p->distance_mask_recv = NULL;
p->cbfunc = NULL;
p->cbdata = NULL;
p->buffers = NULL;
Expand All @@ -134,6 +134,9 @@ static void cdes(orte_grpcomm_coll_t *p)
free(p->dmns);
}
free(p->buffers);
if (NULL != p->distance_mask_recv) {
free(p->distance_mask_recv);
}
}
OBJ_CLASS_INSTANCE(orte_grpcomm_coll_t,
opal_list_item_t,
Expand Down
24 changes: 24 additions & 0 deletions orte/mca/grpcomm/base/grpcomm_base_stubs.c
Original file line number Diff line number Diff line change
Expand Up @@ -422,3 +422,27 @@ static int pack_xcast(orte_grpcomm_signature_t *sig,
return ORTE_SUCCESS;
}

void orte_grpcomm_base_mark_distance_recv(orte_grpcomm_coll_t *coll,
uint32_t distance) {
uint32_t maskNumber = distance / 32;
uint32_t bitNumber = distance % 32;

coll->distance_mask_recv[maskNumber] |= (1 << bitNumber);

return;
}

unsigned int orte_grpcomm_base_check_distance_recv(orte_grpcomm_coll_t *coll,
uint32_t distance) {
uint32_t maskNumber = distance / 32;
uint32_t bitNumber = distance % 32;

if (NULL == coll->distance_mask_recv) {
return 0;
} else {
if (0 == distance) {
return 1;
}
return (((coll->distance_mask_recv[maskNumber] & (1 << bitNumber)) != 0) ? 1 : 0);
}
}
25 changes: 13 additions & 12 deletions orte/mca/grpcomm/brks/grpcomm_brks.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "orte/types.h"
#include "orte/runtime/orte_wait.h"

#include <math.h>
#include <string.h>

#include "opal/dss/dss.h"
Expand Down Expand Up @@ -85,10 +86,10 @@ static int allgather(orte_grpcomm_coll_t *coll,
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (int)coll->ndmns));

/* record that we contributed */
coll->nreported += 1;
coll->nreported = 1;

/* mark local data received */
coll->distance_mask_recv |= 1;
coll->distance_mask_recv = (uint32_t *)calloc(sizeof(uint32_t), (coll->ndmns - 1));

/* start by seeding the collection with our own data */
opal_dss.copy_payload(&coll->bucket, sendbuf);
Expand Down Expand Up @@ -173,8 +174,8 @@ static void brks_allgather_process_data(orte_grpcomm_coll_t *coll, uint32_t dist

while (distance < coll->ndmns) {
OPAL_OUTPUT_VERBOSE((80, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:coll:brks process distance %u (mask recv: 0x%x)",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), distance, coll->distance_mask_recv));
"%s grpcomm:coll:brks process distance %u)",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), distance));

/* first send my current contents */
nv = (coll->ndmns + rank - distance) % coll->ndmns;
Expand All @@ -193,7 +194,7 @@ static void brks_allgather_process_data(orte_grpcomm_coll_t *coll, uint32_t dist
return;
}
coll->nreported += distance;
coll->distance_mask_recv |= (uint32_t)(1 << distance);
orte_grpcomm_base_mark_distance_recv(coll, distance);
OBJ_RELEASE(coll->buffers[distance - 1]);
coll->buffers[distance - 1] = NULL;
distance = distance << 1;
Expand Down Expand Up @@ -249,14 +250,14 @@ static void brks_allgather_recv_dist(int status, orte_process_name_t* sender,
brks_finalize_coll(coll, rc);
return;
}
assert(0 == (coll->distance_mask_recv & (uint32_t)(1 << distance)));
assert(0 == orte_grpcomm_base_check_distance_recv(coll, distance));

/* Check whether we can process next distance */
if (coll->distance_mask_recv & ((uint32_t)(1 << (distance >> 1)))) {
if (orte_grpcomm_base_check_distance_recv(coll, (distance >> 1))) {
OPAL_OUTPUT_VERBOSE((80, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:coll:brks data from %d distance received, "
"Process the next distance (mask recv: 0x%x).",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), distance, coll->distance_mask_recv));
"Process the next distance.",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), distance));
/* capture any provided content */
if (OPAL_SUCCESS != (rc = opal_dss.copy_payload(&coll->bucket, buffer))) {
OBJ_RELEASE(sig);
Expand All @@ -265,13 +266,13 @@ static void brks_allgather_recv_dist(int status, orte_process_name_t* sender,
return;
}
coll->nreported += distance;
coll->distance_mask_recv |= (uint32_t)(1 << distance);
orte_grpcomm_base_mark_distance_recv(coll, distance);
brks_allgather_process_data(coll, (uint32_t)(distance << 1));
} else {
OPAL_OUTPUT_VERBOSE((80, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:coll:brks data from %d distance received, "
"still waiting for data (mask recv: 0x%x).",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), distance, coll->distance_mask_recv));
"still waiting for data.",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), distance));
if (NULL == coll->buffers) {
if (NULL == (coll->buffers = (opal_buffer_t **)calloc(sizeof(opal_buffer_t *), coll->ndmns - 1))) {
rc = OPAL_ERR_OUT_OF_RESOURCE;
Expand Down
2 changes: 1 addition & 1 deletion orte/mca/grpcomm/grpcomm.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ typedef struct {
/* number reported in */
size_t nreported;
/* distance masks for receive */
uint32_t distance_mask_recv;
uint32_t *distance_mask_recv;
/* received buckets */
opal_buffer_t ** buffers;
/* callback function */
Expand Down
24 changes: 12 additions & 12 deletions orte/mca/grpcomm/rcd/grpcomm_rcd.c
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,10 @@ static int allgather(orte_grpcomm_coll_t *coll,
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (int)coll->ndmns));

/* record that we contributed */
coll->nreported += 1;
coll->nreported = 1;

/* mark local data received */
coll->distance_mask_recv |= 1;
coll->distance_mask_recv = (uint32_t *)calloc(sizeof(uint32_t), log2(coll->ndmns));

/* start by seeding the collection with our own data */
opal_dss.copy_payload(&coll->bucket, sendbuf);
Expand Down Expand Up @@ -178,8 +178,8 @@ static void rcd_allgather_process_data(orte_grpcomm_coll_t *coll, uint32_t dista

while(distance < coll->ndmns) {
OPAL_OUTPUT_VERBOSE((80, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:coll:recdub process distance %u (mask recv: 0x%x)",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), distance, coll->distance_mask_recv));
"%s grpcomm:coll:recdub process distance %u",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), distance));

/* first send my current contents */
nv = rank ^ distance;
Expand All @@ -199,7 +199,7 @@ static void rcd_allgather_process_data(orte_grpcomm_coll_t *coll, uint32_t dista
return;
}
coll->nreported += distance;
coll->distance_mask_recv |= (uint32_t)(1 << distance);
orte_grpcomm_base_mark_distance_recv(coll, distance);
OBJ_RELEASE(coll->buffers[distance_index]);
coll->buffers[distance_index] = NULL;
distance = distance << 1;
Expand Down Expand Up @@ -255,14 +255,14 @@ static void rcd_allgather_recv_dist(int status, orte_process_name_t* sender,
rcd_finalize_coll(coll, rc);
return;
}
assert(0 == (coll->distance_mask_recv & (uint32_t)(1 << distance)));
assert(0 == orte_grpcomm_base_check_distance_recv(coll, distance));

/* Check whether we can process next distance */
if (coll->distance_mask_recv & ((uint32_t)(1 << (distance >> 1)))) {
if (orte_grpcomm_base_check_distance_recv(coll, (distance >> 1))) {
OPAL_OUTPUT_VERBOSE((80, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:coll:recdub data from %d distance received, "
"Process the next distance (mask recv: 0x%x).",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), distance, coll->distance_mask_recv));
"Process the next distance.",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), distance));
/* capture any provided content */
if (OPAL_SUCCESS != (rc = opal_dss.copy_payload(&coll->bucket, buffer))) {
OBJ_RELEASE(sig);
Expand All @@ -271,13 +271,13 @@ static void rcd_allgather_recv_dist(int status, orte_process_name_t* sender,
return;
}
coll->nreported += distance;
coll->distance_mask_recv |= (uint32_t)(1 << distance);
orte_grpcomm_base_mark_distance_recv(coll, distance);
rcd_allgather_process_data(coll, (uint32_t)(distance << 1));
} else {
OPAL_OUTPUT_VERBOSE((80, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:coll:recdub data from %d distance received, "
"still waiting for data (mask recv: 0x%x).",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), distance, coll->distance_mask_recv));
"still waiting for data.",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), distance));
if (NULL == coll->buffers) {
if (NULL == (coll->buffers = (opal_buffer_t **)calloc(sizeof(opal_buffer_t *), log2(coll->ndmns)))) {
rc = OPAL_ERR_OUT_OF_RESOURCE;
Expand Down

0 comments on commit c68a0ba

Please sign in to comment.