Skip to content

Commit

Permalink
Feat[BMQ, MQB]: shutdown v2, optimizing shutdown logic (bloomberg#399)
Browse files Browse the repository at this point in the history
* Shutdown V2

Signed-off-by: dorjesinpo <129227380+dorjesinpo@users.noreply.github.com>

* cleaning

Signed-off-by: dorjesinpo <129227380+dorjesinpo@users.noreply.github.com>

* Addressing review

Signed-off-by: dorjesinpo <129227380+dorjesinpo@users.noreply.github.com>

---------

Signed-off-by: dorjesinpo <129227380+dorjesinpo@users.noreply.github.com>
  • Loading branch information
dorjesinpo authored and alexander-e1off committed Oct 24, 2024
1 parent 59bbdf5 commit 68124e0
Show file tree
Hide file tree
Showing 55 changed files with 1,304 additions and 402 deletions.
1 change: 1 addition & 0 deletions src/groups/bmq/bmqp/bmqp_ctrlmsg.xsd
Original file line number Diff line number Diff line change
Expand Up @@ -1404,6 +1404,7 @@
</annotation>
<sequence>
<element name='clusterName' type='string'/>
<element name='version' type='int' default='1'/>
</sequence>
</complexType>

Expand Down
23 changes: 20 additions & 3 deletions src/groups/bmq/bmqp/bmqp_ctrlmsg_messages.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4017,19 +4017,26 @@ const char* StatusCategory::toString(StatusCategory::Value value)

const char StopRequest::CLASS_NAME[] = "StopRequest";

const int StopRequest::DEFAULT_INITIALIZER_VERSION = 1;

const bdlat_AttributeInfo StopRequest::ATTRIBUTE_INFO_ARRAY[] = {
{ATTRIBUTE_ID_CLUSTER_NAME,
"clusterName",
sizeof("clusterName") - 1,
"",
bdlat_FormattingMode::e_TEXT}};
bdlat_FormattingMode::e_TEXT},
{ATTRIBUTE_ID_VERSION,
"version",
sizeof("version") - 1,
"",
bdlat_FormattingMode::e_DEC}};

// CLASS METHODS

const bdlat_AttributeInfo* StopRequest::lookupAttributeInfo(const char* name,
int nameLength)
{
for (int i = 0; i < 1; ++i) {
for (int i = 0; i < 2; ++i) {
const bdlat_AttributeInfo& attributeInfo =
StopRequest::ATTRIBUTE_INFO_ARRAY[i];

Expand All @@ -4047,6 +4054,8 @@ const bdlat_AttributeInfo* StopRequest::lookupAttributeInfo(int id)
switch (id) {
case ATTRIBUTE_ID_CLUSTER_NAME:
return &ATTRIBUTE_INFO_ARRAY[ATTRIBUTE_INDEX_CLUSTER_NAME];
case ATTRIBUTE_ID_VERSION:
return &ATTRIBUTE_INFO_ARRAY[ATTRIBUTE_INDEX_VERSION];
default: return 0;
}
}
Expand All @@ -4055,25 +4064,29 @@ const bdlat_AttributeInfo* StopRequest::lookupAttributeInfo(int id)

StopRequest::StopRequest(bslma::Allocator* basicAllocator)
: d_clusterName(basicAllocator)
, d_version(DEFAULT_INITIALIZER_VERSION)
{
}

StopRequest::StopRequest(const StopRequest& original,
bslma::Allocator* basicAllocator)
: d_clusterName(original.d_clusterName, basicAllocator)
, d_version(original.d_version)
{
}

#if defined(BSLS_COMPILERFEATURES_SUPPORT_RVALUE_REFERENCES) && \
defined(BSLS_COMPILERFEATURES_SUPPORT_NOEXCEPT)
StopRequest::StopRequest(StopRequest&& original) noexcept
: d_clusterName(bsl::move(original.d_clusterName))
: d_clusterName(bsl::move(original.d_clusterName)),
d_version(bsl::move(original.d_version))
{
}

StopRequest::StopRequest(StopRequest&& original,
bslma::Allocator* basicAllocator)
: d_clusterName(bsl::move(original.d_clusterName), basicAllocator)
, d_version(bsl::move(original.d_version))
{
}
#endif
Expand All @@ -4088,6 +4101,7 @@ StopRequest& StopRequest::operator=(const StopRequest& rhs)
{
if (this != &rhs) {
d_clusterName = rhs.d_clusterName;
d_version = rhs.d_version;
}

return *this;
Expand All @@ -4099,6 +4113,7 @@ StopRequest& StopRequest::operator=(StopRequest&& rhs)
{
if (this != &rhs) {
d_clusterName = bsl::move(rhs.d_clusterName);
d_version = bsl::move(rhs.d_version);
}

return *this;
Expand All @@ -4108,6 +4123,7 @@ StopRequest& StopRequest::operator=(StopRequest&& rhs)
void StopRequest::reset()
{
bdlat_ValueTypeFunctions::reset(&d_clusterName);
d_version = DEFAULT_INITIALIZER_VERSION;
}

// ACCESSORS
Expand All @@ -4118,6 +4134,7 @@ StopRequest::print(bsl::ostream& stream, int level, int spacesPerLevel) const
bslim::Printer printer(&stream, level, spacesPerLevel);
printer.start();
printer.printAttribute("clusterName", this->clusterName());
printer.printAttribute("version", this->version());
printer.end();
return stream;
}
Expand Down
45 changes: 42 additions & 3 deletions src/groups/bmq/bmqp/bmqp_ctrlmsg_messages.h
Original file line number Diff line number Diff line change
Expand Up @@ -7429,18 +7429,21 @@ namespace bmqp_ctrlmsg {
class StopRequest {
// INSTANCE DATA
bsl::string d_clusterName;
int d_version;

public:
// TYPES
enum { ATTRIBUTE_ID_CLUSTER_NAME = 0 };
enum { ATTRIBUTE_ID_CLUSTER_NAME = 0, ATTRIBUTE_ID_VERSION = 1 };

enum { NUM_ATTRIBUTES = 1 };
enum { NUM_ATTRIBUTES = 2 };

enum { ATTRIBUTE_INDEX_CLUSTER_NAME = 0 };
enum { ATTRIBUTE_INDEX_CLUSTER_NAME = 0, ATTRIBUTE_INDEX_VERSION = 1 };

// CONSTANTS
static const char CLASS_NAME[];

static const int DEFAULT_INITIALIZER_VERSION;

static const bdlat_AttributeInfo ATTRIBUTE_INFO_ARRAY[];

public:
Expand Down Expand Up @@ -7540,6 +7543,10 @@ class StopRequest {
/// object.
bsl::string& clusterName();

/// Return a reference to the non-modifiable "Version" attribute of this
/// object.
int& version();

// ACCESSORS

/// Format this object to the specified output `stream` at the
Expand Down Expand Up @@ -7587,6 +7594,10 @@ class StopRequest {
/// Return a reference to the non-modifiable "ClusterName" attribute of
/// this object.
const bsl::string& clusterName() const;

/// Return a reference to the non-modifiable "Version" attribute of this
/// object.
int version() const;
};

// FREE OPERATORS
Expand Down Expand Up @@ -27150,6 +27161,12 @@ int StopRequest::manipulateAttributes(MANIPULATOR& manipulator)
return ret;
}

ret = manipulator(&d_version,
ATTRIBUTE_INFO_ARRAY[ATTRIBUTE_INDEX_VERSION]);
if (ret) {
return ret;
}

return ret;
}

Expand All @@ -27163,6 +27180,10 @@ int StopRequest::manipulateAttribute(MANIPULATOR& manipulator, int id)
return manipulator(&d_clusterName,
ATTRIBUTE_INFO_ARRAY[ATTRIBUTE_INDEX_CLUSTER_NAME]);
}
case ATTRIBUTE_ID_VERSION: {
return manipulator(&d_version,
ATTRIBUTE_INFO_ARRAY[ATTRIBUTE_INDEX_VERSION]);
}
default: return NOT_FOUND;
}
}
Expand All @@ -27188,6 +27209,11 @@ inline bsl::string& StopRequest::clusterName()
return d_clusterName;
}

inline int& StopRequest::version()
{
return d_version;
}

// ACCESSORS
template <class ACCESSOR>
int StopRequest::accessAttributes(ACCESSOR& accessor) const
Expand All @@ -27200,6 +27226,10 @@ int StopRequest::accessAttributes(ACCESSOR& accessor) const
return ret;
}

ret = accessor(d_version, ATTRIBUTE_INFO_ARRAY[ATTRIBUTE_INDEX_VERSION]);
if (ret) {
return ret;
}
return ret;
}

Expand All @@ -27213,6 +27243,10 @@ int StopRequest::accessAttribute(ACCESSOR& accessor, int id) const
return accessor(d_clusterName,
ATTRIBUTE_INFO_ARRAY[ATTRIBUTE_INDEX_CLUSTER_NAME]);
}
case ATTRIBUTE_ID_VERSION: {
return accessor(d_version,
ATTRIBUTE_INFO_ARRAY[ATTRIBUTE_INDEX_VERSION]);
}
default: return NOT_FOUND;
}
}
Expand All @@ -27238,6 +27272,11 @@ inline const bsl::string& StopRequest::clusterName() const
return d_clusterName;
}

inline int StopRequest::version() const
{
return d_version;
}

template <typename HASH_ALGORITHM>
void hashAppend(HASH_ALGORITHM& hashAlg,
const bmqp_ctrlmsg::StopRequest& object)
Expand Down
2 changes: 2 additions & 0 deletions src/groups/bmq/bmqp/bmqp_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,8 @@ const char HighAvailabilityFeatures::k_BROADCAST_TO_PROXIES[] =
"BROADCAST_TO_PROXIES";
const char HighAvailabilityFeatures::k_GRACEFUL_SHUTDOWN[] =
"GRACEFUL_SHUTDOWN";
const char HighAvailabilityFeatures::k_GRACEFUL_SHUTDOWN_V2[] =
"GRACEFUL_SHUTDOWN_V2";

// --------------------------------
// struct MessagePropertiesFeatures
Expand Down
2 changes: 2 additions & 0 deletions src/groups/bmq/bmqp/bmqp_protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,8 @@ struct HighAvailabilityFeatures {
static const char k_BROADCAST_TO_PROXIES[];

static const char k_GRACEFUL_SHUTDOWN[];

static const char k_GRACEFUL_SHUTDOWN_V2[];
};

/// This struct defines feature names related to MessageProperties
Expand Down
4 changes: 4 additions & 0 deletions src/groups/bmq/bmqp/bmqp_requestmanager.h
Original file line number Diff line number Diff line change
Expand Up @@ -1091,6 +1091,10 @@ void RequestManager<REQUEST, RESPONSE>::onRequestTimeout(int requestId)

// Explicitly invalidate the timeout since we processed it
request->d_timeoutSchedulerHandle.release();

if (!d_lateResponseMode) {
d_requests.erase(it);
}
} // close guard scope

BALL_LOG_ERROR << "Request with '" << request->nodeDescription()
Expand Down
2 changes: 1 addition & 1 deletion src/groups/bmq/bmqp/bmqp_schemaeventbuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ EncodingType::Enum SchemaEventBuilderUtil::bestEncodingSupported(
return EncodingType::e_BER; // RETURN
}

// If remote suppports BER, return BER
// If remote supports BER, return BER
if (bsl::find(encodingsSupported.cbegin(),
encodingsSupported.cend(),
bsl::string(EncodingFeature::k_ENCODING_BER)) !=
Expand Down
4 changes: 3 additions & 1 deletion src/groups/mqb/mqba/mqba_adminsession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -419,10 +419,12 @@ void AdminSession::tearDown(const bsl::shared_ptr<void>& session,
}

void AdminSession::initiateShutdown(const ShutdownCb& callback,
const bsls::TimeInterval& timeout)
const bsls::TimeInterval& timeout,
bool supportShutdownV2)
{
// executed by the *ANY* thread
(void)timeout;
(void)supportShutdownV2;

dispatcher()->execute(
bdlf::BindUtil::bind(&AdminSession::initiateShutdownDispatched,
Expand Down
6 changes: 5 additions & 1 deletion src/groups/mqb/mqba/mqba_adminsession.h
Original file line number Diff line number Diff line change
Expand Up @@ -262,9 +262,13 @@ class AdminSession : public mqbnet::Session, public mqbi::DispatcherClient {
/// Initiate the shutdown of the session and invoke the specified
/// `callback` upon completion of (asynchronous) shutdown sequence or
/// if the specified `timeout` is expired.
/// The optional (temporary) specified 'supportShutdownV2' indicates
/// shutdown V2 logic which is not applicable to `AdminSession`
/// implementation.
void
initiateShutdown(const ShutdownCb& callback,
const bsls::TimeInterval& timeout) BSLS_KEYWORD_OVERRIDE;
const bsls::TimeInterval& timeout,
bool supportShutdownV2 = false) BSLS_KEYWORD_OVERRIDE;

/// Make the session abandon any work it has.
void invalidate() BSLS_KEYWORD_OVERRIDE;
Expand Down
Loading

0 comments on commit 68124e0

Please sign in to comment.