From 6cd63923500f37fc012dede318f2520352686530 Mon Sep 17 00:00:00 2001 From: Robert Schumacher Date: Tue, 2 Aug 2016 00:46:54 -0700 Subject: [PATCH] Revert global connection pool, but leave improved individual pools. Pools no longer need to self-reference and are destroyed with the owning http_client. --- Release/src/http/client/http_client.cpp | 6 +- Release/src/http/client/http_client_asio.cpp | 172 +++--------------- Release/src/http/client/http_client_impl.h | 6 +- .../src/http/client/http_client_winhttp.cpp | 4 +- Release/src/http/client/http_client_winrt.cpp | 6 +- 5 files changed, 39 insertions(+), 155 deletions(-) diff --git a/Release/src/http/client/http_client.cpp b/Release/src/http/client/http_client.cpp index 9f71603d85..8dbfb9c1f4 100644 --- a/Release/src/http/client/http_client.cpp +++ b/Release/src/http/client/http_client.cpp @@ -191,7 +191,7 @@ const uri & _http_client_communicator::base_uri() const return m_uri; } -_http_client_communicator::_http_client_communicator(http::uri address, http_client_config client_config) +_http_client_communicator::_http_client_communicator(http::uri&& address, http_client_config&& client_config) : m_uri(std::move(address)), m_client_config(std::move(client_config)), m_opened(false), m_scheduled(0) { } @@ -370,12 +370,12 @@ http_client::http_client(const uri &base_uri, const http_client_config &client_c uribuilder.set_scheme(_XPLATSTR("http")); uri uriWithScheme = uribuilder.to_uri(); verify_uri(uriWithScheme); - final_pipeline_stage = details::create_platform_final_pipeline_stage(uriWithScheme, client_config); + final_pipeline_stage = details::create_platform_final_pipeline_stage(std::move(uriWithScheme), http_client_config(client_config)); } else { verify_uri(base_uri); - final_pipeline_stage = details::create_platform_final_pipeline_stage(base_uri, client_config); + final_pipeline_stage = details::create_platform_final_pipeline_stage(uri(base_uri), http_client_config(client_config)); } m_pipeline = std::make_shared(std::move(final_pipeline_stage)); diff --git a/Release/src/http/client/http_client_asio.cpp b/Release/src/http/client/http_client_asio.cpp index 9bd085ab69..f928e18e46 100644 --- a/Release/src/http/client/http_client_asio.cpp +++ b/Release/src/http/client/http_client_asio.cpp @@ -235,9 +235,10 @@ class asio_connection /// /// During the cleanup phase, connections are removed starting with the oldest. This /// ensures that if a high intensity workload is followed by a low intensity workload, -/// the connection pool will correctly adapt to the current workload. Specifically, -/// the following code will eventually result in a maximum of one pooled connection -/// regardless of the initial number of pooled connections: +/// the connection pool will correctly adapt to the low intensity workload. +/// +/// Specifically, the following code will eventually result in a maximum of one pooled +/// connection regardless of the initial number of pooled connections: /// /// while(1) /// { @@ -246,18 +247,11 @@ class asio_connection /// pool.release(conn); /// } /// -/// -/// Additionally, when two cleanup phases have occurred with no calls to `release()` -/// between them, the internal self-reference is cleared. If there are no active -/// `http_client`s keeping the pool alive, this will cause the pool to expire upon -/// cleanup handler termination. Whenever a new call to `release()` arrives, the self -/// reference is re-applied to keep the pool alive. /// class asio_connection_pool : public std::enable_shared_from_this { public: - asio_connection_pool() - : m_pool_epoch_timer(crossplat::threadpool::shared_instance().service()) + asio_connection_pool() : m_pool_epoch_timer(crossplat::threadpool::shared_instance().service()) {} std::shared_ptr acquire() @@ -281,29 +275,29 @@ class asio_connection_pool : public std::enable_shared_from_this lock(m_lock); - if (m_self_reference == nullptr) + if (!is_timer_running) { - auto sptr = this->shared_from_this(); - m_self_reference = sptr; - start_epoch_interval(sptr); + start_epoch_interval(shared_from_this()); + is_timer_running = true; } m_epoch++; - m_connections.emplace_back(m_epoch, connection); + m_connections.emplace_back(m_epoch, std::move(connection)); } private: // Note: must be called under m_lock - static void start_epoch_interval(const std::shared_ptr& pool) { + static void start_epoch_interval(const std::shared_ptr& pool) + { _ASSERTE(pool.get() != nullptr); - _ASSERTE(pool->m_self_reference != nullptr); auto& self = *pool; std::weak_ptr weak_pool = pool; self.m_prev_epoch = self.m_epoch; pool->m_pool_epoch_timer.expires_from_now(boost::posix_time::seconds(30)); - pool->m_pool_epoch_timer.async_wait([weak_pool](const boost::system::error_code& ec) { + pool->m_pool_epoch_timer.async_wait([weak_pool](const boost::system::error_code& ec) + { if (ec) return; @@ -313,11 +307,11 @@ class asio_connection_pool : public std::enable_shared_from_this lock(self.m_lock); - _ASSERTE(self.m_self_reference != nullptr); if (self.m_prev_epoch == self.m_epoch) { self.m_connections.clear(); - self.m_self_reference = nullptr; + self.is_timer_running = false; + return; } else { @@ -335,109 +329,23 @@ class asio_connection_pool : public std::enable_shared_from_this>> m_connections; + uint64_t m_epoch = 0; uint64_t m_prev_epoch = 0; - - std::shared_ptr m_self_reference; -}; - -class asio_shared_connection_pool : public std::enable_shared_from_this -{ -public: - std::shared_ptr obtain(const std::string &pool_key) - { - std::shared_ptr ret; - - std::lock_guard lock(m_lock); - auto it = m_pools.find(pool_key); - if (it != m_pools.end()) - { - ret = it->second.lock(); - if (ret == nullptr) - { - // Previous pool expired - ret = std::make_shared(); - it->second = ret; - } - } - else - { - if (m_pools.empty()) - { - // If transitioning from empty to having a single element, restart the timer. - start_timer(shared_from_this()); - } - ret = std::make_shared(); - m_pools.emplace(pool_key, ret); - } - - assert(ret != nullptr); - return ret; - } - - static std::shared_ptr& shared_instance() - { - static std::shared_ptr s_instance = std::make_shared(); - - return s_instance; - } - - asio_shared_connection_pool() : m_timer(crossplat::threadpool::shared_instance().service()) {} - -private: - static void start_timer(const std::shared_ptr& self) - { - self->m_timer.expires_from_now(boost::posix_time::seconds(60)); - std::weak_ptr weak_this = self; - self->m_timer.async_wait([weak_this](const boost::system::error_code& ec) - { - if (ec) - return; - auto strong_this = weak_this.lock(); - if (!strong_this) - return; - - std::lock_guard lock(strong_this->m_lock); - auto b = strong_this->m_pools.begin(); - auto e = strong_this->m_pools.end(); - for (; b != e;) - { - if (b->second.expired()) - b = strong_this->m_pools.erase(b); - else - ++b; - } - if (!strong_this->m_pools.empty()) - start_timer(strong_this); - }); - } - - boost::asio::deadline_timer m_timer; - std::mutex m_lock; - std::unordered_map> m_pools; + bool is_timer_running = false; + boost::asio::deadline_timer m_pool_epoch_timer; }; class asio_client final : public _http_client_communicator { public: - asio_client(http::uri address, http_client_config client_config) - : _http_client_communicator(std::move(address), std::move(client_config)) - , m_resolver(crossplat::threadpool::shared_instance().service()) - { - m_start_with_ssl = base_uri().scheme() == "https" && !this->client_config().proxy().is_specified(); - - if (this->client_config().get_ssl_context_callback()) - { - // We will use a private connection pool because there is no better approaches to compare callback functors. - m_pool = std::make_shared(); - } - else - { - m_pool = asio_shared_connection_pool::shared_instance()->obtain(get_pool_key()); - } - } + asio_client(http::uri&& address, http_client_config&& client_config) + : _http_client_communicator(std::move(address), std::move(client_config)) + , m_resolver(crossplat::threadpool::shared_instance().service()) + , m_pool(std::make_shared()) + , m_start_with_ssl(base_uri().scheme() == "https" && !this->client_config().proxy().is_specified()) + {} void send_request(const std::shared_ptr &request_ctx) override; @@ -464,35 +372,11 @@ class asio_client final : public _http_client_communicator virtual pplx::task propagate(http_request request) override; -private: - std::string get_pool_key() const - { - auto pool_key = base_uri().to_string(); - - auto &credentials = _http_client_communicator::client_config().credentials(); - if (credentials.is_set()) - { - pool_key.append(credentials.username()); - } - - auto &proxy = _http_client_communicator::client_config().proxy(); - if (proxy.is_specified()) - { - pool_key.append(proxy.address().to_string()); - if (proxy.credentials().is_set()) - { - pool_key.append(proxy.credentials().username()); - } - } - - return pool_key; - } - - std::shared_ptr m_pool; public: tcp::resolver m_resolver; private: - bool m_start_with_ssl; + const std::shared_ptr m_pool; + const bool m_start_with_ssl; }; class asio_context : public request_context, public std::enable_shared_from_this @@ -1612,9 +1496,9 @@ class asio_context : public request_context, public std::enable_shared_from_this }; -std::shared_ptr<_http_client_communicator> create_platform_final_pipeline_stage(uri base_uri, const http_client_config& client_config) +std::shared_ptr<_http_client_communicator> create_platform_final_pipeline_stage(uri&& base_uri, http_client_config&& client_config) { - return std::make_shared(base_uri, client_config); + return std::make_shared(std::move(base_uri), std::move(client_config)); } void asio_client::send_request(const std::shared_ptr &request_ctx) diff --git a/Release/src/http/client/http_client_impl.h b/Release/src/http/client/http_client_impl.h index ac0ae81ce1..eacf64c83b 100644 --- a/Release/src/http/client/http_client_impl.h +++ b/Release/src/http/client/http_client_impl.h @@ -100,7 +100,7 @@ class _http_client_communicator : public http_pipeline_stage { public: - virtual ~_http_client_communicator() {} + virtual ~_http_client_communicator() override = default; // Asynchronously send a HTTP request and process the response. void async_send_request(const std::shared_ptr &request); @@ -112,7 +112,7 @@ class _http_client_communicator : public http_pipeline_stage const uri & base_uri() const; protected: - _http_client_communicator(http::uri address, http_client_config client_config); + _http_client_communicator(http::uri&& address, http_client_config&& client_config); // Method to open client. virtual unsigned long open() = 0; @@ -146,6 +146,6 @@ class _http_client_communicator : public http_pipeline_stage /// /// Factory function implemented by the separate platforms to construct their subclasses of _http_client_communicator /// -std::shared_ptr<_http_client_communicator> create_platform_final_pipeline_stage(uri base_uri, const http_client_config& client_config); +std::shared_ptr<_http_client_communicator> create_platform_final_pipeline_stage(uri&& base_uri, http_client_config&& client_config); }}}} \ No newline at end of file diff --git a/Release/src/http/client/http_client_winhttp.cpp b/Release/src/http/client/http_client_winhttp.cpp index e4d19042b2..db0bb9e340 100644 --- a/Release/src/http/client/http_client_winhttp.cpp +++ b/Release/src/http/client/http_client_winhttp.cpp @@ -1294,9 +1294,9 @@ class winhttp_client : public _http_client_communicator bool m_secure; }; -std::shared_ptr<_http_client_communicator> create_platform_final_pipeline_stage(uri base_uri, const http_client_config& client_config) +std::shared_ptr<_http_client_communicator> create_platform_final_pipeline_stage(uri&& base_uri, http_client_config&& client_config) { - return std::make_shared(std::move(base_uri), client_config); + return std::make_shared(std::move(base_uri), std::move(client_config)); } }}}} diff --git a/Release/src/http/client/http_client_winrt.cpp b/Release/src/http/client/http_client_winrt.cpp index 5ae4a8ec52..639bbc3da8 100644 --- a/Release/src/http/client/http_client_winrt.cpp +++ b/Release/src/http/client/http_client_winrt.cpp @@ -356,7 +356,7 @@ class IResponseStream class winrt_client : public _http_client_communicator { public: - winrt_client(http::uri address, http_client_config client_config) + winrt_client(http::uri&& address, http_client_config&& client_config) : _http_client_communicator(std::move(address), std::move(client_config)) { } winrt_client(const winrt_client&) = delete; @@ -560,9 +560,9 @@ class winrt_client : public _http_client_communicator } }; -std::shared_ptr<_http_client_communicator> create_platform_final_pipeline_stage(uri base_uri, const http_client_config& client_config) +std::shared_ptr<_http_client_communicator> create_platform_final_pipeline_stage(uri&& base_uri, http_client_config&& client_config) { - return std::make_shared(std::move(base_uri), client_config); + return std::make_shared(std::move(base_uri), std::move(client_config)); } }}}}