diff --git a/src/groups/mqb/mqba/mqba_clientsession.cpp b/src/groups/mqb/mqba/mqba_clientsession.cpp index 5455411af..743ba7dc7 100644 --- a/src/groups/mqb/mqba/mqba_clientsession.cpp +++ b/src/groups/mqb/mqba/mqba_clientsession.cpp @@ -747,8 +747,25 @@ void ClientSession::onHandleConfiguredDispatched( // PRECONDITIONS BSLS_ASSERT_SAFE(dispatcher()->inDispatcherThread(this)); + const unsigned int qId = + streamParamsCtrlMsg.choice().isConfigureQueueStreamValue() + ? streamParamsCtrlMsg.choice().configureQueueStream().qId() + : streamParamsCtrlMsg.choice().configureStream().qId(); + + ClientSessionState::QueueStateMap::iterator queueStateIter = + d_queueSessionManager.queues().find(qId); + if (queueStateIter != d_queueSessionManager.queues().end()) { + d_currentOpDescription + << "Configure queue '" + << queueStateIter->second.d_handle_p->queue()->uri() << "'"; + } + else { + d_currentOpDescription << "Configure queue [qId=" << qId << "]"; + } + if (isDisconnected()) { // The client is disconnected or the channel is down + logOperationTime(d_currentOpDescription); return; // RETURN } @@ -768,13 +785,10 @@ void ClientSession::onHandleConfiguredDispatched( << "]"; } else { - unsigned int qId; - if (streamParamsCtrlMsg.choice().isConfigureQueueStreamValue()) { bmqp_ctrlmsg::ConfigureQueueStream& configureQueueStream = response.choice().makeConfigureQueueStreamResponse().request(); - qId = streamParamsCtrlMsg.choice().configureQueueStream().qId(); configureQueueStream.qId() = qId; bmqp::ProtocolUtil::convert( &configureQueueStream.streamParameters(), @@ -788,14 +802,10 @@ void ClientSession::onHandleConfiguredDispatched( bmqp_ctrlmsg::ConfigureStream& configureStream = response.choice().makeConfigureStreamResponse().request(); - qId = streamParamsCtrlMsg.choice().configureStream().qId(); - configureStream.qId() = qId; configureStream.streamParameters() = streamParameters; } - ClientSessionState::QueueStateMap::iterator queueStateIter = - d_queueSessionManager.queues().find(qId); if (queueStateIter == d_queueSessionManager.queues().end()) { // Failure to find queue BALL_LOG_WARN @@ -819,6 +829,8 @@ void ClientSession::onHandleConfiguredDispatched( << "ENCODING_FAILED, rc: " << rc << ", request: " << streamParamsCtrlMsg << "]: " << response; + + logOperationTime(d_currentOpDescription); return; // RETURN } @@ -828,6 +840,8 @@ void ClientSession::onHandleConfiguredDispatched( // Send the response sendPacket(d_state.d_schemaEventBuilder.blob(), true); + + logOperationTime(d_currentOpDescription); } void ClientSession::initiateShutdownDispatched( @@ -1103,6 +1117,29 @@ void ClientSession::closeChannel() channel()->close(status); } +void ClientSession::logOperationTime(mwcu::MemOutStream& opDescription) +{ + if (d_beginTimestamp) { + BALL_LOG_INFO_BLOCK + { + const bsls::Types::Int64 elapsed = + mwcsys::Time::highResolutionTimer() - d_beginTimestamp; + BALL_LOG_OUTPUT_STREAM + << description() << ": " << opDescription.str() + << " took: " << mwcu::PrintUtil::prettyTimeInterval(elapsed) + << " (" << elapsed << " nanoseconds)"; + } + d_beginTimestamp = 0; + } + else { + BSLS_PERFORMANCEHINT_UNLIKELY_HINT; + BALL_LOG_WARN << "d_beginTimestamp was not initialized for operation: " + << opDescription.str(); + } + + opDescription.reset(); +} + void ClientSession::processDisconnectAllQueues( const bmqp_ctrlmsg::ControlMessage& controlMessage) { @@ -1318,6 +1355,11 @@ void ClientSession::openQueueCb( // Send the response sendPacket(d_state.d_schemaEventBuilder.blob(), true); + + const bsl::string& queueUri = + handleParamsCtrlMsg.choice().openQueue().handleParameters().uri(); + d_currentOpDescription << "Open queue '" << queueUri << "'"; + logOperationTime(d_currentOpDescription); } void ClientSession::processCloseQueue( @@ -1396,6 +1438,11 @@ void ClientSession::closeQueueCb( bdlf::BindUtil::bind(&finalizeClosedHandle, description(), handle), handle->queue(), mqbi::DispatcherEventType::e_DISPATCHER); + + const bsl::string& queueUri = + handleParamsCtrlMsg.choice().closeQueue().handleParameters().uri(); + d_currentOpDescription << "Close queue '" << queueUri << "'"; + logOperationTime(d_currentOpDescription); } void ClientSession::processConfigureStream( @@ -2556,6 +2603,8 @@ ClientSession::ClientSession( , d_periodicUnconfirmedCheckHandler() , d_shutdownChain(allocator) , d_shutdownCallback() +, d_beginTimestamp(0) +, d_currentOpDescription(256, allocator) { // Register this client to the dispatcher mqbi::Dispatcher::ProcessorHandle processor = dispatcher->registerClient( @@ -2596,6 +2645,9 @@ ClientSession::~ClientSession() // Unregister from the dispatcher dispatcher()->unregisterClient(this); + + d_currentOpDescription << "Close session"; + logOperationTime(d_currentOpDescription); } // MANIPULATORS @@ -2651,6 +2703,8 @@ void ClientSession::processEvent( return; // RETURN } + d_beginTimestamp = mwcsys::Time::highResolutionTimer(); + d_isDisconnecting = true; eventCallback = bdlf::BindUtil::bind( &ClientSession::processDisconnectAllQueues, @@ -2658,12 +2712,16 @@ void ClientSession::processEvent( controlMessage); } break; case MsgChoice::SELECTION_ID_OPEN_QUEUE: { + d_beginTimestamp = mwcsys::Time::highResolutionTimer(); + eventCallback = bdlf::BindUtil::bind( &ClientSession::processOpenQueue, this, controlMessage); } break; case MsgChoice::SELECTION_ID_CLOSE_QUEUE: { + d_beginTimestamp = mwcsys::Time::highResolutionTimer(); + eventCallback = bdlf::BindUtil::bind( &ClientSession::processCloseQueue, this, @@ -2671,6 +2729,8 @@ void ClientSession::processEvent( } break; case MsgChoice::SELECTION_ID_CONFIGURE_QUEUE_STREAM: case MsgChoice::SELECTION_ID_CONFIGURE_STREAM: { + d_beginTimestamp = mwcsys::Time::highResolutionTimer(); + eventCallback = bdlf::BindUtil::bind( &ClientSession::processConfigureStream, this, @@ -2815,6 +2875,8 @@ void ClientSession::initiateShutdown(const ShutdownCb& callback, { // executed by the *ANY* thread + d_beginTimestamp = mwcsys::Time::highResolutionTimer(); + BALL_LOG_INFO << description() << ": initiateShutdown"; // The 'd_self.acquire()' return 'shared_ptr' but that does diff --git a/src/groups/mqb/mqba/mqba_clientsession.h b/src/groups/mqb/mqba/mqba_clientsession.h index bf61b839c..53f41d5d3 100644 --- a/src/groups/mqb/mqba/mqba_clientsession.h +++ b/src/groups/mqb/mqba/mqba_clientsession.h @@ -391,6 +391,12 @@ class ClientSession : public mqbnet::Session, // If present, call when 'tearDownAllQueuesDone'. // This is the callback given in 'initiateShutdown' + bsls::Types::Int64 d_beginTimestamp; + // HiRes timer value of the begin session/queue operation + + mwcu::MemOutStream d_currentOpDescription; + // Stream for constructing current session/queue operation description. + private: // NOT IMPLEMENTED @@ -600,6 +606,11 @@ class ClientSession : public mqbnet::Session, void closeChannel(); + /// Log session/queue operation time for the specified `opDescription` + /// using the stored operation begin timestamp. After logging reset + /// `opDescription` and set begin timestamp to 0. + void logOperationTime(mwcu::MemOutStream& opDescription); + // PRIVATE ACCESSORS /// Return true if the session is `e_DISCONNECTED` or worse (`e_DEAD`). diff --git a/src/groups/mqb/mqbnet/mqbnet_tcpsessionfactory.cpp b/src/groups/mqb/mqbnet/mqbnet_tcpsessionfactory.cpp index 3a981b436..48c415a97 100644 --- a/src/groups/mqb/mqbnet/mqbnet_tcpsessionfactory.cpp +++ b/src/groups/mqb/mqbnet/mqbnet_tcpsessionfactory.cpp @@ -502,6 +502,11 @@ void TCPSessionFactory::negotiationComplete( statusCode, d_allocator_p); channel->close(status); + + bdlma::LocalSequentialAllocator<64> localAlloc(d_allocator_p); + mwcu::MemOutStream logStream(&localAlloc); + logStream << "[channel: '" << channel.get() << "]"; + logOpenSessionTime(logStream.str(), channel); return; // RETURN } @@ -599,6 +604,8 @@ void TCPSessionFactory::negotiationComplete( // This will eventually call 'btemt_ChannelPool::shutdown' which will // schedule channelStateCb/poolSessionStateCb/onClose/tearDown channel->close(); + + logOpenSessionTime(session->description(), channel); return; // RETURN } @@ -610,6 +617,8 @@ void TCPSessionFactory::negotiationComplete( this, info.get())); } + + logOpenSessionTime(session->description(), channel); } void TCPSessionFactory::onSessionDestroyed( @@ -680,6 +689,15 @@ void TCPSessionFactory::channelStateCallback( channel->close(closeStatus); } else { + { // Save begin session timestamp + // TODO: it's possible to store this timestamp directly in one + // of the mwcio::Channel implementations, so we don't need a + // mutex synchronization for them at all. + bslmt::LockGuard guard(&d_mutex); // LOCK + d_timestampMap[channel.get()] = + mwcsys::Time::highResolutionTimer(); + } // close mutex lock guard // UNLOCK + // Keep track of active channels, for logging purposes ++d_nbActiveChannels; @@ -865,6 +883,35 @@ void TCPSessionFactory::disableHeartbeat( d_heartbeatChannels.erase(channelInfo->d_channel_p); } +void TCPSessionFactory::logOpenSessionTime( + const bsl::string& sessionDescription, + const bsl::shared_ptr& channel) +{ + bsls::Types::Int64 begin = 0; + { + bslmt::LockGuard guard(&d_mutex); // LOCK + + TimestampMap::const_iterator it = d_timestampMap.find(channel.get()); + if (it != d_timestampMap.end()) { + begin = it->second; + d_timestampMap.erase(it); + } + + } // close mutex lock guard // UNLOCK + + if (begin) { + BALL_LOG_INFO_BLOCK + { + const bsls::Types::Int64 elapsed = + mwcsys::Time::highResolutionTimer() - begin; + BALL_LOG_OUTPUT_STREAM + << "Open session '" << sessionDescription + << "' took: " << mwcu::PrintUtil::prettyTimeInterval(elapsed) + << " (" << elapsed << " nanoseconds)"; + } + } +} + TCPSessionFactory::TCPSessionFactory( const mqbcfg::TcpInterfaceConfig& config, bdlmt::EventScheduler* scheduler, @@ -895,6 +942,7 @@ TCPSessionFactory::TCPSessionFactory( , d_heartbeatChannels(allocator) , d_initialMissedHeartbeatCounter(calculateInitialMissedHbCounter(config)) , d_isListening(false) +, d_timestampMap(allocator) , d_allocator_p(allocator) { // PRECONDITIONS diff --git a/src/groups/mqb/mqbnet/mqbnet_tcpsessionfactory.h b/src/groups/mqb/mqbnet/mqbnet_tcpsessionfactory.h index c5955aa68..08bafecad 100644 --- a/src/groups/mqb/mqbnet/mqbnet_tcpsessionfactory.h +++ b/src/groups/mqb/mqbnet/mqbnet_tcpsessionfactory.h @@ -235,6 +235,9 @@ class TCPSessionFactory { typedef bslma::ManagedPtr OpHandleMp; + typedef bsl::unordered_map + TimestampMap; + private: // DATA mwcu::SharedResource d_self; @@ -366,6 +369,9 @@ class TCPSessionFactory { // while operation (readCallback/ // negotiation) is in progress. + TimestampMap d_timestampMap; + // Map of HiRes timestamp of the session beginning per channel. + bslma::Allocator* d_allocator_p; // Allocator to use @@ -468,6 +474,13 @@ class TCPSessionFactory { /// event scheduler processes it. void disableHeartbeat(const bsl::shared_ptr& channelInfo); + /// Log open session time for the specified `sessionDescription` and + /// `channel`, using the stored begin + /// timestamp. After logging, begin timestamp is removed from + /// timestamps map. + void logOpenSessionTime(const bsl::string& sessionDescription, + const bsl::shared_ptr& channel); + private: // NOT IMPLEMENTED