Skip to content

Commit c89b5c8

Browse files
committed
ADD: Add C++ resubscribe
1 parent a772525 commit c89b5c8

9 files changed

+128
-19
lines changed

CHANGELOG.md

+8
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,13 @@
11
# Changelog
22

3+
## 0.30.0 - TBD
4+
5+
### Enhancements
6+
- Added `Resubscribe()` methods to `LiveBlocking` and `LiveThreaded` to make it easier
7+
to resume a live session after losing the connection to the live gateway
8+
- Added `Subscriptions()` getter methods to `LiveBlocking` and `LiveThreaded` for
9+
getting all active subscriptions
10+
311
## 0.29.0 - 2025-02-04
412

513
### Enhancements

cmake/SourcesAndHeaders.cmake

+1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ set(headers
2323
include/databento/ireadable.hpp
2424
include/databento/live.hpp
2525
include/databento/live_blocking.hpp
26+
include/databento/live_subscription.hpp
2627
include/databento/live_threaded.hpp
2728
include/databento/log.hpp
2829
include/databento/metadata.hpp

include/databento/live_blocking.hpp

+10-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@
1010
#include "databento/datetime.hpp" // UnixNanos
1111
#include "databento/dbn.hpp" // Metadata
1212
#include "databento/detail/tcp_client.hpp" // TcpClient
13-
#include "databento/enums.hpp" // Schema, SType, VersionUpgradePolicy
13+
#include "databento/enums.hpp" // Schema, SType, VersionUpgradePolicy
14+
#include "databento/live_subscription.hpp"
1415
#include "databento/record.hpp" // Record, RecordHeader
1516

1617
namespace databento {
@@ -44,6 +45,10 @@ class LiveBlocking {
4445
std::pair<bool, std::chrono::seconds> HeartbeatInterval() const {
4546
return {heartbeat_interval_.count() > 0, heartbeat_interval_};
4647
}
48+
const std::vector<LiveSubscription>& Subscriptions() const {
49+
return subscriptions_;
50+
}
51+
std::vector<LiveSubscription>& Subscriptions() { return subscriptions_; }
4752

4853
/*
4954
* Methods
@@ -80,6 +85,9 @@ class LiveBlocking {
8085
void Stop();
8186
// Closes the current connection and attempts to reconnect to the gateway.
8287
void Reconnect();
88+
// Resubscribes to all subscriptions, removing the original `start` time, if
89+
// any. Usually performed after a `Reconnect()`.
90+
void Resubscribe();
8391

8492
private:
8593
std::string DetermineGateway() const;
@@ -105,6 +113,7 @@ class LiveBlocking {
105113
VersionUpgradePolicy upgrade_policy_;
106114
std::chrono::seconds heartbeat_interval_;
107115
detail::TcpClient client_;
116+
std::vector<LiveSubscription> subscriptions_;
108117
// Must be 8-byte aligned for records
109118
alignas(RecordHeader) std::array<char, kMaxStrLen> read_buffer_{};
110119
std::size_t buffer_size_{};
+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
#pragma once
2+
3+
#include <string>
4+
#include <variant>
5+
#include <vector>
6+
7+
#include "databento/datetime.hpp" // UnixNanos
8+
#include "databento/enums.hpp" // Schema, SType
9+
10+
namespace databento {
11+
struct LiveSubscription {
12+
struct Snapshot {};
13+
struct NoStart {};
14+
using Start = std::variant<Snapshot, UnixNanos, std::string, NoStart>;
15+
16+
std::vector<std::string> symbols;
17+
Schema schema;
18+
SType stype_in;
19+
Start start;
20+
};
21+
} // namespace databento

include/databento/live_threaded.hpp

+4
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
#include "databento/datetime.hpp" // UnixNanos
1111
#include "databento/detail/scoped_thread.hpp" // ScopedThread
1212
#include "databento/enums.hpp" // Schema, SType
13+
#include "databento/live_subscription.hpp"
1314
#include "databento/timeseries.hpp" // MetadataCallback, RecordCallback
1415

1516
namespace databento {
@@ -57,6 +58,8 @@ class LiveThreaded {
5758
// The the first member of the pair will be true, when the heartbeat interval
5859
// was overridden.
5960
std::pair<bool, std::chrono::seconds> HeartbeatInterval() const;
61+
const std::vector<LiveSubscription>& Subscriptions() const;
62+
std::vector<LiveSubscription>& Subscriptions();
6063

6164
/*
6265
* Methods
@@ -86,6 +89,7 @@ class LiveThreaded {
8689
ExceptionCallback exception_callback);
8790
// Closes the current connection, and attempts to reconnect to the gateway.
8891
void Reconnect();
92+
void Resubscribe();
8993
// Blocking wait with an optional timeout for the session to close when the
9094
// record_callback or the exception_callback return Stop.
9195
void BlockForStop();

src/live_blocking.cpp

+29-1
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,9 @@
77
#include <chrono>
88
#include <cstddef> // ptrdiff_t
99
#include <cstdlib>
10-
#include <ios> //hex, setfill, setw
10+
#include <ios> // hex, setfill, setw
1111
#include <sstream>
12+
#include <variant>
1213

1314
#include "databento/constants.hpp" // kApiKeyLength
1415
#include "databento/dbn_decoder.hpp"
@@ -69,6 +70,8 @@ void LiveBlocking::Subscribe(const std::vector<std::string>& symbols,
6970
sub_msg << "schema=" << ToString(schema) << "|stype_in=" << ToString(stype_in)
7071
<< "|start=" << start.time_since_epoch().count();
7172
Subscribe(sub_msg.str(), symbols, false);
73+
subscriptions_.emplace_back(
74+
LiveSubscription{symbols, schema, stype_in, start});
7275
}
7376

7477
void LiveBlocking::Subscribe(const std::vector<std::string>& symbols,
@@ -81,6 +84,13 @@ void LiveBlocking::Subscribe(const std::vector<std::string>& symbols,
8184
sub_msg << "|start=" << start;
8285
}
8386
Subscribe(sub_msg.str(), symbols, false);
87+
if (start.empty()) {
88+
subscriptions_.emplace_back(LiveSubscription{symbols, schema, stype_in,
89+
LiveSubscription::NoStart{}});
90+
} else {
91+
subscriptions_.emplace_back(
92+
LiveSubscription{symbols, schema, stype_in, start});
93+
}
8494
}
8595

8696
void LiveBlocking::SubscribeWithSnapshot(
@@ -90,6 +100,8 @@ void LiveBlocking::SubscribeWithSnapshot(
90100
<< "|stype_in=" << ToString(stype_in);
91101

92102
Subscribe(sub_msg.str(), symbols, true);
103+
subscriptions_.emplace_back(LiveSubscription{symbols, schema, stype_in,
104+
LiveSubscription::Snapshot{}});
93105
}
94106

95107
void LiveBlocking::Subscribe(const std::string& sub_msg,
@@ -168,10 +180,26 @@ const databento::Record* LiveBlocking::NextRecord(
168180
void LiveBlocking::Stop() { client_.Close(); }
169181

170182
void LiveBlocking::Reconnect() {
183+
log_receiver_->Receive(LogLevel::Info, "Reconnecting");
171184
client_ = detail::TcpClient{gateway_, port_};
172185
session_id_ = this->Authenticate();
173186
}
174187

188+
void LiveBlocking::Resubscribe() {
189+
for (auto& subscription : subscriptions_) {
190+
if (std::holds_alternative<UnixNanos>(subscription.start) ||
191+
std::holds_alternative<std::string>(subscription.start)) {
192+
subscription.start = LiveSubscription::NoStart{};
193+
}
194+
std::ostringstream sub_msg;
195+
sub_msg << "schema=" << ToString(subscription.schema)
196+
<< "|stype_in=" << ToString(subscription.stype_in);
197+
Subscribe(
198+
sub_msg.str(), subscription.symbols,
199+
std::holds_alternative<LiveSubscription::Snapshot>(subscription.start));
200+
}
201+
}
202+
175203
std::string LiveBlocking::DecodeChallenge() {
176204
buffer_size_ =
177205
client_.ReadSome(read_buffer_.data(), read_buffer_.size()).read_size;

src/live_threaded.cpp

+11
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,15 @@ std::pair<bool, std::chrono::seconds> LiveThreaded::HeartbeatInterval() const {
9494
return impl_->blocking.HeartbeatInterval();
9595
}
9696

97+
const std::vector<databento::LiveSubscription>& LiveThreaded::Subscriptions()
98+
const {
99+
return impl_->blocking.Subscriptions();
100+
}
101+
102+
std::vector<databento::LiveSubscription>& LiveThreaded::Subscriptions() {
103+
return impl_->blocking.Subscriptions();
104+
}
105+
97106
void LiveThreaded::Subscribe(const std::vector<std::string>& symbols,
98107
Schema schema, SType stype_in) {
99108
impl_->blocking.Subscribe(symbols, schema, stype_in);
@@ -144,6 +153,8 @@ void LiveThreaded::Start(MetadataCallback metadata_callback,
144153

145154
void LiveThreaded::Reconnect() { impl_->blocking.Reconnect(); }
146155

156+
void LiveThreaded::Resubscribe() { impl_->blocking.Resubscribe(); }
157+
147158
void LiveThreaded::BlockForStop() {
148159
std::unique_lock<std::mutex> lock{impl_->last_cb_ret_mutex};
149160
auto* impl = impl_.get();

tests/src/live_blocking_tests.cpp

+21-5
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#include <mutex> // lock_guard, mutex, unique_lock
99
#include <thread> // this_thread
1010
#include <utility>
11+
#include <variant>
1112
#include <vector>
1213

1314
#include "databento/constants.hpp" // dataset
@@ -16,6 +17,7 @@
1617
#include "databento/exceptions.hpp"
1718
#include "databento/live.hpp"
1819
#include "databento/live_blocking.hpp"
20+
#include "databento/live_subscription.hpp"
1921
#include "databento/log.hpp"
2022
#include "databento/record.hpp"
2123
#include "databento/symbology.hpp"
@@ -473,7 +475,7 @@ TEST_F(LiveBlockingTests, TestConnectWhenGatewayNotUp) {
473475
ASSERT_THROW(builder_.BuildBlocking(), databento::TcpError);
474476
}
475477

476-
TEST_F(LiveBlockingTests, TestReconnect) {
478+
TEST_F(LiveBlockingTests, TestReconnectAndResubscribe) {
477479
constexpr auto kTsOut = false;
478480
constexpr TradeMsg kRec{DummyHeader<TradeMsg>(RType::Mbp0),
479481
1,
@@ -498,6 +500,9 @@ TEST_F(LiveBlockingTests, TestReconnect) {
498500
&should_close_cv, &should_close_mutex](mock::MockLsgServer& self) {
499501
self.Accept();
500502
self.Authenticate();
503+
self.Subscribe(kAllSymbols, Schema::Trades, SType::RawSymbol, "0");
504+
self.Start();
505+
self.SendRecord(kRec);
501506
{
502507
std::unique_lock<std::mutex> lock{should_close_mutex};
503508
should_close_cv.wait(lock, [&should_close] { return should_close; });
@@ -520,6 +525,14 @@ TEST_F(LiveBlockingTests, TestReconnect) {
520525
.SetSendTsOut(kTsOut)
521526
.SetAddress(kLocalhost, mock_server->Port())
522527
.BuildBlocking();
528+
ASSERT_TRUE(target.Subscriptions().empty());
529+
target.Subscribe(kAllSymbols, Schema::Trades, SType::RawSymbol, "0");
530+
ASSERT_EQ(target.Subscriptions().size(), 1);
531+
target.Start();
532+
const auto rec1 = target.NextRecord();
533+
ASSERT_TRUE(rec1.Holds<TradeMsg>());
534+
ASSERT_EQ(rec1.Get<TradeMsg>(), kRec);
535+
ASSERT_EQ(target.Subscriptions().size(), 1);
523536

524537
// Tell server to close connection
525538
{
@@ -534,11 +547,14 @@ TEST_F(LiveBlockingTests, TestReconnect) {
534547
}
535548
ASSERT_THROW(target.NextRecord(), databento::DbnResponseError);
536549
target.Reconnect();
537-
target.Subscribe(kAllSymbols, Schema::Trades, SType::RawSymbol);
550+
target.Resubscribe();
551+
ASSERT_EQ(target.Subscriptions().size(), 1);
552+
ASSERT_TRUE(std::holds_alternative<LiveSubscription::NoStart>(
553+
target.Subscriptions()[0].start));
538554
const auto metadata = target.Start();
539555
EXPECT_TRUE(metadata.has_mixed_schema);
540-
const auto rec = target.NextRecord();
541-
ASSERT_TRUE(rec.Holds<TradeMsg>());
542-
ASSERT_EQ(rec.Get<TradeMsg>(), kRec);
556+
const auto rec2 = target.NextRecord();
557+
ASSERT_TRUE(rec2.Holds<TradeMsg>());
558+
ASSERT_EQ(rec2.Get<TradeMsg>(), kRec);
543559
}
544560
} // namespace databento::tests

tests/src/live_threaded_tests.cpp

+23-12
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,15 @@
77
#include <iostream>
88
#include <memory>
99
#include <thread> // this_thread
10+
#include <variant>
1011

1112
#include "databento/constants.hpp"
1213
#include "databento/datetime.hpp"
1314
#include "databento/dbn.hpp"
1415
#include "databento/enums.hpp"
1516
#include "databento/exceptions.hpp"
1617
#include "databento/live.hpp"
18+
#include "databento/live_subscription.hpp"
1719
#include "databento/live_threaded.hpp"
1820
#include "databento/log.hpp"
1921
#include "databento/record.hpp"
@@ -173,7 +175,7 @@ TEST_F(LiveThreadedTests, TestStop) {
173175
mock_server.reset();
174176
}
175177

176-
TEST_F(LiveThreadedTests, TestExceptionCallbackAndReconnect) {
178+
TEST_F(LiveThreadedTests, TestExceptionCallbackReconnectAndResubscribe) {
177179
constexpr auto kSchema = Schema::Trades;
178180
constexpr auto kSType = SType::RawSymbol;
179181
constexpr TradeMsg kRec{DummyHeader<TradeMsg>(RType::Mbp0),
@@ -197,8 +199,9 @@ TEST_F(LiveThreadedTests, TestExceptionCallbackAndReconnect) {
197199
kSType, kUseSnapshot](mock::MockLsgServer& self) {
198200
self.Accept();
199201
self.Authenticate();
200-
self.SubscribeWithSnapshot(kAllSymbols, kSchema, kSType);
202+
self.Subscribe(kAllSymbols, kSchema, kSType, "0");
201203
self.Start();
204+
self.SendRecord(kRec);
202205
{
203206
std::unique_lock<std::mutex> shutdown_lock{should_close_mutex};
204207
should_close_cv.wait(shutdown_lock,
@@ -216,19 +219,22 @@ TEST_F(LiveThreadedTests, TestExceptionCallbackAndReconnect) {
216219
.SetAddress(kLocalhost, mock_server.Port())
217220
.BuildThreaded();
218221
std::atomic<std::int32_t> metadata_calls{};
219-
const auto metadata_cb = [&metadata_calls, &should_close, &should_close_cv,
220-
&should_close_mutex](Metadata&& metadata) {
222+
const auto metadata_cb = [&metadata_calls](Metadata&& metadata) {
221223
++metadata_calls;
222224
EXPECT_TRUE(metadata.has_mixed_schema);
223-
// close server
224-
const std::lock_guard<std::mutex> _lock{should_close_mutex};
225-
should_close = true;
226-
should_close_cv.notify_one();
227225
};
228226
std::atomic<std::int32_t> record_calls{};
229-
const auto record_cb = [&record_calls, kRec](const Record& record) {
227+
const auto record_cb = [&record_calls, kRec, &should_close_mutex,
228+
&should_close,
229+
&should_close_cv](const Record& record) {
230230
++record_calls;
231231
EXPECT_EQ(record.Get<TradeMsg>(), kRec);
232+
if (record_calls == 1) { // close server
233+
const std::lock_guard<std::mutex> _lock{should_close_mutex};
234+
should_close = true;
235+
should_close_cv.notify_one();
236+
return KeepGoing::Continue;
237+
}
232238
return KeepGoing::Stop;
233239
};
234240
std::atomic<std::int32_t> exception_calls{};
@@ -239,20 +245,25 @@ TEST_F(LiveThreadedTests, TestExceptionCallbackAndReconnect) {
239245
EXPECT_NE(dynamic_cast<const databento::DbnResponseError*>(&exc), nullptr)
240246
<< "Unexpected exception type";
241247
target.Reconnect();
242-
target.Subscribe(kAllSymbols, kSchema, kSType);
248+
target.Resubscribe();
249+
EXPECT_EQ(target.Subscriptions().size(), 1);
250+
EXPECT_TRUE(std::holds_alternative<LiveSubscription::NoStart>(
251+
target.Subscriptions()[0].start));
243252
return LiveThreaded::ExceptionAction::Restart;
244253
} else {
245254
GTEST_NONFATAL_FAILURE_("Exception callback called more than expected");
246255
return LiveThreaded::ExceptionAction::Stop;
247256
}
248257
};
249258

250-
target.SubscribeWithSnapshot(kAllSymbols, kSchema, kSType);
259+
ASSERT_TRUE(target.Subscriptions().empty());
260+
target.Subscribe(kAllSymbols, kSchema, kSType, "0");
261+
ASSERT_EQ(target.Subscriptions().size(), 1);
251262
target.Start(metadata_cb, record_cb, exception_cb);
252263
target.BlockForStop();
253264
EXPECT_EQ(metadata_calls, 2);
254265
EXPECT_EQ(exception_calls, 1);
255-
EXPECT_EQ(record_calls, 1);
266+
EXPECT_EQ(record_calls, 2);
256267
}
257268

258269
TEST_F(LiveThreadedTests, TestDeadlockPrevention) {

0 commit comments

Comments
 (0)