-
Notifications
You must be signed in to change notification settings - Fork 3.2k
Proposal: Consumer topic error handling
The consumer currently lacks proper and defined error propagation for certain topic errors, this proposal aims to fix that.
Currently there is no error propagation at all if an application subscribes to non-existent/unknown/unauthorized topics, the consumer will simply not include the non-existent topics in its JoinGroup request.
For each item in the subscription set, be it an exact topic name or a wildcard, if no available topic(s) were matched trigger a per-topic consumer error with the error code set ERR_UNKNOWN_TOPIC_OR_PART
for unknown topics, or ERR_TOPIC_AUTHORIZATION_FAILED
for unauthorized topics.
These errors will be standard consumer errors (RD_KAFKA_OP_CONSUMER_ERR
) which are exposed as message
objects returned from consumer_poll()
(et.al) with the err
field set.
After propagating errors for the unknown or unauthorized topics, any remaining valid topics in the subscription shall be consumed.
Sample application code:
while (run) {
rd_kafka_message_t *rkm = rd_kafka_consumer_poll(rk, -1);
if (!rkm)
continue;
if (!rkm->err)
handle_msg(rkm);
else if (rkm->err == RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART)
handle_unknown_topic(rd_kafka_topic_name(rkm->rkt));
else if (rkm->err == RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED))
handle_unauth_topic(rd_kafka_topic_name(rkm->rkt));
else
handle_other_consumer_error(...);
rd_kafka_message_destroy(rkm);
}
The intervalled metadata update will reapply the subscription on the cluster topics every topic.metadata.refresh.interval.ms
, but in the case where at least one subscribed topic remains unknown or unauthorized we want to avoid triggering a new error on each such check.
Only when the topic changes state between unknown, unauthorized or valid shall the error suppression be cleared.