Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
ras0219-msft committed Jun 29, 2016
1 parent e9667db commit 89c20dd
Showing 1 changed file with 98 additions and 53 deletions.
151 changes: 98 additions & 53 deletions Release/src/http/client/http_client_asio.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,25 +77,25 @@ class asio_connection
friend class asio_connection_pool;
friend class asio_client;
public:
asio_connection(boost::asio::io_service& io_service, bool start_with_ssl, const std::function<void(boost::asio::ssl::context&)>& ssl_context_callback) :
m_socket(io_service),
m_ssl_context_callback(ssl_context_callback),
m_pool_timer(io_service),
m_is_reused(false),
m_keep_alive(true)
asio_connection(
boost::asio::io_service& io_service,
const std::string &pool_key,
bool start_with_ssl,
const std::function<void(boost::asio::ssl::context&)>& ssl_context_callback)
: m_socket(io_service),
m_ssl_context_callback(ssl_context_callback),
m_pool_timer(io_service),
m_is_reused(false),
m_keep_alive(true),
m_pool_key(pool_key),
m_epoch(0)
{
if (start_with_ssl)
{
upgrade_to_ssl();
}
}

asio_connection(boost::asio::io_service& io_service, const std::string &pool_key, bool start_with_ssl, const std::function<void(boost::asio::ssl::context&)>& ssl_context_callback) :
asio_connection(io_service, start_with_ssl, ssl_context_callback)
{
m_pool_key = pool_key;
}

~asio_connection()
{
close();
Expand Down Expand Up @@ -146,8 +146,7 @@ class asio_connection
bool keep_alive() const { return m_keep_alive; }
bool is_ssl() const { return m_ssl_stream ? true : false; }
const std::string &pool_key() const { return m_pool_key; }
const std::string &nonce() const { return m_nonce; }
void generate_nonce() { m_nonce = m_nonce_generator.generate(); }
uint32_t epoch() const { return m_epoch; }

template <typename Iterator, typename Handler>
void async_connect(const Iterator &begin, const Handler &handler)
Expand Down Expand Up @@ -240,7 +239,7 @@ class asio_connection
{
cancel_pool_timer();
m_is_reused = true;
generate_nonce();
m_epoch++;
}

// Guards concurrent access to socket/ssl::stream. This is necessary
Expand All @@ -256,18 +255,71 @@ class asio_connection
bool m_is_reused;
bool m_keep_alive;
std::string m_pool_key;
std::string m_nonce;
utility::nonce_generator m_nonce_generator;
uint32_t m_epoch;
};

class asio_shared_connection_pool
{
public:
asio_shared_connection_pool(boost::asio::io_service& io_service) :
m_io_service(io_service)
{}

~asio_shared_connection_pool()
{
std::lock_guard<std::mutex> lock(m_connections_mutex);
// Cancel the pool timer for all connections.
for (auto& connection : m_connections)
{
connection.second->cancel_pool_timer();
}
}

void release(const std::shared_ptr<asio_connection>& connection)
{
if (connection->keep_alive() && (m_timeout_secs > 0))
{
connection->cancel();

std::lock_guard<std::mutex> lock(m_connections_mutex);
auto it = m_connections.insert(std::make_pair(connection->pool_key(), connection));

// This will destroy and remove the connection from pool after the set timeout.
// We use 'this' because async calls to timer handler only occur while the pool exists.
auto connection_weak = std::weak_ptr<asio_connection>(connection);
auto epoch = connection->epoch();
connection->start_pool_timer(m_timeout_secs, [this, connection_weak, epoch](const boost::error_code& ec) {
this->free_shared_connection(ec, connection_weak, epoch);
});
}
}

private:
void free_connection(const boost::system::error_code& ec, const std::weak_ptr<asio_connection> &connection, unsigned int epoch)
{
auto connection_shared = connection.lock();
if (!connection_shared)
return;

std::lock_guard<std::mutex> lock(m_connections_mutex);
auto it = m_connections.find(connection_shared);
if (it == m_connections.end())
// The connection was acquired while this callback was firing
return;

// The epoch is used to ensure the connection was not quickly acquired and released while this callback was firing.
// Every acquisition increments the epoch.
if (epoch != (*it)->epoch())
m_connections.erase(it);
}
};

class asio_connection_pool
{
public:

asio_connection_pool(boost::asio::io_service& io_service, const std::chrono::seconds &idle_timeout, bool is_shared) :
m_io_service(io_service),
m_timeout_secs(static_cast<int>(idle_timeout.count())),
m_is_shared(is_shared)
asio_connection_pool(boost::asio::io_service& io_service) :
m_io_service(io_service)
{}

~asio_connection_pool()
Expand All @@ -282,29 +334,15 @@ class asio_connection_pool

void release(const std::shared_ptr<asio_connection> &connection)
{
if (connection->keep_alive() && (m_timeout_secs > 0))
{
connection->cancel();
connection->cancel();

if (m_is_shared)
{
std::lock_guard<std::mutex> lock(m_connections_mutex);
auto it = m_shared_connections.insert(std::make_pair(connection->pool_key(), connection));
// This will destroy and remove the connection from pool after the set timeout.
// We use 'this' because async calls to timer handler only occur while the pool exists.
connection->start_pool_timer(m_timeout_secs, boost::bind(&asio_connection_pool::free_shared_connection, this, boost::asio::placeholders::error, it, std::weak_ptr<asio_connection>(connection), connection->nonce()));
}
else
{
std::lock_guard<std::mutex> lock(m_connections_mutex);
auto pair = m_connections.insert(connection);
if (pair.second)
{
// This will destroy and remove the connection from pool after the set timeout.
// We use 'this' because async calls to timer handler only occur while the pool exists.
connection->start_pool_timer(m_timeout_secs, boost::bind(&asio_connection_pool::free_connection, this, boost::asio::placeholders::error, pair.first, std::weak_ptr<asio_connection>(connection), connection->nonce()));
}
}
if (connection->keep_alive())
{
std::lock_guard<std::mutex> lock(m_connections_mutex);
// This will destroy and remove the connection from pool after the set timeout.
// We use 'this' because async calls to timer handler only occur while the pool exists.
connection->start_pool_timer(s_timeout_secs.count(), boost::bind(&asio_connection_pool::free_connection, this, boost::asio::placeholders::error, pair.first, std::weak_ptr<asio_connection>(connection), connection->nonce()));
m_connections.push_back(connection);
}
// Otherwise connection is not put to the pool and it will go out of scope.
}
Expand Down Expand Up @@ -377,26 +415,33 @@ class asio_connection_pool
}

// Using weak_ptr here ensures bind() to this handler will not prevent the connection object from going out of scope.
void free_connection(const boost::system::error_code& ec, std::set<std::shared_ptr<asio_connection>>::iterator it, const std::weak_ptr<asio_connection> &connection, const std::string &nonce)
void free_connection(const boost::system::error_code& ec, const std::weak_ptr<asio_connection> &connection, uint32_t epoch)
{
if (!ec)
{
auto connection_shared = connection.lock();
// Compare nonce here to ensure the iterator is valid, the connection not been reused.
if (connection_shared && (connection_shared->nonce() == nonce))
{
std::lock_guard<std::mutex> lock(m_connections_mutex);
if (!connection_shared)
return;

std::lock_guard<std::mutex> lock(m_connections_mutex);
auto it = m_connections.find(connection_shared);
if (it == m_connections.end())
// The connection was acquired while this callback was firing
return;

// The epoch is used to ensure the connection was not quickly acquired and released while this callback was firing.
// Every acquisition increments the epoch.
if (epoch != (*it)->epoch())
m_connections.erase(it);
}
}
}

boost::asio::io_service& m_io_service;
const int m_timeout_secs;
bool m_is_shared;
std::multimap<std::string, std::shared_ptr<asio_connection>> m_shared_connections;
std::set<std::shared_ptr<asio_connection>> m_connections;

std::mutex m_connections_mutex;
std::set<std::shared_ptr<asio_connection>> m_connections;

static const std::chrono::seconds s_timeout_secs = 30;
};

std::shared_ptr<asio_connection_pool> asio_connection_pool::shared_instance()
Expand Down

0 comments on commit 89c20dd

Please sign in to comment.