From de6604c3d506940e79c1c0634dad58365c0d8a6e Mon Sep 17 00:00:00 2001 From: Aakash Arayambeth Date: Thu, 28 Sep 2023 16:31:49 -0400 Subject: [PATCH] consistent hash utility Signed-off-by: Aakash Arayambeth --- bbinc/consistent_hash.h | 101 +++++ tests/consistent_hash.test/Makefile | 12 + tests/consistent_hash.test/README | 1 + tests/consistent_hash.test/runit | 8 + tests/consistent_hash_bench.test/Makefile | 12 + tests/consistent_hash_bench.test/README | 5 + tests/consistent_hash_bench.test/runit | 11 + tests/tools/CMakeLists.txt | 5 + tests/tools/test_consistent_hash.c | 494 ++++++++++++++++++++ tests/tools/test_consistent_hash_bench.c | 269 +++++++++++ util/CMakeLists.txt | 2 + util/consistent_hash.c | 520 ++++++++++++++++++++++ 12 files changed, 1440 insertions(+) create mode 100644 bbinc/consistent_hash.h create mode 100644 tests/consistent_hash.test/Makefile create mode 100644 tests/consistent_hash.test/README create mode 100755 tests/consistent_hash.test/runit create mode 100644 tests/consistent_hash_bench.test/Makefile create mode 100644 tests/consistent_hash_bench.test/README create mode 100755 tests/consistent_hash_bench.test/runit create mode 100644 tests/tools/test_consistent_hash.c create mode 100644 tests/tools/test_consistent_hash_bench.c create mode 100644 util/consistent_hash.c diff --git a/bbinc/consistent_hash.h b/bbinc/consistent_hash.h new file mode 100644 index 0000000000..007212ce33 --- /dev/null +++ b/bbinc/consistent_hash.h @@ -0,0 +1,101 @@ +#ifndef CH_HASH_H +#define CH_HASH_H +#include +#include +#include +#include +typedef struct consistent_hash ch_hash_t; +typedef struct consistent_hash_node ch_hash_node_t; +typedef int (*hash_func)(uint8_t *, size_t, uint32_t *); +enum ch_err { + CH_NOERR = 0, + CH_ERR_PARAM = 1, + CH_ERR_MALLOC = 2, + CH_ERR_HASH = 3, + CH_ERR_DUP = 4 +}; + +struct consistent_hash_node { + uint8_t *data; + size_t data_len; + LINKC_T(struct consistent_hash_node) lnk; +}; + +typedef struct consistent_hash_keyhash { + uint64_t hash_val; + struct consistent_hash_node *node; +}ch_keyhash_t; + +struct consistent_hash { + LISTC_T(struct consistent_hash_node) nodes; /* List of nodes for key hashes to refer to */ + uint32_t num_nodes; + struct consistent_hash_keyhash **key_hashes; /* Array of key hashes of nodes and (optionally) + their copies. I went with an array over a hash + since we need this list to be sorted in order to look + up the next largest keyhash that has an associated node, + given a keyhash*/ + uint32_t num_keyhashes; + pthread_rwlock_t lock; + hash_func func; +}; + +/* + * Create an in-mem abstraction of a consistent hash. + * num_copies -> number of copies of node to be added to the hash + * Db servers, cache servers are examples of a 'Node' + * ch_func -> pointer to the hash function to be used + * It takes a byte array and it's length as the input + * The hash is available in the OUT parameter hash as a BIGNUM + * www.openssl.org/docs/man1.0.2/man3/bn.html + */ +ch_hash_t *ch_hash_create(uint64_t num_copies, hash_func func); + +/* + * Add a node to the consistent hash. + * name -> a byte array that represents the name of the node + * in the calling program. + * name_len -> length of above name + */ +int ch_hash_add_node(ch_hash_t *hash, uint8_t *name, size_t name_len, uint64_t hashval); + + +/* + * Add a replica for a node at hashval + * node_name -> node name for which replica is being added + * node_len -> size (in bytes) of node_name +*/ +int ch_hash_add_replica(ch_hash_t *hash, uint8_t *node_name, size_t node_len, uint64_t hashval); + + +/* + * Remove a node from the consistent hash. + * name -> a byte array that represents the name of the node + * in the calling program. + * name_len -> length of above name + */ +int ch_hash_remove_node(ch_hash_t *hash, uint8_t *name, size_t name_len); + +/* + * Given a key and it's length, return the node on the consistent hash + * that the inputs hash onto + */ +ch_hash_node_t *ch_hash_find_node(ch_hash_t *hash, uint8_t *key, size_t key_len); + +/* + * Free all memory associated with a consistent hash + */ +void ch_hash_free(ch_hash_t *hash); + +/* + * SHA256 and MD5 based hash funcs + */ +int ch_hash_sha(uint8_t *buf, size_t buf_len, uint32_t *hash); +int ch_hash_md5(uint8_t *buf, size_t buf_len, uint32_t *hash); + + +/* + * TEST HELPERS + */ +uint8_t *get_node_data(ch_hash_node_t *ch); +size_t get_node_data_len(ch_hash_node_t *ch); +#endif diff --git a/tests/consistent_hash.test/Makefile b/tests/consistent_hash.test/Makefile new file mode 100644 index 0000000000..62834065ae --- /dev/null +++ b/tests/consistent_hash.test/Makefile @@ -0,0 +1,12 @@ +ifeq ($(TESTSROOTDIR),) + include ../testcase.mk +else + include $(TESTSROOTDIR)/testcase.mk +endif +ifeq ($(TEST_TIMEOUT),) + export TEST_TIMEOUT=1m +endif + +# this is a local test, don't need cluster +unexport CLUSTER +export COMDB2_UNITTEST=1 diff --git a/tests/consistent_hash.test/README b/tests/consistent_hash.test/README new file mode 100644 index 0000000000..e70d7125cc --- /dev/null +++ b/tests/consistent_hash.test/README @@ -0,0 +1 @@ +This test exercises the consistent hash utility diff --git a/tests/consistent_hash.test/runit b/tests/consistent_hash.test/runit new file mode 100755 index 0000000000..1e75405820 --- /dev/null +++ b/tests/consistent_hash.test/runit @@ -0,0 +1,8 @@ +#!/usr/bin/env bash + +set -e +set -x + +echo run executable that tests the consistent hash utility +${TESTSBUILDDIR}/test_consistent_hash + diff --git a/tests/consistent_hash_bench.test/Makefile b/tests/consistent_hash_bench.test/Makefile new file mode 100644 index 0000000000..62834065ae --- /dev/null +++ b/tests/consistent_hash_bench.test/Makefile @@ -0,0 +1,12 @@ +ifeq ($(TESTSROOTDIR),) + include ../testcase.mk +else + include $(TESTSROOTDIR)/testcase.mk +endif +ifeq ($(TEST_TIMEOUT),) + export TEST_TIMEOUT=1m +endif + +# this is a local test, don't need cluster +unexport CLUSTER +export COMDB2_UNITTEST=1 diff --git a/tests/consistent_hash_bench.test/README b/tests/consistent_hash_bench.test/README new file mode 100644 index 0000000000..a87509641b --- /dev/null +++ b/tests/consistent_hash_bench.test/README @@ -0,0 +1,5 @@ +This test runs record distribution benchmark against the consistent hash utility +It creates 5 tables ( to simulate 5 shards) and inserts a million records into these 5 tables. +The table in which a record goes is decided by the consistent hash and the value of the field in the record +being inserted. Next, one of the tables(shards) is dropped, which causes a certain number of records to be +re-distributed. This test checks the distribution before dropping a table and after dropping a table diff --git a/tests/consistent_hash_bench.test/runit b/tests/consistent_hash_bench.test/runit new file mode 100755 index 0000000000..b59f38ceaa --- /dev/null +++ b/tests/consistent_hash_bench.test/runit @@ -0,0 +1,11 @@ +#!/usr/bin/env bash + +set -e +set -x + +echo run executable that tests the consistent hash utility +# s -> number of shards +# c -> number of copies(replicas) for each shard in the consistent hash +# r -> number of records to be inserted +${TESTSBUILDDIR}/test_consistent_hash_bench -s 5 -c 512 -r 1000000 + diff --git a/tests/tools/CMakeLists.txt b/tests/tools/CMakeLists.txt index 0369e8edd9..45de58463e 100644 --- a/tests/tools/CMakeLists.txt +++ b/tests/tools/CMakeLists.txt @@ -70,6 +70,8 @@ add_exe(sqlite_clnt sqlite_clnt.c) add_exe(ssl_multi_certs_one_process ssl_multi_certs_one_process.c) add_exe(stepper stepper.c stepper_client.c) add_exe(test_threadpool test_threadpool.c) +add_exe(test_consistent_hash test_consistent_hash.c) +add_exe(test_consistent_hash_bench test_consistent_hash_bench.c) add_exe(updater updater.c testutil.c) add_exe(utf8 utf8.c) add_exe(verify_atomics_work verify_atomics_work.c) @@ -77,6 +79,8 @@ add_exe(verify_atomics_work verify_atomics_work.c) target_link_libraries(cson_test cson) target_link_libraries(stepper util mem dlmalloc util) target_link_libraries(test_threadpool util mem dlmalloc util) +target_link_libraries(test_consistent_hash util mem dlmalloc util) +target_link_libraries(test_consistent_hash_bench util mem dlmalloc util) list(APPEND common-deps ${READLINE_LIBRARIES} @@ -87,6 +91,7 @@ list(APPEND common-deps ${UNWIND_LIBRARY} ${CMAKE_DL_LIBS} m + crc32c ) foreach(executable ${test-tools}) diff --git a/tests/tools/test_consistent_hash.c b/tests/tools/test_consistent_hash.c new file mode 100644 index 0000000000..4b44eb79fa --- /dev/null +++ b/tests/tools/test_consistent_hash.c @@ -0,0 +1,494 @@ +#include +#include +#include +#include +#include "consistent_hash.h" +#include +enum ch_hash_func_type { + CH_HASH_SHA = 1, + CH_HASH_MD5 = 2, + CH_HASH_CRC = 3 +}; + +void test_add_and_find_one_node(hash_func func, int func_type) { + ch_hash_t *ch = ch_hash_create(1, func); + if (!ch) { + goto fail; + } + + if (ch_hash_add_node(ch, (uint8_t *)"SHARD1", 6, ch->key_hashes[0]->hash_val)) { + goto fail; + } + + /* Use same key */ + ch_hash_node_t *node = ch_hash_find_node(ch, (uint8_t *)"KEY1", 4); + if (node == NULL) { + goto fail; + } + if (6 != get_node_data_len(node)) { + goto fail; + } + if (memcmp("SHARD1", get_node_data(node), 6) != 0) { + goto fail; + } + + /* Use different key -> should still map to same node + * since num_nodes = 1 + */ + + node = ch_hash_find_node(ch, (uint8_t *)"KEY2", 4); + if (node == NULL) { + goto fail; + } + if (6 != get_node_data_len(node)) { + goto fail; + } + if (memcmp("SHARD1", get_node_data(node), 6) != 0) { + goto fail; + } + + return; +fail: + if (ch) { + ch_hash_free(ch); + } + abort(); +} + +void test_add_and_find_multiple_nodes(hash_func func, int func_type) { + ch_hash_t *ch = ch_hash_create(1, func); + if (!ch) { + goto fail; + } + + + if (ch_hash_add_node(ch, (uint8_t *)"SHARD1", 6, ch->key_hashes[0]->hash_val)) { + goto fail; + } + + ch_hash_node_t *node = ch_hash_find_node(ch, (uint8_t *)"KEY1", 4); + if (node == NULL) { + goto fail; + } + switch (func_type) { + case CH_HASH_SHA: + if (6 != get_node_data_len(node)) { + goto fail; + } + if (memcmp("SHARD1", get_node_data(node), 6) != 0) { + goto fail; + } + break; + case CH_HASH_MD5: + if (6 != get_node_data_len(node)) { + goto fail; + } + if (memcmp("SHARD1", get_node_data(node), 6) != 0) { + goto fail; + } + break; + default: + goto fail; + } + + node = ch_hash_find_node(ch, (uint8_t *)"KEY2", 4); + if (node == NULL) { + goto fail; + } + switch (func_type) { + case CH_HASH_SHA: + if (6 != get_node_data_len(node)) { + goto fail; + } + if (memcmp("SHARD1", get_node_data(node), 6) != 0) { + goto fail; + } + break; + case CH_HASH_MD5: + if (6 != get_node_data_len(node)) { + goto fail; + } + if (memcmp("SHARD1", get_node_data(node), 6) != 0) { + goto fail; + } + break; + default: + goto fail; + + } + + node = ch_hash_find_node(ch, (uint8_t *)"KEY15", 5); + if (node == NULL) { + goto fail; + } + switch (func_type) { + case CH_HASH_SHA: + if (6 != get_node_data_len(node)) { + goto fail; + } + if (memcmp("SHARD1", get_node_data(node), 6) != 0) { + goto fail; + } + break; + case CH_HASH_MD5: + if (6 != get_node_data_len(node)) { + goto fail; + } + if (memcmp("SHARD1", get_node_data(node), 6) != 0) { + goto fail; + } + break; + default: + goto fail; + + } + + /* Now add another node */ + if (ch_hash_add_node(ch, (uint8_t *)"SHARD2", 6, 2863311529)) { + goto fail; + } + + node = ch_hash_find_node(ch, (uint8_t *)"KEY1", 4); + if (node == NULL) { + goto fail; + } + switch (func_type) { + case CH_HASH_SHA: + if (6 != get_node_data_len(node)) { + goto fail; + } + if (memcmp("SHARD2", get_node_data(node), 6) != 0) { + goto fail; + } + break; + case CH_HASH_MD5: + if (6 != get_node_data_len(node)) { + goto fail; + } + if (memcmp("SHARD2", get_node_data(node), 6) != 0) { + goto fail; + } + break; + default: + goto fail; + + } + + node = ch_hash_find_node(ch, (uint8_t *)"KEY2", 4); + if (node == NULL) { + goto fail; + } + switch (func_type) { + case CH_HASH_SHA: + if (6 != get_node_data_len(node)) { + goto fail; + } + if (memcmp("SHARD1", get_node_data(node), 6) != 0) { + goto fail; + } + break; + case CH_HASH_MD5: + if (6 != get_node_data_len(node)) { + goto fail; + } + if (memcmp("SHARD2", get_node_data(node), 6) != 0) { + goto fail; + } + break; + default: + goto fail; + + } + + node = ch_hash_find_node(ch, (uint8_t *)"KEY15", 5); + if (node == NULL) { + goto fail; + } + switch (func_type) { + case CH_HASH_SHA: + if (6 != get_node_data_len(node)) { + goto fail; + } + if (memcmp("SHARD1", get_node_data(node), 6) != 0) { + goto fail; + } + break; + case CH_HASH_MD5: + if (6 != get_node_data_len(node)) { + goto fail; + } + if (memcmp("SHARD2", get_node_data(node), 6) != 0) { + goto fail; + } + break; + default: + goto fail; + + } + return ; +fail: + if (ch) { + ch_hash_free(ch); + } + abort(); +} + +void test_replica(hash_func func, int func_type) { + int rc = 0; + ch_hash_t *ch = ch_hash_create(2, func); + if (!ch) { + goto fail; + } + + if (ch_hash_add_node(ch, (uint8_t *)"SHARD1", 6, ch->key_hashes[0]->hash_val)) { + goto fail; + } + if (ch_hash_add_node(ch, (uint8_t *)"SHARD2", 6, ch->key_hashes[1]->hash_val)) { + goto fail; + } + + ch_hash_node_t *node = ch_hash_find_node(ch, (uint8_t *)"KEY15", 5); + if (node == NULL) { + goto fail; + } + switch (func_type) { + case CH_HASH_SHA: + if (6 != get_node_data_len(node)) { + goto fail; + } + if (memcmp("SHARD1", get_node_data(node), 6) != 0) { + goto fail; + } + break; + case CH_HASH_MD5: + if (6 != get_node_data_len(node)) { + goto fail; + } + if (memcmp("SHARD2", get_node_data(node), 6) != 0) { + goto fail; + } + break; + default: + goto fail; + + } + + /* + * Add a replica between the two ranges + */ + switch (func_type) { + case CH_HASH_SHA: + rc = ch_hash_add_replica(ch, (uint8_t *)"SHARD2", 6, 4180816000); + break; + case CH_HASH_MD5: + rc = ch_hash_add_replica(ch, (uint8_t *)"SHARD1", 6, 49952000); + break; + default: + goto fail; + } + + node = ch_hash_find_node(ch, (uint8_t *)"KEY15", 5); + if (node == NULL) { + goto fail; + } + switch (func_type) { + case CH_HASH_SHA: + if (6 != get_node_data_len(node)) { + goto fail; + } + if (memcmp("SHARD2", get_node_data(node), 6) != 0) { + goto fail; + } + break; + case CH_HASH_MD5: + if (6 != get_node_data_len(node)) { + goto fail; + } + if (memcmp("SHARD1", get_node_data(node), 6) != 0) { + goto fail; + } + break; + default: + goto fail; + + } + return; +fail: + if (ch) { + ch_hash_free(ch); + } + abort(); +} + + +void test_remove_node(hash_func func, int func_type) { + ch_hash_t *ch = ch_hash_create(2, func); + if (!ch) { + goto fail; + } + + if (ch_hash_add_node(ch, (uint8_t *)"SHARD1", 6, ch->key_hashes[0]->hash_val)) { + goto fail; + } + if (ch_hash_add_node(ch, (uint8_t *)"SHARD2", 6, ch->key_hashes[1]->hash_val)) { + goto fail; + } + + ch_hash_node_t *node = ch_hash_find_node(ch, (uint8_t *)"KEY15", 5); + if (node == NULL) { + goto fail; + } + switch (func_type) { + case CH_HASH_SHA: + if (6 != get_node_data_len(node)) { + goto fail; + } + if (memcmp("SHARD1", get_node_data(node), 6) != 0) { + goto fail; + } + break; + case CH_HASH_MD5: + if (6 != get_node_data_len(node)) { + goto fail; + } + if (memcmp("SHARD2", get_node_data(node), 6) != 0) { + goto fail; + } + break; + default: + goto fail; + + } + + switch (func_type) { + case CH_HASH_SHA: + if (ch_hash_remove_node(ch, (uint8_t *)"SHARD1", 6)) { + goto fail; + } + break; + case CH_HASH_MD5: + if (ch_hash_remove_node(ch, (uint8_t *)"SHARD2", 6)) { + goto fail; + } + break; + default: + goto fail; + } + + node = ch_hash_find_node(ch, (uint8_t *)"KEY15", 5); + if (node == NULL) { + goto fail; + } + switch (func_type) { + case CH_HASH_SHA: + if (6 != get_node_data_len(node)) { + goto fail; + } + if (memcmp("SHARD2", get_node_data(node), 6) != 0) { + goto fail; + } + break; + case CH_HASH_MD5: + if (6 != get_node_data_len(node)) { + goto fail; + } + if (memcmp("SHARD1", get_node_data(node), 6) != 0) { + goto fail; + } + break; + default: + goto fail; + + } + return; +fail: + if (ch) { + ch_hash_free(ch); + } + abort(); +} + + +void test_find_with_zero_nodes(hash_func func, int func_type){ + ch_hash_t *ch = ch_hash_create(0, func); + if (!ch) { + goto fail; + } + ch_hash_node_t *node = ch_hash_find_node(ch, (uint8_t *)"KEY15", 5); + if (node != NULL) { + goto fail; + } + return; +fail: + if (ch) { + ch_hash_free(ch); + } + abort(); +} + +void print_distribution(int num_nodes, hash_func func) { + ch_hash_t *ch = ch_hash_create(num_nodes,func); + unsigned char bytes[4]; + int shard1=0, shard2=0, shard3=0; + if (!ch) { + goto fail; + } + if (ch_hash_add_node(ch, (uint8_t *)"SHARD1", 6, ch->key_hashes[0]->hash_val)) { + goto fail; + } + if (ch_hash_add_node(ch, (uint8_t *)"SHARD2", 6, ch->key_hashes[1]->hash_val)) { + goto fail; + } + if (ch_hash_add_node(ch, (uint8_t *)"SHARD3", 6, ch->key_hashes[2]->hash_val)) { + goto fail; + } + + for (int i=1;i<=2000000;i++) { + bytes[0] = (i>>24) & 0xFF; + bytes[1] = (i>>16) & 0xFF; + bytes[2] = (i>>8) & 0xFF; + bytes[3] = (i) & 0xFF; + ch_hash_node_t *node = ch_hash_find_node(ch, (uint8_t *)bytes, sizeof(int)); + if (node==NULL) { + goto fail; + } + + if (memcmp("SHARD1", get_node_data(node), 6) == 0) { + shard1++; + } else if (memcmp("SHARD2", get_node_data(node), 6) == 0) { + shard2++; + } else if (memcmp("SHARD3", get_node_data(node), 6) == 0) { + shard3++; + } + } + + printf("The key distribution is: \n"); + printf("Shard1 : %d\n", shard1); + printf("Shard2 : %d\n", shard2); + printf("Shard3 : %d\n", shard3); + return; +fail: + if (ch) { + ch_hash_free(ch); + } + abort(); +} + +void run_tests(hash_func func, int func_type) { + test_add_and_find_one_node(func, func_type); + test_add_and_find_multiple_nodes(func, func_type); + test_remove_node(func, func_type); + test_replica(func, func_type); +} + +int main() +{ + run_tests(ch_hash_sha, CH_HASH_SHA); + run_tests(ch_hash_md5, CH_HASH_MD5); + + int num_nodes = 3; + print_distribution(num_nodes, ch_hash_sha); + print_distribution(num_nodes, ch_hash_md5); + printf("SUCCESS\n"); + return 0; +} diff --git a/tests/tools/test_consistent_hash_bench.c b/tests/tools/test_consistent_hash_bench.c new file mode 100644 index 0000000000..07b94db620 --- /dev/null +++ b/tests/tools/test_consistent_hash_bench.c @@ -0,0 +1,269 @@ +#include +#include +#include +#include +#include "consistent_hash.h" +#include +#include +#include +#include + +ch_hash_t *ch = NULL; + +int print_query_result(void *a_param, int argc, char *argv[], char **column) { + for(int i=0;i ceiling || distribution[i] < floor) { + printf("FAILED\n"); + goto fail; + } + } + + free(distribution); + return 0; +fail: + free(distribution); + return -1; +} + +int redistribute_record(void *a_param, int argc, char *argv[], char **column) { + // unsigned char bytes[4]; + sqlite3 *db = (sqlite3 *)a_param; + long temp_int = 0; + char *ptr; + for(int j=0;jkey_hashes[i-1]->hash_val)) { + printf("Failed to add node for table %s\n", shard); + } + sqlite3_free(shard); + } + return 0; +} + + +void populate_records(int limit, sqlite3 *db) { + unsigned char bytes[4]; + + for(int i=0;i>24) & 0xFF; + bytes[1] = (i>>16) & 0xFF; + bytes[2] = (i>>8) & 0xFF; + bytes[3] = (i) & 0xFF; + ch_hash_node_t *node = ch_hash_find_node(ch, (uint8_t *)bytes, sizeof(int)); + if (node==NULL) { + return; + } + insert_record((char *)get_node_data(node), i, db); + } +} +void usage(const char *p, const char *err) { + fprintf(stderr, "%s\n", err); + fprintf(stderr, "Usage %s --numshards NUMSHARDS --numrecords NUMRECORDS\n", p); + exit(1); +} + +int main(int argc, char *argv[]) { + if (argc < 3) { + usage(argv[0], "Required parameters were NOT provided"); + } + int numshards = 0, numrecords = 0; + clock_t start, end; + double time_used; + static struct option long_options[] = + { + {"numshards", required_argument, NULL, 's'}, + {"numrecords", required_argument, NULL, 'r'}, + {NULL, 0, NULL, 0} + }; + int c; + int index; + while ((c = getopt_long(argc, argv, "s:r:?", long_options, &index)) != -1) { + //printf("c '%c' %d index %d optarg '%s'\n", c, c, index, optarg); + switch(c) { + case 's': numshards = atoi(optarg); break; + case 'r': numrecords = atoi(optarg); break; + case '?': break; + default: break; + } + } + + printf("numshards: %d, numrecords: %d\n", numshards, numrecords); + + sqlite3 *db; + char *err_msg = NULL; + int rc = 0; + if (create_consistent_hash(numshards, ch_hash_sha)) { + printf("Failed to create consistent hash\n"); + return -1; + } + rc = sqlite3_open("test.db", &db); + + if (rc != SQLITE_OK) { + printf("Cannot open database : %s\n", sqlite3_errmsg(db)); + sqlite3_close(db); + + return -1; + } + printf("Successfully created a database \n"); + + char *sql = "", *temp = NULL; + for(int i=1;i<=numshards;i++) { + temp = sqlite3_mprintf("%s\nCREATE TABLE SHARD%d(id INT);", sql, i); + sql = temp; + } + /*char *sql = "CREATE TABLE SHARD1(id INT);" + "CREATE TABLE SHARD2(id INT);" + "CREATE TABLE SHARD3(id INT);";*/ + + char *tune_sqlite = "PRAGMA journal_mode = OFF;" + "PRAGMA synchronous = 0;" + "PRAGMA cache_size = 1000000;" + "PRAGMA locking_mode = EXCLUSIVE;" + "PRAGMA temp_store = MEMORY;"; + rc = sqlite3_exec(db, tune_sqlite, 0, 0, &err_msg); + if (rc != SQLITE_OK) { + printf("SQL ERROR: %s\n", err_msg); + sqlite3_free(err_msg); + sqlite3_close(db); + + return -1; + } + rc = sqlite3_exec(db, sql, 0, 0, &err_msg); + if (rc != SQLITE_OK) { + printf("SQL ERROR: %s\n", err_msg); + sqlite3_free(err_msg); + sqlite3_close(db); + + return -1; + } + + printf("Successfully created tables\n"); + start = clock(); + populate_records(numrecords, db); + end = clock(); + if(check_distribution(db, numshards, numrecords)) { + return -1; + } + printf("It took %f seconds to insert %d records into %d shards.\n", ((double)(end-start))/CLOCKS_PER_SEC, numrecords, numshards); + /*printf("Dropping SHARD5\n"); + drop_shard(db, 5); + if (check_distribution(db, numshards-1, numrecords)) { + return -1; + }*/ + + sqlite3_close(db); + return 0; +} diff --git a/util/CMakeLists.txt b/util/CMakeLists.txt index 8c36b73ef0..52a5897c9c 100644 --- a/util/CMakeLists.txt +++ b/util/CMakeLists.txt @@ -12,6 +12,7 @@ set(src comdb2_pthread_create.c comdb2file.c compat.c + consistent_hash.c ctrace.c debug_switches.c flibc.c @@ -79,6 +80,7 @@ include_directories( ${PROJECT_BINARY_DIR}/bbinc ${PROJECT_SOURCE_DIR}/berkdb ${PROJECT_SOURCE_DIR}/cdb2api + ${PROJECT_SOURCE_DIR}/crc32c ${PROJECT_SOURCE_DIR}/db ${PROJECT_SOURCE_DIR}/dlmalloc ${PROJECT_SOURCE_DIR}/mem diff --git a/util/consistent_hash.c b/util/consistent_hash.c new file mode 100644 index 0000000000..2d7719ce4c --- /dev/null +++ b/util/consistent_hash.c @@ -0,0 +1,520 @@ +#include "consistent_hash.h" +#include +#include "logmsg.h" +#include +#include +#include +#include +#include +#include +#include +#include +#define MAX_COPIES 8 +#define HASH_VAL_COUNT 4294967296 // 2^32 hash values + + +static int debug_ch = 0; + +uint8_t *get_node_data(ch_hash_node_t *ch_node) { + return ch_node->data; +} + +size_t get_node_data_len(ch_hash_node_t *ch_node) { + return ch_node->data_len; +} + +ch_keyhash_t **get_key_hashes(ch_hash_t *ch_node) { + return ch_node->key_hashes; +} + +uint32_t get_num_keyhashes(ch_hash_t *ch_node) { + return ch_node->num_keyhashes; +} + +int ch_hash_sha(uint8_t *buf, size_t buf_len, uint32_t *hash){ + uint8_t sha_hash[32]; + SHA256_CTX sha256; + + SHA256_Init(&sha256); + SHA256_Update(&sha256, buf, buf_len); + SHA256_Final(sha_hash, &sha256); + uint32_t lowbits = 0; + lowbits = sha_hash[3] << 24 | sha_hash[2] << 16 | sha_hash[1] << 8 | sha_hash[0]; + *hash = lowbits; + return CH_NOERR; +} + +int ch_hash_md5(uint8_t *buf, size_t buf_len, uint32_t *hash) { + uint8_t md5_hash[16]; + MD5_CTX md5; + + MD5_Init(&md5); + MD5_Update(&md5, buf, buf_len); + MD5_Final(md5_hash, &md5); + uint32_t lowbits = 0; + lowbits = md5_hash[3] << 24 | md5_hash[2] << 16 | md5_hash[1] << 8 | md5_hash[0]; + *hash = lowbits; + return CH_NOERR; +} + +int ch_hash_crc32(uint8_t *buf, size_t buf_len, uint32_t *hash) { + *hash = crc32c(buf, buf_len); + return CH_NOERR; +} + +static int keyhash_cmp(const void *key1, const void *key2) { + ch_keyhash_t *h1 = *(ch_keyhash_t **)key1; + ch_keyhash_t *h2 = *(ch_keyhash_t **)key2; + return h1->hash_val > h2->hash_val ? 1 : h1->hash_val < h2->hash_val ? -1 : 0; +} + +static int validate_input(ch_hash_t *ch, uint8_t *data, size_t data_len) { + if (!ch) { + logmsg(LOGMSG_ERROR, "%s:%d invalid input : NULL hash\n", __func__, __LINE__); + return CH_ERR_PARAM; + } + + if (!data) { + logmsg(LOGMSG_ERROR, "%s:%d invalid input : NULL data\n", __func__, __LINE__); + return CH_ERR_PARAM; + } + + if (data_len <= 0) { + logmsg(LOGMSG_ERROR, "%s:%d invalid input : data_len should be atleast 1\n", __func__, __LINE__); + return CH_ERR_PARAM; + } + return CH_NOERR; +} + +static void print_key_hashes(ch_hash_t *ch) { + for (int i=0; i< ch->num_keyhashes; i++) { + printf("%ld\n", ch->key_hashes[i]->hash_val); + } +} + +static int ch_hash_remove_node_locked(ch_hash_t *ch, uint8_t *data, size_t data_len) { + struct consistent_hash_node *item, *tmp; + + LISTC_FOR_EACH_SAFE(&ch->nodes, item, tmp, lnk) { + if (item->data_len == data_len && memcmp(item->data, data, data_len)==0) { + /* remove all key_hashes for this node */ + int i=0; + while(inum_keyhashes){ + if (ch->key_hashes[i] != NULL && ch->key_hashes[i]->node == item) { + ch->key_hashes[i]->node = NULL; + free(ch->key_hashes[i]); + /*swap with last key_hash and set current to NULL*/ + ch->key_hashes[i] = ch->key_hashes[ch->num_keyhashes-1]; + ch->key_hashes[ch->num_keyhashes-1] = NULL; + ch->num_keyhashes--; + continue; + } + i++; + } + /* remove node */ + listc_rfl(&ch->nodes, item); + free(item->data); + free(item); + } + } + qsort(ch->key_hashes, ch->num_keyhashes, sizeof(ch_keyhash_t *), keyhash_cmp); + return CH_NOERR; +} +/* + * Find if a given hashval exists + */ +ch_keyhash_t* ch_hash_find_hashval_locked(ch_hash_t *ch, uint64_t hashval) { + if (!ch) { + logmsg(LOGMSG_ERROR, "%s:%d Consistent hash cannot be NULL!\n", __func__, __LINE__); + return NULL; + } + + if (hashval<0 || hashval > HASH_VAL_COUNT) { + logmsg(LOGMSG_ERROR, "%s:%d Invalid hash value :%ld\n",__func__, __LINE__, hashval); + return NULL; + } + if (ch->num_keyhashes==0) { + logmsg(LOGMSG_ERROR, "%s:%d No keyhashes in the consistent hash\n",__func__, __LINE__); + return NULL; + } + int32_t low = 0, mid = 0; + uint64_t midval = 0; + int32_t high = ch->num_keyhashes-1; + while(low<=high) { + mid = low + (high-low)/2; + //logmsg(LOGMSG_USER, "l: %d, mid: %d, r: %d\n",low, mid, high); + midval = ch->key_hashes[mid]->hash_val; + //logmsg(LOGMSG_USER, "midval is %ld\n", midval); + if (midval == hashval) { + return ch->key_hashes[mid]; + } else if (midval < hashval) { + low = mid+1; + } else { + high = mid-1; + } + } + //logmsg(LOGMSG_USER, "%s:%d Hashval %ld was not found\n",__func__, __LINE__, hashval); + return NULL; +} + +/* + * Add a node at hashval hash value + */ +static int ch_hash_add_node_to_hashval_locked(ch_hash_t *ch, ch_hash_node_t *node, uint64_t hashval) { + ch_keyhash_t **temp = NULL, *keyhash=NULL; + + keyhash = ch_hash_find_hashval_locked(ch, hashval); + if (keyhash) { + if (debug_ch) { + logmsg(LOGMSG_USER, "%s:%d keyhash %ld already exists\n",__func__, __LINE__, hashval); + } + if (keyhash->node) { + logmsg(LOGMSG_ERROR, "%s:%d keyhash %ld has a node associated with it. Not adding new node\n", __func__, __LINE__, hashval); + return CH_ERR_DUP; + } + } else { + /* Hashval doesn't exist. Add a new one */ + keyhash = (ch_keyhash_t *)malloc(sizeof(ch_keyhash_t)); + if (!keyhash) { + logmsg(LOGMSG_ERROR, "%s:%d malloc error\n", __func__, __LINE__); + goto cleanup; + } + keyhash->hash_val = hashval; + ch->num_keyhashes++; + temp = (ch_keyhash_t **)realloc(ch->key_hashes, sizeof(ch_keyhash_t *) * ch->num_keyhashes); + if (!temp) { + logmsg(LOGMSG_ERROR, ":%s:%d realloc failed!\n", __func__, __LINE__); + goto cleanup; + } + temp[ch->num_keyhashes-1] = keyhash; + ch->key_hashes = temp; + temp = NULL; + } + if (debug_ch) { + logmsg(LOGMSG_USER, "Adding node to hashval %ld\n", hashval); + } + keyhash->node = node; + if (ch->num_keyhashes > 1) { + qsort((void *)ch->key_hashes, ch->num_keyhashes, sizeof(ch_keyhash_t *), keyhash_cmp); + } + return CH_NOERR; + +cleanup: + if (keyhash) { + free(keyhash); + } + + if (temp) { + free(temp); + } + + return CH_ERR_MALLOC; +} + +static ch_hash_node_t *ch_hash_lookup_node(ch_hash_t *ch, uint8_t *bytes, size_t len) { + ch_hash_node_t *node = NULL; + LISTC_FOR_EACH(&ch->nodes, node, lnk) { + if (memcmp(bytes, node->data, len) == 0) { + return node; + } + } + return NULL; +} +static ch_keyhash_t* ch_keyhash_upper_bound(ch_hash_t *ch, uint64_t hash) { + if (ch==NULL) { + if (ch==NULL) { + logmsg(LOGMSG_ERROR, "CH is NULL\n"); + } + return NULL; + } + if (debug_ch) { + logmsg(LOGMSG_USER, "Looking for hash %ld\n",hash); + } + int l=0, mid=0, r=ch->num_keyhashes-1, ans=0; + while (l<=r) { + if (debug_ch) { + logmsg(LOGMSG_USER, "l: %d, mid: %d, r: %d\n",l, mid, r); + } + mid = l + (r-l) / 2; + if (ch->key_hashes[mid]->hash_val <= hash) { + l = mid + 1; + } else { + ans = mid; + r = mid-1; + } + } + if (debug_ch) { + logmsg(LOGMSG_USER, "Found the hash val at index %d\n", ans); + } + return ch->key_hashes[ans]; +} +static ch_keyhash_t* ch_keyhash_lower_bound(ch_hash_t *ch, uint64_t hash) { + if (ch==NULL) { + if (ch==NULL) { + logmsg(LOGMSG_ERROR, "CH is NULL\n"); + } + return NULL; + } + + if (debug_ch) { + logmsg(LOGMSG_USER, "Looking for hash %ld\n",hash); + } + int l=0, mid=0, r=ch->num_keyhashes-1, ans=0; + while (l<=r) { + if (debug_ch) { + logmsg(LOGMSG_USER, "l: %d, mid: %d, r: %d\n",l, mid, r); + } + mid = l + (r-l)/ 2; + if (ch->key_hashes[mid]->hash_val >= hash) { + r = mid - 1; + } else { + ans = mid; + l = mid + 1; + } + } + + if (debug_ch) { + logmsg(LOGMSG_USER, "Found the hash val at index %d\n", ans); + } + return ch->key_hashes[ans]; +} + +static int ch_hash_add_node_locked(ch_hash_t *ch, uint8_t *data, size_t data_len, uint64_t hashval) { + int rc = 0; + + ch_hash_node_t *node = NULL; + node = ch_hash_lookup_node(ch, data, data_len); + if (node!=NULL) { + logmsg(LOGMSG_ERROR, "%s:%d Node already exists. Try adding a replica instead.\n",__func__, __LINE__); + return CH_ERR_HASH; + } + node = (ch_hash_node_t *)malloc(sizeof(ch_hash_node_t)); + if (!node) { + goto oom; + } + + node->data = (uint8_t *)malloc(sizeof(uint8_t) * data_len); + if (!node->data) { + goto oom; + } + memset(node->data, 0, data_len); + node->data_len = data_len; + memcpy(node->data, data, data_len); + listc_abl(&ch->nodes, node); + + rc = ch_hash_add_node_to_hashval_locked(ch, node, hashval); + + if (rc) { + if (rc==CH_ERR_DUP) { + return CH_NOERR; + } + ch_hash_remove_node_locked(ch, node->data, node->data_len); + return rc; + } + return CH_NOERR; + +oom: + if (node) { + if (node->data) { + free(node->data); + } + free(node); + } + return CH_ERR_MALLOC; +} + +ch_hash_node_t *ch_hash_find_node_locked(ch_hash_t *ch, uint8_t *key, size_t key_len) { + int rc = 0; + uint32_t hash_key = 0; + ch_keyhash_t *next_biggest = NULL; + + if (ch->num_keyhashes == 0) { + logmsg(LOGMSG_ERROR, "Hash is empty!\n"); + return NULL; + } + rc = ch->func(key, key_len, &hash_key); + if (rc) { + logmsg(LOGMSG_ERROR, "%s:%d hash failure. rc: %d\n", __func__, __LINE__, rc); + return NULL; + } + + next_biggest = ch_keyhash_upper_bound(ch, hash_key); + assert(next_biggest!=NULL); + return next_biggest->node; +} + + +/* + * Input : num_nodes -> number of nodes (ex: database servers) + * func -> 32 bit hash func + * Output: object of type ch_hash_t with hash value ranges assigned + * for each node + */ +ch_hash_t *ch_hash_create(uint64_t num_nodes, hash_func func) { + uint64_t hash_range_increment = 0; + if (num_nodes < 1) { + logmsg(LOGMSG_ERROR, "num_nodes should atleast be 1\n"); + return NULL; + } + + if (!func) { + logmsg(LOGMSG_ERROR, "hash function must be provided!\n"); + return NULL; + } + + ch_hash_t *ch = (ch_hash_t *)malloc(sizeof(ch_hash_t)); + if(!ch){ + logmsg(LOGMSG_ERROR, "%s:%d malloc error\n", __func__, __LINE__); + return NULL; + } + + ch->num_nodes = num_nodes; + hash_range_increment = HASH_VAL_COUNT / num_nodes; + + if (debug_ch) { + logmsg(LOGMSG_USER, "the hash_range_increment is %ld\n", hash_range_increment); + } + listc_init(&ch->nodes, offsetof(struct consistent_hash_node, lnk)); + /* + * Create num_nodes hash ranges, one for each node in the consistent hash + */ + ch->num_keyhashes = num_nodes; + ch->key_hashes = (ch_keyhash_t**)malloc(sizeof(ch_keyhash_t*) * ch->num_keyhashes); + if (!ch->key_hashes) { + logmsg(LOGMSG_ERROR, "%s:%d malloc error\n", __func__, __LINE__); + goto cleanup; + } + int64_t curHashVal = 0; + for(int i=0;inum_keyhashes;i++){ + ch->key_hashes[i] = (ch_keyhash_t *)malloc(sizeof(ch_keyhash_t)); + if (!ch->key_hashes[i]) { + logmsg(LOGMSG_ERROR,"%s:%d malloc error\n", __func__, __LINE__); + goto cleanup; + } + ch->key_hashes[i]->node = NULL; + ch->key_hashes[i]->hash_val = curHashVal; + curHashVal += hash_range_increment; + + if (debug_ch) { + logmsg(LOGMSG_USER, "assigning hash val %ld\n",ch->key_hashes[i]->hash_val); + } + } + ch->func = func; + Pthread_rwlock_init(&(ch->lock), NULL); + return ch; + +cleanup: + if (ch) { + if (ch->key_hashes) { + for(int i=0;inum_keyhashes;i++) { + if (ch->key_hashes[i]) { + free(ch->key_hashes[i]); + ch->key_hashes[i] = NULL; + } + } + free(ch->key_hashes); + } + free(ch); + } + return NULL; +} + +/* + * Add a replica on the hash ring for an existing node. + * If node doesn't exists return error. + */ +int ch_hash_add_replica(ch_hash_t *hash, uint8_t *data, size_t data_len, uint64_t hashval) { + int rc = 0; + + if (hashval < 0 || hashval > HASH_VAL_COUNT) { + logmsg(LOGMSG_ERROR, "Cannot add node at %ld. Invalid hash value\n", hashval); + return -1; + } + + rc = validate_input(hash, data, data_len); + Pthread_rwlock_wrlock(&hash->lock); + ch_hash_node_t *node = ch_hash_lookup_node(hash, data, data_len); + if (node==NULL) { + logmsg(LOGMSG_ERROR, "%s:%d Node not found. Cannot add replica for non-existing node\n",__func__, __LINE__); + Pthread_rwlock_unlock(&hash->lock); + return CH_ERR_HASH; + } + + rc = ch_hash_add_node_to_hashval_locked(hash, node, hashval); + if (rc) { + Pthread_rwlock_unlock(&hash->lock); + logmsg(LOGMSG_ERROR, "%s:%d failed to add replica\n", __func__, __LINE__); + return rc; + } + Pthread_rwlock_unlock(&hash->lock); + return CH_NOERR; +} + +int ch_hash_add_node(ch_hash_t *ch, uint8_t *data, size_t data_len, uint64_t hashval) { + int rc = 0; + + if (hashval < 0 || hashval > HASH_VAL_COUNT) { + logmsg(LOGMSG_ERROR, "Cannot add node at %ld. Invalid hash value\n", hashval); + return -1; + } + rc = validate_input(ch, data, data_len); + + if (rc) { + return rc; + } + Pthread_rwlock_wrlock(&ch->lock); + rc = ch_hash_add_node_locked(ch, data, data_len, hashval); + Pthread_rwlock_unlock(&ch->lock); + return rc; +} + +ch_hash_node_t *ch_hash_find_node(ch_hash_t *ch, uint8_t *data, size_t data_len) { + int rc = 0; + ch_hash_node_t *node = NULL; + rc = validate_input(ch, data, data_len); + + if (rc) { + return node; + } + Pthread_rwlock_rdlock(&ch->lock); + node = ch_hash_find_node_locked(ch, data, data_len); + Pthread_rwlock_unlock(&ch->lock); + return node; +} + +int ch_hash_remove_node(ch_hash_t *ch, uint8_t *data, size_t data_len) { + int rc = 0; + rc = validate_input(ch, data, data_len); + + if (rc) { + return rc; + } + Pthread_rwlock_wrlock(&ch->lock); + rc = ch_hash_remove_node_locked(ch, data, data_len); + Pthread_rwlock_unlock(&ch->lock); + return rc; +} + +void ch_hash_free(ch_hash_t *ch) { + struct consistent_hash_node *item, *tmp; + if (ch) { + Pthread_rwlock_wrlock(&ch->lock); + for (int i=0; inum_keyhashes; i++) { + if (ch->key_hashes[i]) { + free(ch->key_hashes[i]); + } + } + if (ch->key_hashes) { + free(ch->key_hashes); + } + + LISTC_FOR_EACH_SAFE(&ch->nodes, item, tmp, lnk) { + listc_rfl(&ch->nodes, item); + free(item->data); + free(item); + } + Pthread_rwlock_unlock(&ch->lock); + Pthread_rwlock_destroy(&ch->lock); + free(ch); + } +}