Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat[BMQ, MQB]: shutdown v2, optimizing shutdown logic #399

Merged
merged 3 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/groups/bmq/bmqp/bmqp_ctrlmsg.xsd
Original file line number Diff line number Diff line change
Expand Up @@ -1404,6 +1404,7 @@
</annotation>
<sequence>
<element name='clusterName' type='string'/>
<element name='version' type='int' default='1'/>
</sequence>
</complexType>

Expand Down
23 changes: 20 additions & 3 deletions src/groups/bmq/bmqp/bmqp_ctrlmsg_messages.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4017,19 +4017,26 @@ const char* StatusCategory::toString(StatusCategory::Value value)

const char StopRequest::CLASS_NAME[] = "StopRequest";

const int StopRequest::DEFAULT_INITIALIZER_VERSION = 1;

const bdlat_AttributeInfo StopRequest::ATTRIBUTE_INFO_ARRAY[] = {
{ATTRIBUTE_ID_CLUSTER_NAME,
"clusterName",
sizeof("clusterName") - 1,
"",
bdlat_FormattingMode::e_TEXT}};
bdlat_FormattingMode::e_TEXT},
{ATTRIBUTE_ID_VERSION,
"version",
sizeof("version") - 1,
"",
bdlat_FormattingMode::e_DEC}};

// CLASS METHODS

const bdlat_AttributeInfo* StopRequest::lookupAttributeInfo(const char* name,
int nameLength)
{
for (int i = 0; i < 1; ++i) {
for (int i = 0; i < 2; ++i) {
const bdlat_AttributeInfo& attributeInfo =
StopRequest::ATTRIBUTE_INFO_ARRAY[i];

Expand All @@ -4047,6 +4054,8 @@ const bdlat_AttributeInfo* StopRequest::lookupAttributeInfo(int id)
switch (id) {
case ATTRIBUTE_ID_CLUSTER_NAME:
return &ATTRIBUTE_INFO_ARRAY[ATTRIBUTE_INDEX_CLUSTER_NAME];
case ATTRIBUTE_ID_VERSION:
return &ATTRIBUTE_INFO_ARRAY[ATTRIBUTE_INDEX_VERSION];
default: return 0;
}
}
Expand All @@ -4055,25 +4064,29 @@ const bdlat_AttributeInfo* StopRequest::lookupAttributeInfo(int id)

StopRequest::StopRequest(bslma::Allocator* basicAllocator)
: d_clusterName(basicAllocator)
, d_version(DEFAULT_INITIALIZER_VERSION)
{
}

StopRequest::StopRequest(const StopRequest& original,
bslma::Allocator* basicAllocator)
: d_clusterName(original.d_clusterName, basicAllocator)
, d_version(original.d_version)
{
}

#if defined(BSLS_COMPILERFEATURES_SUPPORT_RVALUE_REFERENCES) && \
defined(BSLS_COMPILERFEATURES_SUPPORT_NOEXCEPT)
StopRequest::StopRequest(StopRequest&& original) noexcept
: d_clusterName(bsl::move(original.d_clusterName))
: d_clusterName(bsl::move(original.d_clusterName)),
d_version(bsl::move(original.d_version))
{
}

StopRequest::StopRequest(StopRequest&& original,
bslma::Allocator* basicAllocator)
: d_clusterName(bsl::move(original.d_clusterName), basicAllocator)
, d_version(bsl::move(original.d_version))
{
}
#endif
Expand All @@ -4088,6 +4101,7 @@ StopRequest& StopRequest::operator=(const StopRequest& rhs)
{
if (this != &rhs) {
d_clusterName = rhs.d_clusterName;
d_version = rhs.d_version;
}

return *this;
Expand All @@ -4099,6 +4113,7 @@ StopRequest& StopRequest::operator=(StopRequest&& rhs)
{
if (this != &rhs) {
d_clusterName = bsl::move(rhs.d_clusterName);
d_version = bsl::move(rhs.d_version);
}

return *this;
Expand All @@ -4108,6 +4123,7 @@ StopRequest& StopRequest::operator=(StopRequest&& rhs)
void StopRequest::reset()
{
bdlat_ValueTypeFunctions::reset(&d_clusterName);
d_version = DEFAULT_INITIALIZER_VERSION;
}

// ACCESSORS
Expand All @@ -4118,6 +4134,7 @@ StopRequest::print(bsl::ostream& stream, int level, int spacesPerLevel) const
bslim::Printer printer(&stream, level, spacesPerLevel);
printer.start();
printer.printAttribute("clusterName", this->clusterName());
printer.printAttribute("version", this->version());
printer.end();
return stream;
}
Expand Down
45 changes: 42 additions & 3 deletions src/groups/bmq/bmqp/bmqp_ctrlmsg_messages.h
Original file line number Diff line number Diff line change
Expand Up @@ -7429,18 +7429,21 @@ namespace bmqp_ctrlmsg {
class StopRequest {
// INSTANCE DATA
bsl::string d_clusterName;
int d_version;

public:
// TYPES
enum { ATTRIBUTE_ID_CLUSTER_NAME = 0 };
enum { ATTRIBUTE_ID_CLUSTER_NAME = 0, ATTRIBUTE_ID_VERSION = 1 };

enum { NUM_ATTRIBUTES = 1 };
enum { NUM_ATTRIBUTES = 2 };

enum { ATTRIBUTE_INDEX_CLUSTER_NAME = 0 };
enum { ATTRIBUTE_INDEX_CLUSTER_NAME = 0, ATTRIBUTE_INDEX_VERSION = 1 };

// CONSTANTS
static const char CLASS_NAME[];

static const int DEFAULT_INITIALIZER_VERSION;

static const bdlat_AttributeInfo ATTRIBUTE_INFO_ARRAY[];

public:
Expand Down Expand Up @@ -7540,6 +7543,10 @@ class StopRequest {
/// object.
bsl::string& clusterName();

/// Return a reference to the non-modifiable "Version" attribute of this
/// object.
int& version();

// ACCESSORS

/// Format this object to the specified output `stream` at the
Expand Down Expand Up @@ -7587,6 +7594,10 @@ class StopRequest {
/// Return a reference to the non-modifiable "ClusterName" attribute of
/// this object.
const bsl::string& clusterName() const;

/// Return a reference to the non-modifiable "Version" attribute of this
/// object.
int version() const;
};

// FREE OPERATORS
Expand Down Expand Up @@ -27150,6 +27161,12 @@ int StopRequest::manipulateAttributes(MANIPULATOR& manipulator)
return ret;
}

ret = manipulator(&d_version,
ATTRIBUTE_INFO_ARRAY[ATTRIBUTE_INDEX_VERSION]);
if (ret) {
return ret;
}

return ret;
}

Expand All @@ -27163,6 +27180,10 @@ int StopRequest::manipulateAttribute(MANIPULATOR& manipulator, int id)
return manipulator(&d_clusterName,
ATTRIBUTE_INFO_ARRAY[ATTRIBUTE_INDEX_CLUSTER_NAME]);
}
case ATTRIBUTE_ID_VERSION: {
return manipulator(&d_version,
ATTRIBUTE_INFO_ARRAY[ATTRIBUTE_INDEX_VERSION]);
}
default: return NOT_FOUND;
}
}
Expand All @@ -27188,6 +27209,11 @@ inline bsl::string& StopRequest::clusterName()
return d_clusterName;
}

inline int& StopRequest::version()
{
return d_version;
}

// ACCESSORS
template <class ACCESSOR>
int StopRequest::accessAttributes(ACCESSOR& accessor) const
Expand All @@ -27200,6 +27226,10 @@ int StopRequest::accessAttributes(ACCESSOR& accessor) const
return ret;
}

ret = accessor(d_version, ATTRIBUTE_INFO_ARRAY[ATTRIBUTE_INDEX_VERSION]);
if (ret) {
return ret;
}
return ret;
}

Expand All @@ -27213,6 +27243,10 @@ int StopRequest::accessAttribute(ACCESSOR& accessor, int id) const
return accessor(d_clusterName,
ATTRIBUTE_INFO_ARRAY[ATTRIBUTE_INDEX_CLUSTER_NAME]);
}
case ATTRIBUTE_ID_VERSION: {
return accessor(d_version,
ATTRIBUTE_INFO_ARRAY[ATTRIBUTE_INDEX_VERSION]);
}
default: return NOT_FOUND;
}
}
Expand All @@ -27238,6 +27272,11 @@ inline const bsl::string& StopRequest::clusterName() const
return d_clusterName;
}

inline int StopRequest::version() const
{
return d_version;
}

template <typename HASH_ALGORITHM>
void hashAppend(HASH_ALGORITHM& hashAlg,
const bmqp_ctrlmsg::StopRequest& object)
Expand Down
2 changes: 2 additions & 0 deletions src/groups/bmq/bmqp/bmqp_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,8 @@ const char HighAvailabilityFeatures::k_BROADCAST_TO_PROXIES[] =
"BROADCAST_TO_PROXIES";
const char HighAvailabilityFeatures::k_GRACEFUL_SHUTDOWN[] =
"GRACEFUL_SHUTDOWN";
const char HighAvailabilityFeatures::k_GRACEFUL_SHUTDOWN_V2[] =
"GRACEFUL_SHUTDOWN_V2";

// --------------------------------
// struct MessagePropertiesFeatures
Expand Down
2 changes: 2 additions & 0 deletions src/groups/bmq/bmqp/bmqp_protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,8 @@ struct HighAvailabilityFeatures {
static const char k_BROADCAST_TO_PROXIES[];

static const char k_GRACEFUL_SHUTDOWN[];

static const char k_GRACEFUL_SHUTDOWN_V2[];
};

/// This struct defines feature names related to MessageProperties
Expand Down
4 changes: 4 additions & 0 deletions src/groups/bmq/bmqp/bmqp_requestmanager.h
Original file line number Diff line number Diff line change
Expand Up @@ -1091,6 +1091,10 @@ void RequestManager<REQUEST, RESPONSE>::onRequestTimeout(int requestId)

// Explicitly invalidate the timeout since we processed it
request->d_timeoutSchedulerHandle.release();

if (!d_lateResponseMode) {
pniedzielski marked this conversation as resolved.
Show resolved Hide resolved
d_requests.erase(it);
}
} // close guard scope

BALL_LOG_ERROR << "Request with '" << request->nodeDescription()
Expand Down
2 changes: 1 addition & 1 deletion src/groups/bmq/bmqp/bmqp_schemaeventbuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ EncodingType::Enum SchemaEventBuilderUtil::bestEncodingSupported(
return EncodingType::e_BER; // RETURN
}

// If remote suppports BER, return BER
// If remote supports BER, return BER
if (bsl::find(encodingsSupported.cbegin(),
encodingsSupported.cend(),
bsl::string(EncodingFeature::k_ENCODING_BER)) !=
Expand Down
4 changes: 3 additions & 1 deletion src/groups/mqb/mqba/mqba_adminsession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -419,10 +419,12 @@ void AdminSession::tearDown(const bsl::shared_ptr<void>& session,
}

void AdminSession::initiateShutdown(const ShutdownCb& callback,
const bsls::TimeInterval& timeout)
const bsls::TimeInterval& timeout,
bool supportShutdownV2)
{
// executed by the *ANY* thread
(void)timeout;
(void)supportShutdownV2;

dispatcher()->execute(
bdlf::BindUtil::bind(&AdminSession::initiateShutdownDispatched,
Expand Down
6 changes: 5 additions & 1 deletion src/groups/mqb/mqba/mqba_adminsession.h
Original file line number Diff line number Diff line change
Expand Up @@ -262,9 +262,13 @@ class AdminSession : public mqbnet::Session, public mqbi::DispatcherClient {
/// Initiate the shutdown of the session and invoke the specified
/// `callback` upon completion of (asynchronous) shutdown sequence or
/// if the specified `timeout` is expired.
/// The optional (temporary) specified 'supportShutdownV2' indicates
/// shutdown V2 logic which is not applicable to `AdminSession`
/// implementation.
void
initiateShutdown(const ShutdownCb& callback,
const bsls::TimeInterval& timeout) BSLS_KEYWORD_OVERRIDE;
const bsls::TimeInterval& timeout,
bool supportShutdownV2 = false) BSLS_KEYWORD_OVERRIDE;

/// Make the session abandon any work it has.
void invalidate() BSLS_KEYWORD_OVERRIDE;
Expand Down
Loading
Loading