Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Why consumers do not report errors if the topic is deleted #4836

Open
3 of 7 tasks
sollhui opened this issue Sep 4, 2024 · 0 comments
Open
3 of 7 tasks

Why consumers do not report errors if the topic is deleted #4836

sollhui opened this issue Sep 4, 2024 · 0 comments

Comments

@sollhui
Copy link

sollhui commented Sep 4, 2024

Read the FAQ first: https://github.com/confluentinc/librdkafka/wiki/FAQ

Do NOT create issues for questions, use the discussion forum: https://github.com/confluentinc/librdkafka/discussions

Description

When my program was consuming Kafka data through consumer, I deleted the topic of Kafka. I expected the consumer to report an error, but in reality, there was no error and no consumption data.

This is part of my program

RdKafka::KafkaConsumer* _k_consumer = RdKafka::KafkaConsumer::create(conf, errstr);
while(true) {
std::unique_ptr<RdKafka::Message> msg(_k_consumer->consume(1000 /* timeout, ms */));
switch (msg->err()) {
        case RdKafka::ERR_NO_ERROR:
            if (_consuming_partition_ids.count(msg->partition()) <= 0) {
                _consuming_partition_ids.insert(msg->partition());
            }
            if (msg->len() == 0) {
                // ignore msg with length 0.
                // put empty msg into queue will cause the load process shutting down.
                break;
            } else if (!queue->blocking_put(msg.get())) {
                // queue is shutdown
                done = true;
            } else {
                ++put_rows;
                msg.release(); // release the ownership, msg will be deleted after being processed
            }
            ++received_rows;
            break;
        case RdKafka::ERR__TIMED_OUT:
            // leave the status as OK, because this may happened
            // if there is no data in kafka.
            LOG(INFO) << "kafka consume timeout: " << _id;
            break;
        case RdKafka::ERR__TRANSPORT:
            LOG(INFO) << "kafka consume Disconnected: " << _id
                      << ", retry times: " << retry_times++;
            if (retry_times <= MAX_RETRY_TIMES_FOR_TRANSPORT_FAILURE) {
                std::this_thread::sleep_for(std::chrono::milliseconds(200));
                break;
            }
            [[fallthrough]];
        case RdKafka::ERR__PARTITION_EOF: {
            LOG(INFO) << "consumer meet partition eof: " << _id
                      << " partition offset: " << msg->offset();
            _consuming_partition_ids.erase(msg->partition());
            if (!queue->blocking_put(msg.get())) {
                done = true;
            } else if (_consuming_partition_ids.size() <= 0) {
                msg.release();
                done = true;
            } else {
                msg.release();
            }
            break;
        }
        case RdKafka::ERR_OFFSET_OUT_OF_RANGE: {
            done = true;
            std::stringstream ss;
            ss << msg->errstr() << ", consume partition " << msg->partition() << ", consume offset "
               << msg->offset();
            LOG(WARNING) << "kafka consume failed: " << _id << ", msg: " << ss.str();
            st = Status::InternalError<false>(ss.str());
            break;
        }
        default:
            LOG(WARNING) << "kafka consume failed: " << _id << ", msg: " << msg->errstr();
            done = true;
            st = Status::InternalError<false>(msg->errstr());
            break;
        }
}
 

If a new consumer is created, there will be an error. I mainly want to know why consumers which have already established a connection with Kafka will not report an error?

How to reproduce

Delete topic of Kafka when consuming Kafka data.

IMPORTANT: Always try to reproduce the issue on the latest released version (see https://github.com/confluentinc/librdkafka/releases), if it can't be reproduced on the latest version the issue has been fixed.

Checklist

IMPORTANT: We will close issues where the checklist has not been completed.

Please provide the following information:

  • librdkafka version (release number or git tag): <1.9.2>
  • Apache Kafka version: <2.3.0>
  • librdkafka client configuration: <enable.partition.eof=true, enable.auto.offset.store=true>
  • Operating system: <linux>
  • Provide logs (with debug=.. as necessary) from librdkafka
  • Provide broker log excerpts
  • Critical issue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant