Skip to content

Commit d89d5e6

Browse files
committed
Concludes the work started by Nikolai Vladimirov on the generic_flat_response
1 parent 97705bf commit d89d5e6

29 files changed

+539
-382
lines changed

README.md

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -106,24 +106,23 @@ receiver(std::shared_ptr<connection> conn) -> net::awaitable<void>
106106
while (conn->will_reconnect()) {
107107

108108
// Reconnect to channels.
109-
co_await conn->async_exec(req, ignore);
109+
co_await conn->async_exec(req);
110110

111111
// Loop reading Redis pushes.
112-
for (;;) {
113-
error_code ec;
114-
co_await conn->async_receive(resp, net::redirect_error(net::use_awaitable, ec));
112+
for (error_code ec;;) {
113+
co_await conn->async_receive2(resp, redirect_error(ec));
115114
if (ec)
116115
break; // Connection lost, break so we can reconnect to channels.
117116

118117
// Use the response resp in some way and then clear it.
119118
...
120119

121-
consume_one(resp);
120+
resp.value().clear();
122121
}
123122
}
124123
}
125124
```
126125
127126
## Further reading
128127
129-
Full documentation is [here](https://www.boost.org/doc/libs/master/libs/redis/index.html).
128+
Full documentation is [here](https://www.boost.org/doc/libs/master/libs/redis/index.html).

doc/modules/ROOT/pages/index.adoc

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -117,19 +117,18 @@ receiver(std::shared_ptr<connection> conn) -> net::awaitable<void>
117117
while (conn->will_reconnect()) {
118118
119119
// Reconnect to channels.
120-
co_await conn->async_exec(req, ignore);
120+
co_await conn->async_exec(req);
121121
122122
// Loop reading Redis pushes.
123-
for (;;) {
124-
error_code ec;
125-
co_await conn->async_receive(resp, net::redirect_error(net::use_awaitable, ec));
123+
for (error_code ec;;) {
124+
co_await conn->async_receive2(resp, redirect_error(ec));
126125
if (ec)
127126
break; // Connection lost, break so we can reconnect to channels.
128127
129-
// Use the response resp in some way and then clear it.
128+
// Use the response here and then clear it.
130129
...
131130
132-
consume_one(resp);
131+
resp.value().clear();
133132
}
134133
}
135134
}

example/cpp20_chat_room.cpp

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,9 @@ using boost::asio::consign;
3030
using boost::asio::detached;
3131
using boost::asio::dynamic_buffer;
3232
using boost::asio::redirect_error;
33-
using boost::asio::use_awaitable;
3433
using boost::redis::config;
3534
using boost::redis::connection;
36-
using boost::redis::generic_flat_response;
37-
using boost::redis::ignore;
35+
using boost::redis::generic_response;
3836
using boost::redis::request;
3937
using boost::system::error_code;
4038
using namespace std::chrono_literals;
@@ -47,16 +45,16 @@ auto receiver(std::shared_ptr<connection> conn) -> awaitable<void>
4745
request req;
4846
req.push("SUBSCRIBE", "channel");
4947

50-
generic_flat_response resp;
48+
generic_response resp;
5149
conn->set_receive_response(resp);
5250

5351
while (conn->will_reconnect()) {
5452
// Subscribe to channels.
55-
co_await conn->async_exec(req, ignore);
53+
co_await conn->async_exec(req);
5654

5755
// Loop reading Redis push messages.
5856
for (error_code ec;;) {
59-
co_await conn->async_receive(redirect_error(use_awaitable, ec));
57+
co_await conn->async_receive2(redirect_error(ec));
6058
if (ec)
6159
break; // Connection lost, break so we can reconnect to channels.
6260
std::cout << resp.value().at(1).value << " " << resp.value().at(2).value << " "
@@ -74,7 +72,7 @@ auto publisher(std::shared_ptr<stream_descriptor> in, std::shared_ptr<connection
7472
auto n = co_await async_read_until(*in, dynamic_buffer(msg, 1024), "\n");
7573
request req;
7674
req.push("PUBLISH", "channel", msg);
77-
co_await conn->async_exec(req, ignore);
75+
co_await conn->async_exec(req);
7876
msg.erase(0, n);
7977
}
8078
}

example/cpp20_streams.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323

2424
namespace net = boost::asio;
2525
using boost::redis::config;
26-
using boost::redis::generic_flat_response;
26+
using boost::redis::generic_response;
2727
using boost::redis::operation;
2828
using boost::redis::request;
2929
using boost::redis::connection;
@@ -33,7 +33,7 @@ auto stream_reader(std::shared_ptr<connection> conn) -> net::awaitable<void>
3333
{
3434
std::string redisStreamKey_;
3535
request req;
36-
generic_flat_response resp;
36+
generic_response resp;
3737

3838
std::string stream_id{"$"};
3939
std::string const field = "myfield";
@@ -51,7 +51,7 @@ auto stream_reader(std::shared_ptr<connection> conn) -> net::awaitable<void>
5151
// The following approach was taken in order to be able to
5252
// deal with the responses, as generated by redis in the case
5353
// that there are multiple stream 'records' within a single
54-
// generic_flat_response. The nesting and number of values in
54+
// generic_response. The nesting and number of values in
5555
// resp.value() are different, depending on the contents
5656
// of the stream in redis. Uncomment the above commented-out
5757
// code for examples while running the XADD command.

example/cpp20_subscriber.cpp

Lines changed: 12 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
1-
/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com)
1+
/* Copyright (c) 2018-2025 Marcelo Zimbres Silva (mzimbres@gmail.com)
22
*
33
* Distributed under the Boost Software License, Version 1.0. (See
44
* accompanying file LICENSE.txt)
55
*/
66

77
#include <boost/redis/connection.hpp>
8-
#include <boost/redis/logger.hpp>
98

109
#include <boost/asio/awaitable.hpp>
1110
#include <boost/asio/co_spawn.hpp>
@@ -23,11 +22,7 @@ namespace asio = boost::asio;
2322
using namespace std::chrono_literals;
2423
using boost::redis::request;
2524
using boost::redis::generic_flat_response;
26-
using boost::redis::consume_one;
27-
using boost::redis::logger;
2825
using boost::redis::config;
29-
using boost::redis::ignore;
30-
using boost::redis::error;
3126
using boost::system::error_code;
3227
using boost::redis::connection;
3328
using asio::signal_set;
@@ -60,24 +55,23 @@ auto receiver(std::shared_ptr<connection> conn) -> asio::awaitable<void>
6055
// Loop while reconnection is enabled
6156
while (conn->will_reconnect()) {
6257
// Reconnect to the channels.
63-
co_await conn->async_exec(req, ignore);
58+
co_await conn->async_exec(req);
6459

65-
// Loop reading Redis pushs messages.
60+
// Loop to read Redis push messages.
6661
for (error_code ec;;) {
67-
// First tries to read any buffered pushes.
68-
conn->receive(ec);
69-
if (ec == error::sync_receive_push_failed) {
70-
ec = {};
71-
co_await conn->async_receive(asio::redirect_error(asio::use_awaitable, ec));
72-
}
73-
62+
// Wait for pushes
63+
co_await conn->async_receive2(asio::redirect_error(ec));
7464
if (ec)
7565
break; // Connection lost, break so we can reconnect to channels.
7666

77-
std::cout << resp.value().at(1).value << " " << resp.value().at(2).value << " "
78-
<< resp.value().at(3).value << std::endl;
67+
// The response must be consumed without suspending the
68+
// coroutine i.e. without the use of async operations.
69+
for (auto const& elem: resp.value().get_view())
70+
std::cout << elem.value.data << "\n";
71+
72+
std::cout << std::endl;
7973

80-
consume_one(resp);
74+
resp.value().clear();
8175
}
8276
}
8377
}

include/boost/redis/adapter/detail/adapters.hpp

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,14 @@
1212
#include <boost/redis/resp3/node.hpp>
1313
#include <boost/redis/resp3/serialization.hpp>
1414
#include <boost/redis/resp3/type.hpp>
15-
#include <boost/redis/response.hpp>
15+
#include <boost/redis/generic_flat_response_value.hpp>
1616

1717
#include <boost/assert.hpp>
1818

1919
#include <array>
2020
#include <charconv>
2121
#include <deque>
2222
#include <forward_list>
23-
#include <iostream>
2423
#include <list>
2524
#include <map>
2625
#include <optional>
@@ -138,6 +137,8 @@ void boost_redis_from_bulk(T& t, resp3::basic_node<String> const& node, system::
138137
from_bulk_impl<T>::apply(t, node, ec);
139138
}
140139

140+
//================================================
141+
141142
template <class Result>
142143
class general_aggregate {
143144
private:
@@ -177,25 +178,25 @@ class general_aggregate {
177178
};
178179

179180
template <>
180-
class general_aggregate<result<flat_response_value>> {
181+
class general_aggregate<result<generic_flat_response_value>> {
181182
private:
182-
result<flat_response_value>* result_;
183+
result<generic_flat_response_value>* result_ = nullptr;
183184

184185
public:
185-
explicit general_aggregate(result<flat_response_value>* c = nullptr)
186+
explicit general_aggregate(result<generic_flat_response_value>* c = nullptr)
186187
: result_(c)
187188
{ }
188189

189190
void on_init() { }
190191
void on_done()
191192
{
192193
if (result_->has_value()) {
193-
result_->value().set_view();
194+
result_->value().notify_done();
194195
}
195196
}
196197

197198
template <class String>
198-
void operator()(resp3::basic_node<String> const& nd, system::error_code&)
199+
void on_node(resp3::basic_node<String> const& nd, system::error_code&)
199200
{
200201
BOOST_ASSERT_MSG(!!result_, "Unexpected null pointer");
201202
switch (nd.data_type) {
@@ -206,7 +207,7 @@ class general_aggregate<result<flat_response_value>> {
206207
std::string{std::cbegin(nd.value), std::cend(nd.value)}
207208
};
208209
break;
209-
default: result_->value().add_node(nd);
210+
default: result_->value().push(nd);
210211
}
211212
}
212213
};

include/boost/redis/adapter/detail/response_traits.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,9 +110,9 @@ struct response_traits<response<Ts...>> {
110110
template <>
111111
struct response_traits<generic_flat_response> {
112112
using response_type = generic_flat_response;
113-
using adapter_type = vector_adapter<response_type>;
113+
using adapter_type = general_aggregate<response_type>;
114114

115-
static auto adapt(response_type& v) noexcept { return adapter_type{v}; }
115+
static auto adapt(response_type& v) noexcept { return adapter_type{&v}; }
116116
};
117117
} // namespace boost::redis::adapter::detail
118118

include/boost/redis/adapter/detail/result_traits.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ struct result_traits<result<std::vector<resp3::basic_node<String>, Allocator>>>
6666
template <>
6767
struct result_traits<generic_flat_response> {
6868
using response_type = generic_flat_response;
69-
using adapter_type = adapter::detail::general_aggregate<response_type>;
69+
using adapter_type = general_aggregate<response_type>;
7070
static auto adapt(response_type& v) noexcept { return adapter_type{&v}; }
7171
};
7272

include/boost/redis/connection.hpp

Lines changed: 77 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -593,7 +593,7 @@ class basic_connection {
593593
return async_run(config{}, std::forward<CompletionToken>(token));
594594
}
595595

596-
/** @brief Receives server side pushes asynchronously.
596+
/** @brief (Deprecated) Receives server side pushes asynchronously.
597597
*
598598
* When pushes arrive and there is no `async_receive` operation in
599599
* progress, pushed data, requests, and responses will be paused
@@ -623,12 +623,76 @@ class basic_connection {
623623
* @param token Completion token.
624624
*/
625625
template <class CompletionToken = asio::default_completion_token_t<executor_type>>
626+
BOOST_DEPRECATED("Please use async_receive2 instead.")
626627
auto async_receive(CompletionToken&& token = {})
627628
{
628629
return impl_->receive_channel_.async_receive(std::forward<CompletionToken>(token));
629630
}
630631

631-
/** @brief Receives server pushes synchronously without blocking.
632+
/** @brief Wait for server pushes asynchronously
633+
*
634+
* This function suspends until a server push is received by the
635+
* connection. On completion an unspecified number of pushes will
636+
* have been added to the response object set with @ref
637+
* boost::redis::connection::set_receive_response.
638+
*
639+
* To prevent receiving an unbound number of pushes the connection
640+
* blocks further read operations on the socket when 256 pushes
641+
* accumulate internally (we don't make any commitment to this
642+
* exact number). When that happens ongoing `async_exec`s and
643+
* health-checks won't make any progress and the connection will
644+
* eventually timeout. To avoid that Apps should call
645+
* `async_receive2` continuously in a loop.
646+
*
647+
* @Note To avoid deadlocks the task (e.g. coroutine) calling
648+
* `async_receive2` should not call `async_exec` in a way where
649+
* they could block each other.
650+
*
651+
* For an example see cpp20_subscriber.cpp. The completion token
652+
* must have the following signature
653+
*
654+
* @code
655+
* void f(system::error_code);
656+
* @endcode
657+
*
658+
* @par Per-operation cancellation
659+
* This operation supports the following cancellation types:
660+
*
661+
* @li `asio::cancellation_type_t::terminal`.
662+
* @li `asio::cancellation_type_t::partial`.
663+
* @li `asio::cancellation_type_t::total`.
664+
*
665+
* Calling `basic_connection::cancel(operation::receive)` will
666+
* also cancel any ongoing receive operations.
667+
*
668+
* @param token Completion token.
669+
*/
670+
template <class CompletionToken = asio::default_completion_token_t<executor_type>>
671+
auto async_receive2(CompletionToken&& token = {})
672+
{
673+
return
674+
impl_->receive_channel_.async_receive(
675+
asio::deferred(
676+
[&conn = *this](system::error_code ec, std::size_t)
677+
{
678+
if (!ec) {
679+
auto f = [](system::error_code, std::size_t) {
680+
// There is no point in checking for errors
681+
// here since async_receive just completed
682+
// without errors.
683+
};
684+
685+
// We just want to drain the channel.
686+
while (conn.impl_->receive_channel_.try_receive(f));
687+
}
688+
689+
return asio::deferred.values(ec);
690+
}
691+
)
692+
)(std::forward<CompletionToken>(token));
693+
}
694+
695+
/** @brief (Deprecated) Receives server pushes synchronously without blocking.
632696
*
633697
* Receives a server push synchronously by calling `try_receive` on
634698
* the underlying channel. If the operation fails because
@@ -638,6 +702,7 @@ class basic_connection {
638702
* @param ec Contains the error if any occurred.
639703
* @returns The number of bytes read from the socket.
640704
*/
705+
BOOST_DEPRECATED("Please, use async_receive2 instead.")
641706
std::size_t receive(system::error_code& ec)
642707
{
643708
std::size_t size = 0;
@@ -837,7 +902,7 @@ class basic_connection {
837902
"the other member functions to interact with the connection.")
838903
auto const& next_layer() const noexcept { return impl_->stream_.next_layer(); }
839904

840-
/// Sets the response object of @ref async_receive operations.
905+
/// Sets the response object of @ref async_receive2 operations.
841906
template <class Response>
842907
void set_receive_response(Response& resp)
843908
{
@@ -1028,12 +1093,21 @@ class connection {
10281093

10291094
/// @copydoc basic_connection::async_receive
10301095
template <class CompletionToken = asio::deferred_t>
1096+
BOOST_DEPRECATED("Please use async_receive2 instead.")
10311097
auto async_receive(CompletionToken&& token = {})
10321098
{
10331099
return impl_.async_receive(std::forward<CompletionToken>(token));
10341100
}
10351101

1102+
/// @copydoc basic_connection::async_receive2
1103+
template <class CompletionToken = asio::deferred_t>
1104+
auto async_receive2(CompletionToken&& token = {})
1105+
{
1106+
return impl_.async_receive2(std::forward<CompletionToken>(token));
1107+
}
1108+
10361109
/// @copydoc basic_connection::receive
1110+
BOOST_DEPRECATED("Please use async_receive2 instead.")
10371111
std::size_t receive(system::error_code& ec) { return impl_.receive(ec); }
10381112

10391113
/**

0 commit comments

Comments
 (0)