Skip to content

Commit

Permalink
Wait ACKs for all messages posted by bmqtool (#231)
Browse files Browse the repository at this point in the history
Instead of waiting for Ctrl+C in auto-posting mode, bmqtool will be
counting ACKs and terminate after all posted messages are acknowledged.

Signed-off-by: Stanislav Yuzvinsky <syuzvinsky@bloomberg.net>
  • Loading branch information
syuzvinsky authored Apr 8, 2024
1 parent c8edbcf commit 18b9adc
Show file tree
Hide file tree
Showing 6 changed files with 4,284 additions and 4,985 deletions.
2 changes: 1 addition & 1 deletion src/applications/bmqtool/bmqtoolcmd.xsd
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@
<element name='latencyReport' type='string' default=""/>
<element name='dumpMsg' type='boolean' default="false"/>
<element name='confirmMsg' type='boolean' default="false"/>
<element name='eventSize' type='int' default="1"/>
<element name='eventSize' type='long' default="1"/>
<element name='msgSize' type='int' default="1024"/>
<element name='postRate' type='int' default="1"/>
<element name='eventsCount' type='string' default="0"/>
Expand Down
23 changes: 15 additions & 8 deletions src/applications/bmqtool/m_bmqtool_application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -768,6 +768,12 @@ void Application::onMessageEvent(const bmqa::MessageEvent& event)

// Write to log file
d_fileLogger.writeAckMessage(message);

if (d_numExpectedAcks != 0 &&
d_numExpectedAcks == ++d_numAcknowledged) {
BALL_LOG_INFO << "All posted messages have been acknowledged";
d_shutdownSemaphore_p->post();
}
}
else {
// Message is a push message
Expand Down Expand Up @@ -931,7 +937,7 @@ void Application::producerThread()
}

eventBuilder.reset();
for (int msgId = 0; msgId < d_parameters_p->eventSize();
for (bsl::uint64_t msgId = 0; msgId < d_parameters_p->eventSize();
++msgId, ++msgSeqId) {
bmqa::Message& msg = eventBuilder.startMessage();
int length = 0;
Expand Down Expand Up @@ -1046,12 +1052,7 @@ void Application::producerThread()
}
}

// Finished posting messages in auto mode?
// If shutDownGrace is set, signal to the main thread to exit.
if (d_parameters_p->mode() == ParametersMode::e_AUTO &&
d_parameters_p->shutdownGrace() != 0) {
// We do not need to sleep the grace period, since it is done
// by the main thread, in the stop() function.
if (!bmqt::QueueFlagsUtil::isAck(d_parameters_p->queueFlags())) {
d_shutdownSemaphore_p->post();
}
}
Expand Down Expand Up @@ -1131,6 +1132,8 @@ Application::Application(Parameters* parameters,
, d_latencies(allocator)
, d_autoReadInProgress(false)
, d_autoReadActivity(false)
, d_numExpectedAcks(0)
, d_numAcknowledged(0)
{
// NOTHING
}
Expand Down Expand Up @@ -1194,8 +1197,12 @@ int Application::run()
d_shutdownSemaphore_p->post();
}
else {
// Start the thread
if (bmqt::QueueFlagsUtil::isWriter(d_parameters_p->queueFlags())) {
d_numExpectedAcks = d_parameters_p->eventsCount() *
d_parameters_p->eventSize();
d_numAcknowledged = 0;

// Start the thread
rc = bslmt::ThreadUtil::create(
&d_runningThread,
bdlf::MemFnUtil::memFn(&Application::producerThread, this));
Expand Down
13 changes: 13 additions & 0 deletions src/applications/bmqtool/m_bmqtool_application.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,19 @@ class Application : public bmqa::SessionEventHandler {
// message was seen during the current
// grace period.

bsl::uint64_t d_numExpectedAcks;
// Auto-produce mode only. The total number of messages
// the tool will send. After posting is finished
// the tool will be waiting for this number of ACK
// messages, after which the shutdown semaphore will
// be posted.

bsl::uint64_t d_numAcknowledged;
// Auto-produce mode only. The number of acknowledged
// messages. When the value of this field becomes equal
// to d_numExpectedAcks, the shutdown semaphore will be
// posted.

// PRIVATE MANIPULATORS
// (virtual: bmqa::SessionEventHandler)

Expand Down
Loading

0 comments on commit 18b9adc

Please sign in to comment.