Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into coverage-test
Browse files Browse the repository at this point in the history
  • Loading branch information
externl committed Dec 5, 2024
2 parents d2c2a00 + bd405a3 commit 677dc89
Show file tree
Hide file tree
Showing 12 changed files with 108 additions and 103 deletions.
26 changes: 13 additions & 13 deletions cpp/src/DataStorm/DataElementI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,15 @@ namespace
}

DataElementI::DataElementI(TopicI* parent, string name, int64_t id, const DataStorm::Config& config)
: _traceLevels(parent->getInstance()->getTraceLevels()),
: _traceLevels(parent->instance()->getTraceLevels()),
_name(std::move(name)),
_id(id),
_config(make_shared<ElementConfig>()),
_executor(parent->getInstance()->getCallbackExecutor()),
_executor(parent->instance()->getCallbackExecutor()),
_listenerCount(0),
// The collocated forwarder is initalized here to avoid using a nullable proxy. The forwarder is only used by
// the instance that owns it and is removed in destroy implementation.
_forwarder{parent->getInstance()->getCollocatedForwarder()->add<SessionPrx>(
_forwarder{parent->instance()->getCollocatedForwarder()->add<SessionPrx>(
[this](Ice::ByteSeq inParams, const Ice::Current& current) { forward(inParams, current); })},
_parent(parent->shared_from_this()),
_waiters(0),
Expand Down Expand Up @@ -90,7 +90,7 @@ DataElementI::destroy()
destroyImpl(); // Must be called first.
}
disconnect();
_parent->getInstance()->getCollocatedForwarder()->remove(_forwarder->ice_getIdentity());
_parent->instance()->getCollocatedForwarder()->remove(_forwarder->ice_getIdentity());
}

void
Expand Down Expand Up @@ -536,7 +536,7 @@ DataElementI::waitForListeners(int count) const
++_waiters;
while (true)
{
_parent->getInstance()->checkShutdown();
_parent->instance()->checkShutdown();
if (count < 0 && _listenerCount == 0)
{
--_waiters;
Expand All @@ -562,7 +562,7 @@ DataElementI::hasListeners() const
Ice::CommunicatorPtr
DataElementI::getCommunicator() const
{
return _parent->getInstance()->getCommunicator();
return _parent->instance()->getCommunicator();
}

bool
Expand Down Expand Up @@ -699,7 +699,7 @@ DataReaderI::waitForUnread(unsigned int count) const
lock,
[&]()
{
_parent->getInstance()->checkShutdown();
_parent->instance()->checkShutdown();
return _samples.size() >= count;
});
}
Expand All @@ -719,7 +719,7 @@ DataReaderI::getNextUnread()
lock,
[&]()
{
_parent->getInstance()->checkShutdown();
_parent->instance()->checkShutdown();
return !_samples.empty();
});
shared_ptr<Sample> sample = _samples.front();
Expand Down Expand Up @@ -767,11 +767,11 @@ DataReaderI::initSamples(
{
if (sample->event == DataStorm::SampleEvent::PartialUpdate)
{
_parent->getUpdater(sample->tag)(previous, sample, _parent->getInstance()->getCommunicator());
_parent->getUpdater(sample->tag)(previous, sample, _parent->instance()->getCommunicator());
}
else
{
sample->decode(_parent->getInstance()->getCommunicator());
sample->decode(_parent->instance()->getCommunicator());
}
}
previous = sample;
Expand Down Expand Up @@ -905,11 +905,11 @@ DataReaderI::queue(
{
if (sample->event == DataStorm::SampleEvent::PartialUpdate)
{
_parent->getUpdater(sample->tag)(_last, sample, _parent->getInstance()->getCommunicator());
_parent->getUpdater(sample->tag)(_last, sample, _parent->instance()->getCommunicator());
}
else
{
sample->decode(_parent->getInstance()->getCommunicator());
sample->decode(_parent->instance()->getCommunicator());
}
}
_lastSendTime = sample->timestamp;
Expand Down Expand Up @@ -1011,7 +1011,7 @@ DataWriterI::publish(const shared_ptr<Key>& key, const shared_ptr<Sample>& sampl
if (sample->event == DataStorm::SampleEvent::PartialUpdate)
{
assert(!sample->hasValue());
_parent->getUpdater(sample->tag)(_last, sample, _parent->getInstance()->getCommunicator());
_parent->getUpdater(sample->tag)(_last, sample, _parent->instance()->getCommunicator());
}

sample->id = ++_parent->_nextSampleId;
Expand Down
61 changes: 27 additions & 34 deletions cpp/src/DataStorm/NodeI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ void
NodeI::init()
{
auto self = shared_from_this();
auto instance = getInstance();
auto instance = _instance.lock();
assert(instance);

auto adapter = instance->getObjectAdapter();
adapter->add<NodePrx>(self, _proxy->ice_getIdentity());
Expand All @@ -88,13 +89,10 @@ NodeI::destroy(bool ownsCommunicator)
{
unique_lock<mutex> lock(_mutex);

auto instance = getInstance();
auto instance = _instance.lock();
assert(instance);
if (instance)
{
instance->getCollocatedForwarder()->remove(_subscriberForwarder->ice_getIdentity());
instance->getCollocatedForwarder()->remove(_publisherForwarder->ice_getIdentity());
}
instance->getCollocatedForwarder()->remove(_subscriberForwarder->ice_getIdentity());
instance->getCollocatedForwarder()->remove(_publisherForwarder->ice_getIdentity());

if (!ownsCommunicator)
{
Expand Down Expand Up @@ -154,11 +152,7 @@ NodeI::createSession(
Ice::checkNotNull(subscriberSession, __FILE__, __LINE__, current);

auto instance = _instance.lock();
if (!instance)
{
// Ignore the Node is being destroyed.
return;
}
assert(instance);

shared_ptr<PublisherSessionI> session;
try
Expand Down Expand Up @@ -195,7 +189,7 @@ NodeI::createSession(
{
if (!connection->getAdapter())
{
connection->setAdapter(self->getInstance()->getObjectAdapter());
connection->setAdapter(instance->getObjectAdapter());
}
subscriberSession = subscriberSession->ice_fixed(connection);
}
Expand All @@ -212,10 +206,7 @@ NodeI::createSession(
assert(!s->ice_getCachedConnection() || s->ice_getCachedConnection() == connection);

// Session::connected informs the subscriber session of all the topic writers in the current node.
session->connected(
*subscriberSession,
connection,
self->getInstance()->getTopicFactory()->getTopicWriters());
session->connected(*subscriberSession, connection, instance->getTopicFactory()->getTopicWriters());
}
catch (const Ice::LocalException&)
{
Expand Down Expand Up @@ -258,8 +249,11 @@ NodeI::confirmCreateSession(
publisherSession = publisherSession->ice_fixed(current.con);
}

auto instance = _instance.lock();
assert(instance);

// Session::connected informs the publisher session of all the topic readers in the current node.
session->connected(*publisherSession, current.con, getInstance()->getTopicFactory()->getTopicReaders());
session->connected(*publisherSession, current.con, instance->getTopicFactory()->getTopicReaders());
}

void
Expand All @@ -269,11 +263,7 @@ NodeI::createSubscriberSession(
const shared_ptr<PublisherSessionI>& session)
{
auto instance = _instance.lock();
if (!instance)
{
// Ignore the Node is being shutdown.
return;
}
assert(instance);

try
{
Expand All @@ -284,14 +274,15 @@ NodeI::createSubscriberSession(
{
if (connection && !connection->getAdapter())
{
connection->setAdapter(self->getInstance()->getObjectAdapter());
connection->setAdapter(instance->getObjectAdapter());
}
subscriber->initiateCreateSessionAsync(
self->_proxy,
nullptr,
[=](auto ex) { self->removePublisherSession(subscriber, session, ex); });
},
[=, self = shared_from_this()](auto ex) { self->removePublisherSession(subscriber, session, ex); });
[subscriber, session, self = shared_from_this()](auto ex)
{ self->removePublisherSession(subscriber, session, ex); });
}
catch (const Ice::LocalException&)
{
Expand All @@ -306,11 +297,7 @@ NodeI::createPublisherSession(
shared_ptr<SubscriberSessionI> session)
{
auto instance = _instance.lock();
if (!instance)
{
// Ignore the Node is being shutdown.
return;
}
assert(instance);

try
{
Expand All @@ -336,7 +323,7 @@ NodeI::createPublisherSession(

if (connection && !connection->getAdapter())
{
connection->setAdapter(self->getInstance()->getObjectAdapter());
connection->setAdapter(instance->getObjectAdapter());
}

try
Expand All @@ -353,7 +340,7 @@ NodeI::createPublisherSession(
self->removeSubscriberSession(publisher, session, current_exception());
}
},
[=, self = shared_from_this()](exception_ptr ex)
[publisher, session, self = shared_from_this()](exception_ptr ex)
{ self->removeSubscriberSession(publisher, session, ex); });
}
catch (const Ice::LocalException&)
Expand Down Expand Up @@ -443,6 +430,8 @@ shared_ptr<SubscriberSessionI>
NodeI::createSubscriberSessionServant(NodePrx node)
{
// Called with mutex locked
auto instance = _instance.lock();
assert(instance);
auto p = _subscribers.find(node->ice_getIdentity());
if (p != _subscribers.end())
{
Expand All @@ -455,9 +444,10 @@ NodeI::createSubscriberSessionServant(NodePrx node)
{
int64_t id = ++_nextSubscriberSessionId;
auto session = make_shared<SubscriberSessionI>(
instance,
shared_from_this(),
node,
getInstance()->getObjectAdapter()->createProxy<SessionPrx>({to_string(id), "s"})->ice_oneway());
instance->getObjectAdapter()->createProxy<SessionPrx>({to_string(id), "s"})->ice_oneway());
session->init();
_subscribers.emplace(node->ice_getIdentity(), session);
_subscriberSessions.emplace(session->getProxy()->ice_getIdentity(), session);
Expand All @@ -474,6 +464,8 @@ shared_ptr<PublisherSessionI>
NodeI::createPublisherSessionServant(NodePrx node)
{
// Called with mutex locked
auto instance = _instance.lock();
assert(instance);
auto p = _publishers.find(node->ice_getIdentity());
if (p != _publishers.end())
{
Expand All @@ -486,9 +478,10 @@ NodeI::createPublisherSessionServant(NodePrx node)
{
int64_t id = ++_nextPublisherSessionId;
auto session = make_shared<PublisherSessionI>(
instance,
shared_from_this(),
node,
getInstance()->getObjectAdapter()->createProxy<SessionPrx>({to_string(id), "p"})->ice_oneway());
instance->getObjectAdapter()->createProxy<SessionPrx>({to_string(id), "p"})->ice_oneway());
session->init();
_publishers.emplace(node->ice_getIdentity(), session);
_publisherSessions.emplace(session->getProxy()->ice_getIdentity(), session);
Expand Down
7 changes: 0 additions & 7 deletions cpp/src/DataStorm/NodeI.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,6 @@ namespace DataStormI

DataStormContract::NodePrx getProxy() const { return _proxy; }

std::shared_ptr<Instance> getInstance() const
{
auto instance = _instance.lock();
assert(instance);
return instance;
}

DataStormContract::PublisherSessionPrx getPublisherForwarder() const { return _publisherForwarder; }

DataStormContract::SubscriberSessionPrx getSubscriberForwarder() const { return _subscriberForwarder; }
Expand Down
6 changes: 4 additions & 2 deletions cpp/src/DataStorm/NodeSessionManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ NodeSessionManager::NodeSessionManager(const shared_ptr<Instance>& instance, con
void
NodeSessionManager::init()
{
auto instance = getInstance();
auto instance = _instance.lock();
assert(instance);
auto sessionForwader = make_shared<SessionForwarder>(shared_from_this());
instance->getObjectAdapter()->addDefaultServant(sessionForwader, "sf");
instance->getObjectAdapter()->addDefaultServant(sessionForwader, "pf");
Expand Down Expand Up @@ -100,7 +101,8 @@ NodeSessionManager::createOrGet(NodePrx node, const Ice::ConnectionPtr& connecti
}
}

auto instance = getInstance();
auto instance = _instance.lock();
assert(instance);

if (!connection->getAdapter())
{
Expand Down
7 changes: 0 additions & 7 deletions cpp/src/DataStorm/NodeSessionManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,6 @@ namespace DataStormI

void destroySession(DataStormContract::NodePrx);

std::shared_ptr<Instance> getInstance() const
{
auto instance = _instance.lock();
assert(instance);
return instance;
}

std::weak_ptr<Instance> _instance;
const std::shared_ptr<TraceLevels> _traceLevels;
DataStormContract::NodePrx _nodePrx;
Expand Down
22 changes: 15 additions & 7 deletions cpp/src/DataStorm/SessionI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ namespace
};
}

SessionI::SessionI(const std::shared_ptr<NodeI>& parent, NodePrx node, SessionPrx proxy)
: _instance(parent->getInstance()),
SessionI::SessionI(shared_ptr<Instance> instance, shared_ptr<NodeI> parent, NodePrx node, SessionPrx proxy)
: _instance(std::move(instance)),
_traceLevels(_instance->getTraceLevels()),
_parent(parent),
_parent(std::move(parent)),
_proxy(std::move(proxy)),
_node(std::move(node)),
_destroyed(false),
Expand Down Expand Up @@ -1192,8 +1192,12 @@ SessionI::runWithTopic(int64_t id, TopicI* topic, function<void(TopicSubscriber&
}
}

SubscriberSessionI::SubscriberSessionI(const std::shared_ptr<NodeI>& parent, NodePrx node, SessionPrx proxy)
: SessionI(parent, std::move(node), std::move(proxy))
SubscriberSessionI::SubscriberSessionI(
shared_ptr<Instance> instance,
std::shared_ptr<NodeI> parent,
NodePrx node,
SessionPrx proxy)
: SessionI(std::move(instance), std::move(parent), std::move(node), std::move(proxy))
{
}

Expand Down Expand Up @@ -1319,8 +1323,12 @@ SubscriberSessionI::remove()
_parent->removeSubscriberSession(getNode(), dynamic_pointer_cast<SubscriberSessionI>(shared_from_this()), nullptr);
}

PublisherSessionI::PublisherSessionI(const std::shared_ptr<NodeI>& parent, NodePrx node, SessionPrx proxy)
: SessionI(parent, std::move(node), std::move(proxy))
PublisherSessionI::PublisherSessionI(
shared_ptr<Instance> instance,
std::shared_ptr<NodeI> parent,
NodePrx node,
SessionPrx proxy)
: SessionI(std::move(instance), std::move(parent), std::move(node), std::move(proxy))
{
}

Expand Down
Loading

0 comments on commit 677dc89

Please sign in to comment.