Skip to content

Commit

Permalink
[pulsar-client-cpp] Fix Redelivery of Messages on UnackedMessageTrack…
Browse files Browse the repository at this point in the history
…er When Ack Messages . (#6498)

### Motivation
Because of #6391 , acked messages were counted as unacked messages.
Although messages from brokers were acknowledged, the following log was output.

```
2020-03-06 19:44:51.790 INFO  ConsumerImpl:174 | [persistent://public/default/t1, sub1, 0] Created consumer on broker [127.0.0.1:58860 -> 127.0.0.1:6650]
my-message-0: Fri Mar  6 19:45:05 2020
my-message-1: Fri Mar  6 19:45:05 2020
my-message-2: Fri Mar  6 19:45:05 2020
2020-03-06 19:45:15.818 INFO  UnAckedMessageTrackerEnabled:53 | [persistent://public/default/t1, sub1, 0] : 3 Messages were not acked within 10000 time

```

This behavior happened on master branch.
(cherry picked from commit 67f8cf3)
  • Loading branch information
k2la authored and jiazhai committed Mar 13, 2020
1 parent aaa9062 commit 468bb21
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 7 deletions.
14 changes: 9 additions & 5 deletions pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,10 @@ UnAckedMessageTrackerEnabled::UnAckedMessageTrackerEnabled(long timeoutMs, long
bool UnAckedMessageTrackerEnabled::add(const MessageId& m) {
std::lock_guard<std::mutex> acquire(lock_);
if (messageIdPartitionMap.count(m) == 0) {
bool insert = messageIdPartitionMap.insert(std::make_pair(m, timePartitions.back())).second;
return insert && timePartitions.back().insert(m).second;
std::set<MessageId>& partition = timePartitions.back();
bool emplace = messageIdPartitionMap.emplace(m, partition).second;
bool insert = partition.insert(m).second;
return emplace && insert;
}
return false;
}
Expand All @@ -104,7 +106,8 @@ bool UnAckedMessageTrackerEnabled::isEmpty() {
bool UnAckedMessageTrackerEnabled::remove(const MessageId& m) {
std::lock_guard<std::mutex> acquire(lock_);
bool removed = false;
std::map<MessageId, std::set<MessageId>>::iterator exist = messageIdPartitionMap.find(m);

std::map<MessageId, std::set<MessageId>&>::iterator exist = messageIdPartitionMap.find(m);
if (exist != messageIdPartitionMap.end()) {
removed = exist->second.erase(m);
}
Expand All @@ -121,7 +124,7 @@ void UnAckedMessageTrackerEnabled::removeMessagesTill(const MessageId& msgId) {
for (auto it = messageIdPartitionMap.begin(); it != messageIdPartitionMap.end(); it++) {
MessageId msgIdInMap = it->first;
if (msgIdInMap < msgId) {
std::map<MessageId, std::set<MessageId>>::iterator exist = messageIdPartitionMap.find(msgId);
std::map<MessageId, std::set<MessageId>&>::iterator exist = messageIdPartitionMap.find(msgId);
if (exist != messageIdPartitionMap.end()) {
exist->second.erase(msgId);
}
Expand All @@ -135,7 +138,8 @@ void UnAckedMessageTrackerEnabled::removeTopicMessage(const std::string& topic)
for (auto it = messageIdPartitionMap.begin(); it != messageIdPartitionMap.end(); it++) {
MessageId msgIdInMap = it->first;
if (msgIdInMap.getTopicName().compare(topic) == 0) {
std::map<MessageId, std::set<MessageId>>::iterator exist = messageIdPartitionMap.find(msgIdInMap);
std::map<MessageId, std::set<MessageId>&>::iterator exist =
messageIdPartitionMap.find(msgIdInMap);
if (exist != messageIdPartitionMap.end()) {
exist->second.erase(msgIdInMap);
}
Expand Down
3 changes: 1 addition & 2 deletions pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
#include <mutex>

namespace pulsar {

class UnAckedMessageTrackerEnabled : public UnAckedMessageTrackerInterface {
public:
~UnAckedMessageTrackerEnabled();
Expand All @@ -41,7 +40,7 @@ class UnAckedMessageTrackerEnabled : public UnAckedMessageTrackerInterface {
void timeoutHandlerHelper();
bool isEmpty();
long size();
std::map<MessageId, std::set<MessageId>> messageIdPartitionMap;
std::map<MessageId, std::set<MessageId>&> messageIdPartitionMap;
std::deque<std::set<MessageId>> timePartitions;
std::mutex lock_;
DeadlineTimerPtr timer_;
Expand Down

0 comments on commit 468bb21

Please sign in to comment.