From a6f325bf6b870eb1ec05adca717ca2ddaf14ae5d Mon Sep 17 00:00:00 2001 From: Eric Flumerfelt Date: Wed, 22 May 2024 14:32:07 -0500 Subject: [PATCH] Add a "tag" parameter to ConnectionId that can be used to differentiate two different subscribers within a single app, so that they both see all messages on the channel. --- include/iomanager/IOManager.hpp | 7 ++++++ include/iomanager/SchemaUtils.hpp | 12 ++++++---- include/iomanager/detail/IOManager.hxx | 27 ++++++++++++++++++++++ schema/iomanager/connection.jsonnet | 1 + unittest/IOManager_test.cxx | 32 ++++++++++++++++++++++++++ 5 files changed, 75 insertions(+), 4 deletions(-) diff --git a/include/iomanager/IOManager.hpp b/include/iomanager/IOManager.hpp index f0f6d87..32bf5c1 100644 --- a/include/iomanager/IOManager.hpp +++ b/include/iomanager/IOManager.hpp @@ -65,6 +65,8 @@ class IOManager template std::shared_ptr> get_receiver(std::string const& uid); + template + std::shared_ptr> get_receiver(std::string const& uid, std::string const& tag); template void add_callback(ConnectionId const& id, std::function callback); @@ -72,11 +74,16 @@ class IOManager template void add_callback(std::string const& uid, std::function callback); + template + void add_callback(std::string const& uid, std::string const& tag, std::function callback); + template void remove_callback(ConnectionId const& id); template void remove_callback(std::string const& uid); + template + void remove_callback(std::string const& uid, std::string const& tag); std::set get_datatypes(std::string const& uid); diff --git a/include/iomanager/SchemaUtils.hpp b/include/iomanager/SchemaUtils.hpp index bd12ddb..cbb3d6d 100644 --- a/include/iomanager/SchemaUtils.hpp +++ b/include/iomanager/SchemaUtils.hpp @@ -46,6 +46,9 @@ operator<(ConnectionId const& l, ConnectionId const& r) { if (l.session == r.session || l.session == "" || r.session == "") { if (l.data_type == r.data_type) { + if (l.uid == r.uid) { + return l.tag < r.tag; + } return l.uid < r.uid; } return l.data_type < r.data_type; @@ -55,7 +58,7 @@ operator<(ConnectionId const& l, ConnectionId const& r) inline bool operator==(ConnectionId const& l, ConnectionId const& r) { - return (l.session == "" || r.session == "" || l.session == r.session) && l.uid == r.uid && + return (l.session == "" || r.session == "" || l.session == r.session) && l.uid == r.uid && l.tag == r.tag && l.data_type == r.data_type; } @@ -76,9 +79,10 @@ inline std::string to_string(const ConnectionId& conn_id) { if (conn_id.session != "") { - return conn_id.session + "/" + conn_id.uid + "@@" + conn_id.data_type; + return conn_id.session + "/" + conn_id.uid + (conn_id.tag != "" ? "+" + conn_id.tag : "") + "@@" + + conn_id.data_type; } - return conn_id.uid + "@@" + conn_id.data_type; + return conn_id.uid + (conn_id.tag != "" ? "+" + conn_id.tag : "") + "@@" + conn_id.data_type; } } // namespace connection @@ -96,7 +100,7 @@ struct hash { std::size_t operator()(const dunedaq::iomanager::connection::ConnectionId& conn_id) const { - return std::hash()(conn_id.session + conn_id.uid + conn_id.data_type); + return std::hash()(conn_id.session + conn_id.uid + conn_id.tag + conn_id.data_type); } }; diff --git a/include/iomanager/detail/IOManager.hxx b/include/iomanager/detail/IOManager.hxx index 493a06d..05abc10 100644 --- a/include/iomanager/detail/IOManager.hxx +++ b/include/iomanager/detail/IOManager.hxx @@ -40,6 +40,18 @@ IOManager::get_receiver(std::string const& uid) id.session = m_session; return get_receiver(id); } +template +inline std::shared_ptr> +IOManager::get_receiver(std::string const& uid, std::string const& tag) +{ + auto data_type = datatype_to_string(); + ConnectionId id; + id.uid = uid; + id.data_type = data_type; + id.session = m_session; + id.tag = tag; + return get_receiver(id); +} template inline std::shared_ptr> @@ -117,6 +129,14 @@ IOManager::add_callback(std::string const& uid, std::function c receiver->add_callback(callback); } +template +inline void +IOManager::add_callback(std::string const& uid, std::string const& tag, std::function callback) +{ + auto receiver = get_receiver(uid, tag); + receiver->add_callback(callback); +} + template inline void IOManager::remove_callback(ConnectionId const& id) @@ -132,6 +152,13 @@ IOManager::remove_callback(std::string const& uid) auto receiver = get_receiver(uid); receiver->remove_callback(); } +template +inline void +IOManager::remove_callback(std::string const& uid, std::string const& tag) +{ + auto receiver = get_receiver(uid, tag); + receiver->remove_callback(); +} } // namespace iomanager diff --git a/schema/iomanager/connection.jsonnet b/schema/iomanager/connection.jsonnet index 99e5e8d..743d69b 100755 --- a/schema/iomanager/connection.jsonnet +++ b/schema/iomanager/connection.jsonnet @@ -19,6 +19,7 @@ local c = { ConnectionId: s.record("ConnectionId",[ s.field("uid", self.uid, doc="Identifier for the Connection instance"), s.field("data_type", self.datatype, doc="Name of the expected data type"), + s.field("tag", self.uid, default="", doc="Tag to identify this connection endpoint (used for subscribers)"), s.field("session", self.uid, default="", doc="Name of the DAQ session this Connection lives in") ]), diff --git a/unittest/IOManager_test.cxx b/unittest/IOManager_test.cxx index dddbedb..1b2287b 100755 --- a/unittest/IOManager_test.cxx +++ b/unittest/IOManager_test.cxx @@ -181,6 +181,7 @@ struct ConfigurationTestFixture pub2_id = ConnectionId{ "pub2", "data2_t" }; pub3_id = ConnectionId{ "pub3", "data3_t" }; sub1_id = ConnectionId{ "pub.*", "data2_t" }; + sub1b_id = ConnectionId{ "pub.*", "data2_t", "b" }; sub2_id = ConnectionId{ "pub2", "data2_t" }; sub3_id = ConnectionId{ "pub.*", "data3_t" }; @@ -211,6 +212,7 @@ struct ConfigurationTestFixture ConnectionId pub2_id; ConnectionId pub3_id; ConnectionId sub1_id; + ConnectionId sub1b_id; ConnectionId sub2_id; ConnectionId sub3_id; }; @@ -312,6 +314,36 @@ BOOST_FIXTURE_TEST_CASE(SimplePubSub, ConfigurationTestFixture) BOOST_CHECK_EQUAL(ret3.d1, 58); } +BOOST_FIXTURE_TEST_CASE(MultipleReceiverPubSub, ConfigurationTestFixture) +{ + auto pub1_sender = IOManager::get()->get_sender(pub1_id); + auto sub1a_receiver = IOManager::get()->get_receiver(sub1_id); + auto sub1b_receiver = IOManager::get()->get_receiver(sub1b_id); + + // Sub1 is subscribed to all data_t publishers, two instances should both get all messages + Data2 sent_t1(56, 26.5); + pub1_sender->send(std::move(sent_t1), dunedaq::iomanager::Sender::s_no_block); + + auto ret1a = sub1a_receiver->receive(std::chrono::milliseconds(10)); + auto ret1b = sub1b_receiver->receive(std::chrono::milliseconds(10)); + + BOOST_CHECK_EQUAL(ret1a.d1, 56); + BOOST_CHECK_EQUAL(ret1a.d2, 26.5); + BOOST_CHECK_EQUAL(ret1b.d1, 56); + BOOST_CHECK_EQUAL(ret1b.d2, 26.5); + + Data2 sent_t2(57, 27.5); + pub1_sender->send(std::move(sent_t2), dunedaq::iomanager::Sender::s_no_block); + + ret1a = sub1a_receiver->receive(std::chrono::milliseconds(10)); + ret1b = sub1b_receiver->receive(std::chrono::milliseconds(10)); + + BOOST_CHECK_EQUAL(ret1a.d1, 57); + BOOST_CHECK_EQUAL(ret1a.d2, 27.5); + BOOST_CHECK_EQUAL(ret1b.d1, 57); + BOOST_CHECK_EQUAL(ret1b.d2, 27.5); +} + BOOST_FIXTURE_TEST_CASE(PubSubWithTopic, ConfigurationTestFixture) { auto pub1_sender = IOManager::get()->get_sender(pub1_id);