Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a "tag" parameter to ConnectionId that can be used to differentiate #79

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions include/iomanager/IOManager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,18 +65,25 @@ class IOManager

template<typename Datatype>
std::shared_ptr<ReceiverConcept<Datatype>> get_receiver(std::string const& uid);
template<typename Datatype>
std::shared_ptr<ReceiverConcept<Datatype>> get_receiver(std::string const& uid, std::string const& tag);

template<typename Datatype>
void add_callback(ConnectionId const& id, std::function<void(Datatype&)> callback);

template<typename Datatype>
void add_callback(std::string const& uid, std::function<void(Datatype&)> callback);

template<typename Datatype>
void add_callback(std::string const& uid, std::string const& tag, std::function<void(Datatype&)> callback);

template<typename Datatype>
void remove_callback(ConnectionId const& id);

template<typename Datatype>
void remove_callback(std::string const& uid);
template<typename Datatype>
void remove_callback(std::string const& uid, std::string const& tag);

std::set<std::string> get_datatypes(std::string const& uid);

Expand Down
12 changes: 8 additions & 4 deletions include/iomanager/SchemaUtils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ operator<(ConnectionId const& l, ConnectionId const& r)
{
if (l.session == r.session || l.session == "" || r.session == "") {
if (l.data_type == r.data_type) {
if (l.uid == r.uid) {
return l.tag < r.tag;
}
return l.uid < r.uid;
}
return l.data_type < r.data_type;
Expand All @@ -55,7 +58,7 @@ operator<(ConnectionId const& l, ConnectionId const& r)
inline bool
operator==(ConnectionId const& l, ConnectionId const& r)
{
return (l.session == "" || r.session == "" || l.session == r.session) && l.uid == r.uid &&
return (l.session == "" || r.session == "" || l.session == r.session) && l.uid == r.uid && l.tag == r.tag &&
l.data_type == r.data_type;
}

Expand All @@ -76,9 +79,10 @@ inline std::string
to_string(const ConnectionId& conn_id)
{
if (conn_id.session != "") {
return conn_id.session + "/" + conn_id.uid + "@@" + conn_id.data_type;
return conn_id.session + "/" + conn_id.uid + (conn_id.tag != "" ? "+" + conn_id.tag : "") + "@@" +
conn_id.data_type;
}
return conn_id.uid + "@@" + conn_id.data_type;
return conn_id.uid + (conn_id.tag != "" ? "+" + conn_id.tag : "") + "@@" + conn_id.data_type;
}

} // namespace connection
Expand All @@ -96,7 +100,7 @@ struct hash<dunedaq::iomanager::connection::ConnectionId>
{
std::size_t operator()(const dunedaq::iomanager::connection::ConnectionId& conn_id) const
{
return std::hash<std::string>()(conn_id.session + conn_id.uid + conn_id.data_type);
return std::hash<std::string>()(conn_id.session + conn_id.uid + conn_id.tag + conn_id.data_type);
}
};

Expand Down
27 changes: 27 additions & 0 deletions include/iomanager/detail/IOManager.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,18 @@ IOManager::get_receiver(std::string const& uid)
id.session = m_session;
return get_receiver<Datatype>(id);
}
template<typename Datatype>
inline std::shared_ptr<ReceiverConcept<Datatype>>
IOManager::get_receiver(std::string const& uid, std::string const& tag)
{
auto data_type = datatype_to_string<Datatype>();
ConnectionId id;
id.uid = uid;
id.data_type = data_type;
id.session = m_session;
id.tag = tag;
return get_receiver<Datatype>(id);
}

template<typename Datatype>
inline std::shared_ptr<ReceiverConcept<Datatype>>
Expand Down Expand Up @@ -117,6 +129,14 @@ IOManager::add_callback(std::string const& uid, std::function<void(Datatype&)> c
receiver->add_callback(callback);
}

template<typename Datatype>
inline void
IOManager::add_callback(std::string const& uid, std::string const& tag, std::function<void(Datatype&)> callback)
{
auto receiver = get_receiver<Datatype>(uid, tag);
receiver->add_callback(callback);
}

template<typename Datatype>
inline void
IOManager::remove_callback(ConnectionId const& id)
Expand All @@ -132,6 +152,13 @@ IOManager::remove_callback(std::string const& uid)
auto receiver = get_receiver<Datatype>(uid);
receiver->remove_callback();
}
template<typename Datatype>
inline void
IOManager::remove_callback(std::string const& uid, std::string const& tag)
{
auto receiver = get_receiver<Datatype>(uid, tag);
receiver->remove_callback();
}

} // namespace iomanager

Expand Down
1 change: 1 addition & 0 deletions schema/iomanager/connection.jsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ local c = {
ConnectionId: s.record("ConnectionId",[
s.field("uid", self.uid, doc="Identifier for the Connection instance"),
s.field("data_type", self.datatype, doc="Name of the expected data type"),
s.field("tag", self.uid, default="", doc="Tag to identify this connection endpoint (used for subscribers)"),
s.field("session", self.uid, default="", doc="Name of the DAQ session this Connection lives in")
]),

Expand Down
32 changes: 32 additions & 0 deletions unittest/IOManager_test.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ struct ConfigurationTestFixture
pub2_id = ConnectionId{ "pub2", "data2_t" };
pub3_id = ConnectionId{ "pub3", "data3_t" };
sub1_id = ConnectionId{ "pub.*", "data2_t" };
sub1b_id = ConnectionId{ "pub.*", "data2_t", "b" };
sub2_id = ConnectionId{ "pub2", "data2_t" };
sub3_id = ConnectionId{ "pub.*", "data3_t" };

Expand Down Expand Up @@ -211,6 +212,7 @@ struct ConfigurationTestFixture
ConnectionId pub2_id;
ConnectionId pub3_id;
ConnectionId sub1_id;
ConnectionId sub1b_id;
ConnectionId sub2_id;
ConnectionId sub3_id;
};
Expand Down Expand Up @@ -312,6 +314,36 @@ BOOST_FIXTURE_TEST_CASE(SimplePubSub, ConfigurationTestFixture)
BOOST_CHECK_EQUAL(ret3.d1, 58);
}

BOOST_FIXTURE_TEST_CASE(MultipleReceiverPubSub, ConfigurationTestFixture)
{
auto pub1_sender = IOManager::get()->get_sender<Data2>(pub1_id);
auto sub1a_receiver = IOManager::get()->get_receiver<Data2>(sub1_id);
auto sub1b_receiver = IOManager::get()->get_receiver<Data2>(sub1b_id);

// Sub1 is subscribed to all data_t publishers, two instances should both get all messages
Data2 sent_t1(56, 26.5);
pub1_sender->send(std::move(sent_t1), dunedaq::iomanager::Sender::s_no_block);

auto ret1a = sub1a_receiver->receive(std::chrono::milliseconds(10));
auto ret1b = sub1b_receiver->receive(std::chrono::milliseconds(10));

BOOST_CHECK_EQUAL(ret1a.d1, 56);
BOOST_CHECK_EQUAL(ret1a.d2, 26.5);
BOOST_CHECK_EQUAL(ret1b.d1, 56);
BOOST_CHECK_EQUAL(ret1b.d2, 26.5);

Data2 sent_t2(57, 27.5);
pub1_sender->send(std::move(sent_t2), dunedaq::iomanager::Sender::s_no_block);

ret1a = sub1a_receiver->receive(std::chrono::milliseconds(10));
ret1b = sub1b_receiver->receive(std::chrono::milliseconds(10));

BOOST_CHECK_EQUAL(ret1a.d1, 57);
BOOST_CHECK_EQUAL(ret1a.d2, 27.5);
BOOST_CHECK_EQUAL(ret1b.d1, 57);
BOOST_CHECK_EQUAL(ret1b.d2, 27.5);
}

BOOST_FIXTURE_TEST_CASE(PubSubWithTopic, ConfigurationTestFixture)
{
auto pub1_sender = IOManager::get()->get_sender<Data2>(pub1_id);
Expand Down
Loading