diff --git a/cpp/src/DataStorm/DataElementI.cpp b/cpp/src/DataStorm/DataElementI.cpp index 5219817ab22..79f722c650a 100644 --- a/cpp/src/DataStorm/DataElementI.cpp +++ b/cpp/src/DataStorm/DataElementI.cpp @@ -46,15 +46,15 @@ namespace } DataElementI::DataElementI(TopicI* parent, string name, int64_t id, const DataStorm::Config& config) - : _traceLevels(parent->getInstance()->getTraceLevels()), + : _traceLevels(parent->instance()->getTraceLevels()), _name(std::move(name)), _id(id), _config(make_shared()), - _executor(parent->getInstance()->getCallbackExecutor()), + _executor(parent->instance()->getCallbackExecutor()), _listenerCount(0), // The collocated forwarder is initalized here to avoid using a nullable proxy. The forwarder is only used by // the instance that owns it and is removed in destroy implementation. - _forwarder{parent->getInstance()->getCollocatedForwarder()->add( + _forwarder{parent->instance()->getCollocatedForwarder()->add( [this](Ice::ByteSeq inParams, const Ice::Current& current) { forward(inParams, current); })}, _parent(parent->shared_from_this()), _waiters(0), @@ -90,7 +90,7 @@ DataElementI::destroy() destroyImpl(); // Must be called first. } disconnect(); - _parent->getInstance()->getCollocatedForwarder()->remove(_forwarder->ice_getIdentity()); + _parent->instance()->getCollocatedForwarder()->remove(_forwarder->ice_getIdentity()); } void @@ -536,7 +536,7 @@ DataElementI::waitForListeners(int count) const ++_waiters; while (true) { - _parent->getInstance()->checkShutdown(); + _parent->instance()->checkShutdown(); if (count < 0 && _listenerCount == 0) { --_waiters; @@ -562,7 +562,7 @@ DataElementI::hasListeners() const Ice::CommunicatorPtr DataElementI::getCommunicator() const { - return _parent->getInstance()->getCommunicator(); + return _parent->instance()->getCommunicator(); } bool @@ -699,7 +699,7 @@ DataReaderI::waitForUnread(unsigned int count) const lock, [&]() { - _parent->getInstance()->checkShutdown(); + _parent->instance()->checkShutdown(); return _samples.size() >= count; }); } @@ -719,7 +719,7 @@ DataReaderI::getNextUnread() lock, [&]() { - _parent->getInstance()->checkShutdown(); + _parent->instance()->checkShutdown(); return !_samples.empty(); }); shared_ptr sample = _samples.front(); @@ -767,11 +767,11 @@ DataReaderI::initSamples( { if (sample->event == DataStorm::SampleEvent::PartialUpdate) { - _parent->getUpdater(sample->tag)(previous, sample, _parent->getInstance()->getCommunicator()); + _parent->getUpdater(sample->tag)(previous, sample, _parent->instance()->getCommunicator()); } else { - sample->decode(_parent->getInstance()->getCommunicator()); + sample->decode(_parent->instance()->getCommunicator()); } } previous = sample; @@ -905,11 +905,11 @@ DataReaderI::queue( { if (sample->event == DataStorm::SampleEvent::PartialUpdate) { - _parent->getUpdater(sample->tag)(_last, sample, _parent->getInstance()->getCommunicator()); + _parent->getUpdater(sample->tag)(_last, sample, _parent->instance()->getCommunicator()); } else { - sample->decode(_parent->getInstance()->getCommunicator()); + sample->decode(_parent->instance()->getCommunicator()); } } _lastSendTime = sample->timestamp; @@ -1011,7 +1011,7 @@ DataWriterI::publish(const shared_ptr& key, const shared_ptr& sampl if (sample->event == DataStorm::SampleEvent::PartialUpdate) { assert(!sample->hasValue()); - _parent->getUpdater(sample->tag)(_last, sample, _parent->getInstance()->getCommunicator()); + _parent->getUpdater(sample->tag)(_last, sample, _parent->instance()->getCommunicator()); } sample->id = ++_parent->_nextSampleId; diff --git a/cpp/src/DataStorm/NodeI.cpp b/cpp/src/DataStorm/NodeI.cpp index 0a9c7211c50..09dd2d1a8d8 100644 --- a/cpp/src/DataStorm/NodeI.cpp +++ b/cpp/src/DataStorm/NodeI.cpp @@ -71,7 +71,8 @@ void NodeI::init() { auto self = shared_from_this(); - auto instance = getInstance(); + auto instance = _instance.lock(); + assert(instance); auto adapter = instance->getObjectAdapter(); adapter->add(self, _proxy->ice_getIdentity()); @@ -88,13 +89,10 @@ NodeI::destroy(bool ownsCommunicator) { unique_lock lock(_mutex); - auto instance = getInstance(); + auto instance = _instance.lock(); assert(instance); - if (instance) - { - instance->getCollocatedForwarder()->remove(_subscriberForwarder->ice_getIdentity()); - instance->getCollocatedForwarder()->remove(_publisherForwarder->ice_getIdentity()); - } + instance->getCollocatedForwarder()->remove(_subscriberForwarder->ice_getIdentity()); + instance->getCollocatedForwarder()->remove(_publisherForwarder->ice_getIdentity()); if (!ownsCommunicator) { @@ -154,11 +152,7 @@ NodeI::createSession( Ice::checkNotNull(subscriberSession, __FILE__, __LINE__, current); auto instance = _instance.lock(); - if (!instance) - { - // Ignore the Node is being destroyed. - return; - } + assert(instance); shared_ptr session; try @@ -195,7 +189,7 @@ NodeI::createSession( { if (!connection->getAdapter()) { - connection->setAdapter(self->getInstance()->getObjectAdapter()); + connection->setAdapter(instance->getObjectAdapter()); } subscriberSession = subscriberSession->ice_fixed(connection); } @@ -212,10 +206,7 @@ NodeI::createSession( assert(!s->ice_getCachedConnection() || s->ice_getCachedConnection() == connection); // Session::connected informs the subscriber session of all the topic writers in the current node. - session->connected( - *subscriberSession, - connection, - self->getInstance()->getTopicFactory()->getTopicWriters()); + session->connected(*subscriberSession, connection, instance->getTopicFactory()->getTopicWriters()); } catch (const Ice::LocalException&) { @@ -258,8 +249,11 @@ NodeI::confirmCreateSession( publisherSession = publisherSession->ice_fixed(current.con); } + auto instance = _instance.lock(); + assert(instance); + // Session::connected informs the publisher session of all the topic readers in the current node. - session->connected(*publisherSession, current.con, getInstance()->getTopicFactory()->getTopicReaders()); + session->connected(*publisherSession, current.con, instance->getTopicFactory()->getTopicReaders()); } void @@ -269,11 +263,7 @@ NodeI::createSubscriberSession( const shared_ptr& session) { auto instance = _instance.lock(); - if (!instance) - { - // Ignore the Node is being shutdown. - return; - } + assert(instance); try { @@ -284,14 +274,15 @@ NodeI::createSubscriberSession( { if (connection && !connection->getAdapter()) { - connection->setAdapter(self->getInstance()->getObjectAdapter()); + connection->setAdapter(instance->getObjectAdapter()); } subscriber->initiateCreateSessionAsync( self->_proxy, nullptr, [=](auto ex) { self->removePublisherSession(subscriber, session, ex); }); }, - [=, self = shared_from_this()](auto ex) { self->removePublisherSession(subscriber, session, ex); }); + [subscriber, session, self = shared_from_this()](auto ex) + { self->removePublisherSession(subscriber, session, ex); }); } catch (const Ice::LocalException&) { @@ -306,11 +297,7 @@ NodeI::createPublisherSession( shared_ptr session) { auto instance = _instance.lock(); - if (!instance) - { - // Ignore the Node is being shutdown. - return; - } + assert(instance); try { @@ -336,7 +323,7 @@ NodeI::createPublisherSession( if (connection && !connection->getAdapter()) { - connection->setAdapter(self->getInstance()->getObjectAdapter()); + connection->setAdapter(instance->getObjectAdapter()); } try @@ -353,7 +340,7 @@ NodeI::createPublisherSession( self->removeSubscriberSession(publisher, session, current_exception()); } }, - [=, self = shared_from_this()](exception_ptr ex) + [publisher, session, self = shared_from_this()](exception_ptr ex) { self->removeSubscriberSession(publisher, session, ex); }); } catch (const Ice::LocalException&) @@ -443,6 +430,8 @@ shared_ptr NodeI::createSubscriberSessionServant(NodePrx node) { // Called with mutex locked + auto instance = _instance.lock(); + assert(instance); auto p = _subscribers.find(node->ice_getIdentity()); if (p != _subscribers.end()) { @@ -455,9 +444,10 @@ NodeI::createSubscriberSessionServant(NodePrx node) { int64_t id = ++_nextSubscriberSessionId; auto session = make_shared( + instance, shared_from_this(), node, - getInstance()->getObjectAdapter()->createProxy({to_string(id), "s"})->ice_oneway()); + instance->getObjectAdapter()->createProxy({to_string(id), "s"})->ice_oneway()); session->init(); _subscribers.emplace(node->ice_getIdentity(), session); _subscriberSessions.emplace(session->getProxy()->ice_getIdentity(), session); @@ -474,6 +464,8 @@ shared_ptr NodeI::createPublisherSessionServant(NodePrx node) { // Called with mutex locked + auto instance = _instance.lock(); + assert(instance); auto p = _publishers.find(node->ice_getIdentity()); if (p != _publishers.end()) { @@ -486,9 +478,10 @@ NodeI::createPublisherSessionServant(NodePrx node) { int64_t id = ++_nextPublisherSessionId; auto session = make_shared( + instance, shared_from_this(), node, - getInstance()->getObjectAdapter()->createProxy({to_string(id), "p"})->ice_oneway()); + instance->getObjectAdapter()->createProxy({to_string(id), "p"})->ice_oneway()); session->init(); _publishers.emplace(node->ice_getIdentity(), session); _publisherSessions.emplace(session->getProxy()->ice_getIdentity(), session); diff --git a/cpp/src/DataStorm/NodeI.h b/cpp/src/DataStorm/NodeI.h index 2be7af2528c..d1fbed52eeb 100644 --- a/cpp/src/DataStorm/NodeI.h +++ b/cpp/src/DataStorm/NodeI.h @@ -73,13 +73,6 @@ namespace DataStormI DataStormContract::NodePrx getProxy() const { return _proxy; } - std::shared_ptr getInstance() const - { - auto instance = _instance.lock(); - assert(instance); - return instance; - } - DataStormContract::PublisherSessionPrx getPublisherForwarder() const { return _publisherForwarder; } DataStormContract::SubscriberSessionPrx getSubscriberForwarder() const { return _subscriberForwarder; } diff --git a/cpp/src/DataStorm/NodeSessionManager.cpp b/cpp/src/DataStorm/NodeSessionManager.cpp index 880aa66f3e8..42529d1a56c 100644 --- a/cpp/src/DataStorm/NodeSessionManager.cpp +++ b/cpp/src/DataStorm/NodeSessionManager.cpp @@ -68,7 +68,8 @@ NodeSessionManager::NodeSessionManager(const shared_ptr& instance, con void NodeSessionManager::init() { - auto instance = getInstance(); + auto instance = _instance.lock(); + assert(instance); auto sessionForwader = make_shared(shared_from_this()); instance->getObjectAdapter()->addDefaultServant(sessionForwader, "sf"); instance->getObjectAdapter()->addDefaultServant(sessionForwader, "pf"); @@ -100,7 +101,8 @@ NodeSessionManager::createOrGet(NodePrx node, const Ice::ConnectionPtr& connecti } } - auto instance = getInstance(); + auto instance = _instance.lock(); + assert(instance); if (!connection->getAdapter()) { diff --git a/cpp/src/DataStorm/NodeSessionManager.h b/cpp/src/DataStorm/NodeSessionManager.h index 95c41b71666..18a6824496a 100644 --- a/cpp/src/DataStorm/NodeSessionManager.h +++ b/cpp/src/DataStorm/NodeSessionManager.h @@ -51,13 +51,6 @@ namespace DataStormI void destroySession(DataStormContract::NodePrx); - std::shared_ptr getInstance() const - { - auto instance = _instance.lock(); - assert(instance); - return instance; - } - std::weak_ptr _instance; const std::shared_ptr _traceLevels; DataStormContract::NodePrx _nodePrx; diff --git a/cpp/src/DataStorm/SessionI.cpp b/cpp/src/DataStorm/SessionI.cpp index 9f3378b5904..7adac8ea1a5 100644 --- a/cpp/src/DataStorm/SessionI.cpp +++ b/cpp/src/DataStorm/SessionI.cpp @@ -38,10 +38,10 @@ namespace }; } -SessionI::SessionI(const std::shared_ptr& parent, NodePrx node, SessionPrx proxy) - : _instance(parent->getInstance()), +SessionI::SessionI(shared_ptr instance, shared_ptr parent, NodePrx node, SessionPrx proxy) + : _instance(std::move(instance)), _traceLevels(_instance->getTraceLevels()), - _parent(parent), + _parent(std::move(parent)), _proxy(std::move(proxy)), _node(std::move(node)), _destroyed(false), @@ -1192,8 +1192,12 @@ SessionI::runWithTopic(int64_t id, TopicI* topic, function& parent, NodePrx node, SessionPrx proxy) - : SessionI(parent, std::move(node), std::move(proxy)) +SubscriberSessionI::SubscriberSessionI( + shared_ptr instance, + std::shared_ptr parent, + NodePrx node, + SessionPrx proxy) + : SessionI(std::move(instance), std::move(parent), std::move(node), std::move(proxy)) { } @@ -1319,8 +1323,12 @@ SubscriberSessionI::remove() _parent->removeSubscriberSession(getNode(), dynamic_pointer_cast(shared_from_this()), nullptr); } -PublisherSessionI::PublisherSessionI(const std::shared_ptr& parent, NodePrx node, SessionPrx proxy) - : SessionI(parent, std::move(node), std::move(proxy)) +PublisherSessionI::PublisherSessionI( + shared_ptr instance, + std::shared_ptr parent, + NodePrx node, + SessionPrx proxy) + : SessionI(std::move(instance), std::move(parent), std::move(node), std::move(proxy)) { } diff --git a/cpp/src/DataStorm/SessionI.h b/cpp/src/DataStorm/SessionI.h index 58c30ce2c57..52a86ffbd7d 100644 --- a/cpp/src/DataStorm/SessionI.h +++ b/cpp/src/DataStorm/SessionI.h @@ -250,7 +250,11 @@ namespace DataStormI }; public: - SessionI(const std::shared_ptr&, DataStormContract::NodePrx, DataStormContract::SessionPrx); + SessionI( + std::shared_ptr, + std::shared_ptr, + DataStormContract::NodePrx, + DataStormContract::SessionPrx); void init(); void announceTopics(DataStormContract::TopicInfoSeq, bool, const Ice::Current&) final; @@ -406,7 +410,11 @@ namespace DataStormI class SubscriberSessionI : public SessionI, public DataStormContract::SubscriberSession { public: - SubscriberSessionI(const std::shared_ptr&, DataStormContract::NodePrx, DataStormContract::SessionPrx); + SubscriberSessionI( + std::shared_ptr, + std::shared_ptr, + DataStormContract::NodePrx, + DataStormContract::SessionPrx); void s(std::int64_t, std::int64_t, DataStormContract::DataSample, const Ice::Current&) final; @@ -419,7 +427,11 @@ namespace DataStormI class PublisherSessionI : public SessionI, public DataStormContract::PublisherSession { public: - PublisherSessionI(const std::shared_ptr&, DataStormContract::NodePrx, DataStormContract::SessionPrx); + PublisherSessionI( + std::shared_ptr, + std::shared_ptr, + DataStormContract::NodePrx, + DataStormContract::SessionPrx); private: std::vector> getTopics(const std::string&) const final; diff --git a/cpp/src/DataStorm/TopicFactoryI.cpp b/cpp/src/DataStorm/TopicFactoryI.cpp index 84622c0eb0f..c10d79a67b3 100644 --- a/cpp/src/DataStorm/TopicFactoryI.cpp +++ b/cpp/src/DataStorm/TopicFactoryI.cpp @@ -29,11 +29,13 @@ TopicFactoryI::createTopicReader( shared_ptr sampleFilterFactories) { shared_ptr reader; - auto instance = getInstance(); + auto instance = _instance.lock(); + assert(instance); bool hasWriters; { lock_guard lock(_mutex); reader = make_shared( + instance, shared_from_this(), std::move(keyFactory), std::move(tagFactory), @@ -82,11 +84,13 @@ TopicFactoryI::createTopicWriter( shared_ptr sampleFilterFactories) { shared_ptr writer; - auto instance = getInstance(); + auto instance = _instance.lock(); + assert(instance); bool hasReaders; { lock_guard lock(_mutex); writer = make_shared( + instance, shared_from_this(), std::move(keyFactory), std::move(tagFactory), @@ -131,7 +135,8 @@ void TopicFactoryI::removeTopicReader(const string& name, const shared_ptr& reader) { lock_guard lock(_mutex); - auto instance = getInstance(); + auto instance = _instance.lock(); + assert(instance); if (instance->getTraceLevels()->topic > 0) { Trace out(instance->getTraceLevels(), instance->getTraceLevels()->topicCat); @@ -149,7 +154,8 @@ void TopicFactoryI::removeTopicWriter(const string& name, const shared_ptr& writer) { lock_guard lock(_mutex); - auto instance = getInstance(); + auto instance = _instance.lock(); + assert(instance); if (instance->getTraceLevels()->topic > 0) { Trace out(instance->getTraceLevels(), instance->getTraceLevels()->topicCat); @@ -196,7 +202,9 @@ TopicFactoryI::createPublisherSession( auto readers = getTopicReaders(topic); if (!readers.empty()) { - getInstance()->getNode()->createPublisherSession(publisher, connection, nullptr); + auto instance = _instance.lock(); + assert(instance); + instance->getNode()->createPublisherSession(publisher, connection, nullptr); } } @@ -209,7 +217,9 @@ TopicFactoryI::createSubscriberSession( auto writers = getTopicWriters(topic); if (!writers.empty()) { - getInstance()->getNode()->createSubscriberSession(subscriber, connection, nullptr); + auto instance = _instance.lock(); + assert(instance); + instance->getNode()->createSubscriberSession(subscriber, connection, nullptr); } } @@ -303,5 +313,7 @@ TopicFactoryI::shutdown() const Ice::CommunicatorPtr TopicFactoryI::getCommunicator() const { - return getInstance()->getCommunicator(); + auto instance = _instance.lock(); + assert(instance); + return instance->getCommunicator(); } diff --git a/cpp/src/DataStorm/TopicFactoryI.h b/cpp/src/DataStorm/TopicFactoryI.h index 6eb23e3071e..9552cafcb7c 100644 --- a/cpp/src/DataStorm/TopicFactoryI.h +++ b/cpp/src/DataStorm/TopicFactoryI.h @@ -47,13 +47,6 @@ namespace DataStormI void createSubscriberSession(const std::string&, DataStormContract::NodePrx, const Ice::ConnectionPtr&); void createPublisherSession(const std::string&, DataStormContract::NodePrx, const Ice::ConnectionPtr&); - std::shared_ptr getInstance() const - { - auto instance = _instance.lock(); - assert(instance); - return instance; - } - DataStormContract::TopicInfoSeq getTopicReaders() const; DataStormContract::TopicInfoSeq getTopicWriters() const; diff --git a/cpp/src/DataStorm/TopicI.cpp b/cpp/src/DataStorm/TopicI.cpp index eec2ac91795..3584eb3e213 100644 --- a/cpp/src/DataStorm/TopicI.cpp +++ b/cpp/src/DataStorm/TopicI.cpp @@ -90,7 +90,8 @@ namespace } TopicI::TopicI( - weak_ptr factory, + shared_ptr instance, + shared_ptr factory, shared_ptr keyFactory, shared_ptr tagFactory, shared_ptr sampleFactory, @@ -105,7 +106,7 @@ TopicI::TopicI( _keyFilterFactories(std::move(keyFilterFactories)), _sampleFilterFactories(std::move(sampleFilterFactories)), _name(std::move(name)), - _instance(_factory.lock()->getInstance()), + _instance(std::move(instance)), _traceLevels(_instance->getTraceLevels()), _id(id), // The collocated forwarder is initalized here to avoid using a nullable proxy. The forwarder is only used by @@ -891,6 +892,7 @@ TopicI::addFiltered(const shared_ptr& element, const shared_ptr instance, shared_ptr factory, shared_ptr keyFactory, shared_ptr tagFactory, @@ -900,6 +902,7 @@ TopicReaderI::TopicReaderI( string name, int64_t id) : TopicI( + std::move(instance), std::move(factory), std::move(keyFactory), std::move(tagFactory), @@ -1027,6 +1030,7 @@ TopicReaderI::mergeConfigs(DataStorm::ReaderConfig config) const } TopicWriterI::TopicWriterI( + shared_ptr instance, shared_ptr factory, shared_ptr keyFactory, shared_ptr tagFactory, @@ -1036,6 +1040,7 @@ TopicWriterI::TopicWriterI( string name, int64_t id) : TopicI( + std::move(instance), std::move(factory), std::move(keyFactory), std::move(tagFactory), diff --git a/cpp/src/DataStorm/TopicI.h b/cpp/src/DataStorm/TopicI.h index 5921daa23a3..ea15e1d2991 100644 --- a/cpp/src/DataStorm/TopicI.h +++ b/cpp/src/DataStorm/TopicI.h @@ -27,7 +27,8 @@ namespace DataStormI public: TopicI( - std::weak_ptr, + std::shared_ptr, + std::shared_ptr, std::shared_ptr, std::shared_ptr, std::shared_ptr, @@ -43,7 +44,8 @@ namespace DataStormI void shutdown(); - const std::shared_ptr& getInstance() const { return _instance; } + // const getter for _instance + const std::shared_ptr& instance() const noexcept { return _instance; } DataStormContract::TopicSpec getTopicSpec() const; DataStormContract::ElementInfoSeq getTags() const; @@ -178,6 +180,7 @@ namespace DataStormI { public: TopicReaderI( + std::shared_ptr, std::shared_ptr, std::shared_ptr, std::shared_ptr, @@ -214,6 +217,7 @@ namespace DataStormI { public: TopicWriterI( + std::shared_ptr, std::shared_ptr, std::shared_ptr, std::shared_ptr, diff --git a/cpp/src/slice2cs/CsUtil.cpp b/cpp/src/slice2cs/CsUtil.cpp index c7d4e517089..1c3019b1f2d 100644 --- a/cpp/src/slice2cs/CsUtil.cpp +++ b/cpp/src/slice2cs/CsUtil.cpp @@ -846,24 +846,14 @@ Slice::CsGenerator::writeOptionalMarshalUnmarshalCode( { if (marshal) { - out << nl << "if (" << param << " is not null && " << stream << ".writeOptional(" << tag - << ", Ice.OptionalFormat.FSize))"; - out << sb; - out << nl << "int pos = " << stream << ".startSize();"; - writeMarshalUnmarshalCode(out, type, scope, param, marshal, customStream); - out << nl << stream << ".endSize(pos);"; - out << eb; + out << nl << stream << ".writeProxy(" << tag << ", " << param << ");"; } else { out << nl << "if (" << stream << ".readOptional(" << tag << ", Ice.OptionalFormat.FSize))"; out << sb; out << nl << stream << ".skip(4);"; - string tmp = "tmpVal"; - string typeS = typeToString(type, scope); - out << nl << typeS << ' ' << tmp << ';'; - writeMarshalUnmarshalCode(out, type, scope, tmp, marshal, customStream); - out << nl << param << " = " << tmp << ";"; + writeMarshalUnmarshalCode(out, type, scope, param, marshal, customStream); out << eb; out << nl << "else"; out << sb;