Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement selection vector I/O with collective chunk filling #3826

Merged
merged 14 commits into from
Nov 16, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
269 changes: 45 additions & 224 deletions src/H5Dchunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,6 @@ static herr_t H5D__chunk_prune_fill(H5D_chunk_it_ud1_t *udata, bool new_unfilt
#ifdef H5_HAVE_PARALLEL
static herr_t H5D__chunk_collective_fill(const H5D_t *dset, H5D_chunk_coll_fill_info_t *chunk_fill_info,
const void *fill_buf, const void *partial_chunk_fill_buf);
static int H5D__chunk_cmp_coll_fill_info(const void *_entry1, const void *_entry2);
#endif /* H5_HAVE_PARALLEL */

/* Debugging helper routine callback */
Expand Down Expand Up @@ -5536,9 +5535,7 @@ H5D__chunk_update_old_edge_chunks(H5D_t *dset, hsize_t old_dim[])
/*-------------------------------------------------------------------------
* Function: H5D__chunk_collective_fill
*
* Purpose: Use MPIO collective write to fill the chunks (if number of
* chunks to fill is greater than the number of MPI procs;
* otherwise use independent I/O).
* Purpose: Use MPIO selection vector I/O for writing fill chunks
*
* Return: Non-negative on success/Negative on failure
*
Expand All @@ -5548,211 +5545,60 @@ static herr_t
H5D__chunk_collective_fill(const H5D_t *dset, H5D_chunk_coll_fill_info_t *chunk_fill_info,
const void *fill_buf, const void *partial_chunk_fill_buf)
{
MPI_Comm mpi_comm = MPI_COMM_NULL; /* MPI communicator for file */
int mpi_rank = (-1); /* This process's rank */
int mpi_size = (-1); /* MPI Comm size */
int mpi_code; /* MPI return code */
size_t num_blocks; /* Number of blocks between processes. */
size_t leftover_blocks; /* Number of leftover blocks to handle */
int blocks, leftover; /* converted to int for MPI */
MPI_Aint *chunk_disp_array = NULL;
MPI_Aint *block_disps = NULL;
int *block_lens = NULL;
MPI_Datatype mem_type = MPI_BYTE, file_type = MPI_BYTE;
H5FD_mpio_xfer_t prev_xfer_mode; /* Previous data xfer mode */
bool have_xfer_mode = false; /* Whether the previous xffer mode has been retrieved */
bool need_sort = false;
size_t i; /* Local index variable */
size_t i; /* Local index variable */
uint32_t io_count = 0;
haddr_t *io_addrs = NULL;
size_t *io_sizes = NULL;
const void **io_wbufs = NULL;
H5FD_mem_t io_types[2];
bool all_same_block_len = true;
size_t io_2sizes[2];
herr_t ret_value = SUCCEED; /* Return value */

FUNC_ENTER_PACKAGE

/*
* If a separate fill buffer is provided for partial chunks, ensure
* that the "don't filter partial edge chunks" flag is set.
*/
if (partial_chunk_fill_buf)
assert(dset->shared->layout.u.chunk.flags & H5O_LAYOUT_CHUNK_DONT_FILTER_PARTIAL_BOUND_CHUNKS);

/* Get the MPI communicator */
if (MPI_COMM_NULL == (mpi_comm = H5F_mpi_get_comm(dset->oloc.file)))
HGOTO_ERROR(H5E_INTERNAL, H5E_MPI, FAIL, "Can't retrieve MPI communicator");

/* Get the MPI rank */
if ((mpi_rank = H5F_mpi_get_rank(dset->oloc.file)) < 0)
HGOTO_ERROR(H5E_INTERNAL, H5E_MPI, FAIL, "Can't retrieve MPI rank");
assert(chunk_fill_info->num_chunks != 0);

/* Get the MPI size */
if ((mpi_size = H5F_mpi_get_size(dset->oloc.file)) < 0)
HGOTO_ERROR(H5E_INTERNAL, H5E_MPI, FAIL, "Can't retrieve MPI size");

/* Distribute evenly the number of blocks between processes. */
if (mpi_size == 0)
HGOTO_ERROR(H5E_DATASET, H5E_BADVALUE, FAIL, "Resulted in division by zero");
num_blocks =
(size_t)(chunk_fill_info->num_chunks / (size_t)mpi_size); /* value should be the same on all procs */

/* After evenly distributing the blocks between processes, are there any
* leftover blocks for each individual process (round-robin)?
*/
leftover_blocks = (size_t)(chunk_fill_info->num_chunks % (size_t)mpi_size);

/* Cast values to types needed by MPI */
H5_CHECKED_ASSIGN(blocks, int, num_blocks, size_t);
H5_CHECKED_ASSIGN(leftover, int, leftover_blocks, size_t);

/* Check if we have any chunks to write on this rank */
if (num_blocks > 0 || (leftover && leftover > mpi_rank)) {
MPI_Aint partial_fill_buf_disp = 0;
bool all_same_block_len = true;

/* Allocate buffers */
if (NULL == (chunk_disp_array = (MPI_Aint *)H5MM_malloc((size_t)(blocks + 1) * sizeof(MPI_Aint))))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk file displacement buffer");

if (partial_chunk_fill_buf) {
MPI_Aint fill_buf_addr;
MPI_Aint partial_fill_buf_addr;

/* Calculate the displacement between the fill buffer and partial chunk fill buffer */
if (MPI_SUCCESS != (mpi_code = MPI_Get_address(fill_buf, &fill_buf_addr)))
HMPI_GOTO_ERROR(FAIL, "MPI_Get_address failed", mpi_code)
if (MPI_SUCCESS != (mpi_code = MPI_Get_address(partial_chunk_fill_buf, &partial_fill_buf_addr)))
HMPI_GOTO_ERROR(FAIL, "MPI_Get_address failed", mpi_code)

#if H5_CHECK_MPI_VERSION(3, 1)
partial_fill_buf_disp = MPI_Aint_diff(partial_fill_buf_addr, fill_buf_addr);
#else
partial_fill_buf_disp = partial_fill_buf_addr - fill_buf_addr;
#endif

/*
* Allocate all-zero block displacements array. If a block's displacement
* is left as zero, that block will be written to from the regular fill
* buffer. If a block represents an unfiltered partial edge chunk, its
* displacement will be set so that the block is written to from the
* unfiltered fill buffer.
*/
if (NULL == (block_disps = (MPI_Aint *)H5MM_calloc((size_t)(blocks + 1) * sizeof(MPI_Aint))))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate block displacements buffer");
}

/*
* Perform initial scan of chunk info list to:
* - make sure that chunk addresses are monotonically non-decreasing
* - check if all blocks have the same length
*/
for (i = 1; i < chunk_fill_info->num_chunks; i++) {
if (chunk_fill_info->chunk_info[i].addr < chunk_fill_info->chunk_info[i - 1].addr)
need_sort = true;

if (chunk_fill_info->chunk_info[i].chunk_size != chunk_fill_info->chunk_info[i - 1].chunk_size)
all_same_block_len = false;
}

if (need_sort)
qsort(chunk_fill_info->chunk_info, chunk_fill_info->num_chunks,
sizeof(struct chunk_coll_fill_info), H5D__chunk_cmp_coll_fill_info);

/* Allocate buffer for block lengths if necessary */
if (!all_same_block_len)
if (NULL == (block_lens = (int *)H5MM_malloc((size_t)(blocks + 1) * sizeof(int))))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk lengths buffer");

for (i = 0; i < (size_t)blocks; i++) {
size_t idx = i + (size_t)(mpi_rank * blocks);

/* store the chunk address as an MPI_Aint */
chunk_disp_array[i] = (MPI_Aint)(chunk_fill_info->chunk_info[idx].addr);

if (!all_same_block_len)
H5_CHECKED_ASSIGN(block_lens[i], int, chunk_fill_info->chunk_info[idx].chunk_size, size_t);

if (chunk_fill_info->chunk_info[idx].unfiltered_partial_chunk) {
assert(partial_chunk_fill_buf);
block_disps[i] = partial_fill_buf_disp;
}
} /* end for */

/* Calculate if there are any leftover blocks after evenly
* distributing. If there are, then round-robin the distribution
* to processes 0 -> leftover.
*/
if (leftover && leftover > mpi_rank) {
chunk_disp_array[blocks] =
(MPI_Aint)chunk_fill_info->chunk_info[(blocks * mpi_size) + mpi_rank].addr;

if (!all_same_block_len)
H5_CHECKED_ASSIGN(block_lens[blocks], int,
chunk_fill_info->chunk_info[(blocks * mpi_size) + mpi_rank].chunk_size,
size_t);

if (chunk_fill_info->chunk_info[(blocks * mpi_size) + mpi_rank].unfiltered_partial_chunk) {
assert(partial_chunk_fill_buf);
block_disps[blocks] = partial_fill_buf_disp;
}
io_count = (uint32_t)chunk_fill_info->num_chunks;

blocks++;
for (i = 1; i < io_count; i++) {
if (chunk_fill_info->chunk_info[i].chunk_size != chunk_fill_info->chunk_info[i - 1].chunk_size) {
all_same_block_len = false;
break;
}
}

/* Create file and memory types for the write operation */
if (all_same_block_len) {
int block_len;

H5_CHECKED_ASSIGN(block_len, int, chunk_fill_info->chunk_info[0].chunk_size, size_t);

mpi_code =
MPI_Type_create_hindexed_block(blocks, block_len, chunk_disp_array, MPI_BYTE, &file_type);
if (mpi_code != MPI_SUCCESS)
HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_hindexed_block failed", mpi_code)
if (all_same_block_len) {
io_2sizes[0] = chunk_fill_info->chunk_info[0].chunk_size;
io_2sizes[1] = 0;
}
else {
if (NULL == (io_sizes = H5MM_malloc(io_count * sizeof(*io_sizes))))
HGOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL, "couldn't allocate space for I/O sizes vector");
}

if (partial_chunk_fill_buf) {
/*
* If filters are disabled for partial edge chunks, those chunks could
* potentially have the same block length as the other chunks, but still
* need to be written to using the unfiltered fill buffer. Use an hindexed
* block type rather than an hvector.
*/
mpi_code =
MPI_Type_create_hindexed_block(blocks, block_len, block_disps, MPI_BYTE, &mem_type);
if (mpi_code != MPI_SUCCESS)
HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_hindexed_block failed", mpi_code)
}
else {
mpi_code = MPI_Type_create_hvector(blocks, block_len, 0, MPI_BYTE, &mem_type);
if (mpi_code != MPI_SUCCESS)
HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_hvector failed", mpi_code)
}
}
else {
/*
* Currently, different block lengths implies that there are partial
* edge chunks and the "don't filter partial edge chunks" flag is set.
*/
assert(partial_chunk_fill_buf);
assert(block_lens);
assert(block_disps);
io_types[0] = H5FD_MEM_DRAW;
io_types[1] = H5FD_MEM_NOLIST;

mpi_code = MPI_Type_create_hindexed(blocks, block_lens, chunk_disp_array, MPI_BYTE, &file_type);
if (mpi_code != MPI_SUCCESS)
HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_hindexed failed", mpi_code)
if (NULL == (io_addrs = H5MM_malloc(io_count * sizeof(*io_addrs))))
HGOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL, "couldn't allocate space for I/O addresses vector");

mpi_code = MPI_Type_create_hindexed(blocks, block_lens, block_disps, MPI_BYTE, &mem_type);
if (mpi_code != MPI_SUCCESS)
HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_hindexed failed", mpi_code)
}
if (NULL == (io_wbufs = H5MM_malloc(io_count * sizeof(*io_wbufs))))
HGOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL, "couldn't allocate space for I/O buffers vector");

if (MPI_SUCCESS != (mpi_code = MPI_Type_commit(&file_type)))
HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code)
if (MPI_SUCCESS != (mpi_code = MPI_Type_commit(&mem_type)))
HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code)
} /* end if */
for (i = 0; i < io_count; i++) {
io_addrs[i] = chunk_fill_info->chunk_info[i].addr;

/* Set MPI-IO VFD properties */
if (!all_same_block_len)
io_sizes[i] = chunk_fill_info->chunk_info[i].chunk_size;

/* Set MPI datatypes for operation */
if (H5CX_set_mpi_coll_datatypes(mem_type, file_type) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTSET, FAIL, "can't set MPI-I/O properties");
if (chunk_fill_info->chunk_info[i].unfiltered_partial_chunk)
io_wbufs[i] = partial_chunk_fill_buf;
else
io_wbufs[i] = fill_buf;
}

/* Get current transfer mode */
if (H5CX_get_io_xfer_mode(&prev_xfer_mode) < 0)
Expand All @@ -5763,48 +5609,23 @@ H5D__chunk_collective_fill(const H5D_t *dset, H5D_chunk_coll_fill_info_t *chunk_
if (H5CX_set_io_xfer_mode(H5FD_MPIO_COLLECTIVE) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTSET, FAIL, "can't set transfer mode");

/* Low-level write (collective) */
if (H5F_shared_block_write(H5F_SHARED(dset->oloc.file), H5FD_MEM_DRAW, (haddr_t)0,
(blocks) ? (size_t)1 : (size_t)0, fill_buf) < 0)
HGOTO_ERROR(H5E_IO, H5E_WRITEERROR, FAIL, "unable to write raw data to file");

/* Barrier so processes don't race ahead */
if (MPI_SUCCESS != (mpi_code = MPI_Barrier(mpi_comm)))
HMPI_GOTO_ERROR(FAIL, "MPI_Barrier failed", mpi_code)
if (H5F_shared_vector_write(H5F_SHARED(dset->oloc.file), io_count, io_types, io_addrs,
all_same_block_len ? io_2sizes : io_sizes, io_wbufs) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "vector write call failed");

done:
if (have_xfer_mode)
/* Set transfer mode */
if (H5CX_set_io_xfer_mode(prev_xfer_mode) < 0)
HDONE_ERROR(H5E_DATASET, H5E_CANTSET, FAIL, "can't set transfer mode");

/* free things */
if (MPI_BYTE != file_type)
if (MPI_SUCCESS != (mpi_code = MPI_Type_free(&file_type)))
HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code)
if (MPI_BYTE != mem_type)
if (MPI_SUCCESS != (mpi_code = MPI_Type_free(&mem_type)))
HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code)
H5MM_xfree(chunk_disp_array);
H5MM_xfree(block_disps);
H5MM_xfree(block_lens);
H5MM_xfree(io_addrs);
H5MM_xfree(io_wbufs);
H5MM_xfree(io_sizes);

FUNC_LEAVE_NOAPI(ret_value)
} /* end H5D__chunk_collective_fill() */

static int
H5D__chunk_cmp_coll_fill_info(const void *_entry1, const void *_entry2)
{
const struct chunk_coll_fill_info *entry1;
const struct chunk_coll_fill_info *entry2;

FUNC_ENTER_PACKAGE_NOERR

entry1 = (const struct chunk_coll_fill_info *)_entry1;
entry2 = (const struct chunk_coll_fill_info *)_entry2;

FUNC_LEAVE_NOAPI(H5_addr_cmp(entry1->addr, entry2->addr))
} /* end H5D__chunk_cmp_coll_fill_info() */
#endif /* H5_HAVE_PARALLEL */

/*-------------------------------------------------------------------------
Expand Down
Loading