Skip to content

Commit 30973e5

Browse files
committed
Changed m_is_subscribed state behavior
- Do not update m_is_subscribed variable from assign / unassign method - Removed IsSubscribed call in Consumer::Consume & ConsumerConsumeLoop's execute method
1 parent 4faba5e commit 30973e5

File tree

2 files changed

+3
-7
lines changed

2 files changed

+3
-7
lines changed

src/consumer.cc

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -153,8 +153,6 @@ Baton Consumer::Assign(std::vector<RdKafka::TopicPartition*> partitions) {
153153
m_partition_cnt = partitions.size();
154154
m_partitions.swap(partitions);
155155

156-
m_is_subscribed = true;
157-
158156
return Baton(RdKafka::ERR_NO_ERROR);
159157
}
160158

@@ -172,8 +170,6 @@ Baton Consumer::Unassign() {
172170
return Baton(errcode);
173171
}
174172

175-
m_is_subscribed = false;
176-
177173
m_partitions.empty();
178174
m_partition_cnt = 0;
179175

@@ -250,9 +246,9 @@ Baton Consumer::Subscribe(std::vector<std::string> topics) {
250246
NodeKafka::Message* Consumer::Consume() {
251247
NodeKafka::Message* m;
252248

253-
if (IsConnected() && IsSubscribed()) {
249+
if (IsConnected()) {
254250
scoped_mutex_lock lock(m_connection_lock);
255-
if (!IsConnected() && IsSubscribed()) {
251+
if (!IsConnected()) {
256252
m = new NodeKafka::Message(RdKafka::ERR__STATE);
257253
} else {
258254
RdKafka::KafkaConsumer* consumer =

src/workers.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -505,7 +505,7 @@ ConsumerConsumeLoop::~ConsumerConsumeLoop() {}
505505

506506
void ConsumerConsumeLoop::Execute(const ExecutionMessageBus& bus) {
507507
// Do one check here before we move forward
508-
while (consumer->IsConnected() && consumer->IsSubscribed()) {
508+
while (consumer->IsConnected()) {
509509
NodeKafka::Message* message = consumer->Consume();
510510
if (message->errcode() == RdKafka::ERR__PARTITION_EOF) {
511511
delete message;

0 commit comments

Comments
 (0)