Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance[MQB]: inline Put, Push, Ack, Confirm #177

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 35 additions & 53 deletions src/groups/mqb/mqba/mqba_clientsession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -289,16 +289,16 @@ void finalizeClosedHandle(bsl::string description,
// -------------------------

ClientSessionState::ClientSessionState(
bslma::ManagedPtr<mwcst::StatContext>& clientStatContext,
BlobSpPool* blobSpPool,
bdlbb::BlobBufferFactory* bufferFactory,
bmqp::EncodingType::Enum encodingType,
bslma::Allocator* allocator)
const bsl::shared_ptr<mwcst::StatContext>& clientStatContext,
BlobSpPool* blobSpPool,
bdlbb::BlobBufferFactory* bufferFactory,
bmqp::EncodingType::Enum encodingType,
bslma::Allocator* allocator)
: d_allocator_p(allocator)
, d_channelBufferQueue(allocator)
, d_unackedMessageInfos(d_allocator_p)
, d_dispatcherClientData()
, d_statContext_mp(clientStatContext)
, d_statContext_sp(clientStatContext)
, d_bufferFactory_p(bufferFactory)
, d_blobSpPool_p(blobSpPool)
, d_schemaEventBuilder(bufferFactory, allocator, encodingType)
Expand Down Expand Up @@ -564,12 +564,13 @@ void ClientSession::sendAck(bmqt::AckResult::Enum status,
flush();
}

mqbstat::QueueStatsClient* queueStats = 0;
if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY(queueState == 0)) {
BSLS_PERFORMANCEHINT_UNLIKELY_HINT;

// Invalid/unknown queue
queueStats = invalidQueueStats();
invalidQueueStats()->onEvent(
mqbstat::QueueStatsClient::EventType::e_ACK,
1);
}
else {
// Known queue (or subStream of the queue)
Expand All @@ -584,14 +585,16 @@ void ClientSession::sendAck(bmqt::AckResult::Enum status,
// Invalid/unknown subStream
// Producer has closed the queue before receiving ACKs

queueStats = invalidQueueStats();
invalidQueueStats()->onEvent(
mqbstat::QueueStatsClient::EventType::e_ACK,
1);
}
else {
queueStats = subQueueCiter->value().d_stats.get();
subQueueCiter->value().onEvent(
mqbstat::QueueStatsClient::EventType::e_ACK,
1);
}
}

queueStats->onEvent(mqbstat::QueueStatsClient::EventType::e_ACK, 1);
}

void ClientSession::tearDownImpl(bslmt::Semaphore* semaphore,
Expand Down Expand Up @@ -1737,23 +1740,10 @@ bool ClientSession::validateMessage(mqbi::QueueHandle** queueHandle,
return false; // RETURN
}

StreamsMap::iterator subQueueIt =
queueIt->second.d_subQueueInfosMap.findBySubIdSafe(queueId.subId());
if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY(
subQueueIt == queueIt->second.d_subQueueInfosMap.end())) {
BSLS_PERFORMANCEHINT_UNLIKELY_HINT;

if (eventType == bmqp::EventType::e_CONFIRM) {
// Update invalid queue stats
invalidQueueStats()->onEvent(
mqbstat::QueueStatsClient::EventType::e_CONFIRM,
1);
678098 marked this conversation as resolved.
Show resolved Hide resolved
}

*errorStream << "for an unknown subQueueId";

return false; // RETURN
}
// Do not lookup 'queueId.subId()'.
// 'QueueHandle::confirmMessageDispatched' does the check.
// Note, that it does not update stats (on "bmq://invalid/queue").
// It does log warnings.

*queueHandle = queueIt->second.d_handle_p;
BSLS_ASSERT_SAFE(queueHandle);
Expand Down Expand Up @@ -1799,13 +1789,6 @@ bool ClientSession::validateMessage(mqbi::QueueHandle** queueHandle,
return false; // RETURN
}

if (eventType == bmqp::EventType::e_CONFIRM) {
// Update stats for the queue (or subStream of the queue)
subQueueIt->value().d_stats->onEvent(
mqbstat::QueueStatsClient::EventType::e_CONFIRM,
678098 marked this conversation as resolved.
Show resolved Hide resolved
1);
}

return true;
}

Expand Down Expand Up @@ -1939,7 +1922,7 @@ void ClientSession::onPushEvent(const mqbi::DispatcherPushEvent& event)
blob->length());
}
else {
subQueueCiter->value().d_stats->onEvent(
subQueueCiter->value().onEvent(
mqbstat::QueueStatsClient::EventType::e_PUSH,
blob->length());
}
Expand Down Expand Up @@ -2091,9 +2074,8 @@ void ClientSession::onPutEvent(const mqbi::DispatcherPutEvent& event)
BSLS_ASSERT_SAFE(queueStatePtr && subQueueInfoPtr);
BSLS_ASSERT_SAFE(queueStatePtr->d_handle_p);

subQueueInfoPtr->d_stats->onEvent(
mqbstat::QueueStatsClient::EventType::e_PUT,
appDataSp->length());
subQueueInfoPtr->onEvent(mqbstat::QueueStatsClient::EventType::e_PUT,
appDataSp->length());

const bool isAtMostOnce =
queueStatePtr->d_handle_p->queue()->isAtMostOnce();
Expand Down Expand Up @@ -2260,7 +2242,7 @@ mqbstat::QueueStatsClient* ClientSession::invalidQueueStats()
d_state.d_invalidQueueStats.makeValue();
d_state.d_invalidQueueStats.value().initialize(
"bmq://invalid/queue",
d_state.d_statContext_mp.get(),
d_state.d_statContext_sp.get(),
d_state.d_allocator_p);
// TBD: The queue uri should be '** INVALID QUEUE **', but that can
// only be done once the stats UI panel has been updated to
Expand Down Expand Up @@ -2463,17 +2445,17 @@ bool ClientSession::validatePutMessage(QueueState** queueState,

// CREATORS
ClientSession::ClientSession(
const bsl::shared_ptr<mwcio::Channel>& channel,
const bmqp_ctrlmsg::NegotiationMessage& negotiationMessage,
const bsl::string& sessionDescription,
mqbi::Dispatcher* dispatcher,
mqbblp::ClusterCatalog* clusterCatalog,
mqbi::DomainFactory* domainFactory,
bslma::ManagedPtr<mwcst::StatContext>& clientStatContext,
ClientSessionState::BlobSpPool* blobSpPool,
bdlbb::BlobBufferFactory* bufferFactory,
bdlmt::EventScheduler* scheduler,
bslma::Allocator* allocator)
const bsl::shared_ptr<mwcio::Channel>& channel,
const bmqp_ctrlmsg::NegotiationMessage& negotiationMessage,
const bsl::string& sessionDescription,
mqbi::Dispatcher* dispatcher,
mqbblp::ClusterCatalog* clusterCatalog,
mqbi::DomainFactory* domainFactory,
const bsl::shared_ptr<mwcst::StatContext>& clientStatContext,
ClientSessionState::BlobSpPool* blobSpPool,
bdlbb::BlobBufferFactory* bufferFactory,
bdlmt::EventScheduler* scheduler,
bslma::Allocator* allocator)
: d_self(this) // use default allocator
, d_operationState(e_RUNNING)
, d_isDisconnecting(false)
Expand All @@ -2490,7 +2472,7 @@ ClientSession::ClientSession(
allocator)
, d_queueSessionManager(this,
*d_clientIdentity_p,
d_state.d_statContext_mp.get(),
d_state.d_statContext_sp,
domainFactory,
allocator)
, d_clusterCatalog_p(clusterCatalog)
Expand Down
24 changes: 11 additions & 13 deletions src/groups/mqb/mqba/mqba_clientsession.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,6 @@ struct ClientSessionState {
typedef bsl::pair<UnackedMessageInfoMap::iterator, bool>
UnackedMessageInfoMapInsertRc;

typedef bslma::ManagedPtr<mwcst::StatContext> StatContextMp;

public:
// PUBLIC DATA
bslma::Allocator* d_allocator_p;
Expand Down Expand Up @@ -178,7 +176,7 @@ struct ClientSessionState {
// Dispatcher client data associated to
// this session.

StatContextMp d_statContext_mp;
const bsl::shared_ptr<mwcst::StatContext> d_statContext_sp;
// Stat context dedicated to this
// domain, to use as the parent stat
// context for any queue in this
Expand Down Expand Up @@ -238,11 +236,11 @@ struct ClientSessionState {
/// builder will use. Memory allocations are performed using the
/// specified `allocator`.
ClientSessionState(
bslma::ManagedPtr<mwcst::StatContext>& clientStatContext,
BlobSpPool* blobSpPool,
bdlbb::BlobBufferFactory* bufferFactory,
bmqp::EncodingType::Enum encodingType,
bslma::Allocator* allocator);
const bsl::shared_ptr<mwcst::StatContext>& clientStatContext,
BlobSpPool* blobSpPool,
bdlbb::BlobBufferFactory* bufferFactory,
bmqp::EncodingType::Enum encodingType,
bslma::Allocator* allocator);
};

// ===================
Expand Down Expand Up @@ -620,11 +618,11 @@ class ClientSession : public mqbnet::Session,
mqbi::Dispatcher* dispatcher,
mqbblp::ClusterCatalog* clusterCatalog,
mqbi::DomainFactory* domainFactory,
bslma::ManagedPtr<mwcst::StatContext>& clientStatContext,
ClientSessionState::BlobSpPool* blobSpPool,
bdlbb::BlobBufferFactory* bufferFactory,
bdlmt::EventScheduler* scheduler,
bslma::Allocator* allocator);
const bsl::shared_ptr<mwcst::StatContext>& clientStatContext,
ClientSessionState::BlobSpPool* blobSpPool,
bdlbb::BlobBufferFactory* bufferFactory,
bdlmt::EventScheduler* scheduler,
bslma::Allocator* allocator);

/// Destructor
~ClientSession() BSLS_KEYWORD_OVERRIDE;
Expand Down
41 changes: 23 additions & 18 deletions src/groups/mqb/mqba/mqba_clientsession.t.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,7 @@ class MyMockDomain : public mqbmock::Domain {
/// calls the specified `callback` with a new queue handle created
/// using the specified `handleParameters`. The specified `uri` and
/// `clientContext` are ignored.
void openQueue(BSLS_ANNOTATION_UNUSED const bmqt::Uri& uri,
void openQueue(const bmqt::Uri& uri,
const bsl::shared_ptr<mqbi::QueueHandleRequesterContext>&
clientContext,
const bmqp_ctrlmsg::QueueHandleParameters& handleParameters,
Expand All @@ -605,9 +605,15 @@ class MyMockDomain : public mqbmock::Domain {
handleParameters,
d_allocator_p);

OpenQueueConfirmationCookie confirmationCookie;
mqbi::OpenQueueConfirmationCookie confirmationCookie;
confirmationCookie.createInplace(d_allocator_p, d_queueHandle.get());

confirmationCookie->d_stats.createInplace(d_allocator_p);
confirmationCookie->d_stats->initialize(
uri,
clientContext->statContext().get(),
d_allocator_p);

bmqp_ctrlmsg::Status status(d_allocator_p);
status.category() = bmqp_ctrlmsg::StatusCategory::E_SUCCESS;

Expand Down Expand Up @@ -647,18 +653,18 @@ class TestBench {

public:
// DATA
bdlbb::PooledBlobBufferFactory d_bufferFactory;
BlobSpPool d_blobSpPool;
bsl::shared_ptr<mwcio::TestChannel> d_channel;
mqbmock::Cluster d_cluster;
mqbmock::Dispatcher d_mockDispatcher;
MyMockDomain d_domain;
mqbmock::DomainFactory d_mockDomainFactory;
bslma::ManagedPtr<mwcst::StatContext> d_clientStatContext_mp;
bdlmt::EventScheduler d_scheduler;
TestClock d_testClock;
mqba::ClientSession d_cs;
bslma::Allocator* d_allocator_p;
bdlbb::PooledBlobBufferFactory d_bufferFactory;
BlobSpPool d_blobSpPool;
bsl::shared_ptr<mwcio::TestChannel> d_channel;
mqbmock::Cluster d_cluster;
mqbmock::Dispatcher d_mockDispatcher;
MyMockDomain d_domain;
mqbmock::DomainFactory d_mockDomainFactory;
const bsl::shared_ptr<mwcst::StatContext> d_clientStatContext_sp;
bdlmt::EventScheduler d_scheduler;
TestClock d_testClock;
mqba::ClientSession d_cs;
bslma::Allocator* d_allocator_p;

static const int k_PAYLOAD_LENGTH = 36;

Expand All @@ -681,9 +687,8 @@ class TestBench {
, d_mockDispatcher(allocator)
, d_domain(&d_mockDispatcher, &d_cluster, atMostOnce, allocator)
, d_mockDomainFactory(d_domain, allocator)
, d_clientStatContext_mp(
mqbstat::QueueStatsUtil::initializeStatContextClients(10, allocator)
.managedPtr())
, d_clientStatContext_sp(
mqbstat::QueueStatsUtil::initializeStatContextClients(10, allocator))
, d_scheduler(bsls::SystemClockType::e_MONOTONIC, allocator)
, d_testClock(d_scheduler)
, d_cs(d_channel,
Expand All @@ -692,7 +697,7 @@ class TestBench {
setInDispatcherThread(&d_mockDispatcher),
0, // ClusterCatalog
&d_mockDomainFactory,
d_clientStatContext_mp,
d_clientStatContext_sp,
&d_blobSpPool,
&d_bufferFactory,
&d_scheduler,
Expand Down
Loading
Loading