Skip to content

Commit

Permalink
Simplifies the connect operations.
Browse files Browse the repository at this point in the history
  • Loading branch information
mzimbres committed Oct 21, 2024
1 parent 302d50e commit b8a52e5
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 70 deletions.
75 changes: 12 additions & 63 deletions include/boost/redis/detail/connector.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@
#include <boost/asio/compose.hpp>
#include <boost/asio/connect.hpp>
#include <boost/asio/coroutine.hpp>
#include <boost/asio/experimental/parallel_group.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/asio/cancel_after.hpp>
#include <string>
#include <chrono>

Expand All @@ -30,65 +29,29 @@ struct connect_op {

template <class Self>
void operator()( Self& self
, std::array<std::size_t, 2> const& order = {}
, system::error_code const& ec1 = {}
, asio::ip::tcp::endpoint const& ep= {}
, system::error_code const& ec2 = {})
, system::error_code const& ec = {}
, asio::ip::tcp::endpoint const& ep= {})
{
BOOST_ASIO_CORO_REENTER (coro)
{
ctor_->timer_.expires_after(ctor_->timeout_);

BOOST_ASIO_CORO_YIELD
asio::experimental::make_parallel_group(
[this](auto token)
{
auto f = [](system::error_code const&, auto const&) { return true; };
return asio::async_connect(*stream, *res_, f, token);
},
[this](auto token) { return ctor_->timer_.async_wait(token);}
).async_wait(
asio::experimental::wait_for_one(),
std::move(self));

if (is_cancelled(self)) {
self.complete(asio::error::operation_aborted);
return;
}
asio::async_connect(*stream, *res_,
[](system::error_code const&, auto const&) { return true; },
asio::cancel_after(ctor_->timeout_, std::move(self)));

switch (order[0]) {
case 0: {
ctor_->endpoint_ = ep;
self.complete(ec1);
} break;
case 1:
{
if (ec2) {
self.complete(ec2);
} else {
self.complete(error::connect_timeout);
}
} break;
ctor_->endpoint_ = ep;

default: BOOST_ASSERT(false);
if (ec == asio::error::operation_aborted) {
ec == error::connect_timeout;
}

self.complete(ec);
}
}
};

template <class Executor>
class connector {
public:
using timer_type =
asio::basic_waitable_timer<
std::chrono::steady_clock,
asio::wait_traits<std::chrono::steady_clock>,
Executor>;

connector(Executor ex)
: timer_{ex}
{}

void set_config(config const& cfg)
{ timeout_ = cfg.connect_timeout; }

Expand All @@ -102,28 +65,14 @@ class connector {
return asio::async_compose
< CompletionToken
, void(system::error_code)
>(connect_op<connector, Stream>{this, &stream, &res}, token, timer_);
}

std::size_t cancel(operation op)
{
switch (op) {
case operation::connect:
case operation::all:
timer_.cancel();
break;
default: /* ignore */;
}

return 0;
>(connect_op<connector, Stream>{this, &stream, &res}, token);
}

auto const& endpoint() const noexcept { return endpoint_;}

private:
template <class, class> friend struct connect_op;

timer_type timer_;
std::chrono::steady_clock::duration timeout_ = std::chrono::seconds{2};
asio::ip::tcp::endpoint endpoint_;
};
Expand Down
4 changes: 3 additions & 1 deletion include/boost/redis/detail/resolver.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ struct resolve_op {

resv_->results_ = res;

// TODO: map operation_canceled into error::resolve_timeout
if (ec == asio::error::operation_aborted) {
ec == error::resolve_timeout;
}
self.complete(ec);
}
}
Expand Down
7 changes: 1 addition & 6 deletions include/boost/redis/detail/runner.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
#include <boost/redis/detail/resolver.hpp>
#include <boost/redis/detail/handshaker.hpp>
#include <boost/asio/compose.hpp>
#include <boost/asio/connect.hpp>
#include <boost/asio/coroutine.hpp>
#include <boost/asio/experimental/parallel_group.hpp>
#include <boost/asio/ip/tcp.hpp>
Expand Down Expand Up @@ -165,7 +164,6 @@ class runner {
public:
runner(Executor ex, config cfg)
: resv_{ex}
, ctor_{ex}
, hsher_{ex}
, health_checker_{ex}
, cfg_{cfg}
Expand All @@ -174,7 +172,6 @@ class runner {
std::size_t cancel(operation op)
{
resv_.cancel(op);
ctor_.cancel(op);
hsher_.cancel(op);
health_checker_.cancel(op);
return 0U;
Expand Down Expand Up @@ -202,10 +199,8 @@ class runner {

private:
using resolver_type = resolver<Executor>;
using connector_type = connector<Executor>;
using handshaker_type = detail::handshaker<Executor>;
using health_checker_type = health_checker<Executor>;
using timer_type = typename connector_type::timer_type;

template <class, class, class> friend class runner_op;
template <class, class, class> friend struct hello_op;
Expand Down Expand Up @@ -245,7 +240,7 @@ class runner {
}

resolver_type resv_;
connector_type ctor_;
connector ctor_;
handshaker_type hsher_;
health_checker_type health_checker_;
request hello_req_;
Expand Down

0 comments on commit b8a52e5

Please sign in to comment.