Skip to content

Commit

Permalink
Kvtag query (#122)
Browse files Browse the repository at this point in the history
* Add a new collective kvtag query api to return full/aggregated results to all clients

* Committing clang-format changes

* Add test code

* Committing clang-format changes

* Add an optimization when multiple clients issue different queries

* Add test program

* Fix free issue

* Committing clang-format changes

---------

Co-authored-by: github-actions <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
houjun and github-actions[bot] authored Aug 15, 2023
1 parent 2e8323a commit c5c7b91
Show file tree
Hide file tree
Showing 5 changed files with 355 additions and 22 deletions.
23 changes: 19 additions & 4 deletions src/api/include/pdc_client_connect.h
Original file line number Diff line number Diff line change
Expand Up @@ -595,16 +595,31 @@ perr_t PDC_Client_create_cont_id_mpi(const char *cont_name, pdcid_t cont_create_
perr_t PDC_Client_query_kvtag(const pdc_kvtag_t *kvtag, int *n_res, uint64_t **pdc_ids);

/**
* Client sends query requests to server (used by MPI mode)
* Client sends query requests to server (used by MPI mode), each client gets a subset of the
* queried results
*
* \param kvtag [IN] *********
* \param n_res [IN] **********
* \param pdc_ids [OUT] *********
* \param kvtag [IN] kvtag
* \param n_res [OUT] number of hits
* \param pdc_ids [OUT] object ids of hits, unordered
*
* \return Non-negative on success/Negative on failure
*/
perr_t PDC_Client_query_kvtag_col(const pdc_kvtag_t *kvtag, int *n_res, uint64_t **pdc_ids);

#ifdef ENABLE_MPI
/**
* Client sends query requests to server (used by MPI mode), all clients get the same aggregated
* query results, currently assumes MPI_COMM_WORLD
*
* \param kvtag [IN] kvtag
* \param n_res [OUT] number of hits
* \param pdc_ids [OUT] object ids of hits, unordered
*
* \return Non-negative on success/Negative on failure
*/
perr_t PDC_Client_query_kvtag_mpi(const pdc_kvtag_t *kvtag, int *n_res, uint64_t **pdc_ids, MPI_Comm comm);
#endif

/**
* Client sends query requests to server (used by MPI mode)
*
Expand Down
139 changes: 121 additions & 18 deletions src/api/pdc_client_connect.c
Original file line number Diff line number Diff line change
Expand Up @@ -7264,8 +7264,12 @@ PDC_Client_query_kvtag_server(uint32_t server_id, const pdc_kvtag_t *kvtag, int

FUNC_ENTER(NULL);

if (kvtag == NULL || n_res == NULL || out == NULL)
PGOTO_ERROR(FAIL, "==CLIENT[%d]: input is NULL!", pdc_client_mpi_rank_g);
if (kvtag == NULL)
PGOTO_ERROR(FAIL, "==CLIENT[%d]: %s - kvtag is NULL!", pdc_client_mpi_rank_g, __func__);
if (n_res == NULL)
PGOTO_ERROR(FAIL, "==CLIENT[%d]: %s - n_res is NULL!", pdc_client_mpi_rank_g, __func__);
if (out == NULL)
PGOTO_ERROR(FAIL, "==CLIENT[%d]: %s - out is NULL!", pdc_client_mpi_rank_g, __func__);

if (kvtag->name == NULL)
in.name = " ";
Expand Down Expand Up @@ -7307,7 +7311,8 @@ PDC_Client_query_kvtag_server(uint32_t server_id, const pdc_kvtag_t *kvtag, int
PDC_Client_check_bulk(send_context_g);

*n_res = bulk_arg->n_meta;
*out = bulk_arg->obj_ids;
if (*n_res > 0)
*out = bulk_arg->obj_ids;
free(bulk_arg);
// TODO: need to be careful when freeing the lookup_args, as it include the results returned to user

Expand All @@ -7320,18 +7325,21 @@ PDC_Client_query_kvtag_server(uint32_t server_id, const pdc_kvtag_t *kvtag, int
perr_t
PDC_Client_query_kvtag(const pdc_kvtag_t *kvtag, int *n_res, uint64_t **pdc_ids)
{
perr_t ret_value = SUCCEED;
int32_t i;
int nmeta = 0;
perr_t ret_value = SUCCEED;
int i, nmeta = 0;
uint32_t server_id;

FUNC_ENTER(NULL);

*n_res = 0;
for (i = 0; i < pdc_server_num_g; i++) {
ret_value = PDC_Client_query_kvtag_server((uint32_t)i, kvtag, &nmeta, pdc_ids);
// when there are multiple clients issuing different queries concurrently, try to balance the
// server workload by having different clients sending queries with a different order
server_id = (pdc_client_mpi_rank_g + i) % pdc_server_num_g;
ret_value = PDC_Client_query_kvtag_server(server_id, kvtag, &nmeta, pdc_ids);
if (ret_value != SUCCEED)
PGOTO_ERROR(FAIL, "==PDC_CLIENT[%d]: error with PDC_Client_query_kvtag_server to server %d",
pdc_client_mpi_rank_g, i);
pdc_client_mpi_rank_g, server_id);
}

*n_res = nmeta;
Expand Down Expand Up @@ -7365,14 +7373,15 @@ PDC_assign_server(uint32_t *my_server_start, uint32_t *my_server_end, uint32_t *
FUNC_LEAVE_VOID;
}

// All clients collectively query all servers
// All clients collectively query all servers, each client gets partial results
perr_t
PDC_Client_query_kvtag_col(const pdc_kvtag_t *kvtag, int *n_res, uint64_t **pdc_ids)
{
perr_t ret_value = SUCCEED;
int32_t my_server_start, my_server_end, my_server_count;
int32_t i;
int nmeta = 0;
perr_t ret_value = SUCCEED;
int32_t my_server_start, my_server_end, my_server_count;
int32_t i;
int nmeta = 0;
uint64_t *temp_ids = NULL;

FUNC_ENTER(NULL);

Expand All @@ -7392,23 +7401,117 @@ PDC_Client_query_kvtag_col(const pdc_kvtag_t *kvtag, int *n_res, uint64_t **pdc_
}
}

*n_res = 0;
*n_res = 0;
*pdc_ids = NULL;
for (i = my_server_start; i < my_server_end; i++) {
if (i >= pdc_server_num_g) {
if (i >= pdc_server_num_g)
break;
}
ret_value = PDC_Client_query_kvtag_server((uint32_t)i, kvtag, &nmeta, pdc_ids);

ret_value = PDC_Client_query_kvtag_server((uint32_t)i, kvtag, &nmeta, &temp_ids);
if (ret_value != SUCCEED)
PGOTO_ERROR(FAIL, "==PDC_CLIENT[%d]: error with PDC_Client_query_kvtag_server to server %u",
pdc_client_mpi_rank_g, i);
if (i == my_server_start)
*pdc_ids = temp_ids;
else {
*pdc_ids = (uint64_t *)realloc(*pdc_ids, sizeof(uint64_t) * (*n_res + nmeta));
memcpy(*pdc_ids + (*n_res) * sizeof(uint64_t), temp_ids, nmeta * sizeof(uint64_t));
if (temp_ids)
free(temp_ids);
}
*n_res = *n_res + nmeta;
}

*n_res = nmeta;
done:
fflush(stdout);
FUNC_LEAVE(ret_value);
}

#ifdef ENABLE_MPI
// All clients collectively query all servers, all clients get all results
perr_t
PDC_Client_query_kvtag_mpi(const pdc_kvtag_t *kvtag, int *n_res, uint64_t **pdc_ids, MPI_Comm comm)
{
perr_t ret_value = SUCCEED;
int32_t my_server_start, my_server_end, my_server_count;
int32_t i;
int nmeta = 0, *all_nmeta = NULL, ntotal = 0, *disp = NULL;
uint64_t *temp_ids = NULL;

FUNC_ENTER(NULL);

if (pdc_server_num_g > pdc_client_mpi_size_g) {
my_server_count = pdc_server_num_g / pdc_client_mpi_size_g;
my_server_start = pdc_client_mpi_rank_g * my_server_count;
my_server_end = my_server_start + my_server_count;
if (pdc_client_mpi_rank_g == pdc_client_mpi_size_g - 1) {
my_server_end += pdc_server_num_g % pdc_client_mpi_size_g;
}
}
else {
my_server_start = pdc_client_mpi_rank_g;
my_server_end = my_server_start + 1;
if (pdc_client_mpi_rank_g >= pdc_server_num_g) {
my_server_end = 0;
}
}

*n_res = 0;
*pdc_ids = NULL;
for (i = my_server_start; i < my_server_end; i++) {
if (i >= pdc_server_num_g)
break;

/* printf("==PDC_CLIENT[%d]: querying server %u\n", pdc_client_mpi_rank_g, i); */

ret_value = PDC_Client_query_kvtag_server((uint32_t)i, kvtag, &nmeta, &temp_ids);
if (ret_value != SUCCEED)
PGOTO_ERROR(FAIL, "==PDC_CLIENT[%d]: error in %s querying server %u", pdc_client_mpi_rank_g,
__func__, i);
if (i == my_server_start)
*pdc_ids = temp_ids;
else if (nmeta > 0) {
*pdc_ids = (uint64_t *)realloc(*pdc_ids, sizeof(uint64_t) * (*n_res + nmeta));
memcpy(*pdc_ids + (*n_res) * sizeof(uint64_t), temp_ids, nmeta * sizeof(uint64_t));
free(temp_ids);
}
*n_res = *n_res + nmeta;
/* printf("==PDC_CLIENT[%d]: server %u returned %d res \n", pdc_client_mpi_rank_g, i, *n_res); */
}

if (pdc_client_mpi_size_g == 1)
goto done;

all_nmeta = (int *)malloc(pdc_client_mpi_size_g * sizeof(int));
disp = (int *)malloc(pdc_client_mpi_size_g * sizeof(int));
MPI_Allgather(n_res, 1, MPI_INT, all_nmeta, 1, MPI_INT, comm);
for (i = 0; i < pdc_client_mpi_size_g; i++) {
ntotal += all_nmeta[i];
if (i == 0)
disp[i] = 0;
else
disp[i] = disp[i - 1] + all_nmeta[i];
}

/* printf("==PDC_CLIENT[%d]: after allgather \n", pdc_client_mpi_rank_g); */

temp_ids = (uint64_t *)malloc(ntotal * sizeof(uint64_t));
MPI_Allgatherv(pdc_ids, *n_res, MPI_UINT64_T, temp_ids, all_nmeta, disp, MPI_UINT64_T, comm);

/* printf("==PDC_CLIENT[%d]: after allgatherv\n", pdc_client_mpi_rank_g); */

free(all_nmeta);
free(disp);
if (*n_res > 0)
free(*pdc_ids);
*pdc_ids = temp_ids;
*n_res = ntotal;

done:
fflush(stdout);
FUNC_LEAVE(ret_value);
}
#endif

// Delete a tag specified by a name, and whether it is from a container or an object
static perr_t
Expand Down
7 changes: 7 additions & 0 deletions src/server/pdc_server_region/pdc_server_region_cache.c
Original file line number Diff line number Diff line change
Expand Up @@ -679,6 +679,7 @@ PDC_region_cache_flush_by_pointer(uint64_t obj_id, pdc_obj_cache *obj_cache)
char ** buf, **new_buf, *buf_ptr = NULL;
uint64_t * start, *end, *new_start, *new_end;
int merged_request_size = 0;
int server_rank = 0;
uint64_t unit;
struct pdc_region_info **obj_regions;
#ifdef PDC_TIMING
Expand Down Expand Up @@ -751,6 +752,9 @@ PDC_region_cache_flush_by_pointer(uint64_t obj_id, pdc_obj_cache *obj_cache)
nflush += merged_request_size;
}

#ifdef ENABLE_MPI
MPI_Comm_rank(MPI_COMM_WORLD, &server_rank);
#endif
// Iterate through all cache regions and use POSIX I/O to write them back to file system.
region_cache_iter = obj_cache->region_cache;
while (region_cache_iter != NULL) {
Expand All @@ -764,6 +768,9 @@ PDC_region_cache_flush_by_pointer(uint64_t obj_id, pdc_obj_cache *obj_cache)
if (obj_cache->ndim >= 3)
write_size *= region_cache_info->size[2];

printf("==PDC_SERVER[%d]: server flushed %.1f / %.1f MB to storage\n", server_rank,
write_size / 1048576.0, total_cache_size / 1048576.0);

total_cache_size -= write_size;
free(region_cache_info->offset);
if (obj_cache->ndim > 1) {
Expand Down
1 change: 1 addition & 0 deletions src/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ set(PROGRAMS
kvtag_add_get_scale
# kvtag_query
kvtag_query_scale
kvtag_query_mpi
# obj_transformation
region_transfer_query
region_transfer
Expand Down
Loading

0 comments on commit c5c7b91

Please sign in to comment.