Skip to content

Commit

Permalink
Feat[MQB]: Add log with session/queue operations time (bloomberg#414)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexander-e1off committed Oct 24, 2024
1 parent 63ac23e commit 35369ef
Show file tree
Hide file tree
Showing 4 changed files with 141 additions and 7 deletions.
76 changes: 69 additions & 7 deletions src/groups/mqb/mqba/mqba_clientsession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -747,8 +747,25 @@ void ClientSession::onHandleConfiguredDispatched(
// PRECONDITIONS
BSLS_ASSERT_SAFE(dispatcher()->inDispatcherThread(this));

const unsigned int qId =
streamParamsCtrlMsg.choice().isConfigureQueueStreamValue()
? streamParamsCtrlMsg.choice().configureQueueStream().qId()
: streamParamsCtrlMsg.choice().configureStream().qId();

ClientSessionState::QueueStateMap::iterator queueStateIter =
d_queueSessionManager.queues().find(qId);
if (queueStateIter != d_queueSessionManager.queues().end()) {
d_currentOpDescription
<< "Configure queue '"
<< queueStateIter->second.d_handle_p->queue()->uri() << "'";
}
else {
d_currentOpDescription << "Configure queue [qId=" << qId << "]";
}

if (isDisconnected()) {
// The client is disconnected or the channel is down
logOperationTime(d_currentOpDescription);
return; // RETURN
}

Expand All @@ -768,13 +785,10 @@ void ClientSession::onHandleConfiguredDispatched(
<< "]";
}
else {
unsigned int qId;

if (streamParamsCtrlMsg.choice().isConfigureQueueStreamValue()) {
bmqp_ctrlmsg::ConfigureQueueStream& configureQueueStream =
response.choice().makeConfigureQueueStreamResponse().request();

qId = streamParamsCtrlMsg.choice().configureQueueStream().qId();
configureQueueStream.qId() = qId;
bmqp::ProtocolUtil::convert(
&configureQueueStream.streamParameters(),
Expand All @@ -788,14 +802,10 @@ void ClientSession::onHandleConfiguredDispatched(
bmqp_ctrlmsg::ConfigureStream& configureStream =
response.choice().makeConfigureStreamResponse().request();

qId = streamParamsCtrlMsg.choice().configureStream().qId();

configureStream.qId() = qId;
configureStream.streamParameters() = streamParameters;
}

ClientSessionState::QueueStateMap::iterator queueStateIter =
d_queueSessionManager.queues().find(qId);
if (queueStateIter == d_queueSessionManager.queues().end()) {
// Failure to find queue
BALL_LOG_WARN
Expand All @@ -819,6 +829,8 @@ void ClientSession::onHandleConfiguredDispatched(
<< "ENCODING_FAILED, rc: " << rc
<< ", request: " << streamParamsCtrlMsg
<< "]: " << response;

logOperationTime(d_currentOpDescription);
return; // RETURN
}

Expand All @@ -828,6 +840,8 @@ void ClientSession::onHandleConfiguredDispatched(

// Send the response
sendPacket(d_state.d_schemaEventBuilder.blob(), true);

logOperationTime(d_currentOpDescription);
}

void ClientSession::initiateShutdownDispatched(
Expand Down Expand Up @@ -1103,6 +1117,29 @@ void ClientSession::closeChannel()
channel()->close(status);
}

void ClientSession::logOperationTime(mwcu::MemOutStream& opDescription)
{
if (d_beginTimestamp) {
BALL_LOG_INFO_BLOCK
{
const bsls::Types::Int64 elapsed =
mwcsys::Time::highResolutionTimer() - d_beginTimestamp;
BALL_LOG_OUTPUT_STREAM
<< description() << ": " << opDescription.str()
<< " took: " << mwcu::PrintUtil::prettyTimeInterval(elapsed)
<< " (" << elapsed << " nanoseconds)";
}
d_beginTimestamp = 0;
}
else {
BSLS_PERFORMANCEHINT_UNLIKELY_HINT;
BALL_LOG_WARN << "d_beginTimestamp was not initialized for operation: "
<< opDescription.str();
}

opDescription.reset();
}

void ClientSession::processDisconnectAllQueues(
const bmqp_ctrlmsg::ControlMessage& controlMessage)
{
Expand Down Expand Up @@ -1318,6 +1355,11 @@ void ClientSession::openQueueCb(

// Send the response
sendPacket(d_state.d_schemaEventBuilder.blob(), true);

const bsl::string& queueUri =
handleParamsCtrlMsg.choice().openQueue().handleParameters().uri();
d_currentOpDescription << "Open queue '" << queueUri << "'";
logOperationTime(d_currentOpDescription);
}

void ClientSession::processCloseQueue(
Expand Down Expand Up @@ -1396,6 +1438,11 @@ void ClientSession::closeQueueCb(
bdlf::BindUtil::bind(&finalizeClosedHandle, description(), handle),
handle->queue(),
mqbi::DispatcherEventType::e_DISPATCHER);

const bsl::string& queueUri =
handleParamsCtrlMsg.choice().closeQueue().handleParameters().uri();
d_currentOpDescription << "Close queue '" << queueUri << "'";
logOperationTime(d_currentOpDescription);
}

void ClientSession::processConfigureStream(
Expand Down Expand Up @@ -2556,6 +2603,8 @@ ClientSession::ClientSession(
, d_periodicUnconfirmedCheckHandler()
, d_shutdownChain(allocator)
, d_shutdownCallback()
, d_beginTimestamp(0)
, d_currentOpDescription(256, allocator)
{
// Register this client to the dispatcher
mqbi::Dispatcher::ProcessorHandle processor = dispatcher->registerClient(
Expand Down Expand Up @@ -2596,6 +2645,9 @@ ClientSession::~ClientSession()

// Unregister from the dispatcher
dispatcher()->unregisterClient(this);

d_currentOpDescription << "Close session";
logOperationTime(d_currentOpDescription);
}

// MANIPULATORS
Expand Down Expand Up @@ -2651,26 +2703,34 @@ void ClientSession::processEvent(
return; // RETURN
}

d_beginTimestamp = mwcsys::Time::highResolutionTimer();

d_isDisconnecting = true;
eventCallback = bdlf::BindUtil::bind(
&ClientSession::processDisconnectAllQueues,
this,
controlMessage);
} break;
case MsgChoice::SELECTION_ID_OPEN_QUEUE: {
d_beginTimestamp = mwcsys::Time::highResolutionTimer();

eventCallback = bdlf::BindUtil::bind(
&ClientSession::processOpenQueue,
this,
controlMessage);
} break;
case MsgChoice::SELECTION_ID_CLOSE_QUEUE: {
d_beginTimestamp = mwcsys::Time::highResolutionTimer();

eventCallback = bdlf::BindUtil::bind(
&ClientSession::processCloseQueue,
this,
controlMessage);
} break;
case MsgChoice::SELECTION_ID_CONFIGURE_QUEUE_STREAM:
case MsgChoice::SELECTION_ID_CONFIGURE_STREAM: {
d_beginTimestamp = mwcsys::Time::highResolutionTimer();

eventCallback = bdlf::BindUtil::bind(
&ClientSession::processConfigureStream,
this,
Expand Down Expand Up @@ -2815,6 +2875,8 @@ void ClientSession::initiateShutdown(const ShutdownCb& callback,
{
// executed by the *ANY* thread

d_beginTimestamp = mwcsys::Time::highResolutionTimer();

BALL_LOG_INFO << description() << ": initiateShutdown";

// The 'd_self.acquire()' return 'shared_ptr<ClientSession>' but that does
Expand Down
11 changes: 11 additions & 0 deletions src/groups/mqb/mqba/mqba_clientsession.h
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,12 @@ class ClientSession : public mqbnet::Session,
// If present, call when 'tearDownAllQueuesDone'.
// This is the callback given in 'initiateShutdown'

bsls::Types::Int64 d_beginTimestamp;
// HiRes timer value of the begin session/queue operation

mwcu::MemOutStream d_currentOpDescription;
// Stream for constructing current session/queue operation description.

private:
// NOT IMPLEMENTED

Expand Down Expand Up @@ -600,6 +606,11 @@ class ClientSession : public mqbnet::Session,

void closeChannel();

/// Log session/queue operation time for the specified `opDescription`
/// using the stored operation begin timestamp. After logging reset
/// `opDescription` and set begin timestamp to 0.
void logOperationTime(mwcu::MemOutStream& opDescription);

// PRIVATE ACCESSORS

/// Return true if the session is `e_DISCONNECTED` or worse (`e_DEAD`).
Expand Down
48 changes: 48 additions & 0 deletions src/groups/mqb/mqbnet/mqbnet_tcpsessionfactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,11 @@ void TCPSessionFactory::negotiationComplete(
statusCode,
d_allocator_p);
channel->close(status);

bdlma::LocalSequentialAllocator<64> localAlloc(d_allocator_p);
mwcu::MemOutStream logStream(&localAlloc);
logStream << "[channel: '" << channel.get() << "]";
logOpenSessionTime(logStream.str(), channel);
return; // RETURN
}

Expand Down Expand Up @@ -599,6 +604,8 @@ void TCPSessionFactory::negotiationComplete(
// This will eventually call 'btemt_ChannelPool::shutdown' which will
// schedule channelStateCb/poolSessionStateCb/onClose/tearDown
channel->close();

logOpenSessionTime(session->description(), channel);
return; // RETURN
}

Expand All @@ -610,6 +617,8 @@ void TCPSessionFactory::negotiationComplete(
this,
info.get()));
}

logOpenSessionTime(session->description(), channel);
}

void TCPSessionFactory::onSessionDestroyed(
Expand Down Expand Up @@ -680,6 +689,15 @@ void TCPSessionFactory::channelStateCallback(
channel->close(closeStatus);
}
else {
{ // Save begin session timestamp
// TODO: it's possible to store this timestamp directly in one
// of the mwcio::Channel implementations, so we don't need a
// mutex synchronization for them at all.
bslmt::LockGuard<bslmt::Mutex> guard(&d_mutex); // LOCK
d_timestampMap[channel.get()] =
mwcsys::Time::highResolutionTimer();
} // close mutex lock guard // UNLOCK

// Keep track of active channels, for logging purposes
++d_nbActiveChannels;

Expand Down Expand Up @@ -865,6 +883,35 @@ void TCPSessionFactory::disableHeartbeat(
d_heartbeatChannels.erase(channelInfo->d_channel_p);
}

void TCPSessionFactory::logOpenSessionTime(
const bsl::string& sessionDescription,
const bsl::shared_ptr<mwcio::Channel>& channel)
{
bsls::Types::Int64 begin = 0;
{
bslmt::LockGuard<bslmt::Mutex> guard(&d_mutex); // LOCK

TimestampMap::const_iterator it = d_timestampMap.find(channel.get());
if (it != d_timestampMap.end()) {
begin = it->second;
d_timestampMap.erase(it);
}

} // close mutex lock guard // UNLOCK

if (begin) {
BALL_LOG_INFO_BLOCK
{
const bsls::Types::Int64 elapsed =
mwcsys::Time::highResolutionTimer() - begin;
BALL_LOG_OUTPUT_STREAM
<< "Open session '" << sessionDescription
<< "' took: " << mwcu::PrintUtil::prettyTimeInterval(elapsed)
<< " (" << elapsed << " nanoseconds)";
}
}
}

TCPSessionFactory::TCPSessionFactory(
const mqbcfg::TcpInterfaceConfig& config,
bdlmt::EventScheduler* scheduler,
Expand Down Expand Up @@ -895,6 +942,7 @@ TCPSessionFactory::TCPSessionFactory(
, d_heartbeatChannels(allocator)
, d_initialMissedHeartbeatCounter(calculateInitialMissedHbCounter(config))
, d_isListening(false)
, d_timestampMap(allocator)
, d_allocator_p(allocator)
{
// PRECONDITIONS
Expand Down
13 changes: 13 additions & 0 deletions src/groups/mqb/mqbnet/mqbnet_tcpsessionfactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,9 @@ class TCPSessionFactory {

typedef bslma::ManagedPtr<mwcio::ChannelFactory::OpHandle> OpHandleMp;

typedef bsl::unordered_map<mwcio::Channel*, bsls::Types::Int64>
TimestampMap;

private:
// DATA
mwcu::SharedResource<TCPSessionFactory> d_self;
Expand Down Expand Up @@ -366,6 +369,9 @@ class TCPSessionFactory {
// while operation (readCallback/
// negotiation) is in progress.

TimestampMap d_timestampMap;
// Map of HiRes timestamp of the session beginning per channel.

bslma::Allocator* d_allocator_p;
// Allocator to use

Expand Down Expand Up @@ -468,6 +474,13 @@ class TCPSessionFactory {
/// event scheduler processes it.
void disableHeartbeat(const bsl::shared_ptr<ChannelInfo>& channelInfo);

/// Log open session time for the specified `sessionDescription` and
/// `channel`, using the stored begin
/// timestamp. After logging, begin timestamp is removed from
/// timestamps map.
void logOpenSessionTime(const bsl::string& sessionDescription,
const bsl::shared_ptr<mwcio::Channel>& channel);

private:
// NOT IMPLEMENTED

Expand Down

0 comments on commit 35369ef

Please sign in to comment.