Skip to content

Commit

Permalink
Implement selection vector I/O with collective chunk filling (HDFGrou…
Browse files Browse the repository at this point in the history
…p#3826)

* Changes for ECP-344: Implement selection vector I/O with collective chunk filling.
Also fix a bug in H5FD__mpio_write_vector() to account for fixed size optimization
when computing max address.

* Fixes based on PR review comments:
For H5Dchunk.c: fix H5MM_xfree()
For H5FDmpio.c:
1) Revert the fix to H5FD__mpio_write_vector()
2) Apply the patch from Neil on the proper length of s_sizes reported by H5FD__mpio_vector_build_types()

* Put back the logic of dividing up the work among all the mpi ranks similar to the
original H5D__chunk_collective_fill() routine.

* Add a test to verify the fix for the illegal reference problem in H5FD__mpio_write_vector().
  • Loading branch information
vchoi-hdfgroup authored Nov 16, 2023
1 parent ef39882 commit ed31aac
Show file tree
Hide file tree
Showing 4 changed files with 434 additions and 189 deletions.
264 changes: 101 additions & 163 deletions src/H5Dchunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -5536,11 +5536,9 @@ H5D__chunk_update_old_edge_chunks(H5D_t *dset, hsize_t old_dim[])
/*-------------------------------------------------------------------------
* Function: H5D__chunk_collective_fill
*
* Purpose: Use MPIO collective write to fill the chunks (if number of
* chunks to fill is greater than the number of MPI procs;
* otherwise use independent I/O).
* Purpose: Use MPIO selection vector I/O for writing fill chunks
*
* Return: Non-negative on success/Negative on failure
* Return: Non-negative on success/Negative on failure
*
*-------------------------------------------------------------------------
*/
Expand All @@ -5554,19 +5552,24 @@ H5D__chunk_collective_fill(const H5D_t *dset, H5D_chunk_coll_fill_info_t *chunk_
int mpi_code; /* MPI return code */
size_t num_blocks; /* Number of blocks between processes. */
size_t leftover_blocks; /* Number of leftover blocks to handle */
int blocks, leftover; /* converted to int for MPI */
MPI_Aint *chunk_disp_array = NULL;
MPI_Aint *block_disps = NULL;
int *block_lens = NULL;
MPI_Datatype mem_type = MPI_BYTE, file_type = MPI_BYTE;
H5FD_mpio_xfer_t prev_xfer_mode; /* Previous data xfer mode */
bool have_xfer_mode = false; /* Whether the previous xffer mode has been retrieved */
bool need_sort = false;
size_t i; /* Local index variable */
int blocks; /* converted to int for MPI */
int leftover; /* converted to int for MPI */
H5FD_mpio_xfer_t prev_xfer_mode; /* Previous data xfer mode */
bool have_xfer_mode = false; /* Whether the previous xffer mode has been retrieved */
size_t i; /* Local index variable */
haddr_t *io_addrs = NULL;
size_t *io_sizes = NULL;
const void **io_wbufs = NULL;
H5FD_mem_t io_types[2];
bool all_same_block_len = true;
bool need_sort = false;
size_t io_2sizes[2];
herr_t ret_value = SUCCEED; /* Return value */

FUNC_ENTER_PACKAGE

assert(chunk_fill_info->num_chunks != 0);

/*
* If a separate fill buffer is provided for partial chunks, ensure
* that the "don't filter partial edge chunks" flag is set.
Expand All @@ -5589,6 +5592,7 @@ H5D__chunk_collective_fill(const H5D_t *dset, H5D_chunk_coll_fill_info_t *chunk_
/* Distribute evenly the number of blocks between processes. */
if (mpi_size == 0)
HGOTO_ERROR(H5E_DATASET, H5E_BADVALUE, FAIL, "Resulted in division by zero");

num_blocks =
(size_t)(chunk_fill_info->num_chunks / (size_t)mpi_size); /* value should be the same on all procs */

Expand All @@ -5602,157 +5606,97 @@ H5D__chunk_collective_fill(const H5D_t *dset, H5D_chunk_coll_fill_info_t *chunk_
H5_CHECKED_ASSIGN(leftover, int, leftover_blocks, size_t);

/* Check if we have any chunks to write on this rank */
if (num_blocks > 0 || (leftover && leftover > mpi_rank)) {
MPI_Aint partial_fill_buf_disp = 0;
bool all_same_block_len = true;

/* Allocate buffers */
if (NULL == (chunk_disp_array = (MPI_Aint *)H5MM_malloc((size_t)(blocks + 1) * sizeof(MPI_Aint))))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk file displacement buffer");

if (partial_chunk_fill_buf) {
MPI_Aint fill_buf_addr;
MPI_Aint partial_fill_buf_addr;

/* Calculate the displacement between the fill buffer and partial chunk fill buffer */
if (MPI_SUCCESS != (mpi_code = MPI_Get_address(fill_buf, &fill_buf_addr)))
HMPI_GOTO_ERROR(FAIL, "MPI_Get_address failed", mpi_code)
if (MPI_SUCCESS != (mpi_code = MPI_Get_address(partial_chunk_fill_buf, &partial_fill_buf_addr)))
HMPI_GOTO_ERROR(FAIL, "MPI_Get_address failed", mpi_code)

#if H5_CHECK_MPI_VERSION(3, 1)
partial_fill_buf_disp = MPI_Aint_diff(partial_fill_buf_addr, fill_buf_addr);
#else
partial_fill_buf_disp = partial_fill_buf_addr - fill_buf_addr;
#endif
if (num_blocks > 0 || leftover > mpi_rank) {

/*
* Allocate all-zero block displacements array. If a block's displacement
* is left as zero, that block will be written to from the regular fill
* buffer. If a block represents an unfiltered partial edge chunk, its
* displacement will be set so that the block is written to from the
* unfiltered fill buffer.
*/
if (NULL == (block_disps = (MPI_Aint *)H5MM_calloc((size_t)(blocks + 1) * sizeof(MPI_Aint))))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate block displacements buffer");
}
if (NULL == (io_addrs = H5MM_malloc((size_t)(blocks + 1) * sizeof(*io_addrs))))
HGOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL,
"couldn't allocate space for I/O addresses vector");

/*
* Perform initial scan of chunk info list to:
* - make sure that chunk addresses are monotonically non-decreasing
* - check if all blocks have the same length
*/
for (i = 1; i < chunk_fill_info->num_chunks; i++) {
if (chunk_fill_info->chunk_info[i].addr < chunk_fill_info->chunk_info[i - 1].addr)
need_sort = true;

if (chunk_fill_info->chunk_info[i].chunk_size != chunk_fill_info->chunk_info[i - 1].chunk_size)
all_same_block_len = false;
}
if (NULL == (io_wbufs = H5MM_malloc((size_t)(blocks + 1) * sizeof(*io_wbufs))))
HGOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL, "couldn't allocate space for I/O buffers vector");
}

if (need_sort)
qsort(chunk_fill_info->chunk_info, chunk_fill_info->num_chunks,
sizeof(struct chunk_coll_fill_info), H5D__chunk_cmp_coll_fill_info);
/*
* Perform initial scan of chunk info list to:
* - make sure that chunk addresses are monotonically non-decreasing
* - check if all blocks have the same length
*/
for (i = 1; i < chunk_fill_info->num_chunks; i++) {
if (chunk_fill_info->chunk_info[i].addr < chunk_fill_info->chunk_info[i - 1].addr)
need_sort = true;

/* Allocate buffer for block lengths if necessary */
if (!all_same_block_len)
if (NULL == (block_lens = (int *)H5MM_malloc((size_t)(blocks + 1) * sizeof(int))))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk lengths buffer");
if (chunk_fill_info->chunk_info[i].chunk_size != chunk_fill_info->chunk_info[i - 1].chunk_size)
all_same_block_len = false;
}

for (i = 0; i < (size_t)blocks; i++) {
size_t idx = i + (size_t)(mpi_rank * blocks);
/*
* Note that we sort all of the chunks here, and not just a subset
* corresponding to this rank. We do this since we have found MPI I/O to work
* better when each rank writes blocks that are contiguous in the file,
* and by sorting the full list we maximize the chance of that happening.
*/
if (need_sort)
qsort(chunk_fill_info->chunk_info, chunk_fill_info->num_chunks, sizeof(struct chunk_coll_fill_info),
H5D__chunk_cmp_coll_fill_info);

/* store the chunk address as an MPI_Aint */
chunk_disp_array[i] = (MPI_Aint)(chunk_fill_info->chunk_info[idx].addr);
/*
* If all the chunks have the same length, use the compressed feature
* to store the size.
* Otherwise, allocate the array of sizes for storing chunk sizes.
*/
if (all_same_block_len) {
io_2sizes[0] = chunk_fill_info->chunk_info[0].chunk_size;
io_2sizes[1] = 0;
}
else {
if (NULL == (io_sizes = H5MM_malloc((size_t)(blocks + 1) * sizeof(*io_sizes))))
HGOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL, "couldn't allocate space for I/O sizes vector");
}

if (!all_same_block_len)
H5_CHECKED_ASSIGN(block_lens[i], int, chunk_fill_info->chunk_info[idx].chunk_size, size_t);
/*
* Since the type of all chunks is raw data, use the compressed feature
* to store the chunk type.
*/
io_types[0] = H5FD_MEM_DRAW;
io_types[1] = H5FD_MEM_NOLIST;

if (chunk_fill_info->chunk_info[idx].unfiltered_partial_chunk) {
assert(partial_chunk_fill_buf);
block_disps[i] = partial_fill_buf_disp;
}
} /* end for */
/*
* For the chunks corresponding to this rank, fill in the
* address, size and buf pointer for each chunk.
*/
for (i = 0; i < (size_t)blocks; i++) {
size_t idx = i + (size_t)(mpi_rank * blocks);

/* Calculate if there are any leftover blocks after evenly
* distributing. If there are, then round-robin the distribution
* to processes 0 -> leftover.
*/
if (leftover && leftover > mpi_rank) {
chunk_disp_array[blocks] =
(MPI_Aint)chunk_fill_info->chunk_info[(blocks * mpi_size) + mpi_rank].addr;

if (!all_same_block_len)
H5_CHECKED_ASSIGN(block_lens[blocks], int,
chunk_fill_info->chunk_info[(blocks * mpi_size) + mpi_rank].chunk_size,
size_t);

if (chunk_fill_info->chunk_info[(blocks * mpi_size) + mpi_rank].unfiltered_partial_chunk) {
assert(partial_chunk_fill_buf);
block_disps[blocks] = partial_fill_buf_disp;
}
io_addrs[i] = chunk_fill_info->chunk_info[idx].addr;

blocks++;
}
if (!all_same_block_len)
io_sizes[i] = chunk_fill_info->chunk_info[idx].chunk_size;

/* Create file and memory types for the write operation */
if (all_same_block_len) {
int block_len;
if (chunk_fill_info->chunk_info[idx].unfiltered_partial_chunk)
io_wbufs[i] = partial_chunk_fill_buf;
else
io_wbufs[i] = fill_buf;
}

H5_CHECKED_ASSIGN(block_len, int, chunk_fill_info->chunk_info[0].chunk_size, size_t);
/*
* For the leftover chunk corresponding to this rank, fill in the
* address, size and buf pointer for the chunk.
*/
if (leftover > mpi_rank) {
io_addrs[blocks] = chunk_fill_info->chunk_info[(blocks * mpi_size) + mpi_rank].addr;

mpi_code =
MPI_Type_create_hindexed_block(blocks, block_len, chunk_disp_array, MPI_BYTE, &file_type);
if (mpi_code != MPI_SUCCESS)
HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_hindexed_block failed", mpi_code)
if (!all_same_block_len)
io_sizes[blocks] = chunk_fill_info->chunk_info[(blocks * mpi_size) + mpi_rank].chunk_size;

if (partial_chunk_fill_buf) {
/*
* If filters are disabled for partial edge chunks, those chunks could
* potentially have the same block length as the other chunks, but still
* need to be written to using the unfiltered fill buffer. Use an hindexed
* block type rather than an hvector.
*/
mpi_code =
MPI_Type_create_hindexed_block(blocks, block_len, block_disps, MPI_BYTE, &mem_type);
if (mpi_code != MPI_SUCCESS)
HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_hindexed_block failed", mpi_code)
}
else {
mpi_code = MPI_Type_create_hvector(blocks, block_len, 0, MPI_BYTE, &mem_type);
if (mpi_code != MPI_SUCCESS)
HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_hvector failed", mpi_code)
}
}
else {
/*
* Currently, different block lengths implies that there are partial
* edge chunks and the "don't filter partial edge chunks" flag is set.
*/
if (chunk_fill_info->chunk_info[(blocks * mpi_size) + mpi_rank].unfiltered_partial_chunk) {
assert(partial_chunk_fill_buf);
assert(block_lens);
assert(block_disps);

mpi_code = MPI_Type_create_hindexed(blocks, block_lens, chunk_disp_array, MPI_BYTE, &file_type);
if (mpi_code != MPI_SUCCESS)
HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_hindexed failed", mpi_code)

mpi_code = MPI_Type_create_hindexed(blocks, block_lens, block_disps, MPI_BYTE, &mem_type);
if (mpi_code != MPI_SUCCESS)
HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_hindexed failed", mpi_code)
io_wbufs[blocks] = partial_chunk_fill_buf;
}
else
io_wbufs[blocks] = fill_buf;

if (MPI_SUCCESS != (mpi_code = MPI_Type_commit(&file_type)))
HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code)
if (MPI_SUCCESS != (mpi_code = MPI_Type_commit(&mem_type)))
HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code)
} /* end if */

/* Set MPI-IO VFD properties */

/* Set MPI datatypes for operation */
if (H5CX_set_mpi_coll_datatypes(mem_type, file_type) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTSET, FAIL, "can't set MPI-I/O properties");
blocks++;
}

/* Get current transfer mode */
if (H5CX_get_io_xfer_mode(&prev_xfer_mode) < 0)
Expand All @@ -5763,31 +5707,24 @@ H5D__chunk_collective_fill(const H5D_t *dset, H5D_chunk_coll_fill_info_t *chunk_
if (H5CX_set_io_xfer_mode(H5FD_MPIO_COLLECTIVE) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTSET, FAIL, "can't set transfer mode");

/* Low-level write (collective) */
if (H5F_shared_block_write(H5F_SHARED(dset->oloc.file), H5FD_MEM_DRAW, (haddr_t)0,
(blocks) ? (size_t)1 : (size_t)0, fill_buf) < 0)
HGOTO_ERROR(H5E_IO, H5E_WRITEERROR, FAIL, "unable to write raw data to file");

/* Barrier so processes don't race ahead */
if (MPI_SUCCESS != (mpi_code = MPI_Barrier(mpi_comm)))
HMPI_GOTO_ERROR(FAIL, "MPI_Barrier failed", mpi_code)

/* Perform the selection vector I/O for the chunks */
if (H5F_shared_vector_write(H5F_SHARED(dset->oloc.file), (uint32_t)blocks, io_types, io_addrs,
all_same_block_len ? io_2sizes : io_sizes, io_wbufs) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "vector write call failed");

done:
if (have_xfer_mode)
/* Set transfer mode */
/* Restore transfer mode */
if (H5CX_set_io_xfer_mode(prev_xfer_mode) < 0)
HDONE_ERROR(H5E_DATASET, H5E_CANTSET, FAIL, "can't set transfer mode");

/* free things */
if (MPI_BYTE != file_type)
if (MPI_SUCCESS != (mpi_code = MPI_Type_free(&file_type)))
HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code)
if (MPI_BYTE != mem_type)
if (MPI_SUCCESS != (mpi_code = MPI_Type_free(&mem_type)))
HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code)
H5MM_xfree(chunk_disp_array);
H5MM_xfree(block_disps);
H5MM_xfree(block_lens);
H5MM_xfree(io_addrs);
H5MM_xfree(io_wbufs);
H5MM_xfree(io_sizes);

FUNC_LEAVE_NOAPI(ret_value)
} /* end H5D__chunk_collective_fill() */
Expand All @@ -5805,6 +5742,7 @@ H5D__chunk_cmp_coll_fill_info(const void *_entry1, const void *_entry2)

FUNC_LEAVE_NOAPI(H5_addr_cmp(entry1->addr, entry2->addr))
} /* end H5D__chunk_cmp_coll_fill_info() */

#endif /* H5_HAVE_PARALLEL */

/*-------------------------------------------------------------------------
Expand Down
Loading

0 comments on commit ed31aac

Please sign in to comment.