diff --git a/accelerator/BUILD b/accelerator/BUILD index 3bfa16d1..5d0869ba 100644 --- a/accelerator/BUILD +++ b/accelerator/BUILD @@ -22,6 +22,7 @@ cc_binary( ":ta_config", ":ta_cli", "//connectivity/http", + "//accelerator/core:periodical_task", "@org_iota_common//utils/handles:signal", ] + select({ ":mqtt_enable": [ diff --git a/accelerator/cli_info.h b/accelerator/cli_info.h index 12e583a8..e70f006f 100644 --- a/accelerator/cli_info.h +++ b/accelerator/cli_info.h @@ -53,7 +53,7 @@ typedef enum ta_cli_arg_value_e { HEALTH_TRACK_PERIOD, NO_GTTA, BUFFER_LIST, - DONE_LIST, + COMPLETE_LIST, HTTP_THREADS_CLI, CACHE_CAPACITY, @@ -95,7 +95,7 @@ static struct ta_cli_argument_s { "The period for checking IOTA full node host connection status"}, {"no-gtta", no_argument, NULL, NO_GTTA, "Disable getTransactionToConfirm (gTTA) when sending transaction"}, {"buffer_list", required_argument, NULL, BUFFER_LIST, "Set the value of `buffer_list_name`"}, - {"done_list", required_argument, NULL, DONE_LIST, "Set the value of `done_list_name`"}, + {"complete_list", required_argument, NULL, COMPLETE_LIST, "Specify the name of complete list`"}, {"cache_capacity", required_argument, NULL, CACHE_CAPACITY, "Set the maximum capacity of caching server"}, {"quiet", no_argument, NULL, QUIET, "Disable logger"}, {"runtime_cli", no_argument, NULL, RUNTIME_CLI, "Enable runtime command line"}, diff --git a/accelerator/config.c b/accelerator/config.c index 5c3536d1..54888892 100644 --- a/accelerator/config.c +++ b/accelerator/config.c @@ -221,7 +221,7 @@ status_t cli_core_set(ta_core_t* const core, int key, char* const value) { ta_log_error("Malformed input character\n"); } break; - case CACHE_MAX_CAPACITY: + case CACHE_CAPACITY: strtol_temp = strtol(value, NULL, 10); if (strtol_p != value && errno != ERANGE && strtol_temp >= INT_MIN && strtol_temp <= INT_MAX) { if (strtol_temp <= 0) { @@ -265,8 +265,8 @@ status_t cli_core_set(ta_core_t* const core, int key, char* const value) { case BUFFER_LIST: cache->buffer_list_name = value; break; - case DONE_LIST: - cache->done_list_name = value; + case COMPLETE_LIST: + cache->complete_list_name = value; break; // Command line options configuration @@ -342,8 +342,9 @@ status_t ta_core_default_init(ta_core_t* const core) { cache->host = REDIS_HOST; cache->port = REDIS_PORT; cache->state = false; - cache->buffer_list_name = BUFFER_LIST_NAME; - cache->done_list_name = DONE_LIST_NAME; + cache->complete_list_name = COMPLETE_LIST_NAME; + cache->mam_buffer_list_name = MAM_BUFFER_LIST_NAME; + cache->mam_complete_list_name = MAM_COMPLETE_LIST_NAME; cache->capacity = CACHE_MAX_CAPACITY; ta_log_info("Initializing IOTA full node configuration\n"); diff --git a/accelerator/config.h b/accelerator/config.h index 1eb5fb40..96c76f2f 100644 --- a/accelerator/config.h +++ b/accelerator/config.h @@ -71,7 +71,9 @@ extern "C" { #define DB_HOST "localhost" #define MAM_FILE_PREFIX "/tmp/mam_bin_XXXXXX" #define BUFFER_LIST_NAME "txn_buff_list" -#define DONE_LIST_NAME "done_txn_buff_list" +#define COMPLETE_LIST_NAME "complete_txn_buff_list" +#define MAM_BUFFER_LIST_NAME "mam_buff_list" +#define MAM_COMPLETE_LIST_NAME "complete_mam_buff_list" #define CACHE_MAX_CAPACITY \ 170 * 1024 * 1024 /**< Default cache server maximum capacity. It is set to 170MB by default. */ #define HEALTH_TRACK_PERIOD 1800 /**< Check every half hour in default */ @@ -113,14 +115,16 @@ typedef struct iota_config_s { /** struct type of accelerator cache */ typedef struct ta_cache_s { - char* host; /**< Binding address of redis server */ - uint64_t timeout; /**< Timeout for keys in cache server */ - char* buffer_list_name; /**< Name of the list to buffer transactions */ - char* done_list_name; /**< Name of the list to store successfully broadcast transactions from buffer */ - uint16_t port; /**< Binding port of redis server */ - bool state; /**< Set it true to turn on cache server */ - long int capacity; /**< The maximum capacity of cache server */ - pthread_rwlock_t* rwlock; /**< Read/Write lock to avoid data racing in buffering */ + char* host; /**< Binding address of redis server */ + uint64_t timeout; /**< Timeout for keys in cache server */ + char* buffer_list_name; /**< Name of the list to buffer transactions */ + char* complete_list_name; /**< Name of the list to store successfully broadcast transactions from buffer */ + char* mam_buffer_list_name; /**< Name of the list to buffer MAM requests UUID */ + char* mam_complete_list_name; /**< Name of the list to successfullay published MAM requests */ + uint16_t port; /**< Binding port of redis server */ + bool state; /**< Set it true to turn on cache server */ + long int capacity; /**< The maximum capacity of cache server */ + pthread_rwlock_t* rwlock; /**< Read/Write lock to avoid data racing in buffering */ } ta_cache_t; /** struct type of accelerator core */ diff --git a/accelerator/core/BUILD b/accelerator/core/BUILD index 155ca3de..80427e16 100644 --- a/accelerator/core/BUILD +++ b/accelerator/core/BUILD @@ -87,3 +87,19 @@ cc_library( "@org_iota_common//utils:time", ], ) + +cc_library( + name = "periodical_task", + srcs = ["periodical_task.c"], + hdrs = ["periodical_task.h"], + linkopts = [ + "-lpthread", + ], + visibility = ["//visibility:public"], + deps = [ + ":core", + ":mam_core", + "//common:ta_errors", + "//common:ta_logger", + ], +) diff --git a/accelerator/core/apis.c b/accelerator/core/apis.c index e0165c6e..cf2d3088 100644 --- a/accelerator/core/apis.c +++ b/accelerator/core/apis.c @@ -214,32 +214,49 @@ status_t api_recv_mam_message(const iota_config_t* const iconf, const iota_clien return ret; } -status_t api_send_mam_message(const ta_config_t* const info, const iota_config_t* const iconf, - const iota_client_service_t* const service, char const* const obj, char** json_result) { +status_t api_send_mam_message(const ta_cache_t* const cache, char const* const obj, char** json_result) { status_t ret = SC_OK; ta_send_mam_req_t* req = send_mam_req_new(); - ta_send_mam_res_t* res = send_mam_res_new(); - if (send_mam_message_req_deserialize(obj, req)) { - ret = SC_MAM_FAILED_INIT; + ret = send_mam_message_req_deserialize(obj, req); + if (ret) { + ta_log_error("%s\n", ta_error_to_string(ret)); + goto done; + } + + // Generate UUID + char uuid[UUID_STR_LEN]; + uuid_t bin_uuid; + uuid_generate_random(bin_uuid); + uuid_unparse(bin_uuid, uuid); + if (!uuid[0]) { + ta_log_error("%s\n", "Failed to generate UUID"); + goto done; + } + + // TODO Generate the address that TA can quickly generate from the SEED with given parameters. + + // Buffer send_mam_req_t object for publishing it later + ret = cache_set(uuid, UUID_STR_LEN - 1, obj, strlen(obj), cache->timeout); + if (ret != SC_OK) { ta_log_error("%s\n", ta_error_to_string(ret)); goto done; } - ret = ta_send_mam_message(info, iconf, service, req, res); + // Buffer the UUID to the list + ret = cache_list_push(cache->mam_buffer_list_name, strlen(cache->mam_buffer_list_name), uuid, UUID_STR_LEN - 1); if (ret != SC_OK) { ta_log_error("%s\n", ta_error_to_string(ret)); goto done; } - ret = send_mam_message_res_serialize(res, json_result); + ret = send_mam_message_res_serialize(NULL, uuid, json_result); if (ret != SC_OK) { ta_log_error("%s\n", ta_error_to_string(ret)); } done: send_mam_req_free(&req); - send_mam_res_free(&res); return ret; } diff --git a/accelerator/core/apis.h b/accelerator/core/apis.h index a9091afa..5535fb89 100644 --- a/accelerator/core/apis.h +++ b/accelerator/core/apis.h @@ -75,8 +75,7 @@ status_t api_recv_mam_message(const iota_config_t* const iconf, const iota_clien * - SC_OK on success * - non-zero on error */ -status_t api_send_mam_message(const ta_config_t* const info, const iota_config_t* const iconf, - const iota_client_service_t* const service, char const* const obj, char** json_result); +status_t api_send_mam_message(const ta_cache_t* const cache, char const* const obj, char** json_result); /** * @brief Send transfer to tangle. diff --git a/accelerator/core/core.c b/accelerator/core/core.c index 257c8ae4..fa15339b 100644 --- a/accelerator/core/core.c +++ b/accelerator/core/core.c @@ -296,9 +296,8 @@ status_t ta_find_transaction_objects(const iota_client_service_t* const service, goto done; } char txn_hash[NUM_TRYTES_HASH + 1] = {0}; - char cache_value[NUM_TRYTES_SERIALIZED_TRANSACTION + 1] = {0}; + char* cache_value = NULL; txn_hash[NUM_TRYTES_HASH] = '\0'; - cache_value[NUM_TRYTES_SERIALIZED_TRANSACTION] = '\0'; // append transaction object which is already cached to transaction_array_t // if not, append uncached to request object of `iota_client_find_transaction_objects` @@ -306,7 +305,7 @@ status_t ta_find_transaction_objects(const iota_client_service_t* const service, CDL_FOREACH(req->hashes, q_iter) { flex_trits_to_trytes((tryte_t*)txn_hash, NUM_TRYTES_HASH, q_iter->hash, NUM_TRITS_HASH, NUM_TRITS_HASH); - ret = cache_get(txn_hash, cache_value); + ret = cache_get(txn_hash, &cache_value); if (ret == SC_OK) { flex_trits_from_trytes(tx_trits, NUM_TRITS_SERIALIZED_TRANSACTION, (const tryte_t*)cache_value, NUM_TRYTES_SERIALIZED_TRANSACTION, NUM_TRYTES_SERIALIZED_TRANSACTION); @@ -376,6 +375,7 @@ status_t ta_find_transaction_objects(const iota_client_service_t* const service, get_trytes_req_free(&req_get_trytes); transaction_array_free(uncached_txn_array); free(temp_txn_trits); + free(cache_value); return ret; } @@ -575,29 +575,6 @@ status_t ta_get_node_status(const iota_client_service_t* const service) { return ret; } -status_t ta_update_node_connection(ta_config_t* const info, iota_client_service_t* const service) { - status_t ret = SC_OK; - for (int i = 0; i < MAX_NODE_LIST_ELEMENTS && info->iota_host_list[i]; i++) { - // update new IOTA full node - strncpy(service->http.host, info->iota_host_list[i], HOST_MAX_LEN - 1); - service->http.host[HOST_MAX_LEN - 1] = '\0'; - service->http.port = info->iota_port_list[i]; - ta_log_info("Try to connect to %s:%d\n", service->http.host, service->http.port); - - // Run from the first one until found a good one. - if (ta_get_node_status(service) == SC_OK) { - ta_log_info("Connect to %s:%d\n", service->http.host, service->http.port); - goto done; - } - } - if (ret) { - ta_log_error("All IOTA full node on priority list failed.\n"); - } - -done: - return ret; -} - status_t push_txn_to_buffer(const ta_cache_t* const cache, hash8019_array_p raw_txn_flex_trit_array, char* uuid) { status_t ret = SC_OK; if (!uuid) { @@ -636,158 +613,6 @@ status_t push_txn_to_buffer(const ta_cache_t* const cache, hash8019_array_p raw_ return ret; } -status_t broadcast_buffered_txn(const ta_core_t* const core) { - status_t ret = SC_OK; - int uuid_list_len = 0; - hash8019_array_p txn_trytes_array = hash8019_array_new(); - - /* - *There are 4 data structures used here. - * 1. List: A list of unsent uuid which can be used to identify an unsent transaction object - * 2. Key-value: UUID to unsent transaction object in `flex_trit_t` - * 3. List: Store all the UUID of sent transaction objects. (We could use set after investigation in the future) - * 4. Key-value: UUID to sent transaction object in `flex_trit_t` - * - * 'push_txn_to_buffer()': - * Push UUID to the unsent UUID list. - * Store UUID as key and unsent transaction `flex_trit_t` as value. - * - * 'broadcast_buffered_txn()': - * Pop an unsent UUID in the unsent UUID list. - * Delete UUID-unsent_transaction pair from key value storage - * Store UUID as key and sent transaction object in `flex_trit_t` as value. - * Push UUID of sent transaction into sent transaction list. - * - * 'ta_fetch_txn_with_uuid()': - * Fetch transaction object with UUID in key-value storage. - * Delete UUID from sent transaction list - * Delete UUID-sent_transaction pair from key-value storage - */ - - get_trytes_req_t* req = NULL; - get_trytes_res_t* res = NULL; - do { - char uuid[UUID_STR_LEN]; - - ret = cache_list_size(core->cache.buffer_list_name, &uuid_list_len); - if (ret) { - ta_log_error("%s\n", ta_error_to_string(ret)); - goto done; - } - - if (uuid_list_len == 0) { - ta_log_debug("No buffered requests\n"); - goto done; - } - - ret = cache_list_peek(core->cache.buffer_list_name, UUID_STR_LEN, uuid); - if (ret) { - ta_log_error("%s\n", ta_error_to_string(ret)); - goto done; - } - - // TODO Now we assume every time we call `cache_get()`, we would get a transaction object. However, in the future, - // the returned result may be a bundle. - int trytes_array_len = 0; - ret = cache_list_size(uuid, &trytes_array_len); - if (ret) { - ta_log_error("%s\n", ta_error_to_string(ret)); - goto done; - } - - for (int i = 0; i < trytes_array_len; ++i) { - flex_trit_t req_txn_flex_trits[NUM_FLEX_TRITS_SERIALIZED_TRANSACTION + 1] = {}; - ret = cache_list_at(uuid, i, NUM_FLEX_TRITS_SERIALIZED_TRANSACTION, (char*)req_txn_flex_trits); - if (ret) { - ta_log_error("%s\n", ta_error_to_string(ret)); - goto done; - } - hash_array_push(txn_trytes_array, req_txn_flex_trits); - } - - ret = ta_send_trytes(&core->ta_conf, &core->iota_conf, &core->iota_service, txn_trytes_array); - if (ret) { - ta_log_error("%s\n", ta_error_to_string(ret)); - goto done; - } - - // TODO Update the transaction object saved in redis, which allows `ta_fetch_txn_with_uuid()` to return the - // transaction object much faster. - req = get_trytes_req_new(); - res = get_trytes_res_new(); - iota_transaction_t txn; - for (int i = 0; i < trytes_array_len; ++i) { - transaction_deserialize_from_trits(&txn, hash_array_at(txn_trytes_array, i), true); - flex_trit_t* hash = transaction_hash(&txn); - - ret = hash243_queue_push(&req->hashes, hash); - if (ret) { - ret = SC_CCLIENT_HASH; - ta_log_error("%s\n", ta_error_to_string(ret)); - goto done; - } - } - utarray_done(txn_trytes_array); - - ret = iota_client_get_trytes(&core->iota_service, req, res); - if (ret) { - ret = SC_CCLIENT_FAILED_RESPONSE; - ta_log_error("%s\n", ta_error_to_string(ret)); - goto done; - } - - // Delete the old transaction object - ret = cache_del(uuid); - if (ret) { - ta_log_error("%s\n", ta_error_to_string(ret)); - goto done; - } - - trytes_array_len = hash8019_queue_count(res->trytes); - for (int i = 0; i < trytes_array_len; ++i) { - ret = cache_list_push(uuid, UUID_STR_LEN - 1, hash8019_queue_at(res->trytes, i), - NUM_FLEX_TRITS_SERIALIZED_TRANSACTION); - if (ret) { - ta_log_error("%s\n", ta_error_to_string(ret)); - goto done; - } - } - - if (pthread_rwlock_trywrlock(core->cache.rwlock)) { - ret = SC_CACHE_LOCK_FAILURE; - ta_log_error("%s\n", ta_error_to_string(ret)); - goto done; - } - // Pop transaction from buffered list - ret = cache_list_pop(core->cache.buffer_list_name, (char*)uuid); - if (ret) { - ta_log_error("%s\n", ta_error_to_string(ret)); - goto done; - } - - // Transfer the transaction to another list in where we store all the successfully broadcasted transactions. - ret = cache_list_push(core->cache.done_list_name, strlen(core->cache.done_list_name), uuid, UUID_STR_LEN - 1); - if (ret) { - ta_log_error("%s\n", ta_error_to_string(ret)); - goto done; - } - if (pthread_rwlock_unlock(core->cache.rwlock)) { - ret = SC_CACHE_LOCK_FAILURE; - ta_log_error("%s\n", ta_error_to_string(ret)); - goto done; - } - - get_trytes_req_free(&req); - get_trytes_res_free(&res); - } while (!uuid_list_len); - -done: - hash_array_free(txn_trytes_array); - get_trytes_req_free(&req); - get_trytes_res_free(&res); - return ret; -} - status_t ta_fetch_txn_with_uuid(const ta_cache_t* const cache, const char* const uuid, ta_fetch_txn_with_uuid_res_t* res) { status_t ret = SC_OK; @@ -814,7 +639,7 @@ status_t ta_fetch_txn_with_uuid(const ta_cache_t* const cache, const char* const goto done; } - ret = cache_list_exist(cache->done_list_name, uuid, UUID_STR_LEN - 1, &exist); + ret = cache_list_exist(cache->complete_list_name, uuid, UUID_STR_LEN - 1, &exist); if (ret) { ta_log_error("%s\n", ta_error_to_string(ret)); goto done; @@ -857,7 +682,7 @@ status_t ta_fetch_txn_with_uuid(const ta_cache_t* const cache, const char* const } char pop_uuid[UUID_STR_LEN]; - ret = cache_list_pop(cache->done_list_name, pop_uuid); + ret = cache_list_pop(cache->complete_list_name, pop_uuid); if (ret) { ta_log_error("%s\n", ta_error_to_string(ret)); goto done; diff --git a/accelerator/core/core.h b/accelerator/core/core.h index 698dee85..136ce362 100644 --- a/accelerator/core/core.h +++ b/accelerator/core/core.h @@ -218,21 +218,6 @@ status_t ta_get_bundles_by_addr(const iota_client_service_t* const service, tryt */ status_t ta_get_node_status(const iota_client_service_t* const service); -/** - * @brief Update the binding IOTA full node to another valid host on priority list - * - * ta_update_node_connection would check the connection status of all the IOTA full node on priority list iteratively. - * Once it connects to one of the IOTA full node on the priority list, it would return SC_OK. - * - * @param[in] info Tangle-accelerator configuration variables - * @param[in] service service IOTA full node end point service - * - * @return - * - SC_OK on success - * - non-zero on error - */ -status_t ta_update_node_connection(ta_config_t* const info, iota_client_service_t* const service); - /** * @brief Push failed transactions in raw trytes into transaction buffer * @@ -250,20 +235,6 @@ status_t ta_update_node_connection(ta_config_t* const info, iota_client_service_ */ status_t push_txn_to_buffer(const ta_cache_t* const cache, hash8019_array_p raw_txn_flex_trit_array, char* uuid); -/** - * @brief Broadcast transactions in transaction buffer - * - * Failed transactions would be stored in transaction buffer. Once tangle-accelerator retrieve the connetion with - * Tangle, then tangle-accelerator will start to broadcast these failed transaction trytes. - * - * @param[in] core Pointer to Tangle-accelerator core configuration structure - * - * @return - * - SC_OK on success - * - non-zero on error - */ -status_t broadcast_buffered_txn(const ta_core_t* const core); - /** * @brief Return the transaction object status according to the given UUID * diff --git a/accelerator/core/periodical_task.c b/accelerator/core/periodical_task.c new file mode 100644 index 00000000..10112f01 --- /dev/null +++ b/accelerator/core/periodical_task.c @@ -0,0 +1,384 @@ +/* + * Copyright (C) 2020 BiiLabs Co., Ltd. and Contributors + * All Rights Reserved. + * This is free software; you can redistribute it and/or modify it under the + * terms of the MIT license. A copy of the license can be found in the file + * "LICENSE" at the root of this distribution. + */ + +#include "periodical_task.h" + +#define BK_LOGGER "core" + +static logger_id_t logger_id; + +void bk_logger_init() { logger_id = logger_helper_enable(BK_LOGGER, LOGGER_DEBUG, true); } + +int bk_logger_release() { + logger_helper_release(logger_id); + if (logger_helper_destroy() != RC_OK) { + ta_log_error("Destroying logger failed %s.\n", BK_LOGGER); + return EXIT_FAILURE; + } + + return 0; +} + +/** + * @brief Update the binding IOTA full node to another valid host on priority list + * + * ta_update_full_node_connection would check the connection status of all the IOTA full node on priority list + * iteratively. Once it connects to one of the IOTA full node on the priority list, it would return SC_OK. + * + * @param[in] ta_conf Tangle-accelerator configuration variables + * @param[in] service service IOTA full node endpoint service + * + * @return + * - SC_OK on success + * - non-zero on error + */ +static status_t ta_update_full_node_connection(ta_config_t* const ta_conf, iota_client_service_t* const service) { + status_t ret = SC_OK; + for (int i = 0; i < MAX_NODE_LIST_ELEMENTS && ta_conf->iota_host_list[i]; i++) { + // update new IOTA full node + + strncpy(service->http.host, ta_conf->iota_host_list[i], HOST_MAX_LEN - 1); + service->http.port = ta_conf->iota_port_list[i]; + ta_log_info("Try to connect to %s:%d\n", service->http.host, service->http.port); + + // Run from the first one until found a good one. + if (ta_get_node_status(service) == SC_OK) { + ta_log_info("Connect to %s:%d\n", service->http.host, service->http.port); + goto done; + } + } + if (ret) { + ta_log_error("All IOTA full node on priority list failed.\n"); + } + +done: + return ret; +} + +status_t broadcast_buffered_txn(const ta_core_t* const core) { + status_t ret = SC_OK; + int uuid_list_len = 0; + hash8019_array_p txn_trytes_array = NULL; + + /* + *There are 4 data structures used here. + * 1. List: A list of unsent uuid which can be used to identify an unsent transaction object + * 2. Key-value: UUID to unsent transaction object in `flex_trit_t` + * 3. List: Store all the UUID of sent transaction objects. (We could use set after investigation in the future) + * 4. Key-value: UUID to sent transaction object in `flex_trit_t` + * + * 'push_txn_to_buffer()': + * Push UUID to the unsent UUID list. + * Store UUID as key and unsent transaction `flex_trit_t` as value. + * + * 'broadcast_buffered_txn()': + * Pop an unsent UUID in the unsent UUID list. + * Delete UUID-unsent_transaction pair from key value storage + * Store UUID as key and sent transaction object in `flex_trit_t` as value. + * Push UUID of sent transaction into sent transaction list. + * + * 'ta_fetch_buffered_request_status()': + * Fetch transaction object with UUID in key-value storage. + * Delete UUID from sent transaction list + * Delete UUID-sent_transaction pair from key-value storage + */ + + get_trytes_req_t* req = NULL; + get_trytes_res_t* res = NULL; + do { + char uuid[UUID_STR_LEN]; + txn_trytes_array = hash8019_array_new(); + + ret = cache_list_size(core->cache.buffer_list_name, &uuid_list_len); + if (ret) { + ta_log_error("%s\n", ta_error_to_string(ret)); + goto done; + } + + if (uuid_list_len == 0) { + ta_log_debug("No buffered requests\n"); + goto done; + } + + ret = cache_list_peek(core->cache.buffer_list_name, UUID_STR_LEN, uuid); + if (ret) { + ta_log_error("%s\n", ta_error_to_string(ret)); + goto done; + } + + // TODO Now we assume every time we call `cache_get()`, we would get a transaction object. However, in the future, + // the returned result may be a bundle. + int trytes_array_len = 0; + ret = cache_list_size(uuid, &trytes_array_len); + if (ret) { + ta_log_error("%s\n", ta_error_to_string(ret)); + goto done; + } + + for (int i = 0; i < trytes_array_len; ++i) { + flex_trit_t req_txn_flex_trits[NUM_FLEX_TRITS_SERIALIZED_TRANSACTION + 1] = {}; + ret = cache_list_at(uuid, i, NUM_FLEX_TRITS_SERIALIZED_TRANSACTION, (char*)req_txn_flex_trits); + if (ret) { + ta_log_error("%s\n", ta_error_to_string(ret)); + goto done; + } + hash_array_push(txn_trytes_array, req_txn_flex_trits); + } + + ret = ta_send_trytes(&core->ta_conf, &core->iota_conf, &core->iota_service, txn_trytes_array); + if (ret) { + ta_log_error("%s\n", ta_error_to_string(ret)); + goto done; + } + + // TODO Update the transaction object saved in redis, which allows `ta_fetch_buffered_request_status()` to + // return the transaction object much faster. + req = get_trytes_req_new(); + res = get_trytes_res_new(); + iota_transaction_t txn; + for (int i = 0; i < trytes_array_len; ++i) { + transaction_deserialize_from_trits(&txn, hash_array_at(txn_trytes_array, i), true); + flex_trit_t* hash = transaction_hash(&txn); + + ret = hash243_queue_push(&req->hashes, hash); + if (ret) { + ret = SC_CCLIENT_HASH; + ta_log_error("%s\n", ta_error_to_string(ret)); + goto done; + } + } + hash_array_free(txn_trytes_array); + txn_trytes_array = NULL; + + ret = iota_client_get_trytes(&core->iota_service, req, res); + if (ret) { + ret = SC_CCLIENT_FAILED_RESPONSE; + ta_log_error("%s\n", ta_error_to_string(ret)); + goto done; + } + + // Delete the old transaction object + ret = cache_del(uuid); + if (ret) { + ta_log_error("%s\n", ta_error_to_string(ret)); + goto done; + } + + trytes_array_len = hash8019_queue_count(res->trytes); + for (int i = 0; i < trytes_array_len; ++i) { + ret = cache_list_push(uuid, UUID_STR_LEN - 1, hash8019_queue_at(res->trytes, i), + NUM_FLEX_TRITS_SERIALIZED_TRANSACTION); + if (ret) { + ta_log_error("%s\n", ta_error_to_string(ret)); + goto done; + } + } + + if (pthread_rwlock_trywrlock(core->cache.rwlock)) { + ret = SC_CACHE_LOCK_FAILURE; + ta_log_error("%s\n", ta_error_to_string(ret)); + goto done; + } + // Pop transaction from buffered list + ret = cache_list_pop(core->cache.buffer_list_name, (char*)uuid); + if (ret) { + ta_log_error("%s\n", ta_error_to_string(ret)); + goto done; + } + + // Transfer the transaction to another list in where we store all the successfully broadcasted transactions. + ret = + cache_list_push(core->cache.complete_list_name, strlen(core->cache.complete_list_name), uuid, UUID_STR_LEN - 1); + if (ret) { + ta_log_error("%s\n", ta_error_to_string(ret)); + goto done; + } + if (pthread_rwlock_unlock(core->cache.rwlock)) { + ret = SC_CACHE_LOCK_FAILURE; + ta_log_error("%s\n", ta_error_to_string(ret)); + goto done; + } + + get_trytes_req_free(&req); + get_trytes_res_free(&res); + } while (uuid_list_len); + +done: + hash_array_free(txn_trytes_array); + get_trytes_req_free(&req); + get_trytes_res_free(&res); + return ret; +} + +status_t broadcast_buffered_send_mam_request(const ta_core_t* const core) { + status_t ret = SC_OK; + int uuid_list_len = 0; + char* json = NULL; + + ta_send_mam_req_t* req = NULL; + ta_send_mam_res_t* res = NULL; + do { + char uuid[UUID_STR_LEN]; + + ret = cache_list_size(core->cache.mam_buffer_list_name, &uuid_list_len); + if (ret) { + ta_log_error("%s\n", ta_error_to_string(ret)); + goto done; + } + + if (uuid_list_len == 0) { + ta_log_debug("No buffered requests\n"); + goto done; + } + + ret = cache_list_peek(core->cache.mam_buffer_list_name, UUID_STR_LEN, uuid); + if (ret) { + ta_log_error("%s\n", ta_error_to_string(ret)); + goto done; + } + + ret = cache_get(uuid, &json); + if (ret) { + ta_log_error("%s\n", ta_error_to_string(ret)); + goto done; + } + + req = send_mam_req_new(); + res = send_mam_res_new(); + if (!req || !res) { + ret = SC_OOM; + ta_log_error("%s\n", ta_error_to_string(ret)); + goto done; + } + + ret = send_mam_message_req_deserialize(json, req); + if (ret) { + ta_log_error("%s\n", ta_error_to_string(ret)); + goto done; + } + free(json); + json = NULL; + + ret = ta_send_mam_message(&core->ta_conf, &core->iota_conf, &core->iota_service, req, res); + if (ret) { + ta_log_error("%s\n", ta_error_to_string(ret)); + goto done; + } + + ret = send_mam_message_res_serialize(res, NULL, &json); + if (ret != SC_OK) { + ta_log_error("%s\n", ta_error_to_string(ret)); + goto done; + } + + if (pthread_rwlock_trywrlock(core->cache.rwlock)) { + ret = SC_CACHE_LOCK_FAILURE; + ta_log_error("%s\n", ta_error_to_string(ret)); + goto done; + } + + // Delete the old transaction object + ret = cache_del(uuid); + if (ret) { + ta_log_error("%s\n", ta_error_to_string(ret)); + goto done; + } + + ret = cache_set(uuid, UUID_STR_LEN - 1, json, strlen(json), core->cache.timeout); + if (ret) { + ta_log_error("%s\n", ta_error_to_string(ret)); + goto done; + } + + // Pop transaction from buffered list + ret = cache_list_pop(core->cache.mam_buffer_list_name, (char*)uuid); + if (ret) { + ta_log_error("%s\n", ta_error_to_string(ret)); + goto done; + } + + // Transfer the transaction to another list in where we store all the successfully broadcasted transactions. + ret = cache_list_push(core->cache.mam_complete_list_name, strlen(core->cache.mam_complete_list_name), uuid, + UUID_STR_LEN - 1); + if (ret) { + ta_log_error("%s\n", ta_error_to_string(ret)); + goto done; + } + if (pthread_rwlock_unlock(core->cache.rwlock)) { + ret = SC_CACHE_LOCK_FAILURE; + ta_log_error("%s\n", ta_error_to_string(ret)); + goto done; + } + + send_mam_req_free(&req); + send_mam_res_free(&res); + free(json); + json = NULL; + } while (uuid_list_len); + +done: + send_mam_req_free(&req); + send_mam_res_free(&res); + free(json); + return ret; +} + +void* health_track(void* arg) { + ta_core_t* core = (ta_core_t*)arg; + char uuid[UUID_STR_LEN] = {}; + + while (core->cache.state) { + status_t ret = ta_get_node_status(&core->iota_service); + if (ret == SC_CORE_NODE_UNSYNC || ret == SC_CCLIENT_FAILED_RESPONSE) { + ta_log_error("IOTA full node status error %d. Try to connect to another IOTA full node on priority list\n", ret); + ret = ta_update_full_node_connection(&core->ta_conf, &core->iota_service); + if (ret) { + ta_log_error("Update IOTA full node failed: %d\n", ret); + } + } + + // Broadcast buffered transactions + if (ret == SC_OK) { + ret = broadcast_buffered_txn(core); + if (ret) { + ta_log_error("Broadcast buffered transactions failed. %s\n", ta_error_to_string(ret)); + } + } + + if (ret == SC_OK) { + ret = broadcast_buffered_send_mam_request(core); + if (ret) { + ta_log_error("Broadcast buffered send MAM message requests failed. %s\n", ta_error_to_string(ret)); + } + } + + // The usage exceeds the maximum redis capacity + while (core->cache.capacity < cache_occupied_space()) { + if (pthread_rwlock_trywrlock(core->cache.rwlock)) { + ta_log_error("%s\n", ta_error_to_string(SC_CACHE_LOCK_FAILURE)); + break; + } + + ret = cache_list_pop(core->cache.complete_list_name, uuid); + if (ret) { + ta_log_error("%s\n", ta_error_to_string(ret)); + } + ret = cache_del(uuid); + if (ret) { + ta_log_error("%s\n", ta_error_to_string(ret)); + } + + if (pthread_rwlock_unlock(core->cache.rwlock)) { + ta_log_error("%s\n", ta_error_to_string(SC_CACHE_LOCK_FAILURE)); + } + } + + sleep(core->ta_conf.health_track_period); + } + return ((void*)NULL); +} diff --git a/accelerator/core/periodical_task.h b/accelerator/core/periodical_task.h new file mode 100644 index 00000000..8ade2615 --- /dev/null +++ b/accelerator/core/periodical_task.h @@ -0,0 +1,61 @@ +/* + * Copyright (C) 2020 BiiLabs Co., Ltd. and Contributors + * All Rights Reserved. + * This is free software; you can redistribute it and/or modify it under the + * terms of the MIT license. A copy of the license can be found in the file + * "LICENSE" at the root of this distribution. + */ + +#ifndef CORE_PERIODICAL_TASK_H_ +#define CORE_PERIODICAL_TASK_H_ + +#include +#include "accelerator/config.h" +#include "accelerator/core/core.h" +#include "accelerator/core/mam_core.h" +#include "common/logger.h" +#include "common/ta_errors.h" +#include "pthread.h" +#include "time.h" +#include "uuid/uuid.h" + +#ifdef __cplusplus +extern "C" { +#endif + +/** + * @file accelerator/core/periodical_task.h + */ + +void* health_track(void* arg); + +/** + * @brief Broadcast transactions in transaction buffer + * + * Failed transactions would be stored in transaction buffer. Once tangle-accelerator retrieve the connetion with + * Tangle, then tangle-accelerator will start to broadcast these failed transaction trytes. + * + * @param[in] core Pointer to Tangle-accelerator core configuration structure + * + * @return + * - SC_OK on success + * - non-zero on error + */ +status_t broadcast_buffered_txn(const ta_core_t* const core); + +/** + * @brief Broadcast buffered MAM requests + * + * @param[in] core Pointer to Tangle-accelerator core configuration structure + * + * @return + * - SC_OK on success + * - non-zero on error + */ +status_t broadcast_buffered_send_mam_request(const ta_core_t* const core); + +#ifdef __cplusplus +} +#endif + +#endif // CORE_PERIODICAL_TASK_H_ \ No newline at end of file diff --git a/accelerator/core/serializer/ser_mam.c b/accelerator/core/serializer/ser_mam.c index cd8a6c55..35c01332 100644 --- a/accelerator/core/serializer/ser_mam.c +++ b/accelerator/core/serializer/ser_mam.c @@ -321,9 +321,9 @@ status_t send_mam_message_res_deserialize(const char* const obj, ta_send_mam_res return ret; } -status_t send_mam_message_res_serialize(const ta_send_mam_res_t* const res, char** obj) { +status_t send_mam_message_res_serialize(const ta_send_mam_res_t* const res, char const* const uuid, char** obj) { status_t ret = SC_OK; - if (!res || !obj) { + if ((!res && !uuid) || !obj) { ret = SC_SERIALIZER_NULL; ta_log_error("%s\n", ta_error_to_string(ret)); return ret; @@ -336,20 +336,23 @@ status_t send_mam_message_res_serialize(const ta_send_mam_res_t* const res, char goto done; } - cJSON_AddStringToObject(json_root, "bundle_hash", res->bundle_hash); + if (uuid) { + cJSON_AddStringToObject(json_root, "uuid", uuid); + } else { + cJSON_AddStringToObject(json_root, "bundle_hash", res->bundle_hash); - cJSON_AddStringToObject(json_root, "chid", res->chid); + cJSON_AddStringToObject(json_root, "chid", res->chid); - cJSON_AddStringToObject(json_root, "msg_id", res->msg_id); + cJSON_AddStringToObject(json_root, "msg_id", res->msg_id); - if (res->announcement_bundle_hash[0]) { - cJSON_AddStringToObject(json_root, "announcement_bundle_hash", res->announcement_bundle_hash); - } + if (res->announcement_bundle_hash[0]) { + cJSON_AddStringToObject(json_root, "announcement_bundle_hash", res->announcement_bundle_hash); + } - if (res->chid1[0]) { - cJSON_AddStringToObject(json_root, "chid1", res->chid1); + if (res->chid1[0]) { + cJSON_AddStringToObject(json_root, "chid1", res->chid1); + } } - *obj = cJSON_PrintUnformatted(json_root); if (*obj == NULL) { ret = SC_SERIALIZER_JSON_PARSE; diff --git a/accelerator/core/serializer/ser_mam.h b/accelerator/core/serializer/ser_mam.h index c6f73074..0edf751b 100644 --- a/accelerator/core/serializer/ser_mam.h +++ b/accelerator/core/serializer/ser_mam.h @@ -55,7 +55,7 @@ status_t send_mam_message_res_deserialize(const char* const obj, ta_send_mam_res * - SC_OK on success * - non-zero on error */ -status_t send_mam_message_res_serialize(const ta_send_mam_res_t* const res, char** obj); +status_t send_mam_message_res_serialize(const ta_send_mam_res_t* const res, char const* const uuid, char** obj); /** * @brief Deserialize request of recv_mam_message diff --git a/accelerator/main.c b/accelerator/main.c index 1ca1d527..06496051 100644 --- a/accelerator/main.c +++ b/accelerator/main.c @@ -1,5 +1,6 @@ #include +#include "accelerator/core/periodical_task.h" #include "common/logger.h" #include "common/ta_errors.h" #include "connectivity/common.h" @@ -21,54 +22,6 @@ static void ta_stop(int signal) { } } -static void* health_track(void* arg) { - ta_core_t* core = (ta_core_t*)arg; - while (core->cache.state) { - status_t ret = ta_get_node_status(&core->iota_service); - if (ret == SC_CORE_NODE_UNSYNC || ret == SC_CCLIENT_FAILED_RESPONSE) { - ta_log_error("IOTA full node status error %d. Try to connect to another IOTA full node host on priority list\n", - ret); - ret = ta_update_node_connection(&core->ta_conf, &core->iota_service); - if (ret) { - ta_log_error("Update IOTA full node host failed: %d\n", ret); - } - } - - // Broadcast buffered transactions - if (ret == SC_OK) { - ret = broadcast_buffered_txn(core); - if (ret) { - ta_log_error("Broadcast buffered transactions failed. %s\n", ta_error_to_string(ret)); - } - } - - char uuid[UUID_STR_LEN] = {}; - // The usage exceeds the maximum redis capacity - while (core->cache.capacity < cache_occupied_space()) { - if (pthread_rwlock_trywrlock(core->cache.rwlock)) { - ta_log_error("%s\n", ta_error_to_string(SC_CACHE_LOCK_FAILURE)); - break; - } - - ret = cache_list_pop(core->cache.done_list_name, uuid); - if (ret) { - ta_log_error("%s\n", ta_error_to_string(ret)); - } - ret = cache_del(uuid); - if (ret) { - ta_log_error("%s\n", ta_error_to_string(ret)); - } - - if (pthread_rwlock_unlock(core->cache.rwlock)) { - ta_log_error("%s\n", ta_error_to_string(SC_CACHE_LOCK_FAILURE)); - } - } - - sleep(core->ta_conf.health_track_period); - } - return ((void*)NULL); -} - int main(int argc, char* argv[]) { if (signal_handle_register(SIGINT, ta_stop) == SIG_ERR || signal_handle_register(SIGTERM, ta_stop) == SIG_ERR) { return EXIT_FAILURE; diff --git a/connectivity/http/http.c b/connectivity/http/http.c index 1c4a9268..fe0115cf 100644 --- a/connectivity/http/http.c +++ b/connectivity/http/http.c @@ -169,10 +169,9 @@ static inline int process_find_transaction_by_id_request(ta_http_t *const http, } #endif -static inline int process_send_mam_msg_request(ta_http_t *const http, iota_client_service_t *const iota_service, - char const *const payload, char **const out) { +static inline int process_send_mam_msg_request(ta_http_t *const http, char const *const payload, char **const out) { status_t ret; - ret = api_send_mam_message(&http->core->ta_conf, &http->core->iota_conf, iota_service, payload, out); + ret = api_send_mam_message(&http->core->cache, payload, out); return set_response_content(ret, out); } @@ -234,7 +233,7 @@ static int ta_http_process_request(ta_http_t *const http, iota_client_service_t if (api_path_matcher(url, ".*/recv.*") == SC_OK) { return process_recv_mam_msg_request(http, iota_service, payload, out); } else { - return process_send_mam_msg_request(http, iota_service, payload, out); + return process_send_mam_msg_request(http, payload, out); } } else if (api_path_matcher(url, "/transaction/[A-Z9]{81}[/]?") == SC_OK) { return process_find_txn_obj_single_request(iota_service, url, out); diff --git a/tests/api/BUILD b/tests/api/BUILD index 1bdb8af0..ad318b45 100644 --- a/tests/api/BUILD +++ b/tests/api/BUILD @@ -33,12 +33,14 @@ cc_binary( ) cc_test( - name = "driver_core", + name = "test_periodical_task", srcs = [ - "driver_core.c", + "test_periodical_task.c", ], deps = [ "//accelerator/core", + "//accelerator/core:apis", + "//accelerator/core:periodical_task", "//tests:common", "//tests:logger_lib", "//tests:test_define", diff --git a/tests/api/mam_test.c b/tests/api/mam_test.c index c410ec21..5b414c8c 100644 --- a/tests/api/mam_test.c +++ b/tests/api/mam_test.c @@ -36,15 +36,19 @@ void test_send_mam_message(void) { char* json = (char*)malloc(sizeof(char) * len); snprintf(json, len, json_template, seed); double sum = 0; + test_time_start(&start_time); for (size_t count = 0; count < TEST_COUNT; count++) { - test_time_start(&start_time); + ta_send_mam_req_t* req = send_mam_req_new(); + TEST_ASSERT_EQUAL_INT32(SC_OK, send_mam_message_req_deserialize(json, req)); TEST_ASSERT_EQUAL_INT32( - SC_OK, api_send_mam_message(&ta_core.ta_conf, &ta_core.iota_conf, &ta_core.iota_service, json, &json_result)); + SC_OK, ta_send_mam_message(&ta_core.ta_conf, &ta_core.iota_conf, &ta_core.iota_service, req, &res)); send_mam_message_res_deserialize(json_result, &res); - test_time_end(&start_time, &end_time, &sum); + free(json_result); + send_mam_req_free(&req); } free(json); + test_time_end(&start_time, &end_time, &sum); printf("Average time of receive_mam_message: %lf\n", sum / TEST_COUNT); } @@ -79,15 +83,16 @@ void test_write_until_next_channel(void) { double sum = 0; test_time_start(&start_time); for (int i = 0; i < msg_num; i++) { - char* json_result; mam_res_array[i] = send_mam_res_new(); char* json = (char*)malloc(sizeof(char) * len); snprintf(json, len, json_template_send, seed, payload, i); + + ta_send_mam_req_t* req = send_mam_req_new(); + TEST_ASSERT_EQUAL_INT32(SC_OK, send_mam_message_req_deserialize(json, req)); TEST_ASSERT_EQUAL_INT32( - SC_OK, api_send_mam_message(&ta_core.ta_conf, &ta_core.iota_conf, &ta_core.iota_service, json, &json_result)); - send_mam_message_res_deserialize(json_result, mam_res_array[i]); + SC_OK, ta_send_mam_message(&ta_core.ta_conf, &ta_core.iota_conf, &ta_core.iota_service, req, mam_res_array[i])); + send_mam_req_free(&req); free(json); - free(json_result); } // The current chid1 should be equal to the chid of next channel. Element with index `channel_leaf_msg_num` is the @@ -146,6 +151,7 @@ void test_write_with_chid(void) { const int len = strlen(json_template_send) + NUM_TRYTES_ADDRESS + strlen(payload) + 2; gen_rand_trytes(NUM_TRYTES_ADDRESS, (tryte_t*)seed); double sum = 0; + ta_send_mam_req_t* req; ta_send_mam_res_t* res; test_time_start(&start_time); char* json_result = NULL; @@ -153,13 +159,16 @@ void test_write_with_chid(void) { res = send_mam_res_new(); char* json = (char*)malloc(sizeof(char) * len); snprintf(json, len, json_template_send, seed, payload, i); - TEST_ASSERT_EQUAL_INT32( - SC_OK, api_send_mam_message(&ta_core.ta_conf, &ta_core.iota_conf, &ta_core.iota_service, json, &json_result)); + req = send_mam_req_new(); + TEST_ASSERT_EQUAL_INT32(SC_OK, send_mam_message_req_deserialize(json, req)); + TEST_ASSERT_EQUAL_INT32(SC_OK, + ta_send_mam_message(&ta_core.ta_conf, &ta_core.iota_conf, &ta_core.iota_service, req, res)); free(json); if (i != beginning_msg_num - 1) { free(json_result); send_mam_res_free(&res); } + send_mam_req_free(&req); } send_mam_message_res_deserialize(json_result, res); free(json_result); @@ -171,9 +180,15 @@ void test_write_with_chid(void) { const int len_send_chid = strlen(json_template_send_chid) + NUM_TRYTES_ADDRESS * 2 + strlen(payload); char* json_send_chid = (char*)malloc(sizeof(char) * (len_send_chid + 1)); snprintf(json_send_chid, len_send_chid, json_template_send_chid, seed, res->chid1, payload); - TEST_ASSERT_EQUAL_INT32(SC_OK, api_send_mam_message(&ta_core.ta_conf, &ta_core.iota_conf, &ta_core.iota_service, - json_send_chid, &json_result)); + TEST_ASSERT_EQUAL_INT32(SC_OK, api_send_mam_message(&ta_core.cache, json_send_chid, &json_result)); + + req = send_mam_req_new(); + TEST_ASSERT_EQUAL_INT32(SC_OK, send_mam_message_req_deserialize(json_send_chid, req)); + TEST_ASSERT_EQUAL_INT32(SC_OK, + ta_send_mam_message(&ta_core.ta_conf, &ta_core.iota_conf, &ta_core.iota_service, req, res)); + test_time_end(&start_time, &end_time, &sum); + send_mam_req_free(&req); send_mam_res_free(&res); free(json_send_chid); free(json_result); @@ -193,9 +208,11 @@ void test_encrypt_decrypt_psk(void) { char* json = (char*)malloc(sizeof(char) * len_send); snprintf(json, len_send, json_template_send, seed); ta_send_mam_res_t* send_res = send_mam_res_new(); + ta_send_mam_req_t* req = send_mam_req_new(); + TEST_ASSERT_EQUAL_INT32(SC_OK, send_mam_message_req_deserialize(json, req)); TEST_ASSERT_EQUAL_INT32( - SC_OK, api_send_mam_message(&ta_core.ta_conf, &ta_core.iota_conf, &ta_core.iota_service, json, &json_result)); - send_mam_message_res_deserialize(json_result, send_res); + SC_OK, ta_send_mam_message(&ta_core.ta_conf, &ta_core.iota_conf, &ta_core.iota_service, req, send_res)); + send_mam_req_free(&req); free(json_result); free(json); diff --git a/tests/api/driver_core.c b/tests/api/test_periodical_task.c similarity index 80% rename from tests/api/driver_core.c rename to tests/api/test_periodical_task.c index 7a9ad6e5..8be46007 100644 --- a/tests/api/driver_core.c +++ b/tests/api/test_periodical_task.c @@ -6,12 +6,19 @@ * "LICENSE" at the root of this distribution. */ +#include "accelerator/core/apis.h" #include "accelerator/core/core.h" +#include "accelerator/core/periodical_task.h" #include "tests/common.h" #include "tests/test_define.h" -static ta_core_t ta_core; #define TXN_NUM_IN_BUNDLE 2 +#define MAM_REQ_NUM 2 +#define ORDERED_PAYLOAD "test payload number" + +static ta_core_t ta_core; +static char* response_uuid[MAM_REQ_NUM]; + status_t prepare_transfer(const iota_config_t* const iconf, const iota_client_service_t* const service, ta_send_transfer_req_t** req, const int req_txn_num, hash8019_array_p raw_txn_array) { status_t ret = SC_OK; @@ -207,6 +214,39 @@ void test_fetch_txn_with_uuid(void) { ta_fetch_txn_with_uuid_res_free(&res); } +void test_broadcast_buffered_mam(void) { + const char* json_template = "{\"x-api-key\":\"" TEST_TOKEN "\",\"data\":{\"seed\":\"%s\",\"ch_mss_depth\":" STR( + TEST_CH_DEPTH) ",\"message\":\"" ORDERED_PAYLOAD ":%d\"}, \"protocol\":\"MAM_V1\"}"; + char seed[NUM_TRYTES_ADDRESS + 1] = {}; + gen_rand_trytes(NUM_TRYTES_ADDRESS, (tryte_t*)seed); + const int len = strlen(json_template) + NUM_TRYTES_ADDRESS; + int list_len = -1; + + TEST_ASSERT_EQUAL_INT32(SC_OK, cache_list_size(ta_core.cache.mam_buffer_list_name, &list_len)); + const int init_list_len = list_len; + + for (int i = 0; i < MAM_REQ_NUM; i++) { + char* json_result = NULL; + char* json = (char*)malloc(sizeof(char) * len); + snprintf(json, len, json_template, seed, i); + ta_send_mam_req_t* req = send_mam_req_new(); + TEST_ASSERT_EQUAL_INT32(SC_OK, api_send_mam_message(&ta_core.cache, json, &json_result)); + response_uuid[i] = json_result; + free(json); + send_mam_req_free(&req); + } + + TEST_ASSERT_EQUAL_INT32(SC_OK, cache_list_size(ta_core.cache.mam_buffer_list_name, &list_len)); + TEST_ASSERT_EQUAL_INT32(init_list_len + MAM_REQ_NUM, list_len); + + // Take action to broadcast transactions in buffer + TEST_ASSERT_EQUAL_INT32(SC_OK, broadcast_buffered_send_mam_request(&ta_core)); + + // The length of the buffer list should be zero, since all the transaction objects have been popped out. + TEST_ASSERT_EQUAL_INT32(SC_OK, cache_list_size(ta_core.cache.mam_buffer_list_name, &list_len)); + TEST_ASSERT_EQUAL_INT32(init_list_len, list_len); +} + int main(int argc, char* argv[]) { rand_trytes_init(); @@ -220,9 +260,18 @@ int main(int argc, char* argv[]) { ta_core_set(&ta_core); ta_logger_switch(false, true, &ta_core.ta_conf); + const int list_name_len = 15; + ta_core.cache.mam_buffer_list_name = (char*)malloc(list_name_len); + gen_rand_trytes(list_name_len - 1, (tryte_t*)ta_core.cache.mam_buffer_list_name); + ta_core.cache.mam_buffer_list_name[list_name_len - 1] = 0; + ta_core.cache.mam_complete_list_name = (char*)malloc(list_name_len); + gen_rand_trytes(list_name_len - 1, (tryte_t*)ta_core.cache.mam_complete_list_name); + ta_core.cache.mam_complete_list_name[list_name_len - 1] = 0; + UNITY_BEGIN(); RUN_TEST(test_broadcast_buffered_txn); RUN_TEST(test_fetch_txn_with_uuid); + RUN_TEST(test_broadcast_buffered_mam); ta_core_destroy(&ta_core); return UNITY_END(); } diff --git a/tests/unit-test/test_cache.c b/tests/unit-test/test_cache.c index 3d630d4e..41f8e146 100644 --- a/tests/unit-test/test_cache.c +++ b/tests/unit-test/test_cache.c @@ -20,10 +20,11 @@ void test_cache_del(void) { void test_cache_get(void) { char* key = test_uuid; - char res[strlen(CACHE_VALUE) + 1]; + char* res = NULL; - TEST_ASSERT_EQUAL_INT(SC_OK, cache_get(key, res)); + TEST_ASSERT_EQUAL_INT(SC_OK, cache_get(key, &res)); TEST_ASSERT_EQUAL_STRING(CACHE_VALUE, res); + free(res); } void test_cache_set(void) { @@ -34,12 +35,15 @@ void test_cache_set(void) { void test_cache_timeout(void) { char* key = test_uuid; - char res[strlen(CACHE_VALUE) + 1]; + char* res = NULL; const int timeout = 2; TEST_ASSERT_EQUAL_INT(SC_OK, cache_set(key, strlen(key), CACHE_VALUE, strlen(CACHE_VALUE), timeout)); - TEST_ASSERT_EQUAL_INT(SC_OK, cache_get(key, res)); + TEST_ASSERT_EQUAL_INT(SC_OK, cache_get(key, &res)); + free(res); + res = NULL; sleep(timeout + 1); - TEST_ASSERT_EQUAL_INT(SC_CACHE_FAILED_RESPONSE, cache_get(key, res)); + TEST_ASSERT_EQUAL_INT(SC_CACHE_FAILED_RESPONSE, cache_get(key, &res)); + free(res); } void test_generate_uuid(void) { diff --git a/tests/unit-test/test_serializer.c b/tests/unit-test/test_serializer.c index 6187f4b8..711084f0 100644 --- a/tests/unit-test/test_serializer.c +++ b/tests/unit-test/test_serializer.c @@ -392,7 +392,7 @@ void test_send_mam_message_response_serialize(void) { send_mam_res_set_announce_bundle_hash(res, (tryte_t*)ADDRESS_1); send_mam_res_set_chid1(res, (tryte_t*)ADDRESS_2); - send_mam_message_res_serialize(res, &json_result); + send_mam_message_res_serialize(res, NULL, &json_result); TEST_ASSERT_EQUAL_STRING(json, json_result); free(json_result); diff --git a/utils/cache/backend_redis.c b/utils/cache/backend_redis.c index d7e7324f..faba6b09 100644 --- a/utils/cache/backend_redis.c +++ b/utils/cache/backend_redis.c @@ -52,7 +52,7 @@ static status_t redis_del(redisContext* c, const char* const key) { return ret; } -static status_t redis_get(redisContext* c, const char* const key, char* res) { +static status_t redis_get(redisContext* c, const char* const key, char** res) { status_t ret = SC_OK; if (key == NULL) { ta_log_error("%s\n", ta_error_to_string(SC_NULL)); @@ -61,8 +61,7 @@ static status_t redis_get(redisContext* c, const char* const key, char* res) { redisReply* reply = redisCommand(c, "GET %s", key); if (reply->type == REDIS_REPLY_STRING) { - strncpy(res, reply->str, reply->len); - res[reply->len] = 0; + *res = strdup(reply->str); } else { ret = SC_CACHE_FAILED_RESPONSE; ta_log_error("%s\n", ta_error_to_string(ret)); @@ -309,7 +308,7 @@ status_t cache_del(const char* const key) { return redis_del(CONN(cache)->rc, key); } -status_t cache_get(const char* const key, char* res) { +status_t cache_get(const char* const key, char** res) { if (!state) { ta_log_debug("%s\n", ta_error_to_string(SC_CACHE_OFF)); return SC_CACHE_OFF; diff --git a/utils/cache/cache.h b/utils/cache/cache.h index 3828088b..ac0b03a5 100644 --- a/utils/cache/cache.h +++ b/utils/cache/cache.h @@ -75,7 +75,7 @@ status_t cache_del(const char* const key); * - SC_OK on success * - non-zero on error */ -status_t cache_get(const char* const key, char* res); +status_t cache_get(const char* const key, char** res); /** * @brief Set key-value storage in in-memory cache