Skip to content

Commit

Permalink
Less code
Browse files Browse the repository at this point in the history
Signed-off-by: Evgeny Malygin <emalygin@bloomberg.net>
  • Loading branch information
678098 committed Jun 12, 2024
1 parent e88f558 commit 01396d3
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 213 deletions.
135 changes: 22 additions & 113 deletions src/groups/mqb/mqbs/mqbs_filebackedstorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,19 +73,8 @@ void FileBackedStorage::purgeCommon(const mqbu::StorageKey& appKey)

mqbs::VirtualStorageCatalog::OnStorageUpdateCb cb;
if (d_queue_p) {
// disambiguate mqbstat::QueueStatsDomain::onEvent
bdlf::MemFn<void (mqbstat::QueueStatsDomain::*)(
mqbstat::QueueStatsDomain::EventType::Enum,
bsls::Types::Int64,
const bsl::string&)>
f(&mqbstat::QueueStatsDomain::onEvent);

cb = bdlf::BindUtil::bind(
f,
d_queue_p->stats(),
mqbstat::QueueStatsDomain::EventType::e_PURGE,
bdlf::PlaceHolders::_1, // value
bdlf::PlaceHolders::_2); // appId
cb = d_queue_p->stats()->buildEventCallback(
mqbstat::QueueStatsDomain::EventType::e_PURGE);
}

d_virtualStorageCatalog.removeAll(appKey, cb);
Expand Down Expand Up @@ -357,20 +346,9 @@ FileBackedStorage::put(mqbi::StorageMessageAttributes* attributes,
// if we keep `irc` (like we keep 'DataStoreRecordHandle').

BSLS_ASSERT_SAFE(d_queue_p);
// disambiguate mqbstat::QueueStatsDomain::onEvent
bdlf::MemFn<void (mqbstat::QueueStatsDomain::*)(
mqbstat::QueueStatsDomain::EventType::Enum,
bsls::Types::Int64,
const bsl::string&)>
f(&mqbstat::QueueStatsDomain::onEvent);

mqbs::VirtualStorageCatalog::OnStorageUpdateCb cb =
bdlf::BindUtil::bind(
f,
d_queue_p->stats(),
mqbstat::QueueStatsDomain::EventType::e_ADD_MESSAGE,
bdlf::PlaceHolders::_1, // value
bdlf::PlaceHolders::_2); // appId
d_queue_p->stats()->buildEventCallback(
mqbstat::QueueStatsDomain::EventType::e_ADD_MESSAGE);

d_virtualStorageCatalog.put(msgGUID,
msgSize,
Expand Down Expand Up @@ -404,15 +382,12 @@ FileBackedStorage::put(mqbi::StorageMessageAttributes* attributes,
BSLS_ASSERT(hasMessage(msgGUID));

for (size_t i = 0; i < storageKeys.size(); ++i) {
d_virtualStorageCatalog.put(
msgGUID,
msgSize,
d_defaultRdaInfo,
bmqp::Protocol::k_DEFAULT_SUBSCRIPTION_ID,
storageKeys[i],
mqbs::VirtualStorageCatalog::
OnStorageUpdateCb()); // empty callback, do not track metrics
// in proxy
// No callback, do not track metrics in proxy
d_virtualStorageCatalog.put(msgGUID,
msgSize,
d_defaultRdaInfo,
bmqp::Protocol::k_DEFAULT_SUBSCRIPTION_ID,
storageKeys[i]);
}

// Note that unlike 'InMemoryStorage', we don't add the message to the
Expand Down Expand Up @@ -472,19 +447,8 @@ FileBackedStorage::releaseRef(const bmqt::MessageGUID& msgGUID,
if (!appKey.isNull()) {
mqbs::VirtualStorageCatalog::OnStorageUpdateCb cb;
if (d_queue_p) {
// disambiguate mqbstat::QueueStatsDomain::onEvent
bdlf::MemFn<void (mqbstat::QueueStatsDomain::*)(
mqbstat::QueueStatsDomain::EventType::Enum,
bsls::Types::Int64,
const bsl::string&)>
f(&mqbstat::QueueStatsDomain::onEvent);

cb = bdlf::BindUtil::bind(
f,
d_queue_p->stats(),
mqbstat::QueueStatsDomain::EventType::e_DEL_MESSAGE,
bdlf::PlaceHolders::_1, // value
bdlf::PlaceHolders::_2); // appId
cb = d_queue_p->stats()->buildEventCallback(
mqbstat::QueueStatsDomain::EventType::e_DEL_MESSAGE);
}

const mqbi::StorageResult::Enum rc =
Expand Down Expand Up @@ -535,19 +499,8 @@ FileBackedStorage::remove(const bmqt::MessageGUID& msgGUID,
if (clearAll) {
mqbs::VirtualStorageCatalog::OnStorageUpdateCb cb;
if (d_queue_p) {
// disambiguate mqbstat::QueueStatsDomain::onEvent
bdlf::MemFn<void (mqbstat::QueueStatsDomain::*)(
mqbstat::QueueStatsDomain::EventType::Enum,
bsls::Types::Int64,
const bsl::string&)>
f(&mqbstat::QueueStatsDomain::onEvent);

cb = bdlf::BindUtil::bind(
f,
d_queue_p->stats(),
mqbstat::QueueStatsDomain::EventType::e_DEL_MESSAGE,
bdlf::PlaceHolders::_1, // value
bdlf::PlaceHolders::_2); // appId
cb = d_queue_p->stats()->buildEventCallback(
mqbstat::QueueStatsDomain::EventType::e_DEL_MESSAGE);
}

d_virtualStorageCatalog.remove(msgGUID,
Expand Down Expand Up @@ -818,19 +771,8 @@ int FileBackedStorage::gcExpiredMessages(

mqbs::VirtualStorageCatalog::OnStorageUpdateCb cb;
if (d_queue_p) {
// disambiguate mqbstat::QueueStatsDomain::onEvent
bdlf::MemFn<void (mqbstat::QueueStatsDomain::*)(
mqbstat::QueueStatsDomain::EventType::Enum,
bsls::Types::Int64,
const bsl::string&)>
f(&mqbstat::QueueStatsDomain::onEvent);

cb = bdlf::BindUtil::bind(
f,
d_queue_p->stats(),
mqbstat::QueueStatsDomain::EventType::e_DEL_MESSAGE,
bdlf::PlaceHolders::_1, // value
bdlf::PlaceHolders::_2); // appId
cb = d_queue_p->stats()->buildEventCallback(
mqbstat::QueueStatsDomain::EventType::e_DEL_MESSAGE);
}

// Remove message from all virtual storages.
Expand Down Expand Up @@ -894,19 +836,8 @@ void FileBackedStorage::processMessageRecord(

mqbs::VirtualStorageCatalog::OnStorageUpdateCb cb;
if (d_queue_p) {
// disambiguate mqbstat::QueueStatsDomain::onEvent
bdlf::MemFn<void (mqbstat::QueueStatsDomain::*)(
mqbstat::QueueStatsDomain::EventType::Enum,
bsls::Types::Int64,
const bsl::string&)>
f(&mqbstat::QueueStatsDomain::onEvent);

cb = bdlf::BindUtil::bind(
f,
d_queue_p->stats(),
mqbstat::QueueStatsDomain::EventType::e_ADD_MESSAGE,
bdlf::PlaceHolders::_1, // value
bdlf::PlaceHolders::_2); // appId
cb = d_queue_p->stats()->buildEventCallback(
mqbstat::QueueStatsDomain::EventType::e_ADD_MESSAGE);
}

// Add 'guid' to all virtual storages, if any.
Expand Down Expand Up @@ -1009,19 +940,8 @@ void FileBackedStorage::processConfirmRecord(
if (!appKey.isNull()) {
mqbs::VirtualStorageCatalog::OnStorageUpdateCb cb;
if (d_queue_p) {
// disambiguate mqbstat::QueueStatsDomain::onEvent
bdlf::MemFn<void (mqbstat::QueueStatsDomain::*)(
mqbstat::QueueStatsDomain::EventType::Enum,
bsls::Types::Int64,
const bsl::string&)>
f(&mqbstat::QueueStatsDomain::onEvent);

cb = bdlf::BindUtil::bind(
f,
d_queue_p->stats(),
mqbstat::QueueStatsDomain::EventType::e_DEL_MESSAGE,
bdlf::PlaceHolders::_1, // value
bdlf::PlaceHolders::_2); // appId
cb = d_queue_p->stats()->buildEventCallback(
mqbstat::QueueStatsDomain::EventType::e_DEL_MESSAGE);
}

const mqbi::StorageResult::Enum rc =
Expand Down Expand Up @@ -1077,19 +997,8 @@ void FileBackedStorage::processDeletionRecord(const bmqt::MessageGUID& guid)

mqbs::VirtualStorageCatalog::OnStorageUpdateCb cb;
if (d_queue_p) {
// disambiguate mqbstat::QueueStatsDomain::onEvent
bdlf::MemFn<void (mqbstat::QueueStatsDomain::*)(
mqbstat::QueueStatsDomain::EventType::Enum,
bsls::Types::Int64,
const bsl::string&)>
f(&mqbstat::QueueStatsDomain::onEvent);

cb = bdlf::BindUtil::bind(
f,
d_queue_p->stats(),
mqbstat::QueueStatsDomain::EventType::e_DEL_MESSAGE,
bdlf::PlaceHolders::_1, // value
bdlf::PlaceHolders::_2); // appId
cb = d_queue_p->stats()->buildEventCallback(
mqbstat::QueueStatsDomain::EventType::e_DEL_MESSAGE);
}

// Delete 'guid' from all virtual storages, if any. Note that 'guid'
Expand Down
105 changes: 18 additions & 87 deletions src/groups/mqb/mqbs/mqbs_inmemorystorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -207,19 +207,8 @@ InMemoryStorage::put(mqbi::StorageMessageAttributes* attributes,

mqbs::VirtualStorageCatalog::OnStorageUpdateCb cb;
if (d_queue_p) {
// disambiguate mqbstat::QueueStatsDomain::onEvent
bdlf::MemFn<void (mqbstat::QueueStatsDomain::*)(
mqbstat::QueueStatsDomain::EventType::Enum,
bsls::Types::Int64,
const bsl::string&)>
f(&mqbstat::QueueStatsDomain::onEvent);

cb = bdlf::BindUtil::bind(
f,
d_queue_p->stats(),
mqbstat::QueueStatsDomain::EventType::e_ADD_MESSAGE,
bdlf::PlaceHolders::_1, // value
bdlf::PlaceHolders::_2); // appId
cb = d_queue_p->stats()->buildEventCallback(
mqbstat::QueueStatsDomain::EventType::e_ADD_MESSAGE);
}

d_virtualStorageCatalog.put(msgGUID,
Expand Down Expand Up @@ -252,15 +241,12 @@ InMemoryStorage::put(mqbi::StorageMessageAttributes* attributes,
// corresponding virtual storages.

for (size_t i = 0; i < storageKeys.size(); ++i) {
d_virtualStorageCatalog.put(
msgGUID,
msgSize,
d_defaultRdaInfo,
bmqp::Protocol::k_DEFAULT_SUBSCRIPTION_ID,
storageKeys[i],
mqbs::VirtualStorageCatalog::
OnStorageUpdateCb()); // empty callback, do not track metrics
// in proxy
// No callback, do not track metrics in proxy
d_virtualStorageCatalog.put(msgGUID,
msgSize,
d_defaultRdaInfo,
bmqp::Protocol::k_DEFAULT_SUBSCRIPTION_ID,
storageKeys[i]);
}

// If the guid also exists in the 'physical' storage, bump up its reference
Expand Down Expand Up @@ -297,19 +283,8 @@ mqbi::StorageResult::Enum InMemoryStorage::releaseRef(
if (!appKey.isNull()) {
mqbs::VirtualStorageCatalog::OnStorageUpdateCb cb;
if (d_queue_p) {
// disambiguate mqbstat::QueueStatsDomain::onEvent
bdlf::MemFn<void (mqbstat::QueueStatsDomain::*)(
mqbstat::QueueStatsDomain::EventType::Enum,
bsls::Types::Int64,
const bsl::string&)>
f(&mqbstat::QueueStatsDomain::onEvent);

cb = bdlf::BindUtil::bind(
f,
d_queue_p->stats(),
mqbstat::QueueStatsDomain::EventType::e_DEL_MESSAGE,
bdlf::PlaceHolders::_1, // value
bdlf::PlaceHolders::_2); // appId
cb = d_queue_p->stats()->buildEventCallback(
mqbstat::QueueStatsDomain::EventType::e_DEL_MESSAGE);
}

const mqbi::StorageResult::Enum rc =
Expand Down Expand Up @@ -342,19 +317,8 @@ InMemoryStorage::remove(const bmqt::MessageGUID& msgGUID,
if (clearAll) {
mqbs::VirtualStorageCatalog::OnStorageUpdateCb cb;
if (d_queue_p) {
// disambiguate mqbstat::QueueStatsDomain::onEvent
bdlf::MemFn<void (mqbstat::QueueStatsDomain::*)(
mqbstat::QueueStatsDomain::EventType::Enum,
bsls::Types::Int64,
const bsl::string&)>
f(&mqbstat::QueueStatsDomain::onEvent);

cb = bdlf::BindUtil::bind(
f,
d_queue_p->stats(),
mqbstat::QueueStatsDomain::EventType::e_DEL_MESSAGE,
bdlf::PlaceHolders::_1, // value
bdlf::PlaceHolders::_2); // appId
cb = d_queue_p->stats()->buildEventCallback(
mqbstat::QueueStatsDomain::EventType::e_DEL_MESSAGE);
}

d_virtualStorageCatalog.remove(msgGUID,
Expand Down Expand Up @@ -396,19 +360,8 @@ InMemoryStorage::removeAll(const mqbu::StorageKey& appKey)

mqbs::VirtualStorageCatalog::OnStorageUpdateCb cb;
if (d_queue_p) {
// disambiguate mqbstat::QueueStatsDomain::onEvent
bdlf::MemFn<void (mqbstat::QueueStatsDomain::*)(
mqbstat::QueueStatsDomain::EventType::Enum,
bsls::Types::Int64,
const bsl::string&)>
f(&mqbstat::QueueStatsDomain::onEvent);

cb = bdlf::BindUtil::bind(
f,
d_queue_p->stats(),
mqbstat::QueueStatsDomain::EventType::e_PURGE,
bdlf::PlaceHolders::_1, // value
bdlf::PlaceHolders::_2); // appId
cb = d_queue_p->stats()->buildEventCallback(
mqbstat::QueueStatsDomain::EventType::e_PURGE);
}

d_virtualStorageCatalog.removeAll(mqbu::StorageKey::k_NULL_KEY, cb);
Expand Down Expand Up @@ -495,19 +448,8 @@ InMemoryStorage::removeAll(const mqbu::StorageKey& appKey)

mqbs::VirtualStorageCatalog::OnStorageUpdateCb cb;
if (d_queue_p) {
// disambiguate mqbstat::QueueStatsDomain::onEvent
bdlf::MemFn<void (mqbstat::QueueStatsDomain::*)(
mqbstat::QueueStatsDomain::EventType::Enum,
bsls::Types::Int64,
const bsl::string&)>
f(&mqbstat::QueueStatsDomain::onEvent);

cb = bdlf::BindUtil::bind(
f,
d_queue_p->stats(),
mqbstat::QueueStatsDomain::EventType::e_PURGE,
bdlf::PlaceHolders::_1, // value
bdlf::PlaceHolders::_2); // appId
cb = d_queue_p->stats()->buildEventCallback(
mqbstat::QueueStatsDomain::EventType::e_PURGE);
}

// Clear out the virtual storage associated with the specified 'appKey'.
Expand Down Expand Up @@ -562,19 +504,8 @@ int InMemoryStorage::gcExpiredMessages(

mqbs::VirtualStorageCatalog::OnStorageUpdateCb cb;
if (d_queue_p) {
// disambiguate mqbstat::QueueStatsDomain::onEvent
bdlf::MemFn<void (mqbstat::QueueStatsDomain::*)(
mqbstat::QueueStatsDomain::EventType::Enum,
bsls::Types::Int64,
const bsl::string&)>
f(&mqbstat::QueueStatsDomain::onEvent);

cb = bdlf::BindUtil::bind(
f,
d_queue_p->stats(),
mqbstat::QueueStatsDomain::EventType::e_DEL_MESSAGE,
bdlf::PlaceHolders::_1, // value
bdlf::PlaceHolders::_2); // appId
cb = d_queue_p->stats()->buildEventCallback(
mqbstat::QueueStatsDomain::EventType::e_DEL_MESSAGE);
}

// Remove message from all virtual storages and the physical (this)
Expand Down
25 changes: 14 additions & 11 deletions src/groups/mqb/mqbs/mqbs_virtualstoragecatalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,12 +119,13 @@ class VirtualStorageCatalog {
/// `rdaInfo` to the virtual storage associated with the specified
/// `appKey`. Note that if `appKey` is null, the message will be added
/// to all virtual storages maintained by this instance.
mqbi::StorageResult::Enum put(const bmqt::MessageGUID& msgGUID,
int msgSize,
const bmqp::RdaInfo& rdaInfo,
unsigned int subScriptionId,
const mqbu::StorageKey& appKey,
const OnStorageUpdateCb& putCb);
mqbi::StorageResult::Enum
put(const bmqt::MessageGUID& msgGUID,
int msgSize,
const bmqp::RdaInfo& rdaInfo,
unsigned int subScriptionId,
const mqbu::StorageKey& appKey,
const OnStorageUpdateCb& putCb = OnStorageUpdateCb());

/// Get an iterator for items stored in the virtual storage identified
/// by the specified `appKey`. Iterator will point to point to the
Expand Down Expand Up @@ -157,17 +158,19 @@ class VirtualStorageCatalog {
/// null, then remove the message from the storages for all clients.
/// Return 0 on success, or a non-zero return code if the `msgGUID` was
/// not found or the `appKey` is invalid.
mqbi::StorageResult::Enum remove(const bmqt::MessageGUID& msgGUID,
const mqbu::StorageKey& appKey,
const OnStorageUpdateCb& removeCb);
mqbi::StorageResult::Enum
remove(const bmqt::MessageGUID& msgGUID,
const mqbu::StorageKey& appKey,
const OnStorageUpdateCb& removeCb = OnStorageUpdateCb());

/// Remove all messages from the storage for the client identified by
/// the specified `appKey`. If `appKey` is null, then remove messages
/// for all clients. Return one of the return codes from:
/// * **e_SUCCESS** : `msgGUID` was not found
/// * **e_APPKEY_NOT_FOUND** : Invalid `appKey` specified
mqbi::StorageResult::Enum removeAll(const mqbu::StorageKey& appKey,
const OnStorageUpdateCb& purgeCb);
mqbi::StorageResult::Enum
removeAll(const mqbu::StorageKey& appKey,
const OnStorageUpdateCb& purgeCb = OnStorageUpdateCb());

/// Create, if it doesn't exist already, a virtual storage instance with
/// the specified `appId` and `appKey`. Return zero upon success and a
Expand Down
Loading

0 comments on commit 01396d3

Please sign in to comment.