diff --git a/CMakeLists.txt b/CMakeLists.txt index 460ee8f87..70bc8ce2e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -467,7 +467,21 @@ if(PDC_ENABLE_FASTBIT) set(ENABLE_FASTBIT 1) endif() + +# Metadata with RocksDB +#----------------------------------------------------------------------------- +option(PDC_ENABLE_ROCKSDB "Enable RocksDB (experimental)." OFF) +if(PDC_ENABLE_ROCKSDB) + set(ENABLE_ROCKSDB 1) +endif() + +# Metadata with SQLite #----------------------------------------------------------------------------- +option(PDC_ENABLE_SQLITE3 "Enable SQLite3 (experimental)." OFF) +if(PDC_ENABLE_SQLITE3) + set(ENABLE_SQLITE3 1) +endif() + # Check availability of symbols #----------------------------------------------------------------------------- check_symbol_exists(malloc_usable_size "malloc.h" HAVE_MALLOC_USABLE_SIZE) diff --git a/docs/requirements.txt b/docs/requirements.txt index ffc6fc227..b9245e074 100644 --- a/docs/requirements.txt +++ b/docs/requirements.txt @@ -1,2 +1,4 @@ +sphinx==6.2.1 +sphinx-rtd-theme==1.2.2 sphinxemoji breathe diff --git a/docs/source/api.rst b/docs/source/api.rst index ab058f10a..4dc7dc226 100644 --- a/docs/source/api.rst +++ b/docs/source/api.rst @@ -253,6 +253,33 @@ PDC object APIs * Delete data from an object. * For developers: see pdc_client_connect.c. Use PDC_obj_get_info to retrieve name. Then forward name to servers to fulfill requests. +--------------------------- +PDC region APIs +--------------------------- + + +--------------------------- +PDC property APIs +--------------------------- + + +--------------------------- +PDC metadata APIs +--------------------------- +PDC maintains object metadata (obj name, dimension, create time, etc.) in a distributed hash table. Each object's metadata can be +accessed with its object ID. Users can also issue metadata queries to retrieve the object IDs that meet the query constraints. + +PDC allows users to add key-value tags to each object, where key is a string and value can be a binary array of any datatype and length. +The key-value tags are stored in an in-memory linked list by default. + +PDC has metadata indexing and querying support when DART is enabled. See ``DART`` section in the Developer Notes. + +PDC additionally supports managing the key-value tags with RocksDB and SQLite, both are considered experimental at the moment. +Either RocksDB or SQLite can be enabled by turning on the ``PDC_ENABLE_ROCKSDB`` or ``PDC_USE_SQLITE3`` flag in CMake, setting the +``ROCKSDB_DIR`` or ``SQLITE3_DIR`` and setting the environment variable ``PDC_USE_ROCKSDB`` or ``PDC_USE_SQLITE3`` to 1 before launching the server. +Users can use the same PDC query APIs when RocksDB or SQLite is enabled. + + * perr_t PDCobj_put_tag(pdcid_t obj_id, char *tag_name, void *tag_value, psize_t value_size) * Input: * obj_id: Local object ID @@ -285,17 +312,7 @@ PDC object APIs * For developers: see pdc_client_connect.c. Need to use PDCtag_delete to submit RPCs to the servers for metadata update. --------------------------- -PDC region APIs ---------------------------- - - ---------------------------- -PDC property APIs ---------------------------- - - ---------------------------- -PDC query APIs +PDC Data query APIs --------------------------- * pdc_query_t *PDCquery_create(pdcid_t obj_id, pdc_query_op_t op, pdc_var_type_t type, void *value) @@ -883,4 +900,4 @@ Developers notes * Object * Object property See `Object Property `_ - * Object structure (pdc_obj_pkg.h and pdc_obj.h) See `Object Structure `_ \ No newline at end of file + * Object structure (pdc_obj_pkg.h and pdc_obj.h) See `Object Structure `_ diff --git a/docs/source/developer-notes.rst b/docs/source/developer-notes.rst index b8c187be3..0db7c96f1 100644 --- a/docs/source/developer-notes.rst +++ b/docs/source/developer-notes.rst @@ -138,6 +138,13 @@ For No-index approach, here are the APIs you can call for different communicatio * PDC_Client_query_kvtag (point-to-point) * PDC_Client_query_kvtag_mpi (collective) +The default PDC kvtags are stored within each object's metadata as a linked list, and any query involves traversing the list in memory. + +We have additional support to manage the kvtags with RocksDB and SQLite. With this approach, each PDC server creates and accesses its own RocksDB and SQLite database file, which is stored as an in-memory file in /tmp directory. When RocksDB or SQLite is enabled with setting the environment variable ``PDC_USE_ROCKSDB=1`` or ``PDC_USE_SQLITE3=1``. +With the RocksDB implementation, each kvtag is stored as a RocksDB key-value pair. To differenciate the kvtags for different objects, we encode the object ID to the key string used for the RocksDB, and store the value as the RocksDB value. As a result, the value can be retrieved directly when its object ID and key string is known. Otherwise we must iterate over the entire DB to search for an kvtag. +With the SQLite3 implementation, each kvtag is inserted as a row in a SQLite3 table. Currently, the table has the following columns and SQLite3 datatypes: objid (INTEGER), name (TEXT), value_text(TEXT), value_int(INTEGER), value_float(REAL), value_double(REAL), value_blob(BLOB). We create a SQL SELECT statement automatically on the server when receiving a query request from the PDC client. Currently this implementation is focused on supporting string/text affix search and integer/float (single) value match search. +Currently, both the RocksDB and the SQLite implementation are developed for benchmarking purpose, the database files are removed at server finalization time, and restart is not supported. + Index-facilitated Approach --------------------------------------------- @@ -398,7 +405,6 @@ Also, to make sure your code with Julia function calls doesn't get compiled when For more info on embedded Julia support, please visit: `Embedded Julia https://docs.julialang.org/en/v1/manual/embedding/`_. - --------------------------------------------- Docker Support --------------------------------------------- diff --git a/src/api/pdc_client_connect.c b/src/api/pdc_client_connect.c index 44c1786e6..b89006293 100644 --- a/src/api/pdc_client_connect.c +++ b/src/api/pdc_client_connect.c @@ -9020,8 +9020,16 @@ _standard_all_gather_result(int query_sent, int *n_res, uint64_t **pdc_ids, MPI_ uint64_t *all_ids = (uint64_t *)malloc(ntotal * sizeof(uint64_t)); MPI_Allgatherv(*pdc_ids, *n_res, MPI_UINT64_T, all_ids, all_nmeta_array, disp, MPI_UINT64_T, world_comm); + if (*pdc_ids) + free(*pdc_ids); + *n_res = ntotal; *pdc_ids = all_ids; + + free(all_nmeta_array); + free(disp); + + return; } void @@ -9127,7 +9135,7 @@ PDC_Client_query_kvtag_mpi(const pdc_kvtag_t *kvtag, int *n_res, uint64_t **pdc_ if (*n_res <= 0) { *n_res = 0; - *pdc_ids = (uint64_t *)malloc(0); + *pdc_ids = NULL; } else { // print the pdc ids returned by this client, along with the client id @@ -9349,4 +9357,4 @@ PDC_Client_search_obj_ref_through_dart_mpi(dart_hash_algo_t hash_algo, char *que } #endif -/******************** Collective Object Selection Query Ends *******************************/ \ No newline at end of file +/******************** Collective Object Selection Query Ends *******************************/ diff --git a/src/server/CMakeLists.txt b/src/server/CMakeLists.txt index aaedcd4ae..5653602d9 100644 --- a/src/server/CMakeLists.txt +++ b/src/server/CMakeLists.txt @@ -6,6 +6,17 @@ if(PDC_ENABLE_FASTBIT) find_library(FASTBIT_LIBRARY fastbit $ENV{HOME}/cori/fastbit-2.0.3/install) endif() +if(PDC_ENABLE_ROCKSDB) + add_definitions(-DENABLE_ROCKSDB=1) + find_path(ROCKSDB_INCLUDE_DIR include/db.h) + find_library(ROCKSDB_LIBRARY rocksdb 8.1.1< REQUIRED) +endif() + +if(PDC_ENABLE_SQLITE3) + add_definitions(-DENABLE_SQLITE3=1) + find_package(SQLite3 3.31.0 REQUIRED) +endif() + include_directories( ${PDC_COMMON_INCLUDE_DIRS} ${PDC_INCLUDES_BUILD_TIME} @@ -28,6 +39,7 @@ include_directories( ${PDC_SOURCE_DIR}/src/utils/include ${MERCURY_INCLUDE_DIR} ${FASTBIT_INCLUDE_DIR} + ${ROCKSDB_INCLUDE_DIR} ) add_definitions( -DIS_PDC_SERVER=1 ) @@ -57,9 +69,17 @@ add_library(pdc_server_lib ) if(PDC_ENABLE_FASTBIT) message(STATUS "Enabled fastbit") - target_link_libraries(pdc_server_lib mercury ${PDC_COMMONS_LIBRARIES} -lm -ldl ${PDC_EXT_LIB_DEPENDENCIES} ${FASTBIT_LIBRARY}/libfastbit.so) + target_link_libraries(pdc_server_lib ${MERCURY_LIBRARY} ${PDC_COMMONS_LIBRARIES} -lm -ldl ${PDC_EXT_LIB_DEPENDENCIES} ${FASTBIT_LIBRARY}/libfastbit.so) +elseif(PDC_ENABLE_ROCKSDB) + if(PDC_ENABLE_SQLITE3) + target_link_libraries(pdc_server_lib ${MERCURY_LIBRARY} ${PDC_COMMONS_LIBRARIES} -lm -ldl ${PDC_EXT_LIB_DEPENDENCIES} ${ROCKSDB_LIBRARY} SQLite::SQLite3) + else() + target_link_libraries(pdc_server_lib ${MERCURY_LIBRARY} ${PDC_COMMONS_LIBRARIES} -lm -ldl ${PDC_EXT_LIB_DEPENDENCIES} ${ROCKSDB_LIBRARY}) + endif() +elseif(PDC_ENABLE_SQLITE3) + target_link_libraries(pdc_server_lib ${MERCURY_LIBRARY} ${PDC_COMMONS_LIBRARIES} -lm -ldl ${PDC_EXT_LIB_DEPENDENCIES} SQLite::SQLite3) else() - target_link_libraries(pdc_server_lib mercury ${PDC_COMMONS_LIBRARIES} -lm -ldl ${PDC_EXT_LIB_DEPENDENCIES}) + target_link_libraries(pdc_server_lib ${MERCURY_LIBRARY} ${PDC_COMMONS_LIBRARIES} -lm -ldl ${PDC_EXT_LIB_DEPENDENCIES}) endif() add_executable(pdc_server.exe @@ -78,10 +98,9 @@ if(NOT ${PDC_INSTALL_BIN_DIR} MATCHES ${PROJECT_BINARY_DIR}/bin) install( TARGETS pdc_server.exe - DESTINATION ${PDC_INSTALL_BIN_DIR} + pdc_server_lib + LIBRARY DESTINATION ${PDC_INSTALL_LIB_DIR} + ARCHIVE DESTINATION ${PDC_INSTALL_LIB_DIR} + RUNTIME DESTINATION ${PDC_INSTALL_BIN_DIR} ) endif() - - - - diff --git a/src/server/include/pdc_server.h b/src/server/include/pdc_server.h index 09f527013..f2fca93d2 100644 --- a/src/server/include/pdc_server.h +++ b/src/server/include/pdc_server.h @@ -49,6 +49,18 @@ #include "iapi.h" #endif +#ifdef ENABLE_ROCKSDB +#include "rocksdb/c.h" +extern rocksdb_t *rocksdb_g; +extern int use_rocksdb_g; +#endif + +#ifdef ENABLE_SQLITE3 +#include "sqlite3.h" +extern sqlite3 *sqlite3_db_g; +extern int use_sqlite3_g; +#endif + #ifdef ENABLE_MULTITHREAD // Mercury multithread #include "mercury_thread.h" diff --git a/src/server/include/pdc_server_metadata.h b/src/server/include/pdc_server_metadata.h index d7832b6f9..de3c15c80 100644 --- a/src/server/include/pdc_server_metadata.h +++ b/src/server/include/pdc_server_metadata.h @@ -62,6 +62,8 @@ extern pdc_remote_server_info_t *pdc_remote_server_info_g; extern double total_mem_usage_g; extern int is_hash_table_init_g; extern int is_restart_g; +extern int use_rocksdb_g; +extern int use_sqlite3_g; /****************************/ /* Library Private Typedefs */ @@ -83,6 +85,14 @@ typedef struct pdc_cont_hash_table_entry_t { pdc_kvtag_list_t *kvtag_list_head; } pdc_cont_hash_table_entry_t; +#ifdef ENABLE_SQLITE3 +typedef struct pdc_sqlite3_query_t { + pdcid_t **obj_ids; + int nobj; + int nalloc; +} pdc_sqlite3_query_t; +#endif + /***************************************/ /* Library-private Function Prototypes */ /***************************************/ diff --git a/src/server/pdc_server.c b/src/server/pdc_server.c index a07e5596b..95e90b146 100644 --- a/src/server/pdc_server.c +++ b/src/server/pdc_server.c @@ -34,6 +34,7 @@ #include #include #include +#include #include #include @@ -61,6 +62,16 @@ #include #endif +#ifdef ENABLE_ROCKSDB +#include "rocksdb/c.h" +rocksdb_t *rocksdb_g; +#endif + +#ifdef ENABLE_SQLITE3 +#include "sqlite3.h" +sqlite3 *sqlite3_db_g; +#endif + // Check how long PDC has run every OP_INTERVAL operations #define PDC_CHECKPOINT_CHK_OP_INTERVAL 2000 // Checkpoint every INTERVAL_SEC second and at least OP_INTERVAL operations @@ -131,6 +142,8 @@ int read_from_bb_size_g = 0; int gen_hist_g = 0; int gen_fastbit_idx_g = 0; int use_fastbit_idx_g = 0; +int use_rocksdb_g = 0; +int use_sqlite3_g = 0; char * gBinningOption = NULL; double server_write_time_g = 0.0; @@ -359,6 +372,71 @@ PDC_Server_write_addr_to_file(char **addr_strings, int n) FUNC_LEAVE(ret_value); } +static int +remove_directory(const char *dir) +{ + int ret = 0; + FTS * ftsp = NULL; + FTSENT *curr; + + // Cast needed (in C) because fts_open() takes a "char * const *", instead + // of a "const char * const *", which is only allowed in C++. fts_open() + // does not modify the argument. + char *files[] = {(char *)dir, NULL}; + + // FTS_NOCHDIR - Avoid changing cwd, which could cause unexpected behavior + // in multithreaded programs + // FTS_PHYSICAL - Don't follow symlinks. Prevents deletion of files outside + // of the specified directory + // FTS_XDEV - Don't cross filesystem boundaries + ftsp = fts_open(files, FTS_NOCHDIR | FTS_PHYSICAL | FTS_XDEV, NULL); + if (!ftsp) { + fprintf(stderr, "PDC_SERVER: %s: fts_open failed: %s\n", dir, strerror(curr->fts_errno)); + ret = -1; + goto done; + } + + while ((curr = fts_read(ftsp))) { + switch (curr->fts_info) { + case FTS_NS: + case FTS_DNR: + case FTS_ERR: + break; + + case FTS_DC: + case FTS_DOT: + case FTS_NSOK: + // Not reached unless FTS_LOGICAL, FTS_SEEDOT, or FTS_NOSTAT were + // passed to fts_open() + break; + + case FTS_D: + // Do nothing. Need depth-first search, so directories are deleted + // in FTS_DP + break; + + case FTS_DP: + case FTS_F: + case FTS_SL: + case FTS_SLNONE: + case FTS_DEFAULT: + if (remove(curr->fts_accpath) < 0) { + fprintf(stderr, "PDC_SERVER: %s: Failed to remove: %s\n", curr->fts_path, + strerror(curr->fts_errno)); + ret = -1; + } + break; + } + } + +done: + if (ftsp) { + fts_close(ftsp); + } + + return ret; +} + /* * Remove server config file * @@ -380,6 +458,20 @@ PDC_Server_rm_config_file() goto done; } +#ifdef ENABLE_ROCKSDB + if (use_rocksdb_g) { + snprintf(config_fname, ADDR_MAX, "/tmp/PDC_rocksdb_%d", pdc_server_rank_g); + remove_directory(config_fname); + } +#endif + +#ifdef ENABLE_SQLITE3 + if (use_sqlite3_g) { + snprintf(config_fname, ADDR_MAX, "/tmp/PDC_sqlite3_%d", pdc_server_rank_g); + remove_directory(config_fname); + } +#endif + done: FUNC_LEAVE(ret_value); } @@ -2069,11 +2161,27 @@ PDC_Server_get_env() gen_fastbit_idx_g = 1; tmp_env_char = getenv("PDC_USE_FASTBIT_IDX"); - if (tmp_env_char != NULL) + if (tmp_env_char != NULL) { use_fastbit_idx_g = 1; + printf("==PDC_SERVER[%d]: using FastBit for data indexing and querying\n"); + } + + tmp_env_char = getenv("PDC_USE_ROCKSDB"); + if (tmp_env_char != NULL && strcmp(tmp_env_char, "1") == 0) { + use_rocksdb_g = 1; + if (pdc_server_rank_g == 0) + printf("==PDC_SERVER[%d]: using RocksDB for kvtag\n"); + } + + tmp_env_char = getenv("PDC_USE_SQLITE3"); + if (tmp_env_char != NULL && strcmp(tmp_env_char, "1") == 0) { + use_sqlite3_g = 1; + if (pdc_server_rank_g == 0) + printf("==PDC_SERVER[%d]: using SQLite3 for kvtag\n", pdc_server_rank_g); + } if (pdc_server_rank_g == 0) { - printf("\n==PDC_SERVER[%d]: using [%s] as tmp dir, %d OSTs, %d OSTs per data file, %d%% to BB\n", + printf("==PDC_SERVER[%d]: using [%s] as tmp dir, %d OSTs, %d OSTs per data file, %d%% to BB\n", pdc_server_rank_g, pdc_server_tmp_dir_g, lustre_total_ost_g, pdc_nost_per_file_g, write_to_bb_percentage_g); } @@ -2142,6 +2250,75 @@ server_run(int argc, char *argv[]) if (pdc_server_rank_g == 0) if (PDC_Server_write_addr_to_file(all_addr_strings_g, pdc_server_size_g) != SUCCEED) printf("==PDC_SERVER[%d]: Error with write config file\n", pdc_server_rank_g); + +#ifdef ENABLE_ROCKSDB + if (use_rocksdb_g) { + /* rocksdb_backup_engine_t *be; */ + rocksdb_options_t *options = rocksdb_options_create(); + rocksdb_options_increase_parallelism(options, 2); + rocksdb_options_optimize_level_style_compaction(options, 0); + rocksdb_options_set_create_if_missing(options, 1); + + rocksdb_block_based_table_options_t *table_options = rocksdb_block_based_options_create(); + rocksdb_filterpolicy_t * filter_policy = rocksdb_filterpolicy_create_bloom(10); + rocksdb_block_based_options_set_filter_policy(table_options, filter_policy); + + rocksdb_options_set_block_based_table_factory(options, table_options); + rocksdb_slicetransform_t *slicetransform = rocksdb_slicetransform_create_fixed_prefix(3); + rocksdb_options_set_prefix_extractor(options, slicetransform); + + char *err = NULL; + char rocksdb_path[ADDR_MAX]; + snprintf(rocksdb_path, ADDR_MAX, "/tmp/PDC_rocksdb_%d", pdc_server_rank_g); + + // Remove the in-memory db + remove_directory(rocksdb_path); + + // Create db + rocksdb_g = rocksdb_open(options, rocksdb_path, &err); + assert(!err); + if (pdc_server_rank_g == 0) + printf("==PDC_SERVER[%d]: RocksDB initialized\n", pdc_server_rank_g); + } + +#endif + +#ifdef ENABLE_SQLITE3 + if (use_sqlite3_g) { + char *errMessage = 0; + char sqlite3_path[ADDR_MAX]; + snprintf(sqlite3_path, ADDR_MAX, "/tmp/PDC_sqlite3_%d", pdc_server_rank_g); + sqlite3_open(sqlite3_path, &sqlite3_db_g); + + sqlite3_exec(sqlite3_db_g, + "CREATE TABLE objects (objid INTEGER, name TEXT, value_text TEXT, " + "value_int INTEGER, value_float REAL, value_double REAL, value_blob BLOB);", + 0, 0, &errMessage); + if (errMessage) + printf("==PDC_SERVER[%d]: error from SQLite %s!\n", pdc_server_rank_g, errMessage); + + // Create indexes + sqlite3_exec(sqlite3_db_g, "CREATE INDEX index_name ON objects(name);", 0, 0, &errMessage); + if (errMessage) + printf("==PDC_SERVER[%d]: error from SQLite %s!\n", pdc_server_rank_g, errMessage); + sqlite3_exec(sqlite3_db_g, "CREATE INDEX index_value_int ON objects(value_int);", 0, 0, &errMessage); + if (errMessage) + printf("==PDC_SERVER[%d]: error from SQLite %s!\n", pdc_server_rank_g, errMessage); + sqlite3_exec(sqlite3_db_g, "CREATE INDEX index_value_text ON objects(value_text);", 0, 0, + &errMessage); + if (errMessage) + printf("==PDC_SERVER[%d]: error from SQLite %s!\n", pdc_server_rank_g, errMessage); + sqlite3_exec(sqlite3_db_g, "CREATE INDEX index_value_float ON objects(value_float);", 0, 0, + &errMessage); + if (errMessage) + printf("==PDC_SERVER[%d]: error from SQLite %s!\n", pdc_server_rank_g, errMessage); + sqlite3_exec(sqlite3_db_g, "CREATE INDEX index_value_double ON objects(value_double);", 0, 0, + &errMessage); + if (errMessage) + printf("==PDC_SERVER[%d]: error from SQLite %s!\n", pdc_server_rank_g, errMessage); + } +#endif + #ifdef PDC_TIMING #ifdef ENABLE_MPI pdc_server_timings->PDCserver_start_total += MPI_Wtime() - start; @@ -2175,6 +2352,30 @@ server_run(int argc, char *argv[]) #endif done: +#ifdef ENABLE_ROCKSDB + if (use_rocksdb_g) { + char rocksdb_fname[ADDR_MAX]; + struct stat st; + snprintf(rocksdb_fname, ADDR_MAX, "/tmp/PDC_rocksdb_%d", pdc_server_rank_g); + stat(rocksdb_fname, &st); + printf("==PDC_SERVER[%d]: RocksDB file size %lu\n", pdc_server_rank_g, st.st_size); + + rocksdb_close(rocksdb_g); + } +#endif + +#ifdef ENABLE_SQLITE3 + if (use_sqlite3_g) { + char sqlite3_fname[ADDR_MAX]; + struct stat st; + snprintf(sqlite3_fname, ADDR_MAX, "/tmp/PDC_sqlite3_%d", pdc_server_rank_g); + stat(sqlite3_fname, &st); + printf("==PDC_SERVER[%d]: SQLite3 max memory usage: %llu, DB file size %lu\n", pdc_server_rank_g, + sqlite3_memory_highwater(0), st.st_size); + sqlite3_close(sqlite3_db_g); + } +#endif + #ifdef PDC_TIMING PDC_server_timing_report(); #endif @@ -2183,4 +2384,4 @@ server_run(int argc, char *argv[]) MPI_Finalize(); #endif return 0; -} \ No newline at end of file +} diff --git a/src/server/pdc_server_metadata.c b/src/server/pdc_server_metadata.c index b887d7685..1f5e58c3a 100644 --- a/src/server/pdc_server_metadata.c +++ b/src/server/pdc_server_metadata.c @@ -50,6 +50,7 @@ #include "pdc_server.h" #include "mercury_hash_table.h" #include "pdc_malloc.h" +#include "string_utils.h" #define BLOOM_TYPE_T counting_bloom_t #define BLOOM_NEW new_counting_bloom @@ -1622,9 +1623,191 @@ _is_matching_kvtag(pdc_kvtag_t *in, pdc_kvtag_t *kvtag) FUNC_LEAVE(ret_value); } -perr_t -PDC_Server_get_kvtag_query_result(pdc_kvtag_t *in /*FIXME: query input should be string-based*/, - uint32_t *n_meta, uint64_t **obj_ids) +#ifdef ENABLE_SQLITE3 +static int +sqlite_query_kvtag_callback(void *data, int argc, char **argv, char **colName) +{ + pdc_sqlite3_query_t *query_data = (pdc_sqlite3_query_t *)data; + + if (NULL != argv[0]) { + pdcid_t id = strtoull(argv[0], NULL, 10); + if (query_data->nobj >= query_data->nalloc) { + query_data->nalloc *= 2; + *query_data->obj_ids = realloc(*query_data->obj_ids, query_data->nalloc * sizeof(uint64_t)); + } + (*query_data->obj_ids)[query_data->nobj] = id; + query_data->nobj += 1; + /* printf("SQLite3 found %s = %llu\n", colName[0], id); */ + } + else { + printf("SQLite3 found nothing\n"); + return 0; + } + + return 0; +} +#endif + +static perr_t +PDC_Server_query_kvtag_rocksdb(pdc_kvtag_t *in, uint32_t *n_meta, uint64_t **obj_ids, uint64_t alloc_size) +{ + perr_t ret_value = SUCCEED; +#ifdef ENABLE_ROCKSDB + const char *rocksdb_key; + pdc_kvtag_t tmp; + uint64_t obj_id; + char name[TAG_LEN_MAX]; + size_t len; + uint32_t iter = 0; + + rocksdb_readoptions_t *readoptions = rocksdb_readoptions_create(); + rocksdb_iterator_t * rocksdb_iter = rocksdb_create_iterator(rocksdb_g, readoptions); + rocksdb_iter_seek_to_first(rocksdb_iter); + + // Iterate over all rocksdb kv + while (rocksdb_iter_valid(rocksdb_iter)) { + rocksdb_key = rocksdb_iter_key(rocksdb_iter, &len); + /* sprintf(rocksdb_key, "%lu`%s", obj_id, in->kvtag.name); */ + sscanf(rocksdb_key, "%lu`%s", &obj_id, name); + tmp.name = name; + tmp.value = (void *)rocksdb_iter_value(rocksdb_iter, &len); + tmp.size = len; + tmp.type = in->type; + + if (_is_matching_kvtag(in, &tmp) == TRUE) { + if (iter >= alloc_size) { + alloc_size *= 2; + *obj_ids = (void *)realloc(*obj_ids, alloc_size * sizeof(uint64_t)); + } + (*obj_ids)[iter++] = obj_id; + } + + /* printf("==PDC_SERVER[%d]: rocksdb iter [%s] [%d], len %d\n", pdc_server_rank_g, tmp.name, + * *((int*)tmp.value), tmp.size); */ + rocksdb_iter_next(rocksdb_iter); + } + + *n_meta = iter; + // Debug + /* printf("==PDC_SERVER[%d]: rocksdb found %d objids \n", pdc_server_rank_g, iter); */ + + if (rocksdb_iter) + rocksdb_iter_destroy(rocksdb_iter); +#else + printf("==PDC_SERVER[%d]: enabled rocksdb but PDC is not compiled with it!\n", pdc_server_rank_g); + ret_value = FAIL; +#endif + + return ret_value; +} + +static perr_t +PDC_Server_query_kvtag_sqlite(pdc_kvtag_t *in, uint32_t *n_meta, uint64_t **obj_ids, uint64_t alloc_size) +{ + perr_t ret_value = SUCCEED; +#ifdef ENABLE_SQLITE3 + char sql[TAG_LEN_MAX]; + char * errMessage = NULL; + char * tmp_value, *tmp_name, *current_pos; + pdc_sqlite3_query_t query_data; + + // Check if there is * in tag name + if (NULL == strstr(in->name, "*")) { + // exact name match + if (in->type == PDC_STRING) { + // valut type is string + if (NULL == strstr((char *)in->value, "*")) { + // exact name and value string match + sprintf(sql, "SELECT objid FROM objects WHERE name = \'%s\' AND value_text = \'%s\';", + in->name, (char *)in->value); + } + else { + // value has * in it + tmp_value = strdup((char *)in->value); + // replace * with % for sqlite3 + current_pos = strchr(tmp_value, '*'); + while (current_pos) { + *current_pos = '%'; + current_pos = strchr(current_pos, '*'); + } + + sprintf(sql, "SELECT objid FROM objects WHERE name = \'%s\' AND value_text LIKE \'%s\';", + in->name, tmp_value); + if (tmp_value) + free(tmp_value); + } + } + else { + // Only check name for non string value type + sprintf(sql, "SELECT objid FROM objects WHERE name = \'%s\';", in->name); + } + } + else { + tmp_name = strdup(in->name); + // replace * with % for sqlite3 + current_pos = strchr(tmp_name, '*'); + while (current_pos) { + *current_pos = '%'; + current_pos = strchr(current_pos, '*'); + } + + sprintf(sql, "SELECT objid FROM objects WHERE name LIKE \'%s\';", tmp_name); + + if (in->type == PDC_STRING) { + // valut type is string + if (NULL == strstr((char *)in->value, "*")) { + // exact name and value string match + sprintf(sql, "SELECT objid FROM objects WHERE name LIKE \'%s\' AND value_text = \'%s\';", + tmp_name, (char *)in->value); + } + else { + // value has * in it + tmp_value = strdup((char *)in->value); + // replace * with % for sqlite3 + current_pos = strchr(tmp_value, '*'); + while (current_pos) { + *current_pos = '%'; + current_pos = strchr(current_pos, '*'); + } + + sprintf(sql, "SELECT objid FROM objects WHERE name LIKE \'%s\' AND value_text LIKE \'%s\';", + tmp_name, tmp_value); + if (tmp_value) + free(tmp_value); + } + } + else { + // Only check name for non string value type + sprintf(sql, "SELECT objid FROM objects WHERE name LIKE \'%s\';", tmp_name); + } + + if (tmp_name) + free(tmp_name); + } + + query_data.nobj = 0; + query_data.nalloc = alloc_size; + query_data.obj_ids = obj_ids; + + // debug + /* printf("==PDC_SERVER[%d]: constructed SQL [%s]\n", pdc_server_rank_g, sql); */ + + // Construct a SQL query + sqlite3_exec(sqlite3_db_g, sql, sqlite_query_kvtag_callback, &query_data, &errMessage); + if (errMessage) + printf("==PDC_SERVER[%d]: error from SQLite %s!\n", pdc_server_rank_g, errMessage); + + *n_meta = query_data.nobj; +#else + printf("==PDC_SERVER[%d]: enabled SQLite3 but PDC is not compiled with it!\n", pdc_server_rank_g); + ret_value = FAIL; +#endif + + return ret_value; +} + +static perr_t +PDC_Server_query_kvtag_someta(pdc_kvtag_t *in, uint32_t *n_meta, uint64_t **obj_ids, uint64_t alloc_size) { perr_t ret_value = SUCCEED; uint32_t iter = 0; @@ -1634,13 +1817,6 @@ PDC_Server_get_kvtag_query_result(pdc_kvtag_t *in /*FIXME: query input should be HashTableIterator hash_table_iter; int n_entry, is_name_match, is_value_match; HashTablePair pair; - uint32_t alloc_size = 100; - - FUNC_ENTER(NULL); - - *n_meta = 0; - - *obj_ids = (void *)calloc(alloc_size, sizeof(uint64_t)); if (metadata_hash_table_g != NULL) { @@ -1652,14 +1828,14 @@ PDC_Server_get_kvtag_query_result(pdc_kvtag_t *in /*FIXME: query input should be head = pair.value; DL_FOREACH(head->metadata, elt) { - DL_FOREACH(elt->kvtag_list_head, kvtag_list_elt) - { #ifdef PDC_DEBUG_OUTPUT - printf("==PDC_SERVER: Matching kvtag [\"%s\":\"%s\"] of object %s on condition in->key: " - "%s, in->value: %s ", - (char *)kvtag_list_elt->kvtag->name, (char *)kvtag_list_elt->kvtag->value, - elt->obj_name, in->name, in->value); + printf("==PDC_SERVER: Matching kvtag [\"%s\":\"%s\"] of object %s on condition in->key: " + "%s, in->value: %s ", + (char *)kvtag_list_elt->kvtag->name, (char *)kvtag_list_elt->kvtag->value, + elt->obj_name, in->name, in->value); #endif + DL_FOREACH(elt->kvtag_list_head, kvtag_list_elt) + { if (_is_matching_kvtag(in, kvtag_list_elt->kvtag) == TRUE) { #ifdef PDC_DEBUG_OUTPUT println("[Found]"); @@ -1669,23 +1845,58 @@ PDC_Server_get_kvtag_query_result(pdc_kvtag_t *in /*FIXME: query input should be *obj_ids = (void *)realloc(*obj_ids, alloc_size * sizeof(uint64_t)); } (*obj_ids)[iter++] = elt->obj_id; - // break; // FIXME: shall we break here? or continue to check other kvtags? + break; } - else { + } // End for each kvtag in list + } // End for each metadata from hash table entry + } // End looping metadata hash table + *n_meta = iter; #ifdef PDC_DEBUG_OUTPUT - println("[NOT FOUND]"); + printf("==PDC_SERVER[%d]: found %d objids \n", pdc_server_rank_g, iter); #endif - } - - } // End for each kvtag - } // End for each metadata - } // End while - *n_meta = iter; } // if (metadata_hash_table_g != NULL) else { printf("==PDC_SERVER: metadata_hash_table_g not initialized!\n"); ret_value = FAIL; - goto done; + } + + return ret_value; +} + +perr_t +PDC_Server_get_kvtag_query_result(pdc_kvtag_t *in /*FIXME: query input should be string-based*/, + uint32_t *n_meta, uint64_t **obj_ids) +{ + perr_t ret_value = SUCCEED; + + uint32_t alloc_size = 128; + + FUNC_ENTER(NULL); + + *n_meta = 0; + *obj_ids = (void *)calloc(alloc_size, sizeof(uint64_t)); + + if (use_rocksdb_g == 1) { + ret_value = PDC_Server_query_kvtag_rocksdb(in, n_meta, obj_ids, alloc_size); + if (ret_value != SUCCEED) { + printf("==PDC_SERVER[%d]: Error with PDC_Server_query_kvtag_rocksdb!\n", pdc_server_rank_g); + goto done; + } + } + else if (use_sqlite3_g) { + ret_value = PDC_Server_query_kvtag_sqlite(in, n_meta, obj_ids, alloc_size); + if (ret_value != SUCCEED) { + printf("==PDC_SERVER[%d]: Error with PDC_Server_query_kvtag_sqlite!\n", pdc_server_rank_g); + goto done; + } + } // End if SQLite3 + else { + // SoMeta backend + ret_value = PDC_Server_query_kvtag_someta(in, n_meta, obj_ids, alloc_size); + if (ret_value != SUCCEED) { + printf("==PDC_SERVER[%d]: Error with PDC_Server_query_kvtag_someta!\n", pdc_server_rank_g); + goto done; + } } done: @@ -2558,43 +2769,95 @@ PDC_add_kvtag_to_list(pdc_kvtag_list_t **list_head, pdc_kvtag_t *tag) FUNC_LEAVE(ret_value); } -perr_t -PDC_Server_add_kvtag(metadata_add_kvtag_in_t *in, metadata_add_tag_out_t *out) +static perr_t +PDC_Server_add_kvtag_rocksdb(metadata_add_kvtag_in_t *in, metadata_add_tag_out_t *out) { - - perr_t ret_value = SUCCEED; - uint32_t hash_key; - uint64_t obj_id; -#ifdef ENABLE_MULTITHREAD - int unlocked; + perr_t ret_value = SUCCEED; +#ifdef ENABLE_ROCKSDB + rocksdb_writeoptions_t *writeoptions = rocksdb_writeoptions_create(); + char rocksdb_key[TAG_LEN_MAX] = {0}; + sprintf(rocksdb_key, "%lu`%s", in->obj_id, in->kvtag.name); + char *err = NULL; + // Debug + /* printf("Put [%s] [%d], len%lu\n", in->kvtag.name, *((int*)in->kvtag.value), in->kvtag.size); */ + rocksdb_put(rocksdb_g, writeoptions, rocksdb_key, strlen(rocksdb_key) + 1, in->kvtag.value, + in->kvtag.size, &err); + if (err != NULL) { + printf("==PDC_SERVER[%d]: error with rocksdb_put %s, [%s]!\n", pdc_server_rank_g, in->kvtag.name, + err); + ret_value = FAIL; + } + else + out->ret = 1; +#else + printf("==PDC_SERVER[%d]: enabled rocksdb but PDC is not compiled with it!\n", pdc_server_rank_g); + ret_value = FAIL; #endif - pdc_hash_table_entry_head * lookup_value; - pdc_cont_hash_table_entry_t *cont_lookup_value; - FUNC_ENTER(NULL); + return ret_value; +} -#ifdef ENABLE_TIMING - struct timeval pdc_timer_start; - struct timeval pdc_timer_end; - double ht_total_sec; - gettimeofday(&pdc_timer_start, 0); -#endif +static perr_t +PDC_Server_add_kvtag_sqlite3(metadata_add_kvtag_in_t *in, metadata_add_tag_out_t *out) +{ + perr_t ret_value = SUCCEED; +#ifdef ENABLE_SQLITE3 + char sql[TAG_LEN_MAX] = {0}; + char *errMessage = NULL; - hash_key = in->hash_value; - obj_id = in->obj_id; + if (in->kvtag.type == PDC_STRING || in->kvtag.type == PDC_CHAR) { + sprintf(sql, "INSERT INTO objects (objid, name, value_text) VALUES (%llu, '%s', '%s');", in->obj_id, + in->kvtag.name, (char *)in->kvtag.value); + } + else if (in->kvtag.type == PDC_INT && in->kvtag.size == sizeof(int)) { + sprintf(sql, "INSERT INTO objects (objid, name, value_int) VALUES (%llu, '%s', '%d');", in->obj_id, + in->kvtag.name, *((int *)in->kvtag.value)); + } + else if (in->kvtag.type == PDC_FLOAT && in->kvtag.size == sizeof(float)) { + sprintf(sql, "INSERT INTO objects (objid, name, value_float) VALUES (%llu, '%s', '%f');", in->obj_id, + in->kvtag.name, *((float *)in->kvtag.value)); + } + else if (in->kvtag.type == PDC_DOUBLE && in->kvtag.size == sizeof(double)) { + sprintf(sql, "INSERT INTO objects (objid, name, value_double) VALUES (%llu, '%s', '%lf');", + in->obj_id, in->kvtag.name, *((double *)in->kvtag.value)); + } + else { + printf("==PDC_SERVER[%d]: datatype not supported %d!\n", pdc_server_rank_g, in->kvtag.type); + ret_value = FAIL; + goto done; + } - // printf("==SERVER[%d]: PDC_add_kvtag::in.obj_id = %llu \n ", pdc_server_rank_g, obj_id); + // debug + /* printf("==PDC_SERVER[%d]: constructed SQL [%s]\n", pdc_server_rank_g, sql); */ + sqlite3_exec(sqlite3_db_g, sql, NULL, 0, &errMessage); -#ifdef ENABLE_MULTITHREAD - // Obtain lock for hash table - unlocked = 0; - hg_thread_mutex_lock(&pdc_metadata_hash_table_mutex_g); + if (errMessage) + printf("==PDC_SERVER[%d]: error from SQLite %s!\n", pdc_server_rank_g, errMessage); + else + out->ret = 1; +#else + printf("==PDC_SERVER[%d]: enabled SQLite3 but PDC is not compiled with it!\n", pdc_server_rank_g); + ret_value = FAIL; #endif +done: + return ret_value; +} + +static perr_t +PDC_Server_add_kvtag_someta(metadata_add_kvtag_in_t *in, metadata_add_tag_out_t *out) +{ + perr_t ret_value = SUCCEED; + pdc_hash_table_entry_head * lookup_value; + pdc_cont_hash_table_entry_t *cont_lookup_value; + uint32_t hash_key; + + hash_key = in->hash_value; + lookup_value = hash_table_lookup(metadata_hash_table_g, &hash_key); if (lookup_value != NULL) { pdc_metadata_t *target; - target = find_metadata_by_id_from_list(lookup_value->metadata, obj_id); + target = find_metadata_by_id_from_list(lookup_value->metadata, in->obj_id); if (target != NULL) { PDC_add_kvtag_to_list(&target->kvtag_list_head, &in->kvtag); out->ret = 1; @@ -2612,12 +2875,64 @@ PDC_Server_add_kvtag(metadata_add_kvtag_in_t *in, metadata_add_tag_out_t *out) out->ret = 1; } else { - printf("==PDC_SERVER[%d]: add tag target %" PRIu64 " not found!\n", pdc_server_rank_g, obj_id); + printf("==PDC_SERVER[%d]: add tag target %" PRIu64 " not found!\n", pdc_server_rank_g, + in->obj_id); ret_value = FAIL; out->ret = -1; } } + return ret_value; +} + +perr_t +PDC_Server_add_kvtag(metadata_add_kvtag_in_t *in, metadata_add_tag_out_t *out) +{ + perr_t ret_value = SUCCEED; +#ifdef ENABLE_MULTITHREAD + int unlocked; +#endif + FUNC_ENTER(NULL); + +#ifdef ENABLE_TIMING + struct timeval pdc_timer_start; + struct timeval pdc_timer_end; + double ht_total_sec; + gettimeofday(&pdc_timer_start, 0); +#endif + + out->ret = -1; + // printf("==SERVER[%d]: PDC_add_kvtag::in.obj_id = %llu \n ", pdc_server_rank_g, obj_id); + +#ifdef ENABLE_MULTITHREAD + // Obtain lock for hash table + unlocked = 0; + hg_thread_mutex_lock(&pdc_metadata_hash_table_mutex_g); +#endif + + if (use_rocksdb_g == 1) { + ret_value = PDC_Server_add_kvtag_rocksdb(in, out); + if (ret_value != SUCCEED) { + printf("==PDC_SERVER[%d]: Error with PDC_Server_add_kvtag_rocksdb!\n", pdc_server_rank_g); + goto done; + } + } // End if rocksdb + else if (use_sqlite3_g == 1) { + ret_value = PDC_Server_add_kvtag_sqlite3(in, out); + if (ret_value != SUCCEED) { + printf("==PDC_SERVER[%d]: Error with PDC_Server_add_kvtag_sqlite3!\n", pdc_server_rank_g); + goto done; + } + } // End if sqlite3 + else { + ret_value = PDC_Server_add_kvtag_someta(in, out); + if (ret_value != SUCCEED) { + printf("==PDC_SERVER[%d]: Error with PDC_Server_add_kvtag_someta!\n", pdc_server_rank_g); + goto done; + } + } + +done: #ifdef ENABLE_MULTITHREAD // ^ Release hash table lock hg_thread_mutex_unlock(&pdc_metadata_hash_table_mutex_g); @@ -2674,37 +2989,126 @@ PDC_get_kvtag_value_from_list(pdc_kvtag_list_t **list_head, char *key, metadata_ FUNC_LEAVE(ret_value); } -perr_t -PDC_Server_get_kvtag(metadata_get_kvtag_in_t *in, metadata_get_kvtag_out_t *out) +#ifdef ENABLE_SQLITE3 +static int +sqlite_get_kvtag_callback(void *data, int argc, char **argv, char **colName) { + pdc_kvtag_t *out = (pdc_kvtag_t *)data; + + for (int i = 0; i < argc; i++) { + if (NULL != argv[i]) { + if (0 == strcmp(colName[i], "value_int")) { + int *int_tmp = (int *)malloc(sizeof(int)); + *int_tmp = atoi(argv[i]); + out->value = (void *)int_tmp; + out->size = sizeof(int); + /* printf("SQLite3 found %s = %d\n", colName[i], int_tmp); */ + break; + } + else if (0 == strcmp(colName[i], "value_real")) { + float *float_tmp = (float *)malloc(sizeof(float)); + *float_tmp = (float)atof(argv[i]); + out->value = (void *)float_tmp; + out->size = sizeof(float); + /* printf("SQLite3 found %s = %f\n", colName[i], float_tmp); */ + break; + } + else if (0 == strcmp(colName[i], "value_double")) { + double *double_tmp = (double *)malloc(sizeof(double)); + *double_tmp = atof(argv[i]); + out->value = (void *)double_tmp; + out->size = sizeof(double); + /* printf("SQLite3 found %s = %f\n", colName[i], double_tmp); */ + break; + } + else if (0 == strcmp(colName[i], "value_text")) { + out->value = strdup(argv[i]); + /* printf("SQLite3 found %s = %s\n", colName[i], argv[i]); */ + out->size = strlen(argv[i]) + 1; + break; + } + else { + out->value = NULL; + /* printf("SQLite3 found nothing\n"); */ + return 0; + } + } + } - perr_t ret_value = SUCCEED; - uint32_t hash_key; - uint64_t obj_id; -#ifdef ENABLE_MULTITHREAD - int unlocked; + return 0; +} #endif - pdc_hash_table_entry_head * lookup_value; - pdc_cont_hash_table_entry_t *cont_lookup_value; - FUNC_ENTER(NULL); +static perr_t +PDC_Server_get_kvtag_rocksdb(metadata_get_kvtag_in_t *in, metadata_get_kvtag_out_t *out) +{ + perr_t ret_value = SUCCEED; -#ifdef ENABLE_TIMING - struct timeval pdc_timer_start; - struct timeval pdc_timer_end; - double ht_total_sec; - gettimeofday(&pdc_timer_start, 0); +#ifdef ENABLE_ROCKSDB + rocksdb_readoptions_t *readoptions = rocksdb_readoptions_create(); + char rocksdb_key[TAG_LEN_MAX] = {0}; + sprintf(rocksdb_key, "%lu`%s", in->obj_id, in->key); + char * err = NULL; + size_t len; + char * value = rocksdb_get(rocksdb_g, readoptions, rocksdb_key, strlen(rocksdb_key) + 1, &len, &err); + if (value == NULL) { + printf("==PDC_SERVER[%d]: error with rocksdb_get %s, [%s]!\n", pdc_server_rank_g, in->key, err); + ret_value = FAIL; + } + out->kvtag.name = in->key; + out->kvtag.size = len; + out->kvtag.value = value; + out->ret = 1; +#else + printf("==PDC_SERVER[%d]: enabled rocksdb but PDC is not compiled with it!\n", pdc_server_rank_g); + ret_value = FAIL; #endif - hash_key = in->hash_value; - obj_id = in->obj_id; + return ret_value; +} -#ifdef ENABLE_MULTITHREAD - // Obtain lock for hash table - unlocked = 0; - hg_thread_mutex_lock(&pdc_metadata_hash_table_mutex_g); +static perr_t +PDC_Server_get_kvtag_sqlite3(metadata_get_kvtag_in_t *in, metadata_get_kvtag_out_t *out) +{ + perr_t ret_value = SUCCEED; +#ifdef ENABLE_SQLITE3 + char sql[TAG_LEN_MAX]; + char *errMessage = NULL; + sprintf(sql, + "SELECT value_text, value_int, value_float, value_double, value_blob FROM objects WHERE " + "objid = %llu AND name = \'%s\';", + in->obj_id, in->key); + + /* printf("==PDC_SERVER[%d]: get kvtag [%s]!\n", pdc_server_rank_g, in->key); */ + sqlite3_exec(sqlite3_db_g, sql, sqlite_get_kvtag_callback, &out->kvtag, &errMessage); + if (errMessage) { + printf("==PDC_SERVER[%d]: error from SQLite %s!\n", pdc_server_rank_g, errMessage); + } + else { + // size and value is filled in sqlite_get_kvtag_callback + out->kvtag.name = in->key; + out->ret = 1; + } +#else + printf("==PDC_SERVER[%d]: enabled SQLite3 but PDC is not compiled with it!\n", pdc_server_rank_g); + ret_value = FAIL; #endif + return ret_value; +} + +static perr_t +PDC_Server_get_kvtag_someta(metadata_get_kvtag_in_t *in, metadata_get_kvtag_out_t *out) +{ + perr_t ret_value = SUCCEED; + uint32_t hash_key; + uint64_t obj_id; + pdc_hash_table_entry_head * lookup_value; + pdc_cont_hash_table_entry_t *cont_lookup_value; + + hash_key = in->hash_value; + obj_id = in->obj_id; + lookup_value = hash_table_lookup(metadata_hash_table_g, &hash_key); if (lookup_value != NULL) { pdc_metadata_t *target; @@ -2731,11 +3135,58 @@ PDC_Server_get_kvtag(metadata_get_kvtag_in_t *in, metadata_get_kvtag_out_t *out) } } - if (ret_value != SUCCEED) { - printf("==PDC_SERVER[%d]: %s - error \n", pdc_server_rank_g, __func__); - goto done; + return ret_value; +} + +perr_t +PDC_Server_get_kvtag(metadata_get_kvtag_in_t *in, metadata_get_kvtag_out_t *out) +{ + perr_t ret_value = SUCCEED; +#ifdef ENABLE_MULTITHREAD + int unlocked; +#endif + + FUNC_ENTER(NULL); + +#ifdef ENABLE_TIMING + struct timeval pdc_timer_start; + struct timeval pdc_timer_end; + double ht_total_sec; + gettimeofday(&pdc_timer_start, 0); +#endif + + out->ret = -1; + +#ifdef ENABLE_MULTITHREAD + // Obtain lock for hash table + unlocked = 0; + hg_thread_mutex_lock(&pdc_metadata_hash_table_mutex_g); +#endif + + if (use_rocksdb_g == 1) { + ret_value = PDC_Server_get_kvtag_rocksdb(in, out); + if (ret_value != SUCCEED) { + printf("==PDC_SERVER[%d]: Error with PDC_Server_get_kvtag_rocksdb!\n", pdc_server_rank_g); + goto done; + } + } + else if (use_sqlite3_g == 1) { + ret_value = PDC_Server_get_kvtag_sqlite3(in, out); + if (ret_value != SUCCEED) { + printf("==PDC_SERVER[%d]: Error with PDC_Server_get_kvtag_sqlite3!\n", pdc_server_rank_g); + goto done; + } + } + else { + // Someta + ret_value = PDC_Server_get_kvtag_someta(in, out); + if (ret_value != SUCCEED) { + printf("==PDC_SERVER[%d]: Error with PDC_Server_get_kvtag_someta!\n", pdc_server_rank_g); + goto done; + } } +done: #ifdef ENABLE_MULTITHREAD // ^ Release hash table lock hg_thread_mutex_unlock(&pdc_metadata_hash_table_mutex_g); @@ -2760,7 +3211,6 @@ PDC_Server_get_kvtag(metadata_get_kvtag_in_t *in, metadata_get_kvtag_out_t *out) hg_thread_mutex_unlock(&pdc_time_mutex_g); #endif -done: #ifdef ENABLE_MULTITHREAD if (unlocked == 0) hg_thread_mutex_unlock(&pdc_metadata_hash_table_mutex_g); @@ -2795,36 +3245,68 @@ PDC_del_kvtag_value_from_list(pdc_kvtag_list_t **list_head, char *key) FUNC_LEAVE(ret_value); } -perr_t -PDC_Server_del_kvtag(metadata_get_kvtag_in_t *in, metadata_add_tag_out_t *out) +static perr_t +PDC_Server_del_kvtag_rocksdb(metadata_get_kvtag_in_t *in, metadata_add_tag_out_t *out) { - - perr_t ret_value = SUCCEED; - uint32_t hash_key; - uint64_t obj_id; -#ifdef ENABLE_MULTITHREAD - int unlocked; + perr_t ret_value = SUCCEED; +#ifdef ENABLE_ROCKSDB + char * err = NULL; + char rocksdb_key[TAG_LEN_MAX] = {0}; + rocksdb_writeoptions_t *writeoptions = rocksdb_writeoptions_create(); + + sprintf(rocksdb_key, "%lu`%s", in->obj_id, in->key); + rocksdb_delete(rocksdb_g, writeoptions, rocksdb_key, strlen(rocksdb_key) + 1, &err); + if (err != NULL) { + printf("==PDC_SERVER[%d]: error with rocksdb_delete [%s], [%s]!\n", pdc_server_rank_g, in->key, err); + ret_value = FAIL; + } + else + out->ret = 1; +#else + printf("==PDC_SERVER[%d]: enabled rocksdb but PDC is not compiled with it!\n", pdc_server_rank_g); + ret_value = FAIL; #endif - pdc_hash_table_entry_head * lookup_value; - pdc_cont_hash_table_entry_t *cont_lookup_value; - FUNC_ENTER(NULL); + return ret_value; +} -#ifdef ENABLE_TIMING - struct timeval pdc_timer_start; - struct timeval pdc_timer_end; - double ht_total_sec; - gettimeofday(&pdc_timer_start, 0); +static perr_t +PDC_Server_del_kvtag_sqlite3(metadata_get_kvtag_in_t *in, metadata_add_tag_out_t *out) +{ + perr_t ret_value = SUCCEED; +#ifdef ENABLE_SQLITE3 + char sql[TAG_LEN_MAX]; + char *errMessage = NULL; + + sprintf(sql, "DELETE FROM objects WHERE objid = %llu AND name = \'%s\';", in->obj_id, in->key); + + sqlite3_exec(sqlite3_db_g, sql, NULL, 0, &errMessage); + if (errMessage) { + printf("==PDC_SERVER[%d]: error from SQLite %s!\n", pdc_server_rank_g, errMessage); + ret_value = FAIL; + } + else + out->ret = 1; +#else + printf("==PDC_SERVER[%d]: enabled SQLite3 but PDC is not compiled with it!\n", pdc_server_rank_g); + ret_value = FAIL; #endif + return ret_value; +} + +static perr_t +PDC_Server_del_kvtag_someta(metadata_get_kvtag_in_t *in, metadata_add_tag_out_t *out) +{ + perr_t ret_value = SUCCEED; + uint32_t hash_key; + uint64_t obj_id; + pdc_hash_table_entry_head * lookup_value; + pdc_cont_hash_table_entry_t *cont_lookup_value; + hash_key = in->hash_value; obj_id = in->obj_id; -#ifdef ENABLE_MULTITHREAD - // Obtain lock for hash table - hg_thread_mutex_lock(&pdc_metadata_hash_table_mutex_g); -#endif - // Look obj tags first lookup_value = hash_table_lookup(metadata_hash_table_g, &hash_key); if (lookup_value != NULL) { @@ -2839,7 +3321,6 @@ PDC_Server_del_kvtag(metadata_get_kvtag_in_t *in, metadata_add_tag_out_t *out) out->ret = -1; printf("==PDC_SERVER[%d]: %s - failed to find requested kvtag [%s]\n", pdc_server_rank_g, __func__, in->key); - goto done; } } else { @@ -2853,6 +3334,54 @@ PDC_Server_del_kvtag(metadata_get_kvtag_in_t *in, metadata_add_tag_out_t *out) out->ret = -1; printf("==PDC_SERVER[%d]: %s - failed to find requested kvtag [%s]\n", pdc_server_rank_g, __func__, in->key); + } + } + + return ret_value; +} + +perr_t +PDC_Server_del_kvtag(metadata_get_kvtag_in_t *in, metadata_add_tag_out_t *out) +{ + perr_t ret_value = SUCCEED; +#ifdef ENABLE_MULTITHREAD + int unlocked; +#endif + + FUNC_ENTER(NULL); + +#ifdef ENABLE_TIMING + struct timeval pdc_timer_start; + struct timeval pdc_timer_end; + double ht_total_sec; + gettimeofday(&pdc_timer_start, 0); +#endif + + out->ret = -1; + +#ifdef ENABLE_MULTITHREAD + // Obtain lock for hash table + hg_thread_mutex_lock(&pdc_metadata_hash_table_mutex_g); +#endif + + if (use_rocksdb_g) { + ret_value = PDC_Server_del_kvtag_rocksdb(in, out); + if (ret_value != SUCCEED) { + printf("==PDC_SERVER[%d]: Error with PDC_Server_del_kvtag_rocksdb!\n", pdc_server_rank_g); + goto done; + } + } + else if (use_sqlite3_g) { + ret_value = PDC_Server_del_kvtag_sqlite3(in, out); + if (ret_value != SUCCEED) { + printf("==PDC_SERVER[%d]: Error with PDC_Server_del_kvtag_sqlite3!\n", pdc_server_rank_g); + goto done; + } + } + else { + ret_value = PDC_Server_del_kvtag_someta(in, out); + if (ret_value != SUCCEED) { + printf("==PDC_SERVER[%d]: Error with PDC_Server_del_kvtag_someta!\n", pdc_server_rank_g); goto done; } } diff --git a/src/tests/CMakeLists.txt b/src/tests/CMakeLists.txt index 0c070e486..13c2a5a31 100644 --- a/src/tests/CMakeLists.txt +++ b/src/tests/CMakeLists.txt @@ -95,7 +95,7 @@ set(PROGRAMS # data_server_meta_test kvtag_add_get # kvtag_get -# kvtag_query + kvtag_query kvtag_query_scale # obj_transformation region_transfer_query @@ -258,6 +258,8 @@ add_test(NAME obj_life WORKING_DIRECTORY ${CMAKE_RUNTIME_OUTPUT_DIRECTO #add_test(NAME obj_dim WORKING_DIRECTORY ${CMAKE_RUNTIME_OUTPUT_DIRECTORY} COMMAND run_test.sh ./obj_dim ) add_test(NAME obj_buf WORKING_DIRECTORY ${CMAKE_RUNTIME_OUTPUT_DIRECTORY} COMMAND run_test.sh ./obj_buf ) add_test(NAME obj_tags WORKING_DIRECTORY ${CMAKE_RUNTIME_OUTPUT_DIRECTORY} COMMAND run_test.sh ./obj_tags ) +add_test(NAME kvtag_add_get WORKING_DIRECTORY ${CMAKE_RUNTIME_OUTPUT_DIRECTORY} COMMAND run_test.sh ./kvtag_add_get) +add_test(NAME kvtag_query WORKING_DIRECTORY ${CMAKE_RUNTIME_OUTPUT_DIRECTORY} COMMAND run_test.sh ./kvtag_query 100 1 10 0) add_test(NAME obj_info WORKING_DIRECTORY ${CMAKE_RUNTIME_OUTPUT_DIRECTORY} COMMAND run_test.sh ./obj_info ) add_test(NAME obj_put_data WORKING_DIRECTORY ${CMAKE_RUNTIME_OUTPUT_DIRECTORY} COMMAND run_test.sh ./obj_put_data ) add_test(NAME obj_get_data WORKING_DIRECTORY ${CMAKE_RUNTIME_OUTPUT_DIRECTORY} COMMAND run_test.sh ./obj_get_data ) @@ -337,6 +339,8 @@ set_tests_properties(obj_life PROPERTIES LABELS serial ) #set_tests_properties(obj_dim PROPERTIES LABELS serial ) set_tests_properties(obj_buf PROPERTIES LABELS serial ) set_tests_properties(obj_tags PROPERTIES LABELS serial ) +set_tests_properties(kvtag_add_get PROPERTIES LABELS serial ) +set_tests_properties(kvtag_query PROPERTIES LABELS serial ) set_tests_properties(obj_info PROPERTIES LABELS serial ) set_tests_properties(obj_put_data PROPERTIES LABELS serial ) set_tests_properties(obj_get_data PROPERTIES LABELS serial ) diff --git a/src/tests/dart_attr_dist_test.c b/src/tests/dart_attr_dist_test.c index 82daae911..f333d76f2 100644 --- a/src/tests/dart_attr_dist_test.c +++ b/src/tests/dart_attr_dist_test.c @@ -88,7 +88,7 @@ main(int argc, char *argv[]) size_t total_num_attr = atoi(argv[2]); pdcid_t *obj_ids; int i, j, k, pct, q_repeat_count = 100; - double stime, total_time; + double stime, total_time = 0; int val; char pdc_context_name[40]; @@ -121,17 +121,23 @@ main(int argc, char *argv[]) // &arr_len); // broadcast the size from rank 0 to all other processes +#ifdef ENABLE_MPI MPI_Bcast(&arr_len, 1, MPI_UNSIGNED_LONG, 0, MPI_COMM_WORLD); +#endif } else { // receive the size on all other ranks +#ifdef ENABLE_MPI MPI_Bcast(&arr_len, 1, MPI_UNSIGNED_LONG, 0, MPI_COMM_WORLD); +#endif // allocate memory for the array attr_2_obj_array = (int64_t *)malloc(arr_len * sizeof(int64_t)); } // broadcast the array itself +#ifdef ENABLE_MPI MPI_Bcast(attr_2_obj_array, arr_len, MPI_LONG_LONG_INT, 0, MPI_COMM_WORLD); +#endif // print array. for (i = 0; i < arr_len; ++i) { @@ -374,4 +380,4 @@ main(int argc, char *argv[]) #endif return 0; -} \ No newline at end of file +} diff --git a/src/tests/kvtag_query.c b/src/tests/kvtag_query.c index cf1e80dcb..3a6038d3f 100644 --- a/src/tests/kvtag_query.c +++ b/src/tests/kvtag_query.c @@ -24,7 +24,6 @@ #include #include -#include #include #include #include @@ -32,160 +31,215 @@ #include "pdc_client_connect.h" int -main() +assign_work_to_rank(int rank, int size, int nwork, int *my_count, int *my_start) { + if (rank > size || my_count == NULL || my_start == NULL) { + printf("assign_work_to_rank(): Error with input!\n"); + return -1; + } + if (nwork < size) { + if (rank < nwork) + *my_count = 1; + else + *my_count = 0; + (*my_start) = rank * (*my_count); + } + else { + (*my_count) = nwork / size; + (*my_start) = rank * (*my_count); + + // Last few ranks may have extra work + if (rank >= size - nwork % size) { + (*my_count)++; + (*my_start) += (rank - (size - nwork % size)); + } + } + + return 1; +} - int i; - pdcid_t pdc, cont_prop, cont, obj_prop1, obj_prop2, obj1, obj2; - uint64_t * obj_ids = NULL, *obj_ids1 = NULL, *obj_ids2 = NULL; - int nobj; - pdc_kvtag_t kvtag1, kvtag2, kvtag3; - char * v1 = "value1"; - int v2 = 2; - double v3 = 3.45; +void +print_usage(char *name) +{ + printf("%s n_obj n_round n_selectivity is_using_dart\n", name); + printf("Summary: This test will create n_obj objects, and add n_selectivity tags to each object. Then it " + "will " + "perform n_round collective queries against the tags, each query from each client should get " + "a whole result set.\n"); + printf("Parameters:\n"); + printf(" n_obj: number of objects\n"); + printf(" n_round: number of rounds, it can be the total number of tags too, as each round will perform " + "one query against one tag\n"); + printf(" n_selectivity: selectivity, on a 100 scale. \n"); + printf(" is_using_dart: 1 for using dart, 0 for not using dart\n"); +} + +int +main(int argc, char *argv[]) +{ + pdcid_t pdc, cont_prop, cont, obj_prop; + pdcid_t * obj_ids; + int n_obj, n_add_tag, my_obj, my_obj_s, my_add_tag, my_add_tag_s; + int proc_num = 1, my_rank = 0, i, v, iter, round, selectivity, is_using_dart; + char obj_name[128]; + double stime, total_time; + pdc_kvtag_t kvtag; + uint64_t * pdc_ids; + int nres, ntotal; + +#ifdef ENABLE_MPI + MPI_Init(&argc, &argv); + MPI_Comm_size(MPI_COMM_WORLD, &proc_num); + MPI_Comm_rank(MPI_COMM_WORLD, &my_rank); +#endif + + if (argc < 5) { + if (my_rank == 0) + print_usage(argv[0]); + goto done; + } + n_obj = atoi(argv[1]); + round = atoi(argv[2]); + selectivity = atoi(argv[3]); + is_using_dart = atoi(argv[4]); + n_add_tag = n_obj * selectivity / 100; // create a pdc pdc = PDCinit("pdc"); - printf("create a new pdc\n"); // create a container property cont_prop = PDCprop_create(PDC_CONT_CREATE, pdc); - if (cont_prop > 0) - printf("Create a container property\n"); - else + if (cont_prop <= 0) printf("Fail to create container property @ line %d!\n", __LINE__); // create a container cont = PDCcont_create("c1", cont_prop); - if (cont > 0) - printf("Create a container c1\n"); - else + if (cont <= 0) printf("Fail to create container @ line %d!\n", __LINE__); // create an object property - obj_prop1 = PDCprop_create(PDC_OBJ_CREATE, pdc); - if (obj_prop1 > 0) - printf("Create an object property\n"); - else - printf("Fail to create object property @ line %d!\n", __LINE__); - - obj_prop2 = PDCprop_create(PDC_OBJ_CREATE, pdc); - if (obj_prop2 > 0) - printf("Create an object property\n"); - else + obj_prop = PDCprop_create(PDC_OBJ_CREATE, pdc); + if (obj_prop <= 0) printf("Fail to create object property @ line %d!\n", __LINE__); - // create first object - obj1 = PDCobj_create(cont, "o1", obj_prop1); - if (obj1 > 0) - printf("Create an object o1\n"); - else - printf("Fail to create object @ line %d!\n", __LINE__); - - // create second object - obj2 = PDCobj_create(cont, "o2", obj_prop2); - if (obj2 > 0) - printf("Create an object o2\n"); - else - printf("Fail to create object @ line %d!\n", __LINE__); - - kvtag1.name = "key1string"; - kvtag1.value = (void *)v1; - kvtag1.type = PDC_STRING; - kvtag1.size = strlen(v1) + 1; - - kvtag2.name = "key2int"; - kvtag2.value = (void *)&v2; - kvtag2.type = PDC_INT; - kvtag2.size = sizeof(int); - - kvtag3.name = "key3double"; - kvtag3.value = (void *)&v3; - kvtag3.type = PDC_DOUBLE; - kvtag3.size = sizeof(double); - - if (PDCobj_put_tag(obj1, kvtag1.name, kvtag1.value, kvtag1.type, kvtag1.size) < 0) - printf("fail to add a kvtag to o1\n"); - else - printf("successfully added a kvtag to o1\n"); - - if (PDCobj_put_tag(obj1, kvtag2.name, kvtag2.value, kvtag2.type, kvtag2.size) < 0) - printf("fail to add a kvtag to o1\n"); - else - printf("successfully added a kvtag to o1\n"); - - if (PDCobj_put_tag(obj2, kvtag2.name, kvtag2.value, kvtag2.type, kvtag2.size) < 0) - printf("fail to add a kvtag to o2\n"); - else - printf("successfully added a kvtag to o2\n"); - - if (PDCobj_put_tag(obj2, kvtag3.name, kvtag3.value, kvtag3.type, kvtag3.size) < 0) - printf("fail to add a kvtag to o2\n"); - else - printf("successfully added a kvtag to o2\n"); - - if (PDC_Client_query_kvtag(&kvtag1, &nobj, &obj_ids) < 0) - printf("fail to query a kvtag\n"); - else { - printf("successfully queried a tag, nres=%d\n", nobj); - for (i = 0; i < nobj; i++) - printf("%" PRIu64 ", ", obj_ids[i]); - printf("\n\n"); + // Create a number of objects, add at least one tag to that object + assign_work_to_rank(my_rank, proc_num, n_obj, &my_obj, &my_obj_s); + if (my_rank == 0) + printf("I will create %d obj\n", my_obj); + + obj_ids = (pdcid_t *)calloc(my_obj, sizeof(pdcid_t)); + for (i = 0; i < my_obj; i++) { + sprintf(obj_name, "obj%d", my_obj_s + i); + obj_ids[i] = PDCobj_create(cont, obj_name, obj_prop); + if (obj_ids[i] <= 0) + printf("Fail to create object @ line %d!\n", __LINE__); } - if (obj_ids != NULL) - free(obj_ids); - if (PDC_Client_query_kvtag(&kvtag3, &nobj, &obj_ids2) < 0) - printf("fail to query a kvtag\n"); - else { - printf("successfully queried a tag, nres=%d\n", nobj); - for (i = 0; i < nobj; i++) - printf("%" PRIu64 ", ", obj_ids2[i]); - printf("\n\n"); + if (my_rank == 0) + printf("Created %d objects\n", n_obj); + fflush(stdout); + + char *attr_name_per_rank = gen_random_strings(1, 6, 8, 26)[0]; + // Add tags + kvtag.name = attr_name_per_rank; + kvtag.value = (void *)&v; + kvtag.type = PDC_INT; + kvtag.size = sizeof(int); + + char key[32]; + char value[32]; + char exact_query[48]; + + dart_object_ref_type_t ref_type = REF_PRIMARY_ID; + dart_hash_algo_t hash_algo = DART_HASH; + + assign_work_to_rank(my_rank, proc_num, n_add_tag, &my_add_tag, &my_add_tag_s); + + // This is for adding #rounds tags to the objects. + for (i = 0; i < my_add_tag; i++) { + for (iter = 0; iter < round; iter++) { + v = iter; + sprintf(value, "%d", v); + if (is_using_dart) { + if (PDC_Client_insert_obj_ref_into_dart(hash_algo, kvtag.name, value, ref_type, + (uint64_t)obj_ids[i]) < 0) { + printf("fail to add a kvtag to o%d\n", i + my_obj_s); + } + } + else { + /* println("Rank %d: [%s] [%d], len %d\n", my_rank, kvtag.name, v, kvtag.size); */ + if (PDCobj_put_tag(obj_ids[i], kvtag.name, kvtag.value, kvtag.type, kvtag.size) < 0) { + printf("fail to add a kvtag to o%d\n", i + my_obj_s); + } + } + } + if (my_rank == 0) + println("Rank %d: Added %d kvtag to the %d th object\n", my_rank, round, i); } - if (obj_ids2 != NULL) - free(obj_ids2); - if (PDC_Client_query_kvtag(&kvtag2, &nobj, &obj_ids1) < 0) - printf("fail to query a kvtag\n"); - else { - printf("successfully queried a tag, nres=%d\n", nobj); - for (i = 0; i < nobj; i++) - printf("%" PRIu64 ", ", obj_ids1[i]); - printf("\n\n"); +#ifdef ENABLE_MPI + MPI_Barrier(MPI_COMM_WORLD); +#endif + + kvtag.name = attr_name_per_rank; + kvtag.value = (void *)&v; + kvtag.type = PDC_INT; + kvtag.size = sizeof(int); + +#ifdef ENABLE_MPI + MPI_Barrier(MPI_COMM_WORLD); + stime = MPI_Wtime(); +#endif + + for (iter = 0; iter < round; iter++) { + v = iter; + if (is_using_dart) { + sprintf(value, "%ld", v); + sprintf(exact_query, "%s=%s", kvtag.name, value); +#ifdef ENABLE_MPI + PDC_Client_search_obj_ref_through_dart_mpi(hash_algo, exact_query, ref_type, &nres, &pdc_ids, + MPI_COMM_WORLD); +#else + PDC_Client_search_obj_ref_through_dart(hash_algo, exact_query, ref_type, &nres, &pdc_ids); +#endif + } + else { + /* println("Rank %d: round %d, query kvtag [%s] [%d]\n", my_rank, round, kvtag.name, + * *((int*)kvtag.value)); */ +#ifdef ENABLE_MPI + if (PDC_Client_query_kvtag_mpi(&kvtag, &nres, &pdc_ids, MPI_COMM_WORLD) < 0) { +#else + if (PDC_Client_query_kvtag(&kvtag, &nres, &pdc_ids) < 0) { +#endif + printf("fail to query kvtag [%s] with rank %d\n", kvtag.name, my_rank); + break; + } + } } - if (obj_ids1 != NULL) - free(obj_ids1); - - // close first object - if (PDCobj_close(obj1) < 0) - printf("fail to close object o1\n"); - else - printf("successfully close object o1\n"); - // close second object - if (PDCobj_close(obj2) < 0) - printf("fail to close object o2\n"); - else - printf("successfully close object o2\n"); +#ifdef ENABLE_MPI + MPI_Barrier(MPI_COMM_WORLD); + MPI_Reduce(&nres, &ntotal, 1, MPI_INT, MPI_SUM, 0, MPI_COMM_WORLD); + total_time = MPI_Wtime() - stime; + if (my_rank == 0) + println("Total time to query %d objects with tag: %.5f", ntotal, total_time); +#else + println("Query found %d objects", nres); +#endif // close a container if (PDCcont_close(cont) < 0) printf("fail to close container c1\n"); else printf("successfully close container c1\n"); - // close a container property - if (PDCprop_close(obj_prop1) < 0) - printf("Fail to close property @ line %d\n", __LINE__); - else - printf("successfully close object property\n"); - - if (PDCprop_close(obj_prop2) < 0) + // close an object property + if (PDCprop_close(obj_prop) < 0) printf("Fail to close property @ line %d\n", __LINE__); else printf("successfully close object property\n"); + // close a container property if (PDCprop_close(cont_prop) < 0) printf("Fail to close property @ line %d\n", __LINE__); else @@ -194,6 +248,10 @@ main() // close pdc if (PDCclose(pdc) < 0) printf("fail to close PDC\n"); +done: +#ifdef ENABLE_MPI + MPI_Finalize(); +#endif return 0; } diff --git a/src/tests/kvtag_query_scale_col.c b/src/tests/kvtag_query_scale_col.c index 78d4b26b5..a2a4b8405 100644 --- a/src/tests/kvtag_query_scale_col.c +++ b/src/tests/kvtag_query_scale_col.c @@ -30,6 +30,7 @@ #include #include "pdc.h" #include "pdc_client_connect.h" +#include "string_utils.h" int assign_work_to_rank(int rank, int size, int nwork, int *my_count, int *my_start) @@ -140,6 +141,8 @@ main(int argc, char *argv[]) pdc_kvtag_t kvtag; uint64_t * pdc_ids; int nres, ntotal; + int * my_cnt_round; + int * total_cnt_round; #ifdef ENABLE_MPI MPI_Init(&argc, &argv); @@ -181,6 +184,9 @@ main(int argc, char *argv[]) dart_object_ref_type_t ref_type = REF_PRIMARY_ID; dart_hash_algo_t hash_algo = DART_HASH; + my_cnt_round = (int *)calloc(round, sizeof(int)); + total_cnt_round = (int *)calloc(round, sizeof(int)); + MPI_Barrier(MPI_COMM_WORLD); stime = MPI_Wtime(); @@ -193,8 +199,8 @@ main(int argc, char *argv[]) for (iter = 0; iter < round; iter++) { char attr_name[64]; char tag_value[64]; - snprintf(attr_name, 63, "%d%dattr_name%d%d", iter, iter, iter, iter); - snprintf(tag_value, 63, "%d%dtag_value%d%d", iter, iter, iter, iter); + snprintf(attr_name, 63, "%03d%03dattr_name%03d%03d", iter, iter, iter, iter); + snprintf(tag_value, 63, "%03d%03dtag_value%03d%03d", iter, iter, iter, iter); kvtag.name = strdup(attr_name); kvtag.value = (void *)strdup(tag_value); kvtag.type = PDC_STRING; @@ -212,8 +218,9 @@ main(int argc, char *argv[]) } free(kvtag.name); free(kvtag.value); + my_cnt_round[iter]++; } - if (my_rank == 0) { + if (my_rank == 0 && n_obj > 1000) { println("Rank %d: Added %d kvtag to the %d / %d th object, I'm applying selectivity %d to %d " "objects.\n", my_rank, round, i + 1, my_obj_after_selectivity, selectivity, my_obj); @@ -229,6 +236,9 @@ main(int argc, char *argv[]) } #ifdef ENABLE_MPI + for (i = 0; i < round; i++) + MPI_Allreduce(&my_cnt_round[i], &total_cnt_round[i], 1, MPI_INT, MPI_SUM, MPI_COMM_WORLD); + MPI_Barrier(MPI_COMM_WORLD); #endif @@ -250,8 +260,8 @@ main(int argc, char *argv[]) #endif char attr_name[64]; char tag_value[64]; - snprintf(attr_name, 63, "%d%dattr_name%d%d", iter, iter, iter, iter); - snprintf(tag_value, 63, "%d%dtag_value%d%d", iter, iter, iter, iter); + snprintf(attr_name, 63, "%03d%03dattr_name%03d%03d", iter, iter, iter, iter); + snprintf(tag_value, 63, "%03d%03dtag_value%03d%03d", iter, iter, iter, iter); kvtag.name = strdup(attr_name); kvtag.value = (void *)strdup(tag_value); @@ -263,10 +273,11 @@ main(int argc, char *argv[]) input.base_tag = &kvtag; input.key_query_type = query_type; input.value_query_type = query_type; - input.affix_len = 4; + input.affix_len = 12; gen_query_key_value(&input, &output); + pdc_ids = NULL; if (is_using_dart) { char *query_string = gen_query_str(&output); ret_value = (comm_type == 0) @@ -278,7 +289,9 @@ main(int argc, char *argv[]) else { kvtag.name = output.key_query; kvtag.value = output.value_query; - ret_value = (comm_type == 0) + /* fprintf(stderr, " Rank %d: key [%s] value [%s]\n", my_rank, kvtag.name, + * kvtag.value); */ + ret_value = (comm_type == 0) ? PDC_Client_query_kvtag(&kvtag, &nres, &pdc_ids) : PDC_Client_query_kvtag_mpi(&kvtag, &nres, &pdc_ids, MPI_COMM_WORLD); } @@ -286,6 +299,13 @@ main(int argc, char *argv[]) printf("fail to query kvtag [%s] with rank %d\n", kvtag.name, my_rank); break; } + + if (iter >= 0) { + if (nres != total_cnt_round[iter]) + printf("Rank %d: query %d, comm %d, round %d - results %d do not match expected %d\n", + my_rank, query_type, comm_type, iter, nres, total_cnt_round[iter]); + } + round_total += nres; free(kvtag.name); free(kvtag.value); @@ -309,8 +329,8 @@ main(int argc, char *argv[]) is_using_dart == 0 ? " NO " : " DART ", round, round_total, total_time * 1000.0); } #endif - } - } + } // end query type + } // end comm type if (my_rank == 0) { println("Rank %d: All queries are done.", my_rank); @@ -326,8 +346,8 @@ main(int argc, char *argv[]) for (iter = 0; iter < round; iter++) { char attr_name[64]; char tag_value[64]; - snprintf(attr_name, 63, "%d%dattr_name%d%d", iter, iter, iter, iter); - snprintf(tag_value, 63, "%d%dtag_value%d%d", iter, iter, iter, iter); + snprintf(attr_name, 63, "%03d%03dattr_name%03d%03d", iter, iter, iter, iter); + snprintf(tag_value, 63, "%03d%03dtag_value%03d%03d", iter, iter, iter, iter); kvtag.name = strdup(attr_name); kvtag.value = (void *)strdup(tag_value); kvtag.type = PDC_STRING; @@ -340,6 +360,7 @@ main(int argc, char *argv[]) PDCobj_del_tag(obj_ids[i], kvtag.name); } free(kvtag.name); + free(kvtag.value); } }