diff --git a/src/rdkafka_topic.c b/src/rdkafka_topic.c index fd3a17536..caaa41bc1 100644 --- a/src/rdkafka_topic.c +++ b/src/rdkafka_topic.c @@ -1192,7 +1192,7 @@ rd_bool_t rd_kafka_topic_set_notexists(rd_kafka_topic_t *rkt, (rkt->rkt_rk->rk_conf.metadata_propagation_max_ms * 1000)) - rkt->rkt_ts_metadata; - if (!permanent && rkt->rkt_state == RD_KAFKA_TOPIC_S_UNKNOWN && + if (!permanent && (rkt->rkt_state == RD_KAFKA_TOPIC_S_UNKNOWN || rkt->rkt_state == RD_KAFKA_TOPIC_S_EXISTS) && remains_us > 0) { /* Still allowing topic metadata to propagate. */ rd_kafka_dbg( @@ -1326,7 +1326,10 @@ rd_kafka_topic_metadata_update(rd_kafka_topic_t *rkt, if (mdt->err == RD_KAFKA_RESP_ERR_TOPIC_EXCEPTION /*invalid topic*/ || mdt->err == RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART || mdt->err == RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_ID) - rd_kafka_topic_set_notexists(rkt, mdt->err); + { + if (!rd_kafka_topic_set_notexists(rkt, mdt->err)) + goto cleanup; + } else if (mdt->partition_cnt > 0) rd_kafka_topic_set_state(rkt, RD_KAFKA_TOPIC_S_EXISTS); else if (mdt->err) @@ -1437,6 +1440,7 @@ rd_kafka_topic_metadata_update(rd_kafka_topic_t *rkt, rkt, mdt->err ? mdt->err : RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC); +cleanup: rd_kafka_topic_wrunlock(rkt); /* Loose broker references */