From fe373242b855c9e021f61a4a5c3e13724aa8b7a0 Mon Sep 17 00:00:00 2001 From: Pranav Rathi <4427674+pranavrth@users.noreply.github.com> Date: Wed, 6 Mar 2024 12:42:55 +0530 Subject: [PATCH] [KIP-848] Added Heartbeat Errorcodes, OffsetCommit Request, OffsetCommit Response and fixed multiple issues including segfaults - Added error handling to ConsumerGroupHeartbeat API - Added type new errors - UNRELEASED_INSTANCE_ID and UNSUPPORTED_ASSIGNOR - Added partial acknowledgement flow - Upgraded OffsetCommit Request and response to v9 - Fixed metadata being called with duplicate topic id - Fixed next_target_assignment not getting reset to NULL - Fixed member stuck if fenced during rebalancing - Fixed segfault with current and target assignment while resetting consumer group - Fixed segfault due to deleted topic in metadata - Fixed leave not being called if the consumer without any assignment leaves --- INTRODUCTION.md | 76 ++++++------- examples/consumer.c | 93 ++++++++++++++++ src/rdkafka.c | 6 + src/rdkafka.h | 15 ++- src/rdkafka_cgrp.c | 239 ++++++++++++++++++++++++++++++++++------ src/rdkafka_cgrp.h | 12 +- src/rdkafka_int.h | 11 +- src/rdkafka_metadata.c | 8 ++ src/rdkafka_partition.c | 31 +++++- src/rdkafka_partition.h | 7 ++ src/rdkafka_proto.h | 9 ++ src/rdkafka_request.c | 115 +++++++------------ src/rdkafka_request.h | 2 + src/rdkafka_topic.c | 35 +++++- src/rdkafka_topic.h | 11 ++ 15 files changed, 510 insertions(+), 160 deletions(-) diff --git a/INTRODUCTION.md b/INTRODUCTION.md index b0e2bd38b0..aa2419e907 100644 --- a/INTRODUCTION.md +++ b/INTRODUCTION.md @@ -1972,44 +1972,44 @@ The [Apache Kafka Implementation Proposals (KIPs)](https://cwiki.apache.org/conf release of librdkafka. -| ApiKey | Request name | Kafka max | librdkafka max | -| ------- | ------------------------------| ----------- | ----------------------- | -| 0 | Produce | 9 | 7 | -| 1 | Fetch | 15 | 11 | -| 2 | ListOffsets | 8 | 7 | -| 3 | Metadata | 12 | 12 | -| 8 | OffsetCommit | 8 | 7 | -| 9 | OffsetFetch | 8 | 7 | -| 10 | FindCoordinator | 4 | 2 | -| 11 | JoinGroup | 9 | 5 | -| 12 | Heartbeat | 4 | 3 | -| 13 | LeaveGroup | 5 | 1 | -| 14 | SyncGroup | 5 | 3 | -| 15 | DescribeGroups | 5 | 4 | -| 16 | ListGroups | 4 | 4 | -| 17 | SaslHandshake | 1 | 1 | -| 18 | ApiVersions | 3 | 3 | -| 19 | CreateTopics | 7 | 4 | -| 20 | DeleteTopics | 6 | 1 | -| 21 | DeleteRecords | 2 | 1 | -| 22 | InitProducerId | 4 | 4 | -| 23 | OffsetForLeaderEpoch | 4 | 2 | -| 24 | AddPartitionsToTxn | 4 | 0 | -| 25 | AddOffsetsToTxn | 3 | 0 | -| 26 | EndTxn | 3 | 1 | -| 28 | TxnOffsetCommit | 3 | 3 | -| 29 | DescribeAcls | 3 | 1 | -| 30 | CreateAcls | 3 | 1 | -| 31 | DeleteAcls | 3 | 1 | -| 32 | DescribeConfigs | 4 | 1 | -| 33 | AlterConfigs | 2 | 2 | -| 36 | SaslAuthenticate | 2 | 1 | -| 37 | CreatePartitions | 3 | 0 | -| 42 | DeleteGroups | 2 | 1 | -| 44 | IncrementalAlterConfigs | 1 | 1 | -| 47 | OffsetDelete | 0 | 0 | -| 50 | DescribeUserScramCredentials | 0 | 0 | -| 51 | AlterUserScramCredentials | 0 | 0 | +| ApiKey | Request name | Kafka max | librdkafka max | +| ------- | ------------------------------|-----------|----------------| +| 0 | Produce | 9 | 7 | +| 1 | Fetch | 15 | 11 | +| 2 | ListOffsets | 8 | 7 | +| 3 | Metadata | 12 | 12 | +| 8 | OffsetCommit | 9 | 7 | +| 9 | OffsetFetch | 9 | 9 | +| 10 | FindCoordinator | 4 | 2 | +| 11 | JoinGroup | 9 | 5 | +| 12 | Heartbeat | 4 | 3 | +| 13 | LeaveGroup | 5 | 1 | +| 14 | SyncGroup | 5 | 3 | +| 15 | DescribeGroups | 5 | 4 | +| 16 | ListGroups | 4 | 4 | +| 17 | SaslHandshake | 1 | 1 | +| 18 | ApiVersions | 3 | 3 | +| 19 | CreateTopics | 7 | 4 | +| 20 | DeleteTopics | 6 | 1 | +| 21 | DeleteRecords | 2 | 1 | +| 22 | InitProducerId | 4 | 4 | +| 23 | OffsetForLeaderEpoch | 4 | 2 | +| 24 | AddPartitionsToTxn | 4 | 0 | +| 25 | AddOffsetsToTxn | 3 | 0 | +| 26 | EndTxn | 3 | 1 | +| 28 | TxnOffsetCommit | 3 | 3 | +| 29 | DescribeAcls | 3 | 1 | +| 30 | CreateAcls | 3 | 1 | +| 31 | DeleteAcls | 3 | 1 | +| 32 | DescribeConfigs | 4 | 1 | +| 33 | AlterConfigs | 2 | 2 | +| 36 | SaslAuthenticate | 2 | 1 | +| 37 | CreatePartitions | 3 | 0 | +| 42 | DeleteGroups | 2 | 1 | +| 44 | IncrementalAlterConfigs | 1 | 1 | +| 47 | OffsetDelete | 0 | 0 | +| 50 | DescribeUserScramCredentials | 0 | 0 | +| 51 | AlterUserScramCredentials | 0 | 0 | # Recommendations for language binding developers diff --git a/examples/consumer.c b/examples/consumer.c index 8ce6f77f4d..ab8a6fb5c7 100644 --- a/examples/consumer.c +++ b/examples/consumer.c @@ -69,6 +69,60 @@ static int is_printable(const char *buf, size_t size) { return 1; } +static void +print_partition_list(FILE *fp, + const rd_kafka_topic_partition_list_t *partitions) { + int i; + for (i = 0; i < partitions->cnt; i++) { + fprintf(fp, "%s %s [%" PRId32 "] offset %" PRId64, + i > 0 ? "," : "", partitions->elems[i].topic, + partitions->elems[i].partition, + partitions->elems[i].offset); + } + fprintf(fp, "\n"); +} + +static void rebalance_cb(rd_kafka_t *rk, + rd_kafka_resp_err_t err, + rd_kafka_topic_partition_list_t *partitions, + void *opaque) { + rd_kafka_error_t *error = NULL; + rd_kafka_resp_err_t ret_err = RD_KAFKA_RESP_ERR_NO_ERROR; + + fprintf(stderr, "%% Consumer group rebalanced: "); + + switch (err) { + case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS: + fprintf(stderr, "assigned (%s):\n", + rd_kafka_rebalance_protocol(rk)); + print_partition_list(stderr, partitions); + + error = rd_kafka_incremental_assign(rk, partitions); + break; + + case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS: + fprintf(stderr, "revoked (%s):\n", + rd_kafka_rebalance_protocol(rk)); + print_partition_list(stderr, partitions); + + error = rd_kafka_incremental_unassign(rk, partitions); + break; + + default: + fprintf(stderr, "failed: %s\n", rd_kafka_err2str(err)); + rd_kafka_assign(rk, NULL); + break; + } + + if (error) { + fprintf(stderr, "incremental assign failure: %s\n", + rd_kafka_error_string(error)); + rd_kafka_error_destroy(error); + } else if (ret_err) { + fprintf(stderr, "assign failure: %s\n", + rd_kafka_err2str(ret_err)); + } +} int main(int argc, char **argv) { rd_kafka_t *rk; /* Consumer instance handle */ @@ -127,6 +181,45 @@ int main(int argc, char **argv) { return 1; } + if (rd_kafka_conf_set(conf, "group.protocol", "consumer", errstr, + sizeof(errstr)) != RD_KAFKA_CONF_OK) { + fprintf(stderr, "%s\n", errstr); + rd_kafka_conf_destroy(conf); + return 1; + } + + if (rd_kafka_conf_set(conf, "debug", "all", errstr, sizeof(errstr)) != + RD_KAFKA_CONF_OK) { + fprintf(stderr, "%s\n", errstr); + rd_kafka_conf_destroy(conf); + return 1; + } + + if (rd_kafka_conf_set(conf, "session.timeout.ms", "10000", errstr, + sizeof(errstr)) != RD_KAFKA_CONF_OK) { + fprintf(stderr, "%s\n", errstr); + rd_kafka_conf_destroy(conf); + return 1; + } + + if (rd_kafka_conf_set(conf, "max.poll.interval.ms", "20000", errstr, + sizeof(errstr)) != RD_KAFKA_CONF_OK) { + fprintf(stderr, "%s\n", errstr); + rd_kafka_conf_destroy(conf); + return 1; + } + + /* Callback called on partition assignment changes */ + rd_kafka_conf_set_rebalance_cb(conf, rebalance_cb); + + + // if (rd_kafka_conf_set(conf, "debug", "all", errstr, + // sizeof(errstr)) != RD_KAFKA_CONF_OK) { + // fprintf(stderr, "%s\n", errstr); + // rd_kafka_conf_destroy(conf); + // return 1; + // } + /* If there is no previously committed offset for a partition * the auto.offset.reset strategy will be used to decide where * in the partition to start fetching messages. diff --git a/src/rdkafka.c b/src/rdkafka.c index d124d0e413..d66c94cafb 100644 --- a/src/rdkafka.c +++ b/src/rdkafka.c @@ -703,6 +703,12 @@ static const struct rd_kafka_err_desc rd_kafka_err_descs[] = { _ERR_DESC(RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_ID, "Broker: Unknown topic id"), _ERR_DESC(RD_KAFKA_RESP_ERR_FENCED_MEMBER_EPOCH, "Broker: The member epoch is fenced by the group coordinator"), + _ERR_DESC(RD_KAFKA_RESP_ERR_UNRELEASED_INSTANCE_ID, + "Broker: The instance ID is still used by another member in the " + "consumer group"), + _ERR_DESC(RD_KAFKA_RESP_ERR_UNSUPPORTED_ASSIGNOR, + "Broker: The assignor or its version range is not supported by " + "the consumer group"), _ERR_DESC(RD_KAFKA_RESP_ERR_STALE_MEMBER_EPOCH, "Broker: The member epoch is stale"), _ERR_DESC(RD_KAFKA_RESP_ERR__END, NULL)}; diff --git a/src/rdkafka.h b/src/rdkafka.h index 737f890681..02a227757b 100644 --- a/src/rdkafka.h +++ b/src/rdkafka.h @@ -633,9 +633,20 @@ typedef enum { RD_KAFKA_RESP_ERR_PRINCIPAL_DESERIALIZATION_FAILURE = 97, /** Unknown Topic Id */ RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_ID = 100, - /** The member epoch is fenced by the group coordinator */ + /** The member epoch is fenced by the group coordinator. The member must + * abandon all its partitions and rejoin. */ RD_KAFKA_RESP_ERR_FENCED_MEMBER_EPOCH = 110, - /** The member epoch is stale */ + /** The instance ID is still used by another member in the + * consumer group. That member must leave first. + */ + RD_KAFKA_RESP_ERR_UNRELEASED_INSTANCE_ID = 111, + /** The assignor or its version range is not supported by + * the consumer group. + */ + RD_KAFKA_RESP_ERR_UNSUPPORTED_ASSIGNOR = 112, + /** The member epoch is stale. + * The member must retry after receiving its updated member epoch + * via the ConsumerGroupHeartbeat API. */ RD_KAFKA_RESP_ERR_STALE_MEMBER_EPOCH = 113, RD_KAFKA_RESP_ERR_END_ALL, } rd_kafka_resp_err_t; diff --git a/src/rdkafka_cgrp.c b/src/rdkafka_cgrp.c index 301d4aa0d7..46ef318ca1 100644 --- a/src/rdkafka_cgrp.c +++ b/src/rdkafka_cgrp.c @@ -421,7 +421,7 @@ rd_kafka_cgrp_t *rd_kafka_cgrp_new(rd_kafka_t *rk, const rd_kafkap_str_t *group_id, const rd_kafkap_str_t *client_id) { rd_kafka_cgrp_t *rkcg; - + setbuf(stdout, 0); rkcg = rd_calloc(1, sizeof(*rkcg)); rkcg->rkcg_rk = rk; @@ -892,6 +892,7 @@ static void rd_kafka_cgrp_consumer_reset(rd_kafka_cgrp_t *rkcg) { rd_kafka_topic_partition_list_destroy(rkcg->rkcg_current_assignment); RD_IF_FREE(rkcg->rkcg_target_assignment, rd_kafka_topic_partition_list_destroy); + rkcg->rkcg_target_assignment = NULL; RD_IF_FREE(rkcg->rkcg_next_target_assignment, rd_kafka_topic_partition_list_destroy); rkcg->rkcg_current_assignment = rd_kafka_topic_partition_list_new(0); @@ -2591,28 +2592,34 @@ static rd_bool_t rd_kafka_cgrp_update_subscribed_topics(rd_kafka_cgrp_t *rkcg, return rd_true; } -static rd_kafka_op_res_t -rd_kafka_cgrp_consumer_handle_next_assignment(rd_kafka_cgrp_t *rkcg) { +static rd_kafka_op_res_t rd_kafka_cgrp_consumer_handle_next_assignment( + rd_kafka_cgrp_t *rkcg, + rd_kafka_topic_partition_list_t *new_target_assignments) { rd_bool_t is_assignment_different = rd_false; if (rkcg->rkcg_consumer_flags & RD_KAFKA_CGRP_CONSUMER_F_WAITS_ACK) return RD_KAFKA_OP_RES_HANDLED; if (rkcg->rkcg_target_assignment) { is_assignment_different = rd_kafka_topic_partition_list_cmp( - rkcg->rkcg_next_target_assignment, - rkcg->rkcg_target_assignment, + new_target_assignments, rkcg->rkcg_target_assignment, rd_kafka_topic_partition_by_id_cmp); } else { is_assignment_different = rd_kafka_topic_partition_list_cmp( - rkcg->rkcg_next_target_assignment, - rkcg->rkcg_current_assignment, + new_target_assignments, rkcg->rkcg_current_assignment, rd_kafka_topic_partition_by_id_cmp); } + /* + * TODO: What happens in other states? + */ if (!is_assignment_different) { - RD_IF_FREE(rkcg->rkcg_next_target_assignment, - rd_kafka_topic_partition_list_destroy); - rkcg->rkcg_next_target_assignment = NULL; + if (rkcg->rkcg_next_target_assignment && + (new_target_assignments->cnt == + rkcg->rkcg_next_target_assignment->cnt)) { + rd_kafka_topic_partition_list_destroy( + rkcg->rkcg_next_target_assignment); + rkcg->rkcg_next_target_assignment = NULL; + } } else if (rkcg->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_INIT || rkcg->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_STEADY) { rkcg->rkcg_consumer_flags |= RD_KAFKA_CGRP_CONSUMER_F_WAITS_ACK; @@ -2621,7 +2628,16 @@ rd_kafka_cgrp_consumer_handle_next_assignment(rd_kafka_cgrp_t *rkcg) { rkcg->rkcg_target_assignment); } rkcg->rkcg_target_assignment = - rkcg->rkcg_next_target_assignment; + rd_kafka_topic_partition_list_copy(new_target_assignments); + + if (rkcg->rkcg_next_target_assignment && + (new_target_assignments->cnt == + rkcg->rkcg_next_target_assignment->cnt)) { + rd_kafka_topic_partition_list_destroy( + rkcg->rkcg_next_target_assignment); + rkcg->rkcg_next_target_assignment = NULL; + } + if (rd_kafka_is_dbg(rkcg->rkcg_rk, CGRP)) { char rkcg_target_assignment_str[512] = "NULL"; @@ -2635,7 +2651,6 @@ rd_kafka_cgrp_consumer_handle_next_assignment(rd_kafka_cgrp_t *rkcg) { "assignment \"%s\"", rkcg_target_assignment_str); } - rkcg->rkcg_next_target_assignment = NULL; rd_kafka_cgrp_handle_assignment(rkcg, rkcg->rkcg_target_assignment); } @@ -2650,8 +2665,14 @@ static rd_kafka_op_res_t rd_kafka_cgrp_consumer_handle_Metadata_op(rd_kafka_t *rk, rd_kafka_q_t *rkq, rd_kafka_op_t *rko) { - int i, j, found = 0; + /* + * FIXME: Using next_target_assignment is not correct as other heartbeat + * call can change it. + */ + int i, j; rd_kafka_cgrp_t *rkcg = rk->rk_cgrp; + rd_kafka_op_res_t assignment_handle_ret; + rd_kafka_topic_partition_list_t *new_target_assignments; if (rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY) return RD_KAFKA_OP_RES_HANDLED; /* Terminating */ @@ -2660,15 +2681,15 @@ rd_kafka_cgrp_consumer_handle_Metadata_op(rd_kafka_t *rk, return RD_KAFKA_OP_RES_HANDLED; /* Update topic name for all the assignments given by topic id - * KIP848TODO: Improve complexity. + * TODO: Improve complexity. + */ + /* + * TODO: Checking local metadata cache is an improvement which we + * can do later. */ + new_target_assignments = rd_kafka_topic_partition_list_new( + rkcg->rkcg_next_target_assignment->cnt); for (i = 0; i < rkcg->rkcg_next_target_assignment->cnt; i++) { - rd_kafka_topic_partition_t *rktpar = - &rkcg->rkcg_next_target_assignment->elems[i]; - if (rktpar->topic) { - found++; - continue; - } rd_kafka_Uuid_t request_topic_id = rd_kafka_topic_partition_get_topic_id( &rkcg->rkcg_next_target_assignment->elems[i]); @@ -2677,17 +2698,34 @@ rd_kafka_cgrp_consumer_handle_Metadata_op(rd_kafka_t *rk, rko->rko_u.metadata.mdi->topics[j].topic_id; if (!rd_kafka_Uuid_cmp(request_topic_id, compare_topic_id)) { - rktpar->topic = rd_strdup( - rko->rko_u.metadata.md->topics[j].topic); - found++; + if (rko->rko_u.metadata.md->topics[j].err == + RD_KAFKA_RESP_ERR_NO_ERROR) + rd_kafka_topic_partition_list_add_with_topic_name_and_id( + new_target_assignments, + request_topic_id, + rko->rko_u.metadata.md->topics[j] + .topic, + rkcg->rkcg_next_target_assignment + ->elems[i] + .partition); + else + rd_kafka_dbg( + rkcg->rkcg_rk, CGRP, "HEARTBEAT", + "Metadata not found for the " + "assigned topic id - %s due to: " + "%s: " + "Continuing without it", + rd_kafka_Uuid_base64str( + &request_topic_id), + rd_kafka_err2str( + rko->rko_u.metadata.md + ->topics[j] + .err)); break; } } } - if (found < rkcg->rkcg_next_target_assignment->cnt) - return RD_KAFKA_OP_RES_HANDLED; - if (rd_kafka_is_dbg(rkcg->rkcg_rk, CGRP)) { char rkcg_next_target_assignment_str[512] = "NULL"; @@ -2702,7 +2740,10 @@ rd_kafka_cgrp_consumer_handle_Metadata_op(rd_kafka_t *rk, rkcg_next_target_assignment_str); } - return rd_kafka_cgrp_consumer_handle_next_assignment(rkcg); + assignment_handle_ret = rd_kafka_cgrp_consumer_handle_next_assignment( + rkcg, new_target_assignments); + rd_kafka_topic_partition_list_destroy(new_target_assignments); + return assignment_handle_ret; } void rd_kafka_cgrp_consumer_next_target_assignment_request_metadata( @@ -2712,22 +2753,29 @@ void rd_kafka_cgrp_consumer_next_target_assignment_request_metadata( rd_kafka_op_t *rko; rd_kafka_cgrp_t *rkcg = rk->rk_cgrp; rd_kafka_Uuid_t topic_id; + rd_kafka_Uuid_t prev_topic_id = RD_KAFKA_UUID_ZERO; rd_list_t *topic_ids; int i; if (!rkcg->rkcg_next_target_assignment->cnt) { /* No metadata to request, continue with handle_next_assignment. */ - rd_kafka_cgrp_consumer_handle_next_assignment(rkcg); + rd_kafka_topic_partition_list_t *new_target_assignment = + rd_kafka_topic_partition_list_new(0); + rd_kafka_cgrp_consumer_handle_next_assignment( + rkcg, new_target_assignment); + rd_kafka_topic_partition_list_destroy(new_target_assignment); return; } topic_ids = rd_list_new(1, rd_list_Uuid_destroy); - for (i = 0; i < rkcg->rkcg_next_target_assignment->cnt; i++) { topic_id = rd_kafka_topic_partition_get_topic_id( &rkcg->rkcg_next_target_assignment->elems[i]); - rd_list_add(topic_ids, rd_kafka_Uuid_copy(&topic_id)); + if (rd_kafka_Uuid_cmp(prev_topic_id, topic_id) && + !rd_list_find(topic_ids, &topic_id, rd_list_Uuid_cmp)) + rd_list_add(topic_ids, rd_kafka_Uuid_copy(&topic_id)); + prev_topic_id = topic_id; } rko = rd_kafka_op_new_cb(rkcg->rkcg_rk, RD_KAFKA_OP_METADATA, @@ -2751,6 +2799,7 @@ void rd_kafka_cgrp_handle_ConsumerGroupHeartbeat(rd_kafka_t *rk, rd_kafka_cgrp_t *rkcg = rk->rk_cgrp; const int log_decode_errors = LOG_ERR; int16_t error_code = 0; + int actions = 0; rd_kafkap_str_t error_str; rd_kafkap_str_t member_id; int32_t member_epoch; @@ -2869,7 +2918,100 @@ void rd_kafka_cgrp_handle_ConsumerGroupHeartbeat(rd_kafka_t *rk, err: rkcg->rkcg_last_heartbeat_err = err; rkcg->rkcg_flags &= ~RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT; - rkcg->rkcg_last_heartbeat_err = err; + + switch (err) { + case RD_KAFKA_RESP_ERR__DESTROY: + /* quick cleanup */ + return; + + case RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS: + rd_kafka_dbg(rkcg->rkcg_rk, CONSUMER, "HEARTBEAT", + "Heartbeat failed due to coordinator (%s) " + "loading in progress: %s: " + "retrying", + rkcg->rkcg_curr_coord + ? rd_kafka_broker_name(rkcg->rkcg_curr_coord) + : "none", + rd_kafka_err2str(err)); + actions = RD_KAFKA_ERR_ACTION_RETRY; + break; + + case RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP: + case RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE: + rd_kafka_dbg(rkcg->rkcg_rk, CONSUMER, "HEARTBEAT", + "Heartbeat failed due to coordinator (%s) " + "no longer available: %s: " + "re-querying for coordinator", + rkcg->rkcg_curr_coord + ? rd_kafka_broker_name(rkcg->rkcg_curr_coord) + : "none", + rd_kafka_err2str(err)); + /* Remain in joined state and keep querying for coordinator */ + actions = RD_KAFKA_ERR_ACTION_REFRESH; + break; + + case RD_KAFKA_RESP_ERR__TRANSPORT: + rd_kafka_dbg(rkcg->rkcg_rk, CONSUMER, "HEARTBEAT", + "Heartbeat failed due to coordinator (%s) " + "no longer available: %s: " + "re-querying for coordinator", + rkcg->rkcg_curr_coord + ? rd_kafka_broker_name(rkcg->rkcg_curr_coord) + : "none", + rd_kafka_err2str(err)); + /* Remain in joined state and keep querying for coordinator */ + actions = RD_KAFKA_ERR_ACTION_REFRESH; + rkcg->rkcg_consumer_flags |= + RD_KAFKA_CGRP_CONSUMER_F_SEND_FULL_REQUEST; + break; + + case RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID: + case RD_KAFKA_RESP_ERR_FENCED_MEMBER_EPOCH: + rd_kafka_dbg(rkcg->rkcg_rk, CONSUMER, "HEARTBEAT", + "Heartbeat failed due to: %s: " + "will rejoining the group", + rd_kafka_err2str(err)); + rkcg->rkcg_consumer_flags |= + RD_KAFKA_CGRP_CONSUMER_F_WAIT_REJOIN; + return; + + case RD_KAFKA_RESP_ERR_INVALID_REQUEST: + case RD_KAFKA_RESP_ERR_GROUP_MAX_SIZE_REACHED: + case RD_KAFKA_RESP_ERR_UNSUPPORTED_ASSIGNOR: + case RD_KAFKA_RESP_ERR_UNSUPPORTED_VERSION: + case RD_KAFKA_RESP_ERR_UNRELEASED_INSTANCE_ID: + case RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED: + actions = RD_KAFKA_ERR_ACTION_FATAL; + break; + + default: + actions = rd_kafka_err_action(rkb, err, request, + RD_KAFKA_ERR_ACTION_END); + break; + } + + + if (actions & RD_KAFKA_ERR_ACTION_REFRESH) { + /* Re-query for coordinator */ + rd_kafka_cgrp_coord_query(rkcg, rd_kafka_err2str(err)); + } + + if (actions & RD_KAFKA_ERR_ACTION_RETRY && + rd_kafka_buf_retry(rkb, request)) { + /* Retry */ + rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT; + return; + } + + if (actions & RD_KAFKA_ERR_ACTION_FATAL) { + rd_kafka_set_fatal_error(rkcg->rkcg_rk, err, + "Fatal consumer error: %s", + rd_kafka_err2str(err)); + rd_kafka_cgrp_revoke_all_rejoin_maybe( + rkcg, rd_true, /*assignments lost*/ + rd_true, /*initiating*/ + "Fatal error in ConsumerGroupHeartbeat API response"); + } } @@ -5526,14 +5668,29 @@ void rd_kafka_cgrp_consumer_group_heartbeat(rd_kafka_cgrp_t *rkcg, void rd_kafka_cgrp_consumer_serve(rd_kafka_cgrp_t *rkcg) { rd_ts_t now = rd_clock(); - rd_bool_t full_request = rd_false; - rd_bool_t send_ack = rd_false; + rd_bool_t full_request = rkcg->rkcg_consumer_flags & + RD_KAFKA_CGRP_CONSUMER_F_SEND_FULL_REQUEST; + rd_bool_t send_ack = rd_false; if (unlikely(rd_kafka_fatal_error_code(rkcg->rkcg_rk))) return; + if (unlikely(rkcg->rkcg_consumer_flags & + RD_KAFKA_CGRP_CONSUMER_F_WAIT_REJOIN)) { + if (RD_KAFKA_CGRP_REBALANCING(rkcg)) + return; + rkcg->rkcg_consumer_flags &= + ~RD_KAFKA_CGRP_CONSUMER_F_WAIT_REJOIN; + rkcg->rkcg_consumer_flags |= + RD_KAFKA_CGRP_CONSUMER_F_WAIT_REJOIN_TO_COMPLETE; + rd_kafka_cgrp_revoke_all_rejoin(rkcg, rd_true, rd_true, + "member fenced - rejoining"); + } + switch (rkcg->rkcg_join_state) { case RD_KAFKA_CGRP_JOIN_STATE_INIT: + rkcg->rkcg_consumer_flags &= + ~RD_KAFKA_CGRP_CONSUMER_F_WAIT_REJOIN_TO_COMPLETE; full_request = rd_true; break; case RD_KAFKA_CGRP_JOIN_STATE_STEADY: @@ -5552,10 +5709,15 @@ void rd_kafka_cgrp_consumer_serve(rd_kafka_cgrp_t *rkcg) { } if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_SUBSCRIPTION && + !(rkcg->rkcg_consumer_flags & + RD_KAFKA_CGRP_CONSUMER_F_WAIT_REJOIN_TO_COMPLETE) && rd_interval(&rkcg->rkcg_heartbeat_intvl, - rkcg->rkcg_heartbeat_intvl_ms * 1000, now) > 0) + rkcg->rkcg_heartbeat_intvl_ms * 1000, now) > 0) { rd_kafka_cgrp_consumer_group_heartbeat(rkcg, full_request, send_ack); + rkcg->rkcg_consumer_flags &= + ~RD_KAFKA_CGRP_CONSUMER_F_SEND_FULL_REQUEST; + } } /** @@ -5831,6 +5993,15 @@ static void rd_kafka_cgrp_consumer_assignment_done(rd_kafka_cgrp_t *rkcg) { /* FALLTHRU */ case RD_KAFKA_CGRP_JOIN_STATE_INIT: + /* + * There maybe a case when there are no assignments are + * assigned to this consumer. In this case, while terminating + * the consumer can be in STEADY or INIT state and won't go + * to intermediate state. In this scenario, last leave call is + * done from here. + */ + rd_kafka_cgrp_leave_maybe(rkcg); + /* Check if cgrp is trying to terminate, which is safe to do * in these two states. Otherwise we'll need to wait for * the current state to decommission. */ diff --git a/src/rdkafka_cgrp.h b/src/rdkafka_cgrp.h index 322c808fdf..2cd5a59a3b 100644 --- a/src/rdkafka_cgrp.h +++ b/src/rdkafka_cgrp.h @@ -279,9 +279,15 @@ typedef struct rd_kafka_cgrp_s { #define RD_KAFKA_CGRP_CONSUMER_F_WAITS_ACK 0x1 /* TODO: write */ #define RD_KAFKA_CGRP_CONSUMER_F_SEND_NEW_SUBSCRIPTION 0x2 /* TODO: write */ #define RD_KAFKA_CGRP_CONSUMER_F_SENDING_NEW_SUBSCRIPTION \ - 0x4 /* TODO: write \ - */ -#define RD_KAFKA_CGRP_CONSUMER_F_SUBSCRIBED_ONCE 0x8 /* TODO: write */ + 0x4 /* TODO: write \ + */ +#define RD_KAFKA_CGRP_CONSUMER_F_SUBSCRIBED_ONCE 0x8 /* TODO: write */ +#define RD_KAFKA_CGRP_CONSUMER_F_SEND_FULL_REQUEST 0x10 /* TODO: write */ +#define RD_KAFKA_CGRP_CONSUMER_F_WAIT_REJOIN \ + 0x20 /* Member is fenced, need to rejoin */ +#define RD_KAFKA_CGRP_CONSUMER_F_WAIT_REJOIN_TO_COMPLETE \ + 0x40 /* Member is fenced, rejoining */ + /** Rejoin the group following a currently in-progress * incremental unassign. */ diff --git a/src/rdkafka_int.h b/src/rdkafka_int.h index 36947785ab..b7edf9bce7 100644 --- a/src/rdkafka_int.h +++ b/src/rdkafka_int.h @@ -959,10 +959,15 @@ static RD_INLINE RD_UNUSED rd_kafka_resp_err_t rd_kafka_fatal_error_code(rd_kafka_t *rk) { /* This is an optimization to avoid an atomic read which are costly * on some platforms: - * Fatal errors are currently only raised by the idempotent producer - * and static consumers (group.instance.id). */ + * Fatal errors are currently raised by: + * 1) the idempotent producer + * 2) static consumers (group.instance.id) + * 3) Group using consumer protocol (Introduced in KIP-848). See exact + * errors in rd_kafka_cgrp_handle_ConsumerGroupHeartbeat() */ if ((rk->rk_type == RD_KAFKA_PRODUCER && rk->rk_conf.eos.idempotence) || - (rk->rk_type == RD_KAFKA_CONSUMER && rk->rk_conf.group_instance_id)) + (rk->rk_type == RD_KAFKA_CONSUMER && + (rk->rk_conf.group_instance_id || + rk->rk_conf.group_protocol == RD_KAFKA_GROUP_PROTOCOL_CONSUMER))) return rd_atomic32_get(&rk->rk_fatal.err); return RD_KAFKA_RESP_ERR_NO_ERROR; diff --git a/src/rdkafka_metadata.c b/src/rdkafka_metadata.c index c4bb2fcfc9..e39bbc14d4 100644 --- a/src/rdkafka_metadata.c +++ b/src/rdkafka_metadata.c @@ -831,6 +831,7 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb, rd_kafka_parse_Metadata_update_topic(rkb, &md->topics[i], &mdi->topics[i]); + // TODO: Should be done for requested_topic_ids as well. if (requested_topics) { rd_list_free_cb(missing_topics, rd_list_remove_cmp(missing_topics, @@ -857,6 +858,7 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb, } } + // TODO: Should be done for missing_topic_ids as well. /* Requested topics not seen in metadata? Propogate to topic code. */ if (missing_topics) { char *topic; @@ -958,6 +960,8 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb, rd_kafka_metadata_cache_expiry_start(rk); } + + // TODO: Should be done for requested_topic_ids as well. /* Remove cache hints for the originally requested topics. */ if (requested_topics) rd_kafka_metadata_cache_purge_hints(rk, requested_topics); @@ -989,6 +993,8 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb, } done: + + // TODO: Should be done for requested_topic_ids as well. if (missing_topics) rd_list_destroy(missing_topics); @@ -1005,6 +1011,7 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb, err_parse: err = rkbuf->rkbuf_err; err: + // TODO: Should be done for requested_topic_ids as well. if (requested_topics) { /* Failed requests shall purge cache hints for * the requested topics. */ @@ -1013,6 +1020,7 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb, rd_kafka_wrunlock(rkb->rkb_rk); } + // TODO: Should be done for requested_topic_ids as well. if (missing_topics) rd_list_destroy(missing_topics); rd_tmpabuf_destroy(&tbuf); diff --git a/src/rdkafka_partition.c b/src/rdkafka_partition.c index 357c137db8..5383d44d18 100644 --- a/src/rdkafka_partition.c +++ b/src/rdkafka_partition.c @@ -2916,6 +2916,23 @@ rd_kafka_topic_partition_t *rd_kafka_topic_partition_list_add_with_topic_id( return rktpar; } + +rd_kafka_topic_partition_t * +rd_kafka_topic_partition_list_add_with_topic_name_and_id( + rd_kafka_topic_partition_list_t *rktparlist, + rd_kafka_Uuid_t topic_id, + const char *topic, + int32_t partition) { + rd_kafka_topic_partition_t *rktpar; + rktpar = rd_kafka_topic_partition_list_add0( + __FUNCTION__, __LINE__, rktparlist, topic, partition, NULL, NULL); + rd_kafka_topic_partition_private_t *parpriv = + rd_kafka_topic_partition_get_private(rktpar); + parpriv->topic_id = topic_id; + return rktpar; +} + + /** * Adds a consecutive list of partitions to a list */ @@ -4068,11 +4085,16 @@ const char *rd_kafka_topic_partition_list_str( int i; size_t of = 0; + if (!rktparlist->cnt) + dest[0] = '\0'; for (i = 0; i < rktparlist->cnt; i++) { const rd_kafka_topic_partition_t *rktpar = &rktparlist->elems[i]; char errstr[128]; char offsetstr[32]; + const char *topic_id_str = NULL; + const rd_kafka_Uuid_t topic_id = + rd_kafka_topic_partition_get_topic_id(rktpar); int r; if (!rktpar->err && (fmt_flags & RD_KAFKA_FMT_F_ONLY_ERR)) @@ -4090,14 +4112,19 @@ const char *rd_kafka_topic_partition_list_str( else offsetstr[0] = '\0'; + + if (!RD_KAFKA_UUID_IS_ZERO(topic_id)) + topic_id_str = rd_kafka_Uuid_base64str(&topic_id); + r = rd_snprintf(&dest[of], dest_size - of, "%s" - "%s[%" PRId32 + "%s(%s)[%" PRId32 "]" "%s" "%s", of == 0 ? "" : ", ", rktpar->topic, - rktpar->partition, offsetstr, errstr); + topic_id_str, rktpar->partition, offsetstr, + errstr); if ((size_t)r >= dest_size - of) { rd_snprintf(&dest[dest_size - 4], 4, "..."); diff --git a/src/rdkafka_partition.h b/src/rdkafka_partition.h index 56b4a76138..c28023e915 100644 --- a/src/rdkafka_partition.h +++ b/src/rdkafka_partition.h @@ -712,6 +712,13 @@ rd_kafka_topic_partition_t *rd_kafka_topic_partition_list_add_with_topic_id( rd_kafka_Uuid_t topic_id, int32_t partition); +rd_kafka_topic_partition_t * +rd_kafka_topic_partition_list_add_with_topic_name_and_id( + rd_kafka_topic_partition_list_t *rktparlist, + rd_kafka_Uuid_t topic_id, + const char *topic, + int32_t partition); + rd_kafka_topic_partition_t *rd_kafka_topic_partition_list_upsert( rd_kafka_topic_partition_list_t *rktparlist, const char *topic, diff --git a/src/rdkafka_proto.h b/src/rdkafka_proto.h index 04ce3a1d4d..0afcb886e7 100644 --- a/src/rdkafka_proto.h +++ b/src/rdkafka_proto.h @@ -616,6 +616,9 @@ rd_kafka_Uuid_t rd_kafka_Uuid_random(); const char *rd_kafka_Uuid_str(const rd_kafka_Uuid_t *uuid); +#define RD_KAFKA_UUID_IS_ZERO(uuid) \ + (!rd_kafka_Uuid_cmp(uuid, RD_KAFKA_UUID_ZERO)) + /** * @brief UUID copier for rd_list_copy() */ @@ -627,6 +630,12 @@ static RD_INLINE RD_UNUSED void rd_list_Uuid_destroy(void *uuid) { rd_kafka_Uuid_destroy((rd_kafka_Uuid_t *)uuid); } +static RD_INLINE RD_UNUSED int rd_list_Uuid_cmp(const void *uuid1, + const void *uuid2) { + return rd_kafka_Uuid_cmp(*((rd_kafka_Uuid_t *)uuid1), + *((rd_kafka_Uuid_t *)uuid2)); +} + /** * @name Producer ID and Epoch for the Idempotent Producer diff --git a/src/rdkafka_request.c b/src/rdkafka_request.c index 04574a5705..0fc2094f3c 100644 --- a/src/rdkafka_request.c +++ b/src/rdkafka_request.c @@ -219,6 +219,7 @@ rd_kafka_topic_partition_list_t *rd_kafka_buf_read_topic_partitions( int32_t TopicArrayCnt; rd_kafka_topic_partition_list_t *parts = NULL; + // TODO: check topic array to be null case. rd_kafka_buf_read_arraycnt(rkbuf, &TopicArrayCnt, RD_KAFKAP_TOPICS_MAX); parts = rd_kafka_topic_partition_list_new( @@ -278,6 +279,10 @@ rd_kafka_topic_partition_list_t *rd_kafka_buf_read_topic_partitions( case RD_KAFKA_TOPIC_PARTITION_FIELD_METADATA: rd_assert(!*"metadata not implemented"); break; + case RD_KAFKA_TOPIC_PARTITION_FIELD_TIMESTAMP: + rd_assert( + !*"timestamp not implemented"); + break; case RD_KAFKA_TOPIC_PARTITION_FIELD_NOOP: /* Fallback */ case RD_KAFKA_TOPIC_PARTITION_FIELD_END: @@ -453,6 +458,12 @@ int rd_kafka_buf_write_topic_partitions( rkbuf, rktpar->metadata, rktpar->metadata_size); break; + case RD_KAFKA_TOPIC_PARTITION_FIELD_TIMESTAMP: + /* Field specifically added for OffsetCommit. + * Update it if it is used somewhere else as + * well. */ + rd_kafka_buf_write_i64(rkbuf, -1); + break; case RD_KAFKA_TOPIC_PARTITION_FIELD_NOOP: break; case RD_KAFKA_TOPIC_PARTITION_FIELD_END: @@ -1051,7 +1062,7 @@ void rd_kafka_OffsetForLeaderEpochRequest( RD_KAFKA_TOPIC_PARTITION_FIELD_END}; rd_kafka_buf_write_topic_partitions( rkbuf, parts, rd_false /*include invalid offsets*/, - rd_false /*skip valid offsets */, rd_false /*don't use topic id*/, + rd_false /*skip valid offsets */, rd_false /* don't use topic id */, rd_true /*use topic name*/, fields); rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); @@ -1596,7 +1607,7 @@ rd_kafka_handle_OffsetCommit(rd_kafka_t *rk, if (rd_kafka_buf_ApiVersion(rkbuf) >= 3) rd_kafka_buf_read_throttle_time(rkbuf); - rd_kafka_buf_read_i32(rkbuf, &TopicArrayCnt); + rd_kafka_buf_read_arraycnt(rkbuf, &TopicArrayCnt, RD_KAFKAP_TOPICS_MAX); for (i = 0; i < TopicArrayCnt; i++) { rd_kafkap_str_t topic; char *topic_str; @@ -1604,7 +1615,8 @@ rd_kafka_handle_OffsetCommit(rd_kafka_t *rk, int j; rd_kafka_buf_read_str(rkbuf, &topic); - rd_kafka_buf_read_i32(rkbuf, &PartArrayCnt); + rd_kafka_buf_read_arraycnt(rkbuf, &PartArrayCnt, + RD_KAFKAP_PARTITIONS_MAX); RD_KAFKAP_STR_DUPA(&topic_str, &topic); @@ -1615,6 +1627,7 @@ rd_kafka_handle_OffsetCommit(rd_kafka_t *rk, rd_kafka_buf_read_i32(rkbuf, &partition); rd_kafka_buf_read_i16(rkbuf, &ErrorCode); + rd_kafka_buf_skip_tags(rkbuf); rktpar = rd_kafka_topic_partition_list_find( offsets, topic_str, partition); @@ -1638,8 +1651,11 @@ rd_kafka_handle_OffsetCommit(rd_kafka_t *rk, partcnt++; } + rd_kafka_buf_skip_tags(rkbuf); } + rd_kafka_buf_skip_tags(rkbuf); + /* If all partitions failed use error code * from last partition as the global error. */ if (offsets && err && errcnt == partcnt) @@ -1706,23 +1722,19 @@ int rd_kafka_OffsetCommitRequest(rd_kafka_broker_t *rkb, void *opaque, const char *reason) { rd_kafka_buf_t *rkbuf; - ssize_t of_TopicCnt = -1; - int TopicCnt = 0; - const char *last_topic = NULL; - ssize_t of_PartCnt = -1; - int PartCnt = 0; - int tot_PartCnt = 0; + int tot_PartCnt = 0; int i; int16_t ApiVersion; int features; ApiVersion = rd_kafka_broker_ApiVersion_supported( - rkb, RD_KAFKAP_OffsetCommit, 0, 7, &features); + rkb, RD_KAFKAP_OffsetCommit, 0, 9, &features); rd_kafka_assert(NULL, offsets != NULL); - rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_OffsetCommit, 1, - 100 + (offsets->cnt * 128)); + rkbuf = rd_kafka_buf_new_flexver_request(rkb, RD_KAFKAP_OffsetCommit, 1, + 100 + (offsets->cnt * 128), + ApiVersion >= 8); /* ConsumerGroup */ rd_kafka_buf_write_str(rkbuf, cgmetadata->group_id, -1); @@ -1747,61 +1759,23 @@ int rd_kafka_OffsetCommitRequest(rd_kafka_broker_t *rkb, /* Sort offsets by topic */ rd_kafka_topic_partition_list_sort_by_topic(offsets); - /* TopicArrayCnt: Will be updated when we know the number of topics. */ - of_TopicCnt = rd_kafka_buf_write_i32(rkbuf, 0); - - for (i = 0; i < offsets->cnt; i++) { - rd_kafka_topic_partition_t *rktpar = &offsets->elems[i]; - - /* Skip partitions with invalid offset. */ - if (rktpar->offset < 0) - continue; - - if (last_topic == NULL || strcmp(last_topic, rktpar->topic)) { - /* New topic */ - - /* Finalize previous PartitionCnt */ - if (PartCnt > 0) - rd_kafka_buf_update_u32(rkbuf, of_PartCnt, - PartCnt); - - /* TopicName */ - rd_kafka_buf_write_str(rkbuf, rktpar->topic, -1); - /* PartitionCnt, finalized later */ - of_PartCnt = rd_kafka_buf_write_i32(rkbuf, 0); - PartCnt = 0; - last_topic = rktpar->topic; - TopicCnt++; - } - - /* Partition */ - rd_kafka_buf_write_i32(rkbuf, rktpar->partition); - PartCnt++; - tot_PartCnt++; - - /* Offset */ - rd_kafka_buf_write_i64(rkbuf, rktpar->offset); - - /* v6: KIP-101 CommittedLeaderEpoch */ - if (ApiVersion >= 6) - rd_kafka_buf_write_i32( - rkbuf, - rd_kafka_topic_partition_get_leader_epoch(rktpar)); - - /* v1: TimeStamp */ - if (ApiVersion == 1) - rd_kafka_buf_write_i64(rkbuf, -1); - - /* Metadata */ - /* Java client 0.9.0 and broker <0.10.0 can't parse - * Null metadata fields, so as a workaround we send an - * empty string if it's Null. */ - if (!rktpar->metadata) - rd_kafka_buf_write_str(rkbuf, "", 0); - else - rd_kafka_buf_write_str(rkbuf, rktpar->metadata, - rktpar->metadata_size); - } + /* Write partition list, filtering out partitions with valid + * offsets */ + rd_kafka_topic_partition_field_t fields[5]; + i = 0; + fields[i++] = RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION; + fields[i++] = RD_KAFKA_TOPIC_PARTITION_FIELD_OFFSET; + if (ApiVersion >= 6) + fields[i++] = RD_KAFKA_TOPIC_PARTITION_FIELD_EPOCH; + else if (ApiVersion == 1) + fields[i++] = RD_KAFKA_TOPIC_PARTITION_FIELD_TIMESTAMP; + fields[i++] = RD_KAFKA_TOPIC_PARTITION_FIELD_METADATA; + fields[i] = RD_KAFKA_TOPIC_PARTITION_FIELD_END; + + tot_PartCnt = rd_kafka_buf_write_topic_partitions( + rkbuf, offsets, rd_false /*include invalid offsets*/, + rd_false /*skip valid offsets */, rd_false /* use_topic id */, + rd_true, fields); if (tot_PartCnt == 0) { /* No topic+partitions had valid offsets to commit. */ @@ -1810,13 +1784,6 @@ int rd_kafka_OffsetCommitRequest(rd_kafka_broker_t *rkb, return 0; } - /* Finalize previous PartitionCnt */ - if (PartCnt > 0) - rd_kafka_buf_update_u32(rkbuf, of_PartCnt, PartCnt); - - /* Finalize TopicCnt */ - rd_kafka_buf_update_u32(rkbuf, of_TopicCnt, TopicCnt); - rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); rd_rkb_dbg(rkb, TOPIC, "OFFSET", diff --git a/src/rdkafka_request.h b/src/rdkafka_request.h index 39121a44a0..4d98ce51cb 100644 --- a/src/rdkafka_request.h +++ b/src/rdkafka_request.h @@ -79,6 +79,8 @@ typedef enum { RD_KAFKA_TOPIC_PARTITION_FIELD_METADATA, /** Noop, useful for ternary ifs */ RD_KAFKA_TOPIC_PARTITION_FIELD_NOOP, + /** Read/write timestamp */ + RD_KAFKA_TOPIC_PARTITION_FIELD_TIMESTAMP, } rd_kafka_topic_partition_field_t; rd_kafka_topic_partition_list_t *rd_kafka_buf_read_topic_partitions( diff --git a/src/rdkafka_topic.c b/src/rdkafka_topic.c index 5a161db9ac..dac0e15f73 100644 --- a/src/rdkafka_topic.c +++ b/src/rdkafka_topic.c @@ -189,6 +189,22 @@ rd_kafka_topic_t *rd_kafka_topic_find0_fl(const char *func, return rkt; } +/** + * Same semantics as ..find() but takes a Uuid instead. + */ +rd_kafka_topic_t *rd_kafka_topic_find_by_topic_id(rd_kafka_t *rk, + rd_kafka_Uuid_t topic_id) { + rd_kafka_topic_t *rkt; + + TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link) { + if (!rd_kafka_Uuid_cmp(rkt->rkt_topic_id, topic_id)) { + rd_kafka_topic_keep(rkt); + break; + } + } + + return rkt; +} /** * @brief rd_kafka_topic_t comparator. @@ -1298,8 +1314,11 @@ rd_kafka_topic_metadata_update(rd_kafka_topic_t *rkt, /* Set topic state. * UNKNOWN_TOPIC_OR_PART may indicate that auto.create.topics failed */ + // TODO: TopicId: Update Unknown Topic Id exception while rebasing from + // master. if (mdt->err == RD_KAFKA_RESP_ERR_TOPIC_EXCEPTION /*invalid topic*/ || - mdt->err == RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART) + mdt->err == RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART || + mdt->err == RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_ID) rd_kafka_topic_set_notexists(rkt, mdt->err); else if (mdt->partition_cnt > 0) rd_kafka_topic_set_state(rkt, RD_KAFKA_TOPIC_S_EXISTS); @@ -1311,7 +1330,8 @@ rd_kafka_topic_metadata_update(rd_kafka_topic_t *rkt, if (mdt->err == RD_KAFKA_RESP_ERR_NO_ERROR) { upd += rd_kafka_topic_partition_cnt_update(rkt, mdt->partition_cnt); - + if (!rd_kafka_Uuid_cmp(rkt->rkt_topic_id, RD_KAFKA_UUID_ZERO)) + rkt->rkt_topic_id = mdit->topic_id; /* If the metadata times out for a topic (because all brokers * are down) the state will transition to S_UNKNOWN. * When updated metadata is eventually received there might @@ -1419,8 +1439,15 @@ int rd_kafka_topic_metadata_update2( int r; rd_kafka_wrlock(rkb->rkb_rk); - if (!(rkt = - rd_kafka_topic_find(rkb->rkb_rk, mdt->topic, 0 /*!lock*/))) { + + if (likely(mdt->topic != NULL)) { + rkt = rd_kafka_topic_find(rkb->rkb_rk, mdt->topic, 0 /*!lock*/); + } else { + rkt = rd_kafka_topic_find_by_topic_id(rkb->rkb_rk, + mdit->topic_id); + } + + if (!rkt) { rd_kafka_wrunlock(rkb->rkb_rk); return -1; /* Ignore topics that we dont have locally. */ } diff --git a/src/rdkafka_topic.h b/src/rdkafka_topic.h index b8c0b66c99..6e25e7f74e 100644 --- a/src/rdkafka_topic.h +++ b/src/rdkafka_topic.h @@ -109,6 +109,16 @@ typedef struct rd_kafka_partition_leader_epoch_s { int32_t leader_epoch; } rd_kafka_partition_leader_epoch_t; +/** + * Finds and returns a topic based on its topic_id, or NULL if not found. + * The 'rkt' refcount is increased by one and the caller must call + * rd_kafka_topic_destroy() when it is done with the topic to decrease + * the refcount. + * + * Locality: any thread + */ +rd_kafka_topic_t *rd_kafka_topic_find_by_topic_id(rd_kafka_t *rk, + rd_kafka_Uuid_t topic_id); /* * @struct Internal representation of a topic. @@ -124,6 +134,7 @@ struct rd_kafka_topic_s { rwlock_t rkt_lock; rd_kafkap_str_t *rkt_topic; + rd_kafka_Uuid_t rkt_topic_id; rd_kafka_toppar_t *rkt_ua; /**< Unassigned partition (-1) */ rd_kafka_toppar_t **rkt_p; /**< Partition array */