From 84e3dc64a2cff568487468d485fcaf6b7e7c281a Mon Sep 17 00:00:00 2001 From: Davit Vardanyan <78792753+davvard@users.noreply.github.com> Date: Fri, 19 May 2023 16:50:09 +0400 Subject: [PATCH 01/15] Make: Redis build --- .vscode/launch.json | 18 ++++++++++++++++ .vscode/tasks.json | 6 ++++++ CMakeLists.txt | 13 ++++++++++++ cmake/hiredis.cmake | 51 +++++++++++++++++++++++++++++++++++++++++++++ cmake/redis.cmake | 51 +++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 139 insertions(+) create mode 100644 cmake/hiredis.cmake create mode 100644 cmake/redis.cmake diff --git a/.vscode/launch.json b/.vscode/launch.json index 5a8f72852..60e156638 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -314,6 +314,24 @@ "MIMode": "gdb", "miDebuggerPath": "/usr/bin/gdb" }, + { + "name": "C++: Test Redis Client", + "type": "cppdbg", + "request": "launch", + "program": "${workspaceFolder}/build_debug/build/bin/test_units_ustore_redis_client", + "args": [ + "--gtest_break_on_failure", + "--gtest_catch_exceptions=0" + ], + "cwd": "${workspaceFolder}", + "environment": [], + "showDisplayString": true, + "stopAtEntry": false, + "externalConsole": false, + "preLaunchTask": "Build Debug Redis Client", + "MIMode": "gdb", + "miDebuggerPath": "/usr/bin/gdb" + }, { "name": "C++: Test Arrow Server", "type": "cppdbg", diff --git a/.vscode/tasks.json b/.vscode/tasks.json index c10ed39ed..b36384482 100644 --- a/.vscode/tasks.json +++ b/.vscode/tasks.json @@ -91,6 +91,12 @@ "args": [], "type": "shell" }, + { + "label": "Build Debug Redis Client", + "command": "cmake -DCMAKE_BUILD_TYPE=Debug -DUSTORE_BUILD_API_REDIS_CLIENT=1 -B ./build_debug && make test_units_ustore_redis_client -j --silent -C ./build_debug && sleep 5", + "args": [], + "type": "shell" + }, { "label": "Build Debug Arrow Server", "command": "cmake -DCMAKE_BUILD_TYPE=Debug -DUSTORE_BUILD_API_FLIGHT_SERVER=1 -B ./build_debug && make ustore_flight_server_ucset -j --silent -C ./build_debug", diff --git a/CMakeLists.txt b/CMakeLists.txt index 90a72c2f9..0e457dc0f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -21,6 +21,7 @@ option(USTORE_BUILD_TOOLS "Building tools for UStore") option(USTORE_BUILD_BUNDLES "Building bundles libraries for GoLang and Java SDKs") option(USTORE_BUILD_SDK_PYTHON "Building Python bidings for all SDKs") option(USTORE_BUILD_API_FLIGHT "Building Apache Arrow Flight RPC server and client for all backends") +option(USTORE_BUILD_API_REDIS_CLIENT "Building Redis client for all backends") option(USTORE_BUILD_API_FLIGHT_CLIENT "Building Apache Arrow Flight RPC client for all backends") option(USTORE_BUILD_API_FLIGHT_SERVER "Building Apache Arrow Flight RPC server for all backends") option(USTORE_BUILD_API_REST_SERVER "Building REST API server for all backends") @@ -180,6 +181,10 @@ if(${USTORE_BUILD_API_FLIGHT_CLIENT} OR ${USTORE_BUILD_API_FLIGHT_SERVER}) include("${CMAKE_CURRENT_SOURCE_DIR}/cmake/openssl.cmake") endif() +if(${USTORE_BUILD_API_REDIS_CLIENT}) + include("${CMAKE_CURRENT_SOURCE_DIR}/cmake/redis.cmake") +endif() + if(${USTORE_BUILD_API_FLIGHT_SERVER}) include("${CMAKE_CURRENT_SOURCE_DIR}/cmake/clipp.cmake") endif() @@ -261,6 +266,14 @@ if(${USTORE_BUILD_API_FLIGHT_CLIENT}) list(APPEND USTORE_CLIENT_LIBS "ustore_flight_client") endif() +if(${USTORE_BUILD_API_REDIS_CLIENT}) + add_library(ustore_redis_client src/redis_client.cpp src/modality_docs.cpp src/modality_graph.cpp src/modality_vectors.cpp) + target_link_libraries(ustore_redis_client pthread yyjson simdjson bson redis) + target_compile_definitions(ustore_redis_client INTERFACE USTORE_REDIS_CLIENT=TRUE) + list(APPEND USTORE_CLIENT_NAMES "redis_client") + list(APPEND USTORE_CLIENT_LIBS "ustore_redis_client") +endif() + if(${USTORE_BUILD_API_FLIGHT_SERVER}) foreach(engine_name IN ITEMS ${USTORE_ENGINE_NAMES}) string(CONCAT embedded_lib_name "ustore_embedded_" ${engine_name}) diff --git a/cmake/hiredis.cmake b/cmake/hiredis.cmake new file mode 100644 index 000000000..8559249a5 --- /dev/null +++ b/cmake/hiredis.cmake @@ -0,0 +1,51 @@ +include(ExternalProject) + +set(PREFIX_DIR ${CMAKE_BINARY_DIR}/_deps) +ExternalProject_Add( + hiredis_external + + GIT_REPOSITORY "https://github.com/redis/hiredis.git" + GIT_TAG v1.1.0 + GIT_SHALLOW 1 + GIT_PROGRESS 0 + + PREFIX "${PREFIX_DIR}" + DOWNLOAD_DIR "${PREFIX_DIR}/hiredis-src" + LOG_DIR "${PREFIX_DIR}/hiredis-log" + STAMP_DIR "${PREFIX_DIR}/hiredis-stamp" + TMP_DIR "${PREFIX_DIR}/hiredis-tmp" + SOURCE_DIR "${PREFIX_DIR}/hiredis-src" + INSTALL_DIR "${PREFIX_DIR}/hiredis-install" + BINARY_DIR "${PREFIX_DIR}/hiredis-build" + + BUILD_ALWAYS 0 + UPDATE_COMMAND "" + + CMAKE_ARGS + -DCMAKE_INSTALL_PREFIX:PATH=${PREFIX_DIR}/hiredis-install + -DCMAKE_INSTALL_LIBDIR=lib + -DCMAKE_INSTALL_RPATH:PATH=/lib + -DCMAKE_BUILD_TYPE:STRING=${CMAKE_BUILD_TYPE} + -DENABLE_STATIC:STRING=ON + -DENABLE_CPPSUITE:BOOL=OFF + -DCMAKE_C_FLAGS=-Wno-maybe-uninitialized + -DCMAKE_CXX_FLAGS=-Wno-unused-variable + -DENABLE_SSL:BOOL=OFF + -DDISABLE_TESTS:BOOL=ON + -DENABLE_SSL_TESTS:BOOL=OFF + -DENABLE_ASYNC_TESTS:BOOL=OFF +) + +set(hiredis_INCLUDE_DIR ${PREFIX_DIR}/hiredis-install/include) +if(CMAKE_BUILD_TYPE MATCHES "Debug") + set(hiredis_LIBRARY_PATH ${PREFIX_DIR}/hiredis-install/lib/libhiredisd.a) +else() + set(hiredis_LIBRARY_PATH ${PREFIX_DIR}/hiredis-install/lib/libhiredis.a) +endif() + +file(MAKE_DIRECTORY ${hiredis_INCLUDE_DIR}) +add_library(hiredis STATIC IMPORTED) +set_property(TARGET hiredis PROPERTY IMPORTED_LOCATION ${hiredis_LIBRARY_PATH}) +set_property(TARGET hiredis APPEND PROPERTY INTERFACE_INCLUDE_DIRECTORIES ${hiredis_INCLUDE_DIR}) +include_directories(${hiredis_INCLUDE_DIR}) +add_dependencies(hiredis hiredis_external) \ No newline at end of file diff --git a/cmake/redis.cmake b/cmake/redis.cmake new file mode 100644 index 000000000..d7dfffd30 --- /dev/null +++ b/cmake/redis.cmake @@ -0,0 +1,51 @@ +include(ExternalProject) + +include("${CMAKE_MODULE_PATH}/hiredis.cmake") +set(PREFIX_DIR ${CMAKE_BINARY_DIR}/_deps) + +ExternalProject_Add( + redis_external + + GIT_REPOSITORY "https://github.com/sewenew/redis-plus-plus.git" + GIT_TAG 1.3.7 + GIT_SHALLOW 1 + GIT_PROGRESS 0 + + PREFIX "${PREFIX_DIR}" + DOWNLOAD_DIR "${PREFIX_DIR}/redis-src" + LOG_DIR "${PREFIX_DIR}/redis-log" + STAMP_DIR "${PREFIX_DIR}/redis-stamp" + TMP_DIR "${PREFIX_DIR}/redis-tmp" + SOURCE_DIR "${PREFIX_DIR}/redis-src" + INSTALL_DIR "${PREFIX_DIR}/redis-install" + BINARY_DIR "${PREFIX_DIR}/redis-build" + + BUILD_ALWAYS 0 + UPDATE_COMMAND "" + + CMAKE_ARGS + -DCMAKE_PREFIX_PATH:PATH=${PREFIX_DIR}/hiredis-install + -DCMAKE_INSTALL_PREFIX:PATH=${PREFIX_DIR}/redis-install + -DCMAKE_INSTALL_LIBDIR=lib + -DCMAKE_INSTALL_RPATH:PATH=/lib + -DCMAKE_BUILD_TYPE:STRING=${CMAKE_BUILD_TYPE} + -DENABLE_STATIC:STRING=ON + -DENABLE_CPPSUITE:BOOL=OFF + -DCMAKE_C_FLAGS=-Wno-maybe-uninitialized + -DCMAKE_CXX_FLAGS=-Wno-unused-variable + -DREDIS_PLUS_PLUS_BUILD_TEST:BOOL=OFF +) + +set(redis_INCLUDE_DIR ${PREFIX_DIR}/redis-install/include) +set(redis_LIBRARY_PATH ${PREFIX_DIR}/redis-install/lib/libredis++.a) + +file(MAKE_DIRECTORY ${redis_INCLUDE_DIR}) +add_library(redis STATIC IMPORTED) +set_target_properties(redis PROPERTIES IMPORTED_LINK_INTERFACE_LIBRARIES hiredis) + +set_property(TARGET redis PROPERTY IMPORTED_LOCATION ${redis_LIBRARY_PATH}) +set_property(TARGET redis APPEND PROPERTY INTERFACE_INCLUDE_DIRECTORIES ${redis_INCLUDE_DIR}) + +include_directories(${redis_INCLUDE_DIR}) +add_dependencies(redis_external hiredis) +add_dependencies(redis redis_external) \ No newline at end of file From 33f96fbf0a95169ff442ed66f21647b0da8ad3ad Mon Sep 17 00:00:00 2001 From: Davit Vardanyan <78792753+davvard@users.noreply.github.com> Date: Fri, 19 May 2023 16:50:53 +0400 Subject: [PATCH 02/15] Add: Redis Client --- src/redis_client.cpp | 339 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 339 insertions(+) create mode 100644 src/redis_client.cpp diff --git a/src/redis_client.cpp b/src/redis_client.cpp new file mode 100644 index 000000000..e76129a39 --- /dev/null +++ b/src/redis_client.cpp @@ -0,0 +1,339 @@ +#include + +#include + +#include "ustore/db.h" +#include "ustore/cpp/types.hpp" +#include "helpers/linked_memory.hpp" +#include "helpers/linked_array.hpp" +#include "ustore/cpp/ranges_args.hpp" // `places_arg_t` + +/*********************************************************/ +/***************** Structures & Consts ****************/ +/*********************************************************/ + +ustore_collection_t const ustore_collection_main_k = 0; +ustore_length_t const ustore_length_missing_k = std::numeric_limits::max(); +ustore_key_t const ustore_key_unknown_k = std::numeric_limits::max(); +bool const ustore_supports_transactions_k = false; +bool const ustore_supports_named_collections_k = true; +bool const ustore_supports_snapshots_k = false; +static const char kDefaultCollectionName[] = "default"; + +/*********************************************************/ +/***************** C++ Implementation ****************/ +/*********************************************************/ + +using namespace unum::ustore; +using namespace unum; +using namespace sw::redis; + +struct redis_client_t { + std::unique_ptr native; + std::vector collections; +}; + +inline StringView to_string_view(byte_t const* p, size_t size_bytes) noexcept { + return {reinterpret_cast(p), size_bytes}; +} + +inline StringView to_string_view(ustore_key_t const& k) noexcept { + return {reinterpret_cast(&k), sizeof(ustore_key_t)}; +} + +StringView redis_collection(ustore_collection_t collection) { + return collection == ustore_collection_main_k ? kDefaultCollectionName : reinterpret_cast(collection); +} + +/*********************************************************/ +/***************** C Interface ****************/ +/*********************************************************/ + +void ustore_database_init(ustore_database_init_t* c_ptr) { + ustore_database_init_t& c = *c_ptr; + + safe_section("Starting client", c.error, [&] { + ConnectionOptions connection_options; + connection_options.host = "127.0.0.1"; + connection_options.port = 6379; + + redis_client_t* db_ptr = new redis_client_t; + db_ptr->native = std::make_unique(connection_options); + db_ptr->native->keys("*", std::back_inserter(db_ptr->collections)); + auto it = std::find(db_ptr->collections.begin(), db_ptr->collections.end(), kDefaultCollectionName); + if (it != db_ptr->collections.end()) + db_ptr->collections.erase(it); + *c.db = db_ptr; + }); +} + +void ustore_read(ustore_read_t* c_ptr) { + + ustore_read_t& c = *c_ptr; + return_error_if_m(c.db, c.error, uninitialized_state_k, "DataBase is uninitialized"); + redis_client_t& db = *reinterpret_cast(c.db); + + linked_memory_lock_t arena = linked_memory(c.arena, c.options, c.error); + return_if_error_m(c.error); + + strided_iterator_gt keys {c.keys, c.keys_stride}; + strided_iterator_gt collections {c.collections, c.collections_stride}; + places_arg_t places {collections, keys, {}, c.tasks_count}; + validate_read(c.transaction, places, c.options, c.error); + return_if_error_m(c.error); + + // 1. Allocate a tape for all the values to be pulled + auto offs = arena.alloc_or_dummy(places.count + 1, c.error, c.offsets); + return_if_error_m(c.error); + auto lens = arena.alloc_or_dummy(places.count, c.error, c.lengths); + return_if_error_m(c.error); + auto presences = arena.alloc_or_dummy(places.count, c.error, c.presences); + return_if_error_m(c.error); + bool const needs_export = c.values != nullptr; + + safe_section("Reading values", c.error, [&] { + uninitialized_array_gt contents(arena); + for (std::size_t i = 0; i != places.size(); ++i) { + place_t place = places[i]; + auto value = db.native->hget(redis_collection(place.collection), to_string_view(place.key)); + presences[i] = bool(value); + lens[i] = value ? value->size() : ustore_length_missing_k; + offs[i] = contents.size(); + if (needs_export && value) + contents.insert(contents.size(), + (byte_t*)value->data(), + (byte_t*)(value->data() + value->size()), + c.error); + } + offs[places.count] = contents.size(); + if (needs_export) + *c.values = reinterpret_cast(contents.begin()); + }); +} + +void ustore_write(ustore_write_t* c_ptr) { + ustore_write_t& c = *c_ptr; + return_error_if_m(c.db, c.error, uninitialized_state_k, "DataBase is uninitialized"); + + redis_client_t& db = *reinterpret_cast(c.db); + strided_iterator_gt keys {c.keys, c.keys_stride}; + strided_iterator_gt collections {c.collections, c.collections_stride}; + strided_iterator_gt vals {c.values, c.values_stride}; + strided_iterator_gt offs {c.offsets, c.offsets_stride}; + strided_iterator_gt lens {c.lengths, c.lengths_stride}; + bits_view_t presences {c.presences}; + + places_arg_t places {collections, keys, {}, c.tasks_count}; + contents_arg_t contents {presences, offs, lens, vals, c.tasks_count}; + + validate_write(c.transaction, places, contents, c.options, c.error); + return_if_error_m(c.error); + + safe_section("Writing values", c.error, [&] { + for (std::size_t i = 0; i != places.size(); ++i) { + auto place = places[i]; + auto content = contents[i]; + auto collection = redis_collection(place.collection); + if (content) + db.native->hset(collection, to_string_view(place.key), to_string_view(content.data(), content.size())); + else + db.native->hdel(collection, to_string_view(place.key)); + } + }); +} + +void ustore_paths_write(ustore_paths_write_t* c_ptr) { +} + +void ustore_paths_match(ustore_paths_match_t* c_ptr) { +} + +void ustore_paths_read(ustore_paths_read_t* c_ptr) { +} + +void ustore_scan(ustore_scan_t* c_ptr) { + ustore_scan_t& c = *c_ptr; + return_error_if_m(c.db, c.error, uninitialized_state_k, "DataBase is uninitialized"); + + linked_memory_lock_t arena = linked_memory(c.arena, c.options, c.error); + return_if_error_m(c.error); + + redis_client_t& db = *reinterpret_cast(c.db); + strided_iterator_gt start_keys {c.start_keys, c.start_keys_stride}; + strided_iterator_gt limits {c.count_limits, c.count_limits_stride}; + strided_iterator_gt collections {c.collections, c.collections_stride}; + scans_arg_t scans {collections, start_keys, limits, c.tasks_count}; + + // 1. Allocate a tape for all the values to be fetched + auto offsets = arena.alloc_or_dummy(scans.count + 1, c.error, c.offsets); + return_if_error_m(c.error); + auto counts = arena.alloc_or_dummy(scans.count, c.error, c.counts); + return_if_error_m(c.error); + + auto total_keys = reduce_n(scans.limits, scans.count, 0ul); + auto keys_output = *c.keys = arena.alloc(total_keys, c.error).begin(); + return_if_error_m(c.error); + + safe_section("Scanning keys", c.error, [&] { + std::vector keys; + for (ustore_size_t i = 0; i != c.tasks_count; ++i) { + auto scan = scans[i]; + offsets[i] = keys_output - *c.keys; + db.native->hkeys(redis_collection(scan.collection), std::inserter(keys, keys.begin())); + + for (std::size_t i = 0; i != keys.size(); ++i) { + *keys_output = *reinterpret_cast(keys[i].data()); + ++keys_output; + } + counts[i] = keys.size(); + keys.clear(); + } + }); + + offsets[scans.size()] = keys_output - *c.keys; +} + +void ustore_sample(ustore_sample_t* c_ptr) { +} + +void ustore_measure(ustore_measure_t* c_ptr) { +} + +/*********************************************************/ +/***************** Collections Management ****************/ +/*********************************************************/ + +void ustore_collection_create(ustore_collection_create_t* c_ptr) { + ustore_collection_create_t& c = *c_ptr; + auto name_len = c.name ? std::strlen(c.name) : 0; + return_error_if_m(name_len, c.error, args_wrong_k, "Default collection is always present"); + return_error_if_m(c.db, c.error, uninitialized_state_k, "DataBase is uninitialized"); + + redis_client_t& db = *reinterpret_cast(c.db); + for (auto& collection : db.collections) + return_error_if_m(collection != c.name, c.error, args_wrong_k, "Such collection already exists!"); + + db.collections.push_back(c.name); + *c.id = reinterpret_cast(db.collections.back().data()); +} + +void ustore_collection_drop(ustore_collection_drop_t* c_ptr) { + ustore_collection_drop_t& c = *c_ptr; + return_error_if_m(c.db, c.error, uninitialized_state_k, "DataBase is uninitialized"); + bool invalidate = c.mode == ustore_drop_keys_vals_handle_k; + return_error_if_m(c.id != ustore_collection_main_k || !invalidate, + c.error, + args_combo_k, + "Default collection can't be invalidated."); + + redis_client_t& db = *reinterpret_cast(c.db); + auto collection = redis_collection(c.id); + + safe_section("Dropping collection", c.error, [&] { + if (c.mode == ustore_drop_keys_vals_handle_k) { + db.native->del(collection); + auto it = std::find(db.collections.begin(), db.collections.end(), collection); + if (it != db.collections.end()) + db.collections.erase(it); + } + else { + std::vector keys; + db.native->hkeys(collection, std::back_inserter(keys)); + for (const auto& key : keys) + c.mode == ustore_drop_keys_vals_k ? db.native->hdel(collection, key) + : db.native->hset(collection, key, ""); + } + }); +} + +void ustore_collection_list(ustore_collection_list_t* c_ptr) { + ustore_collection_list_t& c = *c_ptr; + return_error_if_m(c.db, c.error, uninitialized_state_k, "DataBase is uninitialized"); + return_error_if_m(c.count && c.names, c.error, args_combo_k, "Need names and outputs!"); + + linked_memory_lock_t arena = linked_memory(c.arena, c.options, c.error); + return_if_error_m(c.error); + + redis_client_t& db = *reinterpret_cast(c.db); + std::size_t collections_count = db.collections.size(); + *c.count = static_cast(collections_count); + + // Every string will be null-terminated + std::size_t strings_length = 0; + for (auto const& collection : db.collections) + strings_length += collection.size() + 1; + + auto names = arena.alloc(strings_length, c.error).begin(); + return_if_error_m(c.error); + *c.names = names; + + // For every collection we also need to export IDs and offsets + auto ids = arena.alloc_or_dummy(collections_count, c.error, c.ids); + return_if_error_m(c.error); + auto offs = arena.alloc_or_dummy(collections_count + 1, c.error, c.offsets); + return_if_error_m(c.error); + + std::size_t i = 0; + for (auto const& collection : db.collections) { + auto len = collection.size(); + std::memcpy(names, collection.data(), len); + names[len] = '\0'; + ids[i] = reinterpret_cast(collection.c_str()); + offs[i] = static_cast(names - *c.names); + names += len + 1; + ++i; + } + offs[i] = static_cast(names - *c.names); +} + +void ustore_database_control(ustore_database_control_t* c_ptr) { +} + +/*********************************************************/ +/***************** Snapshots ****************/ +/*********************************************************/ +void ustore_snapshot_list(ustore_snapshot_list_t* c_ptr) { + ustore_snapshot_list_t& c = *c_ptr; + *c.error = "Snapshots not supported by Redis!"; +} + +void ustore_snapshot_create(ustore_snapshot_create_t* c_ptr) { + ustore_snapshot_create_t& c = *c_ptr; + *c.error = "Snapshots not supported by Redis!"; +} + +void ustore_snapshot_drop(ustore_snapshot_drop_t* c_ptr) { + ustore_snapshot_drop_t& c = *c_ptr; + *c.error = "Snapshots not supported by Redis!"; +} + +/*********************************************************/ +/***************** Transactions ****************/ +/*********************************************************/ + +void ustore_transaction_init(ustore_transaction_init_t* c_ptr) { +} + +void ustore_transaction_commit(ustore_transaction_commit_t* c_ptr) { +} + +/*********************************************************/ +/***************** Memory Management ****************/ +/*********************************************************/ + +void ustore_arena_free(ustore_arena_t c_arena) { + clear_linked_memory(c_arena); +} + +void ustore_transaction_free(ustore_transaction_t const c_transaction) { +} + +void ustore_database_free(ustore_database_t c_db) { + if (!c_db) + return; + redis_client_t& db = *reinterpret_cast(c_db); + delete &db; +} + +void ustore_error_free(ustore_error_t) { +} From 0ec649bf6690ab7cc6a9189075a9c1480d3507f5 Mon Sep 17 00:00:00 2001 From: Davit Vardanyan <78792753+davvard@users.noreply.github.com> Date: Fri, 19 May 2023 16:53:24 +0400 Subject: [PATCH 03/15] Add: Cleaning Redis --- tests/test_units.cpp | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/tests/test_units.cpp b/tests/test_units.cpp index 9de8878f4..77bcced9f 100644 --- a/tests/test_units.cpp +++ b/tests/test_units.cpp @@ -107,6 +107,25 @@ void clear_environment() { usleep(100000); // 0.1 sec #endif +#if defined(USTORE_REDIS_CLIENT) + int originalStdout = dup(STDOUT_FILENO); + int devNull = open("/dev/null", O_WRONLY); + EXPECT_NE(devNull, -1) << "open"; + EXPECT_NE(dup2(devNull, STDOUT_FILENO), -1) << "dup2"; + close(devNull); + + pid_t pid = fork(); + if (pid == -1) + EXPECT_TRUE(false) << "Failed To Clear Redis"; + else if (pid == 0) + EXPECT_NE(execl("/usr/bin/redis-cli", "redis-cli", "FLUSHALL", (char*)(NULL)), -1) << "Failed To Clear Redis"; + else + wait(NULL); + + EXPECT_NE(dup2(originalStdout, STDOUT_FILENO), -1) << "dup2"; + close(originalStdout); +#endif + namespace stdfs = std::filesystem; auto directory_str = path() ? std::string_view(path()) : ""; if (!directory_str.empty()) { From b043715e0ace9420840db42b50442598d1cc4dbe Mon Sep 17 00:00:00 2001 From: Davit Vardanyan <78792753+davvard@users.noreply.github.com> Date: Fri, 19 May 2023 17:07:59 +0400 Subject: [PATCH 04/15] Refactor: Redis client --- src/redis_client.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/redis_client.cpp b/src/redis_client.cpp index e76129a39..1a0410787 100644 --- a/src/redis_client.cpp +++ b/src/redis_client.cpp @@ -41,7 +41,7 @@ inline StringView to_string_view(ustore_key_t const& k) noexcept { return {reinterpret_cast(&k), sizeof(ustore_key_t)}; } -StringView redis_collection(ustore_collection_t collection) { +inline StringView redis_collection(ustore_collection_t collection) { return collection == ustore_collection_main_k ? kDefaultCollectionName : reinterpret_cast(collection); } From 34c675a02c74f1088069c3b2a1bf47f3c0815f08 Mon Sep 17 00:00:00 2001 From: Davit Vardanyan <78792753+davvard@users.noreply.github.com> Date: Fri, 19 May 2023 17:08:25 +0400 Subject: [PATCH 05/15] Add: Snapshot export --- src/redis_client.cpp | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/redis_client.cpp b/src/redis_client.cpp index 1a0410787..aa7db14c1 100644 --- a/src/redis_client.cpp +++ b/src/redis_client.cpp @@ -307,6 +307,11 @@ void ustore_snapshot_drop(ustore_snapshot_drop_t* c_ptr) { *c.error = "Snapshots not supported by Redis!"; } +void ustore_snapshot_export(ustore_snapshot_export_t* c_ptr) { + ustore_snapshot_export_t& c = *c_ptr; + *c.error = "Snapshots not supported by Redis!"; +} + /*********************************************************/ /***************** Transactions ****************/ /*********************************************************/ From b8f920408e08e7363baf28f54016e258fb94c61a Mon Sep 17 00:00:00 2001 From: Davit Vardanyan <78792753+davvard@users.noreply.github.com> Date: Mon, 22 May 2023 10:41:47 +0400 Subject: [PATCH 06/15] Make: Redis client modality paths --- CMakeLists.txt | 4 ++-- src/redis_client.cpp | 9 --------- 2 files changed, 2 insertions(+), 11 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 0e457dc0f..4344f10b1 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -267,8 +267,8 @@ if(${USTORE_BUILD_API_FLIGHT_CLIENT}) endif() if(${USTORE_BUILD_API_REDIS_CLIENT}) - add_library(ustore_redis_client src/redis_client.cpp src/modality_docs.cpp src/modality_graph.cpp src/modality_vectors.cpp) - target_link_libraries(ustore_redis_client pthread yyjson simdjson bson redis) + add_library(ustore_redis_client src/redis_client.cpp src/modality_paths.cpp src/modality_docs.cpp src/modality_graph.cpp src/modality_vectors.cpp) + target_link_libraries(ustore_redis_client pthread yyjson simdjson bson pcre2 redis) target_compile_definitions(ustore_redis_client INTERFACE USTORE_REDIS_CLIENT=TRUE) list(APPEND USTORE_CLIENT_NAMES "redis_client") list(APPEND USTORE_CLIENT_LIBS "ustore_redis_client") diff --git a/src/redis_client.cpp b/src/redis_client.cpp index aa7db14c1..3b75e7495 100644 --- a/src/redis_client.cpp +++ b/src/redis_client.cpp @@ -142,15 +142,6 @@ void ustore_write(ustore_write_t* c_ptr) { }); } -void ustore_paths_write(ustore_paths_write_t* c_ptr) { -} - -void ustore_paths_match(ustore_paths_match_t* c_ptr) { -} - -void ustore_paths_read(ustore_paths_read_t* c_ptr) { -} - void ustore_scan(ustore_scan_t* c_ptr) { ustore_scan_t& c = *c_ptr; return_error_if_m(c.db, c.error, uninitialized_state_k, "DataBase is uninitialized"); From 709de76cf477d753a853707e21af829a014bc336 Mon Sep 17 00:00:00 2001 From: Davit Vardanyan <78792753+davvard@users.noreply.github.com> Date: Tue, 23 May 2023 13:36:06 +0400 Subject: [PATCH 07/15] Add: Redis transaction --- src/redis_client.cpp | 114 +++++++++++++++++++++++++++++++++++-------- 1 file changed, 94 insertions(+), 20 deletions(-) diff --git a/src/redis_client.cpp b/src/redis_client.cpp index 3b75e7495..f817b4286 100644 --- a/src/redis_client.cpp +++ b/src/redis_client.cpp @@ -15,7 +15,7 @@ ustore_collection_t const ustore_collection_main_k = 0; ustore_length_t const ustore_length_missing_k = std::numeric_limits::max(); ustore_key_t const ustore_key_unknown_k = std::numeric_limits::max(); -bool const ustore_supports_transactions_k = false; +bool const ustore_supports_transactions_k = true; bool const ustore_supports_named_collections_k = true; bool const ustore_supports_snapshots_k = false; static const char kDefaultCollectionName[] = "default"; @@ -26,25 +26,50 @@ static const char kDefaultCollectionName[] = "default"; using namespace unum::ustore; using namespace unum; -using namespace sw::redis; -struct redis_client_t { - std::unique_ptr native; - std::vector collections; -}; - -inline StringView to_string_view(byte_t const* p, size_t size_bytes) noexcept { +inline sw::redis::StringView to_string_view(byte_t const* p, size_t size_bytes) noexcept { return {reinterpret_cast(p), size_bytes}; } -inline StringView to_string_view(ustore_key_t const& k) noexcept { +inline sw::redis::StringView to_string_view(ustore_key_t const& k) noexcept { return {reinterpret_cast(&k), sizeof(ustore_key_t)}; } -inline StringView redis_collection(ustore_collection_t collection) { +inline sw::redis::StringView redis_collection(ustore_collection_t collection) { return collection == ustore_collection_main_k ? kDefaultCollectionName : reinterpret_cast(collection); } +struct redis_txn_t { + std::unique_ptr native; + std::map uncommited; + + sw::redis::OptionalString get_uncommited(ustore_collection_t collection, ustore_key_t key) { + auto it = uncommited.find({collection, key}); + if (it != uncommited.end()) + return it->second; + } + + void set(ustore_collection_t collection, ustore_key_t key, value_view_t value) { + native->hset(redis_collection(collection), to_string_view(key), to_string_view(value.data(), value.size())); + uncommited[{collection, key}] = std::string((const char*)value.data(), value.size()); + } + + void del(ustore_collection_t collection, ustore_key_t key) { + native->hdel(redis_collection(collection), to_string_view(key)); + uncommited.erase({collection, key}); + } + + void exec() { + native->exec(); + uncommited.clear(); + } +}; + +struct redis_client_t { + std::unique_ptr native; + std::vector collections; +}; + /*********************************************************/ /***************** C Interface ****************/ /*********************************************************/ @@ -53,12 +78,13 @@ void ustore_database_init(ustore_database_init_t* c_ptr) { ustore_database_init_t& c = *c_ptr; safe_section("Starting client", c.error, [&] { - ConnectionOptions connection_options; + sw::redis::ConnectionOptions connection_options; connection_options.host = "127.0.0.1"; connection_options.port = 6379; redis_client_t* db_ptr = new redis_client_t; - db_ptr->native = std::make_unique(connection_options); + db_ptr->native = std::make_unique(connection_options); + db_ptr->native->keys("*", std::back_inserter(db_ptr->collections)); auto it = std::find(db_ptr->collections.begin(), db_ptr->collections.end(), kDefaultCollectionName); if (it != db_ptr->collections.end()) @@ -72,6 +98,7 @@ void ustore_read(ustore_read_t* c_ptr) { ustore_read_t& c = *c_ptr; return_error_if_m(c.db, c.error, uninitialized_state_k, "DataBase is uninitialized"); redis_client_t& db = *reinterpret_cast(c.db); + redis_txn_t& txn = *reinterpret_cast(c.transaction); linked_memory_lock_t arena = linked_memory(c.arena, c.options, c.error); return_if_error_m(c.error); @@ -95,15 +122,27 @@ void ustore_read(ustore_read_t* c_ptr) { uninitialized_array_gt contents(arena); for (std::size_t i = 0; i != places.size(); ++i) { place_t place = places[i]; - auto value = db.native->hget(redis_collection(place.collection), to_string_view(place.key)); + sw::redis::OptionalString value; + + if (c.transaction) { + value = txn.get_uncommited(place.collection, place.key); + if (!value) + value = db.native->hget(redis_collection(place.collection), to_string_view(place.key)); + txn.native->hget(redis_collection(place.collection), to_string_view(place.key)); + } + else + value = db.native->hget(redis_collection(place.collection), to_string_view(place.key)); + + offs[i] = contents.size(); presences[i] = bool(value); lens[i] = value ? value->size() : ustore_length_missing_k; - offs[i] = contents.size(); - if (needs_export && value) + if (needs_export && value) { contents.insert(contents.size(), (byte_t*)value->data(), (byte_t*)(value->data() + value->size()), c.error); + return_if_error_m(c.error); + } } offs[places.count] = contents.size(); if (needs_export) @@ -116,6 +155,7 @@ void ustore_write(ustore_write_t* c_ptr) { return_error_if_m(c.db, c.error, uninitialized_state_k, "DataBase is uninitialized"); redis_client_t& db = *reinterpret_cast(c.db); + redis_txn_t& txn = *reinterpret_cast(c.transaction); strided_iterator_gt keys {c.keys, c.keys_stride}; strided_iterator_gt collections {c.collections, c.collections_stride}; strided_iterator_gt vals {c.values, c.values_stride}; @@ -133,11 +173,20 @@ void ustore_write(ustore_write_t* c_ptr) { for (std::size_t i = 0; i != places.size(); ++i) { auto place = places[i]; auto content = contents[i]; - auto collection = redis_collection(place.collection); - if (content) - db.native->hset(collection, to_string_view(place.key), to_string_view(content.data(), content.size())); - else - db.native->hdel(collection, to_string_view(place.key)); + if (content) { + if (c.transaction) + txn.set(place.collection, place.key, content); + else + db.native->hset(redis_collection(place.collection), + to_string_view(place.key), + to_string_view(content.data(), content.size())); + } + else { + if (c.transaction) + txn.del(place.collection, place.key); + else + db.native->hdel(redis_collection(place.collection), to_string_view(place.key)); + } } }); } @@ -308,9 +357,31 @@ void ustore_snapshot_export(ustore_snapshot_export_t* c_ptr) { /*********************************************************/ void ustore_transaction_init(ustore_transaction_init_t* c_ptr) { + ustore_transaction_init_t& c = *c_ptr; + return_error_if_m(c.db, c.error, uninitialized_state_k, "DataBase is uninitialized"); + validate_transaction_begin(c.transaction, c.options, c.error); + return_if_error_m(c.error); + + redis_client_t& db = *reinterpret_cast(c.db); + safe_section("Initializing Transaction", c.error, [&] { + auto txn_ptr = new redis_txn_t; + txn_ptr->native = std::make_unique(db.native->transaction()); + *c.transaction = txn_ptr; + }); } void ustore_transaction_commit(ustore_transaction_commit_t* c_ptr) { + ustore_transaction_commit_t& c = *c_ptr; + if (!c.transaction) + return; + + validate_transaction_commit(c.transaction, c.options, c.error); + return_if_error_m(c.error); + + redis_client_t& db = *reinterpret_cast(c.db); + redis_txn_t& txn = *reinterpret_cast(c.transaction); + safe_section("Commiting Transaction", c.error, [&] { txn.exec(); }); + return_if_error_m(c.error); } /*********************************************************/ @@ -322,6 +393,9 @@ void ustore_arena_free(ustore_arena_t c_arena) { } void ustore_transaction_free(ustore_transaction_t const c_transaction) { + if (!c_transaction) + return; + delete reinterpret_cast(c_transaction); } void ustore_database_free(ustore_database_t c_db) { From a4af4cb4a8bb21ad81100569a15e6bf8e2ea3221 Mon Sep 17 00:00:00 2001 From: Davit Vardanyan <78792753+davvard@users.noreply.github.com> Date: Tue, 23 May 2023 15:13:22 +0400 Subject: [PATCH 08/15] Refactor: Redis client --- src/redis_client.cpp | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/src/redis_client.cpp b/src/redis_client.cpp index f817b4286..1e537043f 100644 --- a/src/redis_client.cpp +++ b/src/redis_client.cpp @@ -26,24 +26,26 @@ static const char kDefaultCollectionName[] = "default"; using namespace unum::ustore; using namespace unum; +namespace redis = sw::redis; -inline sw::redis::StringView to_string_view(byte_t const* p, size_t size_bytes) noexcept { +inline redis::StringView to_string_view(byte_t const* p, size_t size_bytes) noexcept { return {reinterpret_cast(p), size_bytes}; } -inline sw::redis::StringView to_string_view(ustore_key_t const& k) noexcept { +inline redis::StringView to_string_view(ustore_key_t const& k) noexcept { return {reinterpret_cast(&k), sizeof(ustore_key_t)}; } -inline sw::redis::StringView redis_collection(ustore_collection_t collection) { +inline redis::StringView redis_collection(ustore_collection_t collection) { return collection == ustore_collection_main_k ? kDefaultCollectionName : reinterpret_cast(collection); } struct redis_txn_t { - std::unique_ptr native; - std::map uncommited; + std::unique_ptr native; + std::unordered_map uncommited; - sw::redis::OptionalString get_uncommited(ustore_collection_t collection, ustore_key_t key) { + redis::OptionalString get_uncommited(ustore_collection_t collection, ustore_key_t key) { + native->hget(redis_collection(collection), to_string_view(key)); auto it = uncommited.find({collection, key}); if (it != uncommited.end()) return it->second; @@ -66,7 +68,7 @@ struct redis_txn_t { }; struct redis_client_t { - std::unique_ptr native; + std::unique_ptr native; std::vector collections; }; @@ -78,12 +80,12 @@ void ustore_database_init(ustore_database_init_t* c_ptr) { ustore_database_init_t& c = *c_ptr; safe_section("Starting client", c.error, [&] { - sw::redis::ConnectionOptions connection_options; + redis::ConnectionOptions connection_options; connection_options.host = "127.0.0.1"; connection_options.port = 6379; redis_client_t* db_ptr = new redis_client_t; - db_ptr->native = std::make_unique(connection_options); + db_ptr->native = std::make_unique(connection_options); db_ptr->native->keys("*", std::back_inserter(db_ptr->collections)); auto it = std::find(db_ptr->collections.begin(), db_ptr->collections.end(), kDefaultCollectionName); @@ -122,13 +124,12 @@ void ustore_read(ustore_read_t* c_ptr) { uninitialized_array_gt contents(arena); for (std::size_t i = 0; i != places.size(); ++i) { place_t place = places[i]; - sw::redis::OptionalString value; + redis::OptionalString value; if (c.transaction) { value = txn.get_uncommited(place.collection, place.key); if (!value) value = db.native->hget(redis_collection(place.collection), to_string_view(place.key)); - txn.native->hget(redis_collection(place.collection), to_string_view(place.key)); } else value = db.native->hget(redis_collection(place.collection), to_string_view(place.key)); @@ -365,7 +366,7 @@ void ustore_transaction_init(ustore_transaction_init_t* c_ptr) { redis_client_t& db = *reinterpret_cast(c.db); safe_section("Initializing Transaction", c.error, [&] { auto txn_ptr = new redis_txn_t; - txn_ptr->native = std::make_unique(db.native->transaction()); + txn_ptr->native = std::make_unique(db.native->transaction()); *c.transaction = txn_ptr; }); } From f309b5d831174ab39d0162d68a84b2649a76b643 Mon Sep 17 00:00:00 2001 From: Davit Vardanyan <78792753+davvard@users.noreply.github.com> Date: Tue, 23 May 2023 17:08:29 +0400 Subject: [PATCH 09/15] Refactor: Running Redis --- tests/test_units.cpp | 36 ++++++++++++++++++++++++------------ 1 file changed, 24 insertions(+), 12 deletions(-) diff --git a/tests/test_units.cpp b/tests/test_units.cpp index 77bcced9f..e0f3e13c3 100644 --- a/tests/test_units.cpp +++ b/tests/test_units.cpp @@ -86,6 +86,20 @@ static std::string config() { return fmt::format(R"({{"version": "1.0", "directory": "{}"}})", dir); } +template +void without_printing(function_at&& function) { + int originalStdout = dup(STDOUT_FILENO); + int devNull = open("/dev/null", O_WRONLY); + EXPECT_NE(devNull, -1) << "open"; + EXPECT_NE(dup2(devNull, STDOUT_FILENO), -1) << "dup2"; + close(devNull); + + function(); + + EXPECT_NE(dup2(originalStdout, STDOUT_FILENO), -1) << "dup2"; + close(originalStdout); +} + #if defined(USTORE_FLIGHT_CLIENT) static pid_t srv_id = -1; static std::string srv_path; @@ -105,25 +119,15 @@ void clear_environment() { exit(0); } usleep(100000); // 0.1 sec -#endif - -#if defined(USTORE_REDIS_CLIENT) - int originalStdout = dup(STDOUT_FILENO); - int devNull = open("/dev/null", O_WRONLY); - EXPECT_NE(devNull, -1) << "open"; - EXPECT_NE(dup2(devNull, STDOUT_FILENO), -1) << "dup2"; - close(devNull); +#elif defined(USTORE_REDIS_CLIENT) pid_t pid = fork(); if (pid == -1) EXPECT_TRUE(false) << "Failed To Clear Redis"; else if (pid == 0) - EXPECT_NE(execl("/usr/bin/redis-cli", "redis-cli", "FLUSHALL", (char*)(NULL)), -1) << "Failed To Clear Redis"; + without_printing([](){EXPECT_NE(execl("/usr/local/bin/redis-cli", "redis-cli", "FLUSHALL", (char*)(NULL)), -1) << "Failed To Clear Redis";}); else wait(NULL); - - EXPECT_NE(dup2(originalStdout, STDOUT_FILENO), -1) << "dup2"; - close(originalStdout); #endif namespace stdfs = std::filesystem; @@ -2333,6 +2337,14 @@ int main(int argc, char** argv) { #if defined(USTORE_FLIGHT_CLIENT) srv_path = argv[0]; srv_path = srv_path.substr(0, srv_path.find_last_of("/") + 1) + "ustore_flight_server_ucset"; +#elif defined(USTORE_REDIS_CLIENT) + pid_t srv_id = fork(); + if (srv_id == 0) { + usleep(1); // TODO Any statement is requiered to be run for successful `execl` run... + without_printing([](){execl("/usr/local/bin/redis-server", "redis-server", (char*)(NULL));}); + exit(0); + } + usleep(100000); // 0.1 sec #endif auto directory_str = path() ? std::string_view(path()) : ""; From e494e9eb80a94156f7b8a1084c014fd36caf6019 Mon Sep 17 00:00:00 2001 From: Davit Vardanyan <78792753+davvard@users.noreply.github.com> Date: Wed, 24 May 2023 13:28:53 +0400 Subject: [PATCH 10/15] Add: Redis sample --- src/redis_client.cpp | 42 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/src/redis_client.cpp b/src/redis_client.cpp index 1e537043f..b75a230a7 100644 --- a/src/redis_client.cpp +++ b/src/redis_client.cpp @@ -235,6 +235,48 @@ void ustore_scan(ustore_scan_t* c_ptr) { } void ustore_sample(ustore_sample_t* c_ptr) { + ustore_sample_t& c = *c_ptr; + return_error_if_m(c.db, c.error, uninitialized_state_k, "DataBase is uninitialized"); + if (!c.tasks_count) + return; + + linked_memory_lock_t arena = linked_memory(c.arena, c.options, c.error); + return_if_error_m(c.error); + + redis_client_t& db = *reinterpret_cast(c.db); + redis_txn_t& txn = *reinterpret_cast(c.transaction); + strided_iterator_gt collections {c.collections, c.collections_stride}; + strided_iterator_gt lens {c.count_limits, c.count_limits_stride}; + sample_args_t samples {collections, lens, c.tasks_count}; + + // 1. Allocate a tape for all the values to be fetched + auto offsets = arena.alloc_or_dummy(samples.count + 1, c.error, c.offsets); + return_if_error_m(c.error); + auto counts = arena.alloc_or_dummy(samples.count, c.error, c.counts); + return_if_error_m(c.error); + + auto total_keys = reduce_n(samples.limits, samples.count, 0ul); + auto keys_output = *c.keys = arena.alloc(total_keys, c.error).begin(); + return_if_error_m(c.error); + std::vector keys; + + for (std::size_t task_idx = 0; task_idx != samples.count; ++task_idx) { + sample_arg_t task = samples[task_idx]; + auto collection = redis_collection(task.collection); + offsets[task_idx] = keys_output - *c.keys; + + safe_section("Sampling", c.error, [&] { + db.native->command("HRANDFIELD", collection, task.limit, std::back_inserter(keys)); + }); + + for (std::size_t i = 0; i != keys.size(); ++i) { + *keys_output = *reinterpret_cast(keys[i].data()); + ++keys_output; + } + counts[task_idx] = keys.size(); + keys.clear(); + } + offsets[samples.count] = keys_output - *c.keys; } void ustore_measure(ustore_measure_t* c_ptr) { From 7e65ca00b79c68d27d28ba211ff0bf8fbe05e785 Mon Sep 17 00:00:00 2001 From: Davit Vardanyan <78792753+davvard@users.noreply.github.com> Date: Wed, 24 May 2023 13:29:51 +0400 Subject: [PATCH 11/15] Add: Simple sample test --- tests/test_units.cpp | 32 +++++++++++++++++++++++++++++--- 1 file changed, 29 insertions(+), 3 deletions(-) diff --git a/tests/test_units.cpp b/tests/test_units.cpp index e0f3e13c3..dd6a48a78 100644 --- a/tests/test_units.cpp +++ b/tests/test_units.cpp @@ -93,7 +93,7 @@ void without_printing(function_at&& function) { EXPECT_NE(devNull, -1) << "open"; EXPECT_NE(dup2(devNull, STDOUT_FILENO), -1) << "dup2"; close(devNull); - + function(); EXPECT_NE(dup2(originalStdout, STDOUT_FILENO), -1) << "dup2"; @@ -125,7 +125,10 @@ void clear_environment() { if (pid == -1) EXPECT_TRUE(false) << "Failed To Clear Redis"; else if (pid == 0) - without_printing([](){EXPECT_NE(execl("/usr/local/bin/redis-cli", "redis-cli", "FLUSHALL", (char*)(NULL)), -1) << "Failed To Clear Redis";}); + without_printing([]() { + EXPECT_NE(execl("/usr/local/bin/redis-cli", "redis-cli", "FLUSHALL", (char*)(NULL)), -1) + << "Failed To Clear Redis"; + }); else wait(NULL); #endif @@ -592,6 +595,29 @@ TEST(db, presences) { } } +TEST(db, sample) { + clear_environment(); + database_t db; + EXPECT_TRUE(db.open(config().c_str())); + EXPECT_TRUE(db.clear()); + auto collection = db.main(); + + auto keys_count = 100; + std::vector keys(keys_count); + std::iota(keys.begin(), keys.end(), 0); + collection[keys] = "value"; + + auto sample_count = 10; + for (std::size_t i = 0; i != keys_count; ++i) { + auto samples = collection.keys().sample(sample_count, collection.member_arena()).throw_or_release(); + EXPECT_EQ(samples.size(), sample_count); + for (std::size_t i = 0; i != sample_count; ++i) { + EXPECT_GE(samples[i], 0); + EXPECT_LT(samples[i], 100); + } + } +} + TEST(db, scan) { clear_environment(); database_t db; @@ -2341,7 +2367,7 @@ int main(int argc, char** argv) { pid_t srv_id = fork(); if (srv_id == 0) { usleep(1); // TODO Any statement is requiered to be run for successful `execl` run... - without_printing([](){execl("/usr/local/bin/redis-server", "redis-server", (char*)(NULL));}); + without_printing([]() { execl("/usr/local/bin/redis-server", "redis-server", (char*)(NULL)); }); exit(0); } usleep(100000); // 0.1 sec From 1a8241f1f0c335568e5bc52e27bcf5c11278ea13 Mon Sep 17 00:00:00 2001 From: Davit Vardanyan <78792753+davvard@users.noreply.github.com> Date: Wed, 24 May 2023 13:42:42 +0400 Subject: [PATCH 12/15] Fix: Redis open close --- tests/test_units.cpp | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/tests/test_units.cpp b/tests/test_units.cpp index dd6a48a78..368191259 100644 --- a/tests/test_units.cpp +++ b/tests/test_units.cpp @@ -70,7 +70,7 @@ static char const* path() { if (path) return std::strlen(path) ? path : nullptr; -#if defined(USTORE_FLIGHT_CLIENT) +#if defined(USTORE_FLIGHT_CLIENT) or defined(USTORE_REDIS_CLIENT) return nullptr; #elif defined(USTORE_TEST_PATH) return USTORE_TEST_PATH; @@ -100,7 +100,7 @@ void without_printing(function_at&& function) { close(originalStdout); } -#if defined(USTORE_FLIGHT_CLIENT) +#if defined(USTORE_FLIGHT_CLIENT) or defined(USTORE_REDIS_CLIENT) static pid_t srv_id = -1; static std::string srv_path; #endif @@ -2364,10 +2364,11 @@ int main(int argc, char** argv) { srv_path = argv[0]; srv_path = srv_path.substr(0, srv_path.find_last_of("/") + 1) + "ustore_flight_server_ucset"; #elif defined(USTORE_REDIS_CLIENT) - pid_t srv_id = fork(); + srv_path = "/usr/local/bin/redis-server"; + srv_id = fork(); if (srv_id == 0) { usleep(1); // TODO Any statement is requiered to be run for successful `execl` run... - without_printing([]() { execl("/usr/local/bin/redis-server", "redis-server", (char*)(NULL)); }); + without_printing([]() { execl(srv_path.c_str(), "redis-server", (char*)(NULL)); }); exit(0); } usleep(100000); // 0.1 sec @@ -2381,7 +2382,7 @@ int main(int argc, char** argv) { ::testing::InitGoogleTest(&argc, argv); int status = RUN_ALL_TESTS(); -#if defined(USTORE_FLIGHT_CLIENT) +#if defined(USTORE_FLIGHT_CLIENT) or defined(USTORE_REDIS_CLIENT) kill(srv_id, SIGKILL); waitpid(srv_id, nullptr, 0); #endif From 25c2937b268d67793166b88b148e4cf44c6fd55c Mon Sep 17 00:00:00 2001 From: Davit Vardanyan <78792753+davvard@users.noreply.github.com> Date: Wed, 24 May 2023 16:20:36 +0400 Subject: [PATCH 13/15] Fix: Transaction get uncommited --- src/redis_client.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/redis_client.cpp b/src/redis_client.cpp index b75a230a7..cfe51438f 100644 --- a/src/redis_client.cpp +++ b/src/redis_client.cpp @@ -49,6 +49,7 @@ struct redis_txn_t { auto it = uncommited.find({collection, key}); if (it != uncommited.end()) return it->second; + return {}; } void set(ustore_collection_t collection, ustore_key_t key, value_view_t value) { From 104c08e8d59e3e701db3aef1643d7bee9c9f2002 Mon Sep 17 00:00:00 2001 From: Davit Vardanyan <78792753+davvard@users.noreply.github.com> Date: Thu, 25 May 2023 11:30:27 +0400 Subject: [PATCH 14/15] Refactor: Transaction exec --- src/redis_client.cpp | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/redis_client.cpp b/src/redis_client.cpp index cfe51438f..4993a86f4 100644 --- a/src/redis_client.cpp +++ b/src/redis_client.cpp @@ -62,9 +62,10 @@ struct redis_txn_t { uncommited.erase({collection, key}); } - void exec() { - native->exec(); + redis::QueuedReplies exec() { + auto reply = native->exec(); uncommited.clear(); + return reply; } }; From 1d1f5090d273b63b00e7498f30b00b2032576f34 Mon Sep 17 00:00:00 2001 From: Davit Vardanyan <78792753+davvard@users.noreply.github.com> Date: Thu, 25 May 2023 12:47:43 +0400 Subject: [PATCH 15/15] Refactor: Redis client --- src/redis_client.cpp | 34 +++++++++++++++++++--------------- 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/src/redis_client.cpp b/src/redis_client.cpp index 4993a86f4..aa3c57d5c 100644 --- a/src/redis_client.cpp +++ b/src/redis_client.cpp @@ -42,7 +42,7 @@ inline redis::StringView redis_collection(ustore_collection_t collection) { struct redis_txn_t { std::unique_ptr native; - std::unordered_map uncommited; + std::unordered_map uncommited; redis::OptionalString get_uncommited(ustore_collection_t collection, ustore_key_t key) { native->hget(redis_collection(collection), to_string_view(key)); @@ -52,9 +52,9 @@ struct redis_txn_t { return {}; } - void set(ustore_collection_t collection, ustore_key_t key, value_view_t value) { - native->hset(redis_collection(collection), to_string_view(key), to_string_view(value.data(), value.size())); - uncommited[{collection, key}] = std::string((const char*)value.data(), value.size()); + void set(ustore_collection_t collection, ustore_key_t key, redis::StringView value) { + native->hset(redis_collection(collection), to_string_view(key), value); + uncommited[{collection, key}] = value; } void del(ustore_collection_t collection, ustore_key_t key) { @@ -62,10 +62,9 @@ struct redis_txn_t { uncommited.erase({collection, key}); } - redis::QueuedReplies exec() { - auto reply = native->exec(); + void exec() { + native->exec(); uncommited.clear(); - return reply; } }; @@ -126,15 +125,17 @@ void ustore_read(ustore_read_t* c_ptr) { uninitialized_array_gt contents(arena); for (std::size_t i = 0; i != places.size(); ++i) { place_t place = places[i]; - redis::OptionalString value; + auto key = to_string_view(place.key); + auto collection = redis_collection(place.collection); + redis::OptionalString value; if (c.transaction) { value = txn.get_uncommited(place.collection, place.key); if (!value) - value = db.native->hget(redis_collection(place.collection), to_string_view(place.key)); + value = db.native->hget(collection, key); } else - value = db.native->hget(redis_collection(place.collection), to_string_view(place.key)); + value = db.native->hget(collection, key); offs[i] = contents.size(); presences[i] = bool(value); @@ -176,22 +177,25 @@ void ustore_write(ustore_write_t* c_ptr) { for (std::size_t i = 0; i != places.size(); ++i) { auto place = places[i]; auto content = contents[i]; + auto key = to_string_view(place.key); + auto collection = redis_collection(place.collection); + auto value = to_string_view(content.data(), content.size()); + if (content) { if (c.transaction) - txn.set(place.collection, place.key, content); + txn.set(place.collection, place.key, value); else - db.native->hset(redis_collection(place.collection), - to_string_view(place.key), - to_string_view(content.data(), content.size())); + db.native->hset(collection, key, value); } else { if (c.transaction) txn.del(place.collection, place.key); else - db.native->hdel(redis_collection(place.collection), to_string_view(place.key)); + db.native->hdel(collection, key); } } }); + return_if_error_m(c.error); } void ustore_scan(ustore_scan_t* c_ptr) {