Skip to content

Commit 2c1f1c4

Browse files
committed
Concludes the work started by Nikolai Vladimirov on the generic_flat_response
1 parent 6ff4740 commit 2c1f1c4

31 files changed

+1135
-406
lines changed

README.md

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -87,9 +87,9 @@ them are:
8787
* [Client-side caching](https://redis.io/docs/manual/client-side-caching/).
8888

8989
The connection class supports server pushes by means of the
90-
`connection::async_receive` function, which can be
90+
`connection::async_receive2` function, which can be
9191
called in the same connection that is being used to execute commands.
92-
The coroutine below shows how to use it:
92+
The coroutine below shows how to use it
9393

9494

9595
```cpp
@@ -99,31 +99,30 @@ receiver(std::shared_ptr<connection> conn) -> net::awaitable<void>
9999
request req;
100100
req.push("SUBSCRIBE", "channel");
101101

102-
generic_response resp;
102+
flat_tree resp;
103103
conn->set_receive_response(resp);
104104

105105
// Loop while reconnection is enabled
106106
while (conn->will_reconnect()) {
107107

108108
// Reconnect to channels.
109-
co_await conn->async_exec(req, ignore);
109+
co_await conn->async_exec(req);
110110

111111
// Loop reading Redis pushes.
112-
for (;;) {
113-
error_code ec;
114-
co_await conn->async_receive(resp, net::redirect_error(net::use_awaitable, ec));
112+
for (error_code ec;;) {
113+
co_await conn->async_receive2(resp, redirect_error(ec));
115114
if (ec)
116115
break; // Connection lost, break so we can reconnect to channels.
117116

118117
// Use the response resp in some way and then clear it.
119118
...
120119

121-
consume_one(resp);
120+
resp.clear();
122121
}
123122
}
124123
}
125124
```
126125
127126
## Further reading
128127
129-
Full documentation is [here](https://www.boost.org/doc/libs/master/libs/redis/index.html).
128+
Full documentation is [here](https://www.boost.org/doc/libs/master/libs/redis/index.html).

doc/modules/ROOT/pages/index.adoc

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -97,9 +97,9 @@ them are:
9797
* https://redis.io/docs/manual/client-side-caching/[Client-side caching].
9898

9999
The connection class supports server pushes by means of the
100-
xref:reference:boost/redis/basic_connection/async_receive.adoc[`connection::async_receive`] function, which can be
100+
xref:reference:boost/redis/basic_connection/async_receive.adoc[`connection::async_receive2`] function, which can be
101101
called in the same connection that is being used to execute commands.
102-
The coroutine below shows how to use it:
102+
The coroutine below shows how to use it
103103

104104

105105
[source,cpp]
@@ -110,26 +110,25 @@ receiver(std::shared_ptr<connection> conn) -> net::awaitable<void>
110110
request req;
111111
req.push("SUBSCRIBE", "channel");
112112
113-
generic_response resp;
113+
flat_tree resp;
114114
conn->set_receive_response(resp);
115115
116116
// Loop while reconnection is enabled
117117
while (conn->will_reconnect()) {
118118
119119
// Reconnect to channels.
120-
co_await conn->async_exec(req, ignore);
120+
co_await conn->async_exec(req);
121121
122122
// Loop reading Redis pushes.
123-
for (;;) {
124-
error_code ec;
125-
co_await conn->async_receive(resp, net::redirect_error(net::use_awaitable, ec));
123+
for (error_code ec;;) {
124+
co_await conn->async_receive2(resp, redirect_error(ec));
126125
if (ec)
127126
break; // Connection lost, break so we can reconnect to channels.
128127
129-
// Use the response resp in some way and then clear it.
128+
// Use the response here and then clear it.
130129
...
131130
132-
consume_one(resp);
131+
resp.clear();
133132
}
134133
}
135134
}

example/cpp20_chat_room.cpp

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,9 @@ using boost::asio::consign;
3030
using boost::asio::detached;
3131
using boost::asio::dynamic_buffer;
3232
using boost::asio::redirect_error;
33-
using boost::asio::use_awaitable;
3433
using boost::redis::config;
3534
using boost::redis::connection;
36-
using boost::redis::generic_flat_response;
37-
using boost::redis::ignore;
35+
using boost::redis::resp3::flat_tree;
3836
using boost::redis::request;
3937
using boost::system::error_code;
4038
using namespace std::chrono_literals;
@@ -47,21 +45,25 @@ auto receiver(std::shared_ptr<connection> conn) -> awaitable<void>
4745
request req;
4846
req.push("SUBSCRIBE", "channel");
4947

50-
generic_flat_response resp;
48+
flat_tree resp;
5149
conn->set_receive_response(resp);
5250

5351
while (conn->will_reconnect()) {
5452
// Subscribe to channels.
55-
co_await conn->async_exec(req, ignore);
53+
co_await conn->async_exec(req);
5654

5755
// Loop reading Redis push messages.
5856
for (error_code ec;;) {
59-
co_await conn->async_receive(redirect_error(use_awaitable, ec));
57+
co_await conn->async_receive2(redirect_error(ec));
6058
if (ec)
6159
break; // Connection lost, break so we can reconnect to channels.
62-
std::cout << resp.value().at(1).value << " " << resp.value().at(2).value << " "
63-
<< resp.value().at(3).value << std::endl;
64-
resp.value().clear();
60+
61+
for (auto const& elem: resp.get_view())
62+
std::cout << elem.value << "\n";
63+
64+
std::cout << std::endl;
65+
66+
resp.clear();
6567
}
6668
}
6769
}
@@ -74,7 +76,7 @@ auto publisher(std::shared_ptr<stream_descriptor> in, std::shared_ptr<connection
7476
auto n = co_await async_read_until(*in, dynamic_buffer(msg, 1024), "\n");
7577
request req;
7678
req.push("PUBLISH", "channel", msg);
77-
co_await conn->async_exec(req, ignore);
79+
co_await conn->async_exec(req);
7880
msg.erase(0, n);
7981
}
8082
}

example/cpp20_streams.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323

2424
namespace net = boost::asio;
2525
using boost::redis::config;
26-
using boost::redis::generic_flat_response;
26+
using boost::redis::generic_response;
2727
using boost::redis::operation;
2828
using boost::redis::request;
2929
using boost::redis::connection;
@@ -33,7 +33,7 @@ auto stream_reader(std::shared_ptr<connection> conn) -> net::awaitable<void>
3333
{
3434
std::string redisStreamKey_;
3535
request req;
36-
generic_flat_response resp;
36+
generic_response resp;
3737

3838
std::string stream_id{"$"};
3939
std::string const field = "myfield";
@@ -51,7 +51,7 @@ auto stream_reader(std::shared_ptr<connection> conn) -> net::awaitable<void>
5151
// The following approach was taken in order to be able to
5252
// deal with the responses, as generated by redis in the case
5353
// that there are multiple stream 'records' within a single
54-
// generic_flat_response. The nesting and number of values in
54+
// generic_response. The nesting and number of values in
5555
// resp.value() are different, depending on the contents
5656
// of the stream in redis. Uncomment the above commented-out
5757
// code for examples while running the XADD command.

example/cpp20_subscriber.cpp

Lines changed: 14 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
1-
/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com)
1+
/* Copyright (c) 2018-2025 Marcelo Zimbres Silva (mzimbres@gmail.com)
22
*
33
* Distributed under the Boost Software License, Version 1.0. (See
44
* accompanying file LICENSE.txt)
55
*/
66

77
#include <boost/redis/connection.hpp>
8-
#include <boost/redis/logger.hpp>
98

109
#include <boost/asio/awaitable.hpp>
1110
#include <boost/asio/co_spawn.hpp>
@@ -22,12 +21,8 @@
2221
namespace asio = boost::asio;
2322
using namespace std::chrono_literals;
2423
using boost::redis::request;
25-
using boost::redis::generic_flat_response;
26-
using boost::redis::consume_one;
27-
using boost::redis::logger;
24+
using boost::redis::resp3::flat_tree;
2825
using boost::redis::config;
29-
using boost::redis::ignore;
30-
using boost::redis::error;
3126
using boost::system::error_code;
3227
using boost::redis::connection;
3328
using asio::signal_set;
@@ -54,30 +49,29 @@ auto receiver(std::shared_ptr<connection> conn) -> asio::awaitable<void>
5449
request req;
5550
req.push("SUBSCRIBE", "channel");
5651

57-
generic_flat_response resp;
52+
flat_tree resp;
5853
conn->set_receive_response(resp);
5954

6055
// Loop while reconnection is enabled
6156
while (conn->will_reconnect()) {
6257
// Reconnect to the channels.
63-
co_await conn->async_exec(req, ignore);
58+
co_await conn->async_exec(req);
6459

65-
// Loop reading Redis pushs messages.
60+
// Loop to read Redis push messages.
6661
for (error_code ec;;) {
67-
// First tries to read any buffered pushes.
68-
conn->receive(ec);
69-
if (ec == error::sync_receive_push_failed) {
70-
ec = {};
71-
co_await conn->async_receive(asio::redirect_error(asio::use_awaitable, ec));
72-
}
73-
62+
// Wait for pushes
63+
co_await conn->async_receive2(asio::redirect_error(ec));
7464
if (ec)
7565
break; // Connection lost, break so we can reconnect to channels.
7666

77-
std::cout << resp.value().at(1).value << " " << resp.value().at(2).value << " "
78-
<< resp.value().at(3).value << std::endl;
67+
// The response must be consumed without suspending the
68+
// coroutine i.e. without the use of async operations.
69+
for (auto const& elem: resp.get_view())
70+
std::cout << elem.value << "\n";
71+
72+
std::cout << std::endl;
7973

80-
consume_one(resp);
74+
resp.clear();
8175
}
8276
}
8377
}

include/boost/redis/adapter/any_adapter.hpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@ class any_adapter {
5353
static auto create_impl(T& resp) -> impl_t
5454
{
5555
using namespace boost::redis::adapter;
56-
5756
return [adapter2 = boost_redis_adapt(resp)](
5857
any_adapter::parse_event ev,
5958
resp3::node_view const& nd,

include/boost/redis/adapter/detail/adapters.hpp

Lines changed: 40 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/* Copyright (c) 2018-2024 Marcelo Zimbres Silva (mzimbres@gmail.com)
1+
/* Copyright (c) 2018-2025 Marcelo Zimbres Silva (mzimbres@gmail.com)
22
*
33
* Distributed under the Boost Software License, Version 1.0. (See
44
* accompanying file LICENSE.txt)
@@ -12,15 +12,14 @@
1212
#include <boost/redis/resp3/node.hpp>
1313
#include <boost/redis/resp3/serialization.hpp>
1414
#include <boost/redis/resp3/type.hpp>
15-
#include <boost/redis/response.hpp>
15+
#include <boost/redis/resp3/flat_tree.hpp>
1616

1717
#include <boost/assert.hpp>
1818

1919
#include <array>
2020
#include <charconv>
2121
#include <deque>
2222
#include <forward_list>
23-
#include <iostream>
2423
#include <list>
2524
#include <map>
2625
#include <optional>
@@ -138,6 +137,8 @@ void boost_redis_from_bulk(T& t, resp3::basic_node<String> const& node, system::
138137
from_bulk_impl<T>::apply(t, node, ec);
139138
}
140139

140+
//================================================
141+
141142
template <class Result>
142143
class general_aggregate {
143144
private:
@@ -177,37 +178,54 @@ class general_aggregate {
177178
};
178179

179180
template <>
180-
class general_aggregate<result<flat_response_value>> {
181+
class general_aggregate<resp3::tree> {
181182
private:
182-
result<flat_response_value>* result_;
183+
resp3::tree* tree_ = nullptr;
183184

184185
public:
185-
explicit general_aggregate(result<flat_response_value>* c = nullptr)
186-
: result_(c)
186+
explicit general_aggregate(resp3::tree* c = nullptr)
187+
: tree_(c)
188+
{ }
189+
190+
void on_init() { }
191+
void on_done() { }
192+
193+
template <class String>
194+
void on_node(resp3::basic_node<String> const& nd, system::error_code&)
195+
{
196+
BOOST_ASSERT_MSG(!!tree_, "Unexpected null pointer");
197+
198+
resp3::node tmp;
199+
tmp.data_type = nd.data_type;
200+
tmp.aggregate_size = nd.aggregate_size;
201+
tmp.depth = nd.depth;
202+
tmp.value = std::string{std::cbegin(nd.value), std::cend(nd.value)};
203+
204+
tree_->push_back(std::move(tmp));
205+
}
206+
};
207+
208+
template <>
209+
class general_aggregate<resp3::flat_tree> {
210+
private:
211+
resp3::flat_tree* tree_ = nullptr;
212+
213+
public:
214+
explicit general_aggregate(resp3::flat_tree* c = nullptr)
215+
: tree_(c)
187216
{ }
188217

189218
void on_init() { }
190219
void on_done()
191220
{
192-
if (result_->has_value()) {
193-
result_->value().set_view();
194-
}
221+
tree_->notify_done();
195222
}
196223

197224
template <class String>
198-
void operator()(resp3::basic_node<String> const& nd, system::error_code&)
225+
void on_node(resp3::basic_node<String> const& nd, system::error_code&)
199226
{
200-
BOOST_ASSERT_MSG(!!result_, "Unexpected null pointer");
201-
switch (nd.data_type) {
202-
case resp3::type::blob_error:
203-
case resp3::type::simple_error:
204-
*result_ = error{
205-
nd.data_type,
206-
std::string{std::cbegin(nd.value), std::cend(nd.value)}
207-
};
208-
break;
209-
default: result_->value().add_node(nd);
210-
}
227+
BOOST_ASSERT_MSG(!!tree_, "Unexpected null pointer");
228+
tree_->push(nd);
211229
}
212230
};
213231

0 commit comments

Comments
 (0)