diff --git a/BUILD.bazel b/BUILD.bazel index 54aafe4a..d77fa5bc 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -9,6 +9,7 @@ cc_library( "sources/builders/reply_builder.cpp", "sources/builders/simple_string_builder.cpp", "sources/core/client.cpp", + "sources/core/client_pool.cpp", "sources/core/consumer.cpp", "sources/core/reply.cpp", "sources/core/sentinel.cpp", diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index d4ddd507..1cee6c85 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -39,7 +39,7 @@ include_directories(${CPP_REDIS_INCLUDES}) ### link_directories(${DEPS_LIBRARIES}) -set(EXAMPLES cpp_redis_client +set(EXAMPLES cpp_redis_client cpp_redis_consumer cpp_redis_future_client cpp_redis_subscriber @@ -48,6 +48,7 @@ set(EXAMPLES cpp_redis_client cpp_redis_streams_client cpp_redis_high_availability_client cpp_redis_multi_exec + cpp_redis_client_pool ) foreach(EXAMPLE IN ITEMS ${EXAMPLES}) diff --git a/examples/cpp_redis_client_pool.cpp b/examples/cpp_redis_client_pool.cpp new file mode 100644 index 00000000..d3091fe7 --- /dev/null +++ b/examples/cpp_redis_client_pool.cpp @@ -0,0 +1,113 @@ +// The MIT License (MIT) +// +// Copyright (c) 2015-2017 Simon Ninon +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +#include "cpp_redis/core/client.hpp" +#include "cpp_redis/core/client_pool.hpp" +#include "cpp_redis/core/reply.hpp" +#include "cpp_redis/core/types.hpp" +#include +#include + +#include "winsock_initializer.h" +#include +#include +#include + +static void +sync_work_chunk(cpp_redis::client_pool& pool, std::string const& key, std::size_t offset) { + //! Perform offset number of synchronous increments to the given key. + while (offset--) { + pool.run([&key](cpp_redis::client* c) { + c->incr(key); + c->sync_commit(); + }); + } +} + +static void +async_work_chunk(cpp_redis::client_pool& pool, std::string const& key, std::size_t offset) { + std::list> replies; + + //! Perform offset number of asynchronous increments to the given key. + while (offset--) { + pool.run([&key, &replies](cpp_redis::client* c) { + replies.push_back(c->incr(key)); + c->commit(); + }); + } + + //! Consume the replies in the order of production. + while (!replies.empty()) { + replies.front().wait(); + replies.pop_front(); + } +} + +const std::string key{"number"}; + +int +main() { + winsock_initializer winsock_init; + //! Enable logging + cpp_redis::active_logger = std::unique_ptr(new cpp_redis::logger); + + //! Offset to assign to each work chunk. + std::size_t ofs1{12}, ofs2{14}, ofs3{1}, ofs4{11}, ofs5{9}, ofs6{7}; + + //! Create a pool of four clients to handle requests in a thread safe manner. + cpp_redis::client_pool pool(4, "127.0.0.1", 26379); + + //! Assign work to the pool. + auto f1 = std::async([&pool, ofs1] { + sync_work_chunk(pool, key, ofs1); + }); + auto f2 = std::async([&pool, ofs2] { + sync_work_chunk(pool, key, ofs2); + }); + auto f3 = std::async([&pool, ofs3] { + async_work_chunk(pool, key, ofs3); + }); + auto f4 = std::async([&pool, ofs4] { + async_work_chunk(pool, key, ofs4); + }); + auto f5 = std::async([&pool, ofs5] { + sync_work_chunk(pool, key, ofs5); + }); + async_work_chunk(pool, key, ofs6); + + //! Wait for all threads to finish. + f1.wait(), f2.wait(), f3.wait(), f4.wait(), f5.wait(); + + //! Retrieve the number stored in the server. + std::future result; + pool.run([&result](cpp_redis::client* c) { + result = c->get(key); + }); + + //! Expected outcome of all the transactions. + int expected = ofs1 + ofs2 + ofs3 + ofs4 + ofs5 + ofs6; + std::cout << "Expected result=" << expected << std::endl; + std::cout << "Computed result=" << result.get().as_string() << std::endl; + + return 0; +} + diff --git a/includes/cpp_redis/core/client_pool.hpp b/includes/cpp_redis/core/client_pool.hpp new file mode 100644 index 00000000..a3cc48e8 --- /dev/null +++ b/includes/cpp_redis/core/client_pool.hpp @@ -0,0 +1,129 @@ +// The MIT License (MIT) +// +// Copyright (c) 2015-2017 Simon Ninon +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +#ifndef CPP_REDIS_CORE_CLIENT_POOL_HPP_ +#define CPP_REDIS_CORE_CLIENT_POOL_HPP_ + +#include +#include +#include +#include +#include +#include + +#include "cpp_redis/core/reply.hpp" +#include + +namespace cpp_redis { + +namespace detail { + +class idle_client { + std::condition_variable& m_bell; + std::vector& m_state; + std::mutex& m_mtx; + +public: + idle_client(std::condition_variable& cv, std::vector& m_state, std::mutex& mtx); + ~idle_client(); + + int m_index; +}; + +} // namespace detail + +class client_pool { +public: + struct con_details { + std::string m_host; + std::size_t m_port; + connect_callback_t m_connect_callback; + std::uint32_t m_timeout_ms; + std::int32_t m_max_reconnects; + std::uint32_t m_reconnect_interval_ms; + }; + + /** + * Constructor + * + * @detail Creates a pool of clients, usable in a thread safe manner. + * + * @param host host to be connected to + * @param port port to be connected to + * @param connect_callback connect handler to be called on connect events (may be null) + * @param timeout_ms maximum time to connect + * @param max_reconnects maximum attempts of reconnection if connection dropped + * @param reconnect_interval_ms time between two attempts of reconnection + * + */ + client_pool( + std::size_t pool_size, + const std::string& host = "127.0.0.1", + std::size_t port = 6379, + const connect_callback_t& connect_callback = nullptr, + std::uint32_t timeout_ms = 0, + std::int32_t max_reconnects = 0, + std::uint32_t reconnect_interval_ms = 0); + + /** + * @brief Use a client from the pool in a thread safe fashion. + * + * @tparam F Callable type equivalent to + * @param fun Operation to perform. + */ + template + cpp_redis::reply_t run(F&& fun); + +private: + detail::idle_client get_idle_client(); + + con_details m_connection_details; + std::vector> m_clients; + std::vector m_occupancy; + mutable std::condition_variable m_task_done; + mutable std::mutex m_mtx; +}; + +namespace detail { + +bool client_is_operable(std::unique_ptr& cl, client_pool::con_details const& details); + +} + +template +cpp_redis::reply_t +client_pool::run(F&& fun) { + detail::idle_client work(m_task_done, m_occupancy, m_mtx); + auto& cl = m_clients[work.m_index]; + + if (detail::client_is_operable(cl, m_connection_details)) { + std::forward(fun)(cl.get()); + return reply_t("OK", reply_t::string_type::simple_string); + } + + return reply_t("disconnected", reply_t::string_type::error); +} + +} // namespace cpp_redis + +#endif + diff --git a/includes/cpp_redis/cpp_redis b/includes/cpp_redis/cpp_redis index 60438ae9..487cd0ec 100644 --- a/includes/cpp_redis/cpp_redis +++ b/includes/cpp_redis/cpp_redis @@ -34,6 +34,7 @@ #include #include #include +#include #endif diff --git a/sources/core/client_pool.cpp b/sources/core/client_pool.cpp new file mode 100644 index 00000000..72281366 --- /dev/null +++ b/sources/core/client_pool.cpp @@ -0,0 +1,126 @@ +// The MIT License (MIT) +// +// Copyright (c) 2015-2017 Simon Ninon +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +#include "cpp_redis/core/client.hpp" +#include + +namespace { + +std::unique_ptr +NewConnectedClient( + cpp_redis::client_pool::con_details const& details) { + + std::unique_ptr client(new cpp_redis::client); + + try { + client->connect(details.m_host, + details.m_port, + details.m_connect_callback, + details.m_timeout_ms, + details.m_max_reconnects, + details.m_reconnect_interval_ms); + } + catch (...) { + client.reset(); + } + + return client; +} + +} // namespace + +namespace cpp_redis { + +namespace detail { + +idle_client::idle_client(std::condition_variable& cv, std::vector& state, std::mutex& mtx) +: m_bell(cv), m_state(state), m_mtx(mtx), m_index(-1) { + bool IdleSlot = false; + bool ActiveSlot = true; + + std::unique_lock lk(m_mtx); + for (;;) { + for (std::size_t i(0), ie(m_state.size()); i < ie; ++i) { + if (m_state[i].compare_exchange_strong(IdleSlot, ActiveSlot)) { + m_index = i; + break; + } + } + + if (-1 != m_index) break; + + m_bell.wait(lk); + } +} + +idle_client::~idle_client() { + std::unique_lock lk(m_mtx); + m_state[m_index].store(false); + m_bell.notify_one(); +} + +bool +client_is_operable(std::unique_ptr& cl, client_pool::con_details const& details) { + bool haveAllocatedClient = + cl || (cl = NewConnectedClient(details)); + if (!haveAllocatedClient) { + return false; + } + + if (!cl->is_connected()) { + if (!cl->is_reconnecting()) { + // Reset a client that has stopped trying to reconnect. + if (!(cl = NewConnectedClient(details))) { + return false; + } + } + else { + return false; + } + } + + return true; +} + +} // namespace detail + + +client_pool::client_pool(std::size_t pool_size, + const std::string& host, + std::size_t port, + const connect_callback_t& connect_callback, + std::uint32_t timeout_ms, + std::int32_t max_reconnects, + std::uint32_t reconnect_interval_ms) +: m_connection_details{host, port, connect_callback, timeout_ms, max_reconnects, reconnect_interval_ms} +, m_clients(pool_size) +, m_occupancy(pool_size) { + + for (auto& m : m_occupancy) m = false; + + for (auto& c : m_clients) { + c = NewConnectedClient(m_connection_details); + } +} + +} // namespace cpp_redis +