Skip to content

Commit

Permalink
Remove QueueStatsDomain callbacks
Browse files Browse the repository at this point in the history
Signed-off-by: Evgeny Malygin <emalygin@bloomberg.net>
  • Loading branch information
678098 committed Aug 2, 2024
1 parent a9e5a13 commit 8ae6b9c
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 168 deletions.
77 changes: 15 additions & 62 deletions src/groups/mqb/mqbs/mqbs_filebackedstorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,7 @@ void FileBackedStorage::purgeCommon(const mqbu::StorageKey& appKey)
// to be purged, otherwise only the virtual storage associated with the
// specified 'appKey'.

mqbs::VirtualStorageCatalog::OnStorageUpdateCb cb;
if (d_queue_p) {
cb = d_queue_p->stats()->buildEventCallback(
mqbstat::QueueStatsDomain::EventType::e_PURGE);
}

d_virtualStorageCatalog.removeAll(appKey, cb);
d_virtualStorageCatalog.removeAll(appKey);

if (appKey.isNull()) {
// Remove all records from the physical storage as well.
Expand Down Expand Up @@ -267,6 +261,8 @@ void FileBackedStorage::setQueue(mqbi::Queue* queue)
// Update queue stats if a queue has been associated with the storage.

if (d_queue_p) {
d_virtualStorageCatalog.setQueueStats(queue->stats());

const bsls::Types::Int64 numMessage = numMessages(
mqbu::StorageKey::k_NULL_KEY);
const bsls::Types::Int64 numByte = numBytes(
Expand All @@ -282,6 +278,9 @@ void FileBackedStorage::setQueue(mqbi::Queue* queue)
<< mwcu::PrintUtil::prettyNumber(numByte)
<< " bytes of outstanding data.";
}
else {
d_virtualStorageCatalog.setQueueStats(NULL);
}
}

void FileBackedStorage::close()
Expand Down Expand Up @@ -345,17 +344,11 @@ FileBackedStorage::put(mqbi::StorageMessageAttributes* attributes,
// VirtualStorageIterator::loadMessageAndAttributes() can be avoided
// if we keep `irc` (like we keep 'DataStoreRecordHandle').

BSLS_ASSERT_SAFE(d_queue_p);
mqbs::VirtualStorageCatalog::OnStorageUpdateCb cb =
d_queue_p->stats()->buildEventCallback(
mqbstat::QueueStatsDomain::EventType::e_ADD_MESSAGE);

d_virtualStorageCatalog.put(msgGUID,
msgSize,
d_defaultRdaInfo,
bmqp::Protocol::k_DEFAULT_SUBSCRIPTION_ID,
mqbu::StorageKey::k_NULL_KEY,
cb);
mqbu::StorageKey::k_NULL_KEY);

// Move auto confirms to the data record
for (unsigned int i = 0; i < d_ephemeralConfirms.size(); ++i) {
Expand All @@ -364,6 +357,7 @@ FileBackedStorage::put(mqbi::StorageMessageAttributes* attributes,
d_ephemeralConfirms.clear();
d_currentlyAutoConfirming = bmqt::MessageGUID();

BSLS_ASSERT_SAFE(d_queue_p);
d_queue_p->stats()->onEvent(
mqbstat::QueueStatsDomain::EventType::e_ADD_MESSAGE,
msgSize);
Expand All @@ -382,7 +376,6 @@ FileBackedStorage::put(mqbi::StorageMessageAttributes* attributes,
BSLS_ASSERT(hasMessage(msgGUID));

for (size_t i = 0; i < storageKeys.size(); ++i) {
// No callback, do not track metrics in proxy
d_virtualStorageCatalog.put(msgGUID,
msgSize,
d_defaultRdaInfo,
Expand Down Expand Up @@ -445,14 +438,8 @@ FileBackedStorage::releaseRef(const bmqt::MessageGUID& msgGUID,
}

if (!appKey.isNull()) {
mqbs::VirtualStorageCatalog::OnStorageUpdateCb cb;
if (d_queue_p) {
cb = d_queue_p->stats()->buildEventCallback(
mqbstat::QueueStatsDomain::EventType::e_DEL_MESSAGE);
}

const mqbi::StorageResult::Enum rc =
d_virtualStorageCatalog.remove(msgGUID, appKey, cb);
d_virtualStorageCatalog.remove(msgGUID, appKey);
if (mqbi::StorageResult::e_SUCCESS != rc) {
return rc; // RETURN
}
Expand Down Expand Up @@ -497,15 +484,7 @@ FileBackedStorage::remove(const bmqt::MessageGUID& msgGUID,
}

if (clearAll) {
mqbs::VirtualStorageCatalog::OnStorageUpdateCb cb;
if (d_queue_p) {
cb = d_queue_p->stats()->buildEventCallback(
mqbstat::QueueStatsDomain::EventType::e_DEL_MESSAGE);
}

d_virtualStorageCatalog.remove(msgGUID,
mqbu::StorageKey::k_NULL_KEY,
cb);
d_virtualStorageCatalog.remove(msgGUID, mqbu::StorageKey::k_NULL_KEY);
}

BSLS_ASSERT_SAFE(!d_virtualStorageCatalog.hasMessage(msgGUID));
Expand Down Expand Up @@ -769,16 +748,9 @@ int FileBackedStorage::gcExpiredMessages(
msgLen);
}

mqbs::VirtualStorageCatalog::OnStorageUpdateCb cb;
if (d_queue_p) {
cb = d_queue_p->stats()->buildEventCallback(
mqbstat::QueueStatsDomain::EventType::e_DEL_MESSAGE);
}

// Remove message from all virtual storages.
d_virtualStorageCatalog.remove(cit->first,
mqbu::StorageKey::k_NULL_KEY,
cb);
mqbu::StorageKey::k_NULL_KEY);

// Delete all items pointed by all handles for this GUID (i.e., delete
// message from the underlying storage).
Expand Down Expand Up @@ -834,19 +806,12 @@ void FileBackedStorage::processMessageRecord(
irc.first->second.d_array.push_back(handle);
irc.first->second.d_refCount = refCount;

mqbs::VirtualStorageCatalog::OnStorageUpdateCb cb;
if (d_queue_p) {
cb = d_queue_p->stats()->buildEventCallback(
mqbstat::QueueStatsDomain::EventType::e_ADD_MESSAGE);
}

// Add 'guid' to all virtual storages, if any.
d_virtualStorageCatalog.put(guid,
msgLen,
d_defaultRdaInfo,
bmqp::Protocol::k_DEFAULT_SUBSCRIPTION_ID,
mqbu::StorageKey::k_NULL_KEY,
cb);
mqbu::StorageKey::k_NULL_KEY);

if (!d_currentlyAutoConfirming.isUnset()) {
if (d_currentlyAutoConfirming == guid) {
Expand All @@ -863,7 +828,7 @@ void FileBackedStorage::processMessageRecord(
}
}

// Update the messages & bytes monitors, and the entire queue stats.
// Update the messages & bytes monitors, and the stats.
d_capacityMeter.forceCommit(1, msgLen); // Return value ignored.

if (d_queue_p) {
Expand Down Expand Up @@ -938,14 +903,8 @@ void FileBackedStorage::processConfirmRecord(
--it->second.d_refCount; // Update outstanding refCount

if (!appKey.isNull()) {
mqbs::VirtualStorageCatalog::OnStorageUpdateCb cb;
if (d_queue_p) {
cb = d_queue_p->stats()->buildEventCallback(
mqbstat::QueueStatsDomain::EventType::e_DEL_MESSAGE);
}

const mqbi::StorageResult::Enum rc =
d_virtualStorageCatalog.remove(guid, appKey, cb);
d_virtualStorageCatalog.remove(guid, appKey);
if (mqbi::StorageResult::e_SUCCESS != rc) {
BALL_LOG_ERROR << "#STORAGE_INVALID_CONFIRM "
<< "PartitionId [" << partitionId() << "]"
Expand Down Expand Up @@ -995,18 +954,12 @@ void FileBackedStorage::processDeletionRecord(const bmqt::MessageGUID& guid)
msgLen);
}

mqbs::VirtualStorageCatalog::OnStorageUpdateCb cb;
if (d_queue_p) {
cb = d_queue_p->stats()->buildEventCallback(
mqbstat::QueueStatsDomain::EventType::e_DEL_MESSAGE);
}

// Delete 'guid' from all virtual storages, if any. Note that 'guid'
// should have already been removed from each virtual storage when confirm
// records were received earlier for each appKey, but we remove the guid
// again, just in case. When the code is mature enough, we could remove
// this.
d_virtualStorageCatalog.remove(guid, mqbu::StorageKey::k_NULL_KEY, cb);
d_virtualStorageCatalog.remove(guid, mqbu::StorageKey::k_NULL_KEY);

d_capacityMeter.remove(1, msgLen, true /* silent mode; don't log */);

Expand Down
58 changes: 11 additions & 47 deletions src/groups/mqb/mqbs/mqbs_inmemorystorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ void InMemoryStorage::setQueue(mqbi::Queue* queue)
// Update queue stats if a queue has been associated with the storage.

if (d_queue_p) {
d_virtualStorageCatalog.setQueueStats(queue->stats());

const bsls::Types::Int64 numMessage = numMessages(
mqbu::StorageKey::k_NULL_KEY);
const bsls::Types::Int64 numByte = numBytes(
Expand All @@ -129,6 +131,9 @@ void InMemoryStorage::setQueue(mqbi::Queue* queue)
<< mwcu::PrintUtil::prettyNumber(numByte)
<< " bytes of outstanding.";
}
else {
d_virtualStorageCatalog.setQueueStats(NULL);
}
}

void InMemoryStorage::close()
Expand Down Expand Up @@ -205,18 +210,11 @@ InMemoryStorage::put(mqbi::StorageMessageAttributes* attributes,
Item(appData, options, *attributes)),
attributes->arrivalTimepoint());

mqbs::VirtualStorageCatalog::OnStorageUpdateCb cb;
if (d_queue_p) {
cb = d_queue_p->stats()->buildEventCallback(
mqbstat::QueueStatsDomain::EventType::e_ADD_MESSAGE);
}

d_virtualStorageCatalog.put(msgGUID,
msgSize,
d_defaultRdaInfo,
bmqp::Protocol::k_DEFAULT_SUBSCRIPTION_ID,
mqbu::StorageKey::k_NULL_KEY,
cb);
mqbu::StorageKey::k_NULL_KEY);

d_currentlyAutoConfirming = bmqt::MessageGUID();
d_numAutoConfirms = 0;
Expand All @@ -241,7 +239,6 @@ InMemoryStorage::put(mqbi::StorageMessageAttributes* attributes,
// corresponding virtual storages.

for (size_t i = 0; i < storageKeys.size(); ++i) {
// No callback, do not track metrics in proxy
d_virtualStorageCatalog.put(msgGUID,
msgSize,
d_defaultRdaInfo,
Expand Down Expand Up @@ -281,14 +278,8 @@ mqbi::StorageResult::Enum InMemoryStorage::releaseRef(
}

if (!appKey.isNull()) {
mqbs::VirtualStorageCatalog::OnStorageUpdateCb cb;
if (d_queue_p) {
cb = d_queue_p->stats()->buildEventCallback(
mqbstat::QueueStatsDomain::EventType::e_DEL_MESSAGE);
}

const mqbi::StorageResult::Enum rc =
d_virtualStorageCatalog.remove(msgGUID, appKey, cb);
d_virtualStorageCatalog.remove(msgGUID, appKey);
if (mqbi::StorageResult::e_SUCCESS != rc) {
return rc; // RETURN
}
Expand All @@ -315,15 +306,7 @@ InMemoryStorage::remove(const bmqt::MessageGUID& msgGUID,
}

if (clearAll) {
mqbs::VirtualStorageCatalog::OnStorageUpdateCb cb;
if (d_queue_p) {
cb = d_queue_p->stats()->buildEventCallback(
mqbstat::QueueStatsDomain::EventType::e_DEL_MESSAGE);
}

d_virtualStorageCatalog.remove(msgGUID,
mqbu::StorageKey::k_NULL_KEY,
cb);
d_virtualStorageCatalog.remove(msgGUID, mqbu::StorageKey::k_NULL_KEY);
}

BSLS_ASSERT_SAFE(!d_virtualStorageCatalog.hasMessage(msgGUID));
Expand Down Expand Up @@ -358,13 +341,7 @@ InMemoryStorage::removeAll(const mqbu::StorageKey& appKey)
if (appKey.isNull()) {
// Clear the 'physical' queue, as well as all virtual storages.

mqbs::VirtualStorageCatalog::OnStorageUpdateCb cb;
if (d_queue_p) {
cb = d_queue_p->stats()->buildEventCallback(
mqbstat::QueueStatsDomain::EventType::e_PURGE);
}

d_virtualStorageCatalog.removeAll(mqbu::StorageKey::k_NULL_KEY, cb);
d_virtualStorageCatalog.removeAll(mqbu::StorageKey::k_NULL_KEY);
d_items.clear();
d_capacityMeter.clear();

Expand Down Expand Up @@ -446,16 +423,10 @@ InMemoryStorage::removeAll(const mqbu::StorageKey& appKey)
iter->advance();
}

mqbs::VirtualStorageCatalog::OnStorageUpdateCb cb;
if (d_queue_p) {
cb = d_queue_p->stats()->buildEventCallback(
mqbstat::QueueStatsDomain::EventType::e_PURGE);
}

// Clear out the virtual storage associated with the specified 'appKey'.
// Note that this cannot be done while iterating over the it in the above
// 'while' loop for obvious reasons.
d_virtualStorageCatalog.removeAll(appKey, cb);
d_virtualStorageCatalog.removeAll(appKey);

if (d_items.empty()) {
d_isEmpty.storeRelaxed(1);
Expand Down Expand Up @@ -502,17 +473,10 @@ int InMemoryStorage::gcExpiredMessages(
msgLen);
}

mqbs::VirtualStorageCatalog::OnStorageUpdateCb cb;
if (d_queue_p) {
cb = d_queue_p->stats()->buildEventCallback(
mqbstat::QueueStatsDomain::EventType::e_DEL_MESSAGE);
}

// Remove message from all virtual storages and the physical (this)
// storage.
d_virtualStorageCatalog.remove(cit->first,
mqbu::StorageKey::k_NULL_KEY,
cb);
mqbu::StorageKey::k_NULL_KEY);
d_items.erase(cit, now);
++numMsgsDeleted;
}
Expand Down
Loading

0 comments on commit 8ae6b9c

Please sign in to comment.