Skip to content

Commit

Permalink
Merge pull request #5742 from hzhou/2112_test_misc
Browse files Browse the repository at this point in the history
misc: fixes and cleanups found during multi-testing

Approved-by: Ken Raffenetti
  • Loading branch information
hzhou authored Jan 6, 2022
2 parents c95687a + 0a73f03 commit a92ce59
Show file tree
Hide file tree
Showing 9 changed files with 75 additions and 56 deletions.
2 changes: 1 addition & 1 deletion maint/extractcvars.in
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ foreach my $p (@cvars) {
# Register the cvar
my $desc = $p->{description};
$desc =~ s/"/\\"/g;
$desc =~ s/\n/\\\n/g;
$desc =~ s/\n/\\n"\n"/g;
printf OUTPUT_C qq( MPIR_T_CVAR_REGISTER_STATIC(\n%s\n%s\n%s\n%s\n%s\n%s\n%s\n%s\n%s);\n),
qq( $mpi_dtype,),
qq( ${uc_ns}_$p->{name}, /* name */),
Expand Down
3 changes: 2 additions & 1 deletion src/mpid/ch4/shm/posix/posix_coll_release_gather.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_POSIX_mpi_bcast_release_gather(void *buffer,
MPI_Aint lb, true_lb, true_extent, extent, type_size;
void *ori_buffer = buffer;
MPI_Datatype ori_datatype = datatype;
int cellsize = MPIDI_POSIX_RELEASE_GATHER_BCAST_CELLSIZE;

/* If there is only one process or no data, return */
if (count == 0 || (MPIR_Comm_size(comm_ptr) == 1)) {
Expand Down Expand Up @@ -105,6 +104,8 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_POSIX_mpi_bcast_release_gather(void *buffer,
}
}
}

int cellsize = MPIDI_POSIX_RELEASE_GATHER_BCAST_CELLSIZE;
#ifdef HAVE_ERROR_CHECKING
/* When error checking is enabled, only (cellsize-(2*cacheline_size)) bytes are reserved for data.
* Initial 2 cacheline_size bytes are reserved to put the amount of data being placed and the
Expand Down
82 changes: 43 additions & 39 deletions src/mpid/ch4/shm/posix/release_gather/release_gather.c
Original file line number Diff line number Diff line change
Expand Up @@ -136,37 +136,27 @@
#include "topotree.h"
#include "topotree_util.h"

#define RELEASE_GATHER_FIELD(comm, field) \
MPIDI_POSIX_COMM(comm, release_gather).field


MPIDI_POSIX_release_gather_tree_type_t MPIDI_POSIX_Bcast_tree_type, MPIDI_POSIX_Reduce_tree_type;

static int get_tree_type(const char *tree_type_name)
{
if (0 == strcmp(tree_type_name, "kary"))
return MPIDI_POSIX_RELEASE_GATHER_TREE_TYPE_KARY;
else if (0 == strcmp(tree_type_name, "knomial_1"))
return MPIDI_POSIX_RELEASE_GATHER_TREE_TYPE_KNOMIAL_1;
else if (0 == strcmp(tree_type_name, "knomial_2"))
return MPIDI_POSIX_RELEASE_GATHER_TREE_TYPE_KNOMIAL_2;
else
return MPIDI_POSIX_RELEASE_GATHER_TREE_TYPE_KARY;
}

/* Initialize the release_gather struct to NULL */
int MPIDI_POSIX_mpi_release_gather_comm_init_null(MPIR_Comm * comm_ptr)
{
MPIR_FUNC_ENTER;

RELEASE_GATHER_FIELD(comm_ptr, num_collective_calls) = 0;

if (0 == strcmp(MPIR_CVAR_BCAST_INTRANODE_TREE_TYPE, "kary"))
MPIDI_POSIX_Bcast_tree_type = MPIDI_POSIX_RELEASE_GATHER_TREE_TYPE_KARY;
else if (0 == strcmp(MPIR_CVAR_BCAST_INTRANODE_TREE_TYPE, "knomial_1"))
MPIDI_POSIX_Bcast_tree_type = MPIDI_POSIX_RELEASE_GATHER_TREE_TYPE_KNOMIAL_1;
else if (0 == strcmp(MPIR_CVAR_BCAST_INTRANODE_TREE_TYPE, "knomial_2"))
MPIDI_POSIX_Bcast_tree_type = MPIDI_POSIX_RELEASE_GATHER_TREE_TYPE_KNOMIAL_2;
else
MPIDI_POSIX_Bcast_tree_type = MPIDI_POSIX_RELEASE_GATHER_TREE_TYPE_KARY;

if (0 == strcmp(MPIR_CVAR_REDUCE_INTRANODE_TREE_TYPE, "kary"))
MPIDI_POSIX_Reduce_tree_type = MPIDI_POSIX_RELEASE_GATHER_TREE_TYPE_KARY;
else if (0 == strcmp(MPIR_CVAR_REDUCE_INTRANODE_TREE_TYPE, "knomial_1"))
MPIDI_POSIX_Reduce_tree_type = MPIDI_POSIX_RELEASE_GATHER_TREE_TYPE_KNOMIAL_1;
else if (0 == strcmp(MPIR_CVAR_REDUCE_INTRANODE_TREE_TYPE, "knomial_2"))
MPIDI_POSIX_Reduce_tree_type = MPIDI_POSIX_RELEASE_GATHER_TREE_TYPE_KNOMIAL_2;
else
MPIDI_POSIX_Reduce_tree_type = MPIDI_POSIX_RELEASE_GATHER_TREE_TYPE_KARY;

RELEASE_GATHER_FIELD(comm_ptr, is_initialized) = 0;

MPIR_FUNC_EXIT;
Expand Down Expand Up @@ -202,6 +192,20 @@ int MPIDI_POSIX_mpi_release_gather_comm_init(MPIR_Comm * comm_ptr,
* reduce buffer (divided into multiple cells) per rank. */

if (RELEASE_GATHER_FIELD(comm_ptr, is_initialized) == 0) {
RELEASE_GATHER_FIELD(comm_ptr, is_initialized) = 1;
/* CVARs may get updated. Turn them into per-comm settings */
RELEASE_GATHER_FIELD(comm_ptr, bcast_tree_type) =
get_tree_type(MPIR_CVAR_BCAST_INTRANODE_TREE_TYPE);
RELEASE_GATHER_FIELD(comm_ptr, bcast_tree_kval) = MPIR_CVAR_BCAST_INTRANODE_TREE_KVAL;
RELEASE_GATHER_FIELD(comm_ptr, bcast_shm_size) =
MPIR_CVAR_BCAST_INTRANODE_BUFFER_TOTAL_SIZE;
RELEASE_GATHER_FIELD(comm_ptr, bcast_num_cells) = MPIR_CVAR_BCAST_INTRANODE_NUM_CELLS;
RELEASE_GATHER_FIELD(comm_ptr, reduce_tree_type) =
get_tree_type(MPIR_CVAR_REDUCE_INTRANODE_TREE_TYPE);
RELEASE_GATHER_FIELD(comm_ptr, reduce_tree_kval) = MPIR_CVAR_REDUCE_INTRANODE_TREE_KVAL;
RELEASE_GATHER_FIELD(comm_ptr, reduce_shm_size) =
MPIR_CVAR_REDUCE_INTRANODE_BUFFER_TOTAL_SIZE;
RELEASE_GATHER_FIELD(comm_ptr, reduce_num_cells) = MPIR_CVAR_REDUCE_INTRANODE_NUM_CELLS;
/* release_gather based collectives have not been used before on this comm */
initialize_flags = true;
if (operation == MPIDI_POSIX_RELEASE_GATHER_OPCODE_BCAST) {
Expand Down Expand Up @@ -245,10 +249,10 @@ int MPIDI_POSIX_mpi_release_gather_comm_init(MPIR_Comm * comm_ptr,
}

if (initialize_bcast_buf) {
memory_to_be_allocated += MPIR_CVAR_BCAST_INTRANODE_BUFFER_TOTAL_SIZE;
memory_to_be_allocated += RELEASE_GATHER_FIELD(comm_ptr, bcast_shm_size);
}
if (initialize_reduce_buf) {
memory_to_be_allocated += (num_ranks * MPIR_CVAR_REDUCE_INTRANODE_BUFFER_TOTAL_SIZE);
memory_to_be_allocated += (num_ranks * RELEASE_GATHER_FIELD(comm_ptr, reduce_shm_size));
}

if (rank == 0) {
Expand Down Expand Up @@ -290,10 +294,11 @@ int MPIDI_POSIX_mpi_release_gather_comm_init(MPIR_Comm * comm_ptr,
/* Topology aware trees are created only when the user has specified process binding */
if (MPIR_hwtopo_is_initialized()) {
mpi_errno =
MPIDI_SHM_topology_tree_init(comm_ptr, 0, MPIR_CVAR_BCAST_INTRANODE_TREE_KVAL,
MPIDI_SHM_topology_tree_init(comm_ptr, 0,
RELEASE_GATHER_FIELD(comm_ptr, bcast_tree_kval),
&release_gather_info_ptr->bcast_tree,
&topotree_fail[0],
MPIR_CVAR_REDUCE_INTRANODE_TREE_KVAL,
RELEASE_GATHER_FIELD(comm_ptr, reduce_tree_kval),
&release_gather_info_ptr->reduce_tree,
&topotree_fail[1], &errflag);
MPIR_ERR_COLL_CHECKANDCONT(mpi_errno, errflag);
Expand Down Expand Up @@ -322,8 +327,9 @@ int MPIDI_POSIX_mpi_release_gather_comm_init(MPIR_Comm * comm_ptr,
if (topotree_fail[0] == 1)
MPIR_Treealgo_tree_free(&release_gather_info_ptr->bcast_tree);
mpi_errno =
MPIR_Treealgo_tree_create(rank, num_ranks, MPIDI_POSIX_Bcast_tree_type,
MPIR_CVAR_BCAST_INTRANODE_TREE_KVAL, 0,
MPIR_Treealgo_tree_create(rank, num_ranks,
RELEASE_GATHER_FIELD(comm_ptr, bcast_tree_type),
RELEASE_GATHER_FIELD(comm_ptr, bcast_tree_kval), 0,
&release_gather_info_ptr->bcast_tree);
MPIR_ERR_COLL_CHECKANDCONT(mpi_errno, errflag);
}
Expand All @@ -332,14 +338,16 @@ int MPIDI_POSIX_mpi_release_gather_comm_init(MPIR_Comm * comm_ptr,
if (topotree_fail[1] == 1)
MPIR_Treealgo_tree_free(&release_gather_info_ptr->reduce_tree);
mpi_errno =
MPIR_Treealgo_tree_create(rank, num_ranks, MPIDI_POSIX_Reduce_tree_type,
MPIR_CVAR_REDUCE_INTRANODE_TREE_KVAL, 0,
MPIR_Treealgo_tree_create(rank, num_ranks,
RELEASE_GATHER_FIELD(comm_ptr, reduce_tree_type),
RELEASE_GATHER_FIELD(comm_ptr, reduce_tree_kval), 0,
&release_gather_info_ptr->reduce_tree);
MPIR_ERR_COLL_CHECKANDCONT(mpi_errno, errflag);
}

release_gather_info_ptr->gather_state = release_gather_info_ptr->release_state
= MPIR_CVAR_BCAST_INTRANODE_NUM_CELLS + MPIR_CVAR_REDUCE_INTRANODE_NUM_CELLS;
release_gather_info_ptr->gather_state = release_gather_info_ptr->release_state =
RELEASE_GATHER_FIELD(comm_ptr, bcast_num_cells) +
RELEASE_GATHER_FIELD(comm_ptr, reduce_num_cells);

release_gather_info_ptr->bcast_buf_addr = NULL;
release_gather_info_ptr->reduce_buf_addr = NULL;
Expand Down Expand Up @@ -374,15 +382,13 @@ int MPIDI_POSIX_mpi_release_gather_comm_init(MPIR_Comm * comm_ptr,
}

if (initialize_bcast_buf) {
RELEASE_GATHER_FIELD(comm_ptr, bcast_shm_size) =
MPIR_CVAR_BCAST_INTRANODE_BUFFER_TOTAL_SIZE;
if (rank == 0)
MPL_atomic_fetch_add_uint64(MPIDI_POSIX_shm_limit_counter,
RELEASE_GATHER_FIELD(comm_ptr, bcast_shm_size));

/* Allocate the shared memory for bcast buffer */
mpi_errno =
MPIDU_shm_alloc(comm_ptr, MPIR_CVAR_BCAST_INTRANODE_BUFFER_TOTAL_SIZE,
MPIDU_shm_alloc(comm_ptr, RELEASE_GATHER_FIELD(comm_ptr, bcast_shm_size),
(void **) &(RELEASE_GATHER_FIELD(comm_ptr, bcast_buf_addr)),
&mapfail_flag);
if (mapfail_flag) {
Expand All @@ -397,14 +403,12 @@ int MPIDI_POSIX_mpi_release_gather_comm_init(MPIR_Comm * comm_ptr,
RELEASE_GATHER_FIELD(comm_ptr, child_reduce_buf_addr) =
MPL_malloc(num_ranks * sizeof(void *), MPL_MEM_COLL);

RELEASE_GATHER_FIELD(comm_ptr, reduce_shm_size) =
MPIR_CVAR_BCAST_INTRANODE_BUFFER_TOTAL_SIZE;
if (rank == 0)
MPL_atomic_fetch_add_uint64(MPIDI_POSIX_shm_limit_counter,
RELEASE_GATHER_FIELD(comm_ptr, reduce_shm_size));

mpi_errno =
MPIDU_shm_alloc(comm_ptr, num_ranks * MPIR_CVAR_REDUCE_INTRANODE_BUFFER_TOTAL_SIZE,
MPIDU_shm_alloc(comm_ptr, num_ranks * RELEASE_GATHER_FIELD(comm_ptr, reduce_shm_size),
(void **) &(RELEASE_GATHER_FIELD(comm_ptr, reduce_buf_addr)),
&mapfail_flag);
if (mapfail_flag) {
Expand All @@ -422,7 +426,7 @@ int MPIDI_POSIX_mpi_release_gather_comm_init(MPIR_Comm * comm_ptr,
(char *) RELEASE_GATHER_FIELD(comm_ptr,
reduce_buf_addr) +
((*utarray_eltptr(RELEASE_GATHER_FIELD(comm_ptr, reduce_tree.children), i))
* MPIR_CVAR_REDUCE_INTRANODE_BUFFER_TOTAL_SIZE);
* RELEASE_GATHER_FIELD(comm_ptr, reduce_shm_size));
}
}

Expand Down
26 changes: 16 additions & 10 deletions src/mpid/ch4/shm/posix/release_gather/release_gather.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ extern MPL_shm_hnd_t shm_limit_handle;
extern MPIDI_POSIX_release_gather_tree_type_t MPIDI_POSIX_Bcast_tree_type,
MPIDI_POSIX_Reduce_tree_type;

#define RELEASE_GATHER_FIELD(comm, field) \
MPIDI_POSIX_COMM(comm, release_gather).field

/* Blocking wait implementation */
/* "acquire" makes sure no writes/reads are reordered before this load */
#define MPIDI_POSIX_RELEASE_GATHER_WAIT_WHILE_LESS_THAN(ptr, value) \
Expand Down Expand Up @@ -41,12 +44,12 @@ extern MPIDI_POSIX_release_gather_tree_type_t MPIDI_POSIX_Bcast_tree_type,
(buf * MPIDI_POSIX_RELEASE_GATHER_BCAST_CELLSIZE)
#define MPIDI_POSIX_RELEASE_GATHER_REDUCE_DATA_ADDR(rank, buf) \
(((char *) release_gather_info_ptr->reduce_buf_addr) + \
(rank * MPIR_CVAR_REDUCE_INTRANODE_BUFFER_TOTAL_SIZE) + \
(rank * RELEASE_GATHER_FIELD(comm_ptr, reduce_shm_size)) + \
(buf * MPIDI_POSIX_RELEASE_GATHER_REDUCE_CELLSIZE))
#define MPIDI_POSIX_RELEASE_GATHER_BCAST_CELLSIZE \
(MPIR_CVAR_BCAST_INTRANODE_BUFFER_TOTAL_SIZE / MPIR_CVAR_BCAST_INTRANODE_NUM_CELLS)
(RELEASE_GATHER_FIELD(comm_ptr, bcast_shm_size) / RELEASE_GATHER_FIELD(comm_ptr, bcast_num_cells))
#define MPIDI_POSIX_RELEASE_GATHER_REDUCE_CELLSIZE \
(MPIR_CVAR_REDUCE_INTRANODE_BUFFER_TOTAL_SIZE / MPIR_CVAR_REDUCE_INTRANODE_NUM_CELLS)
(RELEASE_GATHER_FIELD(comm_ptr, reduce_shm_size) / RELEASE_GATHER_FIELD(comm_ptr, reduce_num_cells))

int MPIDI_POSIX_mpi_release_gather_comm_init_null(MPIR_Comm * comm_ptr);
int MPIDI_POSIX_mpi_release_gather_comm_init(MPIR_Comm * comm_ptr,
Expand Down Expand Up @@ -79,15 +82,16 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_POSIX_mpi_release_gather_release(void *local_
* buffers can be used to pipeline the copying in and out of shared memory, and data is not
* overwritten */
const int relaxation =
(operation ==
MPIDI_POSIX_RELEASE_GATHER_OPCODE_REDUCE) ? MPIR_CVAR_REDUCE_INTRANODE_NUM_CELLS - 1 : 0;
(operation == MPIDI_POSIX_RELEASE_GATHER_OPCODE_REDUCE) ?
RELEASE_GATHER_FIELD(comm_ptr, reduce_num_cells) - 1 : 0;

rank = MPIR_Comm_rank(comm_ptr);
release_gather_info_ptr = &MPIDI_POSIX_COMM(comm_ptr, release_gather);
release_gather_info_ptr->release_state++;

if (operation == MPIDI_POSIX_RELEASE_GATHER_OPCODE_BCAST) {
segment = release_gather_info_ptr->release_state % MPIR_CVAR_BCAST_INTRANODE_NUM_CELLS;
segment = release_gather_info_ptr->release_state %
RELEASE_GATHER_FIELD(comm_ptr, bcast_num_cells);
bcast_data_addr = MPIDI_POSIX_RELEASE_GATHER_BCAST_DATA_ADDR(segment);

if (root != 0) {
Expand Down Expand Up @@ -156,7 +160,8 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_POSIX_mpi_release_gather_release(void *local_

if (operation == MPIDI_POSIX_RELEASE_GATHER_OPCODE_ALLREDUCE) {
/* For allreduce, ranks directly copy the data from the reduce buffer of rank 0 */
segment = release_gather_info_ptr->release_state % MPIR_CVAR_REDUCE_INTRANODE_NUM_CELLS;
segment = release_gather_info_ptr->release_state %
RELEASE_GATHER_FIELD(comm_ptr, reduce_num_cells);
bcast_data_addr = MPIDI_POSIX_RELEASE_GATHER_REDUCE_DATA_ADDR(0, segment);
}

Expand Down Expand Up @@ -256,8 +261,8 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_POSIX_mpi_release_gather_gather(const void *i
* buffers can be used to pipeline the copying in and out of shared memory, and data is not
* overwritten */
const int relaxation =
(operation ==
MPIDI_POSIX_RELEASE_GATHER_OPCODE_BCAST) ? MPIR_CVAR_BCAST_INTRANODE_NUM_CELLS - 1 : 0;
(operation == MPIDI_POSIX_RELEASE_GATHER_OPCODE_BCAST) ?
RELEASE_GATHER_FIELD(comm_ptr, bcast_num_cells) - 1 : 0;
uint64_t min_gather, child_gather_flag;
UT_array *children;
void *temp_recvbuf = NULL;
Expand All @@ -269,7 +274,8 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_POSIX_mpi_release_gather_gather(const void *i

release_gather_info_ptr->gather_state++;
min_gather = release_gather_info_ptr->gather_state;
segment = release_gather_info_ptr->gather_state % MPIR_CVAR_REDUCE_INTRANODE_NUM_CELLS;
segment = release_gather_info_ptr->gather_state %
RELEASE_GATHER_FIELD(comm_ptr, reduce_num_cells);

if (operation == MPIDI_POSIX_RELEASE_GATHER_OPCODE_REDUCE ||
operation == MPIDI_POSIX_RELEASE_GATHER_OPCODE_ALLREDUCE) {
Expand Down
7 changes: 7 additions & 0 deletions src/mpid/ch4/shm/posix/release_gather/release_gather_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ typedef struct MPIDI_POSIX_release_gather_comm_t {
void *flags_addr, *bcast_buf_addr, *reduce_buf_addr;
void **child_reduce_buf_addr;
MPL_atomic_uint64_t *release_flag_addr, *gather_flag_addr;

/* parameters need persist for each communicator */
int bcast_tree_type, bcast_tree_kval;
int bcast_num_cells;

int reduce_tree_type, reduce_tree_kval;
int reduce_num_cells;
} MPIDI_POSIX_release_gather_comm_t;

#endif /* RELEASE_GATHER_TYPES_H_INCLUDED */
2 changes: 1 addition & 1 deletion test/mpi/coll/op_coll.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
#define my_assert(cond_) \
do { \
if (!(cond_)) { \
fprintf(stderr, "assertion (%s) failed, aborting\n", #cond_); \
printf("assertion (%s) failed at line %d, aborting\n", #cond_, __LINE__); \
MPI_Abort(MPI_COMM_WORLD, 1); \
} \
} while (0)
Expand Down
2 changes: 1 addition & 1 deletion test/mpi/errors/pt2pt/testlist
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ proberank 1
truncmsg1 2
truncmsg1 2 env=MPIR_CVAR_CH4_OFI_EAGER_MAX_MSG_SIZE=16384
truncmsg2 2
truncmsg_mrecv 2 mpiversion=3.0
truncmsg_mrecv 2
# multiple completion ests
errinstatts 2
errinstatta 2
Expand Down
3 changes: 2 additions & 1 deletion test/mpi/errors/pt2pt/truncmsg_mrecv.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ int main(int argc, char *argv[])
fprintf(stderr, "Unable to allocate communication buffer of size %d\n", LongLen);
MPI_Abort(MPI_COMM_WORLD, 1);
}

#if MTEST_HAVE_MIN_MPI_VERSION(3,0)
if (testShort) {
if (rank == source) {
err = MPI_Send(buf, ShortLen, MPI_INT, dest, 0, MPI_COMM_WORLD);
Expand Down Expand Up @@ -77,6 +77,7 @@ int main(int argc, char *argv[])
errs += checkTruncError(err, "long");
}
}
#endif

free(buf);
MTest_Finalize(errs);
Expand Down
4 changes: 2 additions & 2 deletions test/mpi/maint/coll_cvars.txt
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ tests:
neighbor_alltoallw:
intra-blocking:
neighb_alltoallw 4
persistent:
p_neighb_alltoallw 4
reduce:
intra-blocking:
reduce 5
Expand All @@ -139,7 +141,6 @@ tests:
redscat3 8
persistent:
p_redscat 4
redscat 4
reduce_scatter_block:
intra-blocking:
red_scat_block 4
Expand All @@ -155,7 +156,6 @@ tests:
scan:
intra-blocking:
scantst 4
op_coll 4
persistent:
p_scan 4
scatter:
Expand Down

0 comments on commit a92ce59

Please sign in to comment.