diff --git a/INTRODUCTION.md b/INTRODUCTION.md index 55c9811c8f..852941c2a3 100644 --- a/INTRODUCTION.md +++ b/INTRODUCTION.md @@ -1967,50 +1967,50 @@ The [Apache Kafka Implementation Proposals (KIPs)](https://cwiki.apache.org/conf ### Supported protocol versions -"Kafka max" is the maximum ApiVersion supported in Apache Kafka 3.5.0, while +"Kafka max" is the maximum ApiVersion supported in Apache Kafka 3.7.0, while "librdkafka max" is the maximum ApiVersion supported in the latest release of librdkafka. -| ApiKey | Request name | Kafka max | librdkafka max | -| ------- | ------------------------------ |-----------|----------------| -| 0 | Produce | 10 | 8 | -| 1 | Fetch | 16 | 15 | -| 2 | ListOffsets | 8 | 7 | -| 3 | Metadata | 12 | 12 | -| 8 | OffsetCommit | 8 | 7 | -| 9 | OffsetFetch | 8 | 7 | -| 10 | FindCoordinator | 4 | 2 | -| 11 | JoinGroup | 9 | 5 | -| 12 | Heartbeat | 4 | 3 | -| 13 | LeaveGroup | 5 | 1 | -| 14 | SyncGroup | 5 | 3 | -| 15 | DescribeGroups | 5 | 4 | -| 16 | ListGroups | 4 | 4 | -| 17 | SaslHandshake | 1 | 1 | -| 18 | ApiVersions | 3 | 3 | -| 19 | CreateTopics | 7 | 4 | -| 20 | DeleteTopics | 6 | 1 | -| 21 | DeleteRecords | 2 | 1 | -| 22 | InitProducerId | 4 | 4 | -| 23 | OffsetForLeaderEpoch | 4 | 2 | -| 24 | AddPartitionsToTxn | 4 | 0 | -| 25 | AddOffsetsToTxn | 3 | 0 | -| 26 | EndTxn | 3 | 1 | -| 28 | TxnOffsetCommit | 3 | 3 | -| 29 | DescribeAcls | 3 | 1 | -| 30 | CreateAcls | 3 | 1 | -| 31 | DeleteAcls | 3 | 1 | -| 32 | DescribeConfigs | 4 | 1 | -| 33 | AlterConfigs | 2 | 2 | -| 36 | SaslAuthenticate | 2 | 1 | -| 37 | CreatePartitions | 3 | 0 | -| 42 | DeleteGroups | 2 | 1 | -| 44 | IncrementalAlterConfigs | 1 | 1 | -| 47 | OffsetDelete | 0 | 0 | -| 50 | DescribeUserScramCredentials | 0 | 0 | -| 51 | AlterUserScramCredentials | 0 | 0 | - +| ApiKey | Request name | Kafka max | librdkafka max | +| ------- | ----------------------------- | ---------- |----------------| +| 0 | Produce | 10 | 8 | +| 1 | Fetch | 16 | 15 | +| 2 | ListOffsets | 8 | 7 | +| 3 | Metadata | 12 | 12 | +| 8 | OffsetCommit | 9 | 9 | +| 9 | OffsetFetch | 9 | 9 | +| 10 | FindCoordinator | 4 | 2 | +| 11 | JoinGroup | 9 | 5 | +| 12 | Heartbeat | 4 | 3 | +| 13 | LeaveGroup | 5 | 1 | +| 14 | SyncGroup | 5 | 3 | +| 15 | DescribeGroups | 5 | 4 | +| 16 | ListGroups | 4 | 4 | +| 17 | SaslHandshake | 1 | 1 | +| 18 | ApiVersions | 3 | 3 | +| 19 | CreateTopics | 7 | 4 | +| 20 | DeleteTopics | 6 | 1 | +| 21 | DeleteRecords | 2 | 1 | +| 22 | InitProducerId | 4 | 4 | +| 23 | OffsetForLeaderEpoch | 4 | 2 | +| 24 | AddPartitionsToTxn | 4 | 0 | +| 25 | AddOffsetsToTxn | 3 | 0 | +| 26 | EndTxn | 3 | 1 | +| 28 | TxnOffsetCommit | 3 | 3 | +| 29 | DescribeAcls | 3 | 1 | +| 30 | CreateAcls | 3 | 1 | +| 31 | DeleteAcls | 3 | 1 | +| 32 | DescribeConfigs | 4 | 1 | +| 33 | AlterConfigs | 2 | 2 | +| 36 | SaslAuthenticate | 2 | 1 | +| 37 | CreatePartitions | 3 | 0 | +| 42 | DeleteGroups | 2 | 1 | +| 44 | IncrementalAlterConfigs | 1 | 1 | +| 47 | OffsetDelete | 0 | 0 | +| 50 | DescribeUserScramCredentials | 0 | 0 | +| 51 | AlterUserScramCredentials | 0 | 0 | +| 68 | ConsumerGroupHeartbeat | 0 | 0 | # Recommendations for language binding developers diff --git a/examples/consumer.c b/examples/consumer.c index 8ce6f77f4d..dad3efc43b 100644 --- a/examples/consumer.c +++ b/examples/consumer.c @@ -258,4 +258,4 @@ int main(int argc, char **argv) { rd_kafka_destroy(rk); return 0; -} +} \ No newline at end of file diff --git a/src/rdkafka.c b/src/rdkafka.c index 5057fe030d..d210081bc1 100644 --- a/src/rdkafka.c +++ b/src/rdkafka.c @@ -707,6 +707,12 @@ static const struct rd_kafka_err_desc rd_kafka_err_descs[] = { _ERR_DESC(RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_ID, "Broker: Unknown topic id"), _ERR_DESC(RD_KAFKA_RESP_ERR_FENCED_MEMBER_EPOCH, "Broker: The member epoch is fenced by the group coordinator"), + _ERR_DESC(RD_KAFKA_RESP_ERR_UNRELEASED_INSTANCE_ID, + "Broker: The instance ID is still used by another member in the " + "consumer group"), + _ERR_DESC(RD_KAFKA_RESP_ERR_UNSUPPORTED_ASSIGNOR, + "Broker: The assignor or its version range is not supported by " + "the consumer group"), _ERR_DESC(RD_KAFKA_RESP_ERR_STALE_MEMBER_EPOCH, "Broker: The member epoch is stale"), _ERR_DESC(RD_KAFKA_RESP_ERR__END, NULL)}; diff --git a/src/rdkafka.h b/src/rdkafka.h index 7775b84316..8c24566347 100644 --- a/src/rdkafka.h +++ b/src/rdkafka.h @@ -638,6 +638,12 @@ typedef enum { RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_ID = 100, /** The member epoch is fenced by the group coordinator */ RD_KAFKA_RESP_ERR_FENCED_MEMBER_EPOCH = 110, + /** The instance ID is still used by another member in the + * consumer group */ + RD_KAFKA_RESP_ERR_UNRELEASED_INSTANCE_ID = 111, + /** The assignor or its version range is not supported by the consumer + * group */ + RD_KAFKA_RESP_ERR_UNSUPPORTED_ASSIGNOR = 112, /** The member epoch is stale */ RD_KAFKA_RESP_ERR_STALE_MEMBER_EPOCH = 113, RD_KAFKA_RESP_ERR_END_ALL, diff --git a/src/rdkafka_cgrp.c b/src/rdkafka_cgrp.c index 301d4aa0d7..d969d63927 100644 --- a/src/rdkafka_cgrp.c +++ b/src/rdkafka_cgrp.c @@ -421,7 +421,6 @@ rd_kafka_cgrp_t *rd_kafka_cgrp_new(rd_kafka_t *rk, const rd_kafkap_str_t *group_id, const rd_kafkap_str_t *client_id) { rd_kafka_cgrp_t *rkcg; - rkcg = rd_calloc(1, sizeof(*rkcg)); rkcg->rkcg_rk = rk; @@ -892,6 +891,7 @@ static void rd_kafka_cgrp_consumer_reset(rd_kafka_cgrp_t *rkcg) { rd_kafka_topic_partition_list_destroy(rkcg->rkcg_current_assignment); RD_IF_FREE(rkcg->rkcg_target_assignment, rd_kafka_topic_partition_list_destroy); + rkcg->rkcg_target_assignment = NULL; RD_IF_FREE(rkcg->rkcg_next_target_assignment, rd_kafka_topic_partition_list_destroy); rkcg->rkcg_current_assignment = rd_kafka_topic_partition_list_new(0); @@ -2591,28 +2591,44 @@ static rd_bool_t rd_kafka_cgrp_update_subscribed_topics(rd_kafka_cgrp_t *rkcg, return rd_true; } -static rd_kafka_op_res_t -rd_kafka_cgrp_consumer_handle_next_assignment(rd_kafka_cgrp_t *rkcg) { - rd_bool_t is_assignment_different = rd_false; - if (rkcg->rkcg_consumer_flags & RD_KAFKA_CGRP_CONSUMER_F_WAITS_ACK) - return RD_KAFKA_OP_RES_HANDLED; - +static rd_bool_t rd_kafka_cgrp_consumer_is_new_assignment_different( + rd_kafka_cgrp_t *rkcg, + rd_kafka_topic_partition_list_t *new_target_assignment) { + int is_assignment_different; if (rkcg->rkcg_target_assignment) { is_assignment_different = rd_kafka_topic_partition_list_cmp( - rkcg->rkcg_next_target_assignment, - rkcg->rkcg_target_assignment, + new_target_assignment, rkcg->rkcg_target_assignment, rd_kafka_topic_partition_by_id_cmp); } else { is_assignment_different = rd_kafka_topic_partition_list_cmp( - rkcg->rkcg_next_target_assignment, - rkcg->rkcg_current_assignment, + new_target_assignment, rkcg->rkcg_current_assignment, rd_kafka_topic_partition_by_id_cmp); } + return is_assignment_different ? rd_true : rd_false; +} + +static rd_kafka_op_res_t rd_kafka_cgrp_consumer_handle_next_assignment( + rd_kafka_cgrp_t *rkcg, + rd_kafka_topic_partition_list_t *new_target_assignment, + rd_bool_t clear_next_assignment) { + rd_bool_t is_assignment_different = rd_false; + if (rkcg->rkcg_consumer_flags & RD_KAFKA_CGRP_CONSUMER_F_WAITS_ACK) + return RD_KAFKA_OP_RES_HANDLED; + + is_assignment_different = + rd_kafka_cgrp_consumer_is_new_assignment_different( + rkcg, new_target_assignment); + /* Starts reconcilation only when the group is in state + * INIT or state STEADY, keeps it as next target assignment + * otherwise. */ if (!is_assignment_different) { - RD_IF_FREE(rkcg->rkcg_next_target_assignment, - rd_kafka_topic_partition_list_destroy); - rkcg->rkcg_next_target_assignment = NULL; + if (rkcg->rkcg_next_target_assignment && + clear_next_assignment) { + rd_kafka_topic_partition_list_destroy( + rkcg->rkcg_next_target_assignment); + rkcg->rkcg_next_target_assignment = NULL; + } } else if (rkcg->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_INIT || rkcg->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_STEADY) { rkcg->rkcg_consumer_flags |= RD_KAFKA_CGRP_CONSUMER_F_WAITS_ACK; @@ -2621,7 +2637,15 @@ rd_kafka_cgrp_consumer_handle_next_assignment(rd_kafka_cgrp_t *rkcg) { rkcg->rkcg_target_assignment); } rkcg->rkcg_target_assignment = - rkcg->rkcg_next_target_assignment; + rd_kafka_topic_partition_list_copy(new_target_assignment); + + if (rkcg->rkcg_next_target_assignment && + clear_next_assignment) { + rd_kafka_topic_partition_list_destroy( + rkcg->rkcg_next_target_assignment); + rkcg->rkcg_next_target_assignment = NULL; + } + if (rd_kafka_is_dbg(rkcg->rkcg_rk, CGRP)) { char rkcg_target_assignment_str[512] = "NULL"; @@ -2635,7 +2659,6 @@ rd_kafka_cgrp_consumer_handle_next_assignment(rd_kafka_cgrp_t *rkcg) { "assignment \"%s\"", rkcg_target_assignment_str); } - rkcg->rkcg_next_target_assignment = NULL; rd_kafka_cgrp_handle_assignment(rkcg, rkcg->rkcg_target_assignment); } @@ -2650,8 +2673,15 @@ static rd_kafka_op_res_t rd_kafka_cgrp_consumer_handle_Metadata_op(rd_kafka_t *rk, rd_kafka_q_t *rkq, rd_kafka_op_t *rko) { - int i, j, found = 0; + /* + * FIXME: Using next_target_assignment is not correct as other heartbeat + * call can change it. + */ + int i, j; rd_kafka_cgrp_t *rkcg = rk->rk_cgrp; + rd_kafka_op_res_t assignment_handle_ret; + rd_kafka_topic_partition_list_t *new_target_assignment; + rd_bool_t all_partition_metadata_available; if (rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY) return RD_KAFKA_OP_RES_HANDLED; /* Terminating */ @@ -2660,15 +2690,15 @@ rd_kafka_cgrp_consumer_handle_Metadata_op(rd_kafka_t *rk, return RD_KAFKA_OP_RES_HANDLED; /* Update topic name for all the assignments given by topic id - * KIP848TODO: Improve complexity. + * TODO: Improve complexity. + */ + /* + * TODO: Checking local metadata cache is an improvement which we + * can do later. */ + new_target_assignment = rd_kafka_topic_partition_list_new( + rkcg->rkcg_next_target_assignment->cnt); for (i = 0; i < rkcg->rkcg_next_target_assignment->cnt; i++) { - rd_kafka_topic_partition_t *rktpar = - &rkcg->rkcg_next_target_assignment->elems[i]; - if (rktpar->topic) { - found++; - continue; - } rd_kafka_Uuid_t request_topic_id = rd_kafka_topic_partition_get_topic_id( &rkcg->rkcg_next_target_assignment->elems[i]); @@ -2677,32 +2707,59 @@ rd_kafka_cgrp_consumer_handle_Metadata_op(rd_kafka_t *rk, rko->rko_u.metadata.mdi->topics[j].topic_id; if (!rd_kafka_Uuid_cmp(request_topic_id, compare_topic_id)) { - rktpar->topic = rd_strdup( - rko->rko_u.metadata.md->topics[j].topic); - found++; + if (rko->rko_u.metadata.md->topics[j].err == + RD_KAFKA_RESP_ERR_NO_ERROR) + rd_kafka_topic_partition_list_add_with_topic_name_and_id( + new_target_assignment, + request_topic_id, + rko->rko_u.metadata.md->topics[j] + .topic, + rkcg->rkcg_next_target_assignment + ->elems[i] + .partition); + else + rd_kafka_dbg( + rkcg->rkcg_rk, CGRP, "HEARTBEAT", + "Metadata not found for the " + "assigned topic id - %s due to: " + "%s: " + "Continuing without it", + rd_kafka_Uuid_base64str( + &request_topic_id), + rd_kafka_err2str( + rko->rko_u.metadata.md + ->topics[j] + .err)); break; } } } - if (found < rkcg->rkcg_next_target_assignment->cnt) - return RD_KAFKA_OP_RES_HANDLED; + all_partition_metadata_available = + new_target_assignment->cnt == rkcg->rkcg_next_target_assignment->cnt + ? rd_true + : rd_false; if (rd_kafka_is_dbg(rkcg->rkcg_rk, CGRP)) { - char rkcg_next_target_assignment_str[512] = "NULL"; + char new_target_assignment_str[512] = "NULL"; rd_kafka_topic_partition_list_str( - rkcg->rkcg_next_target_assignment, - rkcg_next_target_assignment_str, - sizeof(rkcg_next_target_assignment_str), 0); + new_target_assignment, new_target_assignment_str, + sizeof(new_target_assignment_str), 0); rd_kafka_dbg( rkcg->rkcg_rk, CGRP, "HEARTBEAT", - "Metadata available for next target assignment \"%s\"", - rkcg_next_target_assignment_str); + "Metadata available for %d/%d next target assignment " + "which are: \"%s\"", + new_target_assignment->cnt, + rkcg->rkcg_next_target_assignment->cnt, + new_target_assignment_str); } - return rd_kafka_cgrp_consumer_handle_next_assignment(rkcg); + assignment_handle_ret = rd_kafka_cgrp_consumer_handle_next_assignment( + rkcg, new_target_assignment, all_partition_metadata_available); + rd_kafka_topic_partition_list_destroy(new_target_assignment); + return assignment_handle_ret; } void rd_kafka_cgrp_consumer_next_target_assignment_request_metadata( @@ -2712,22 +2769,29 @@ void rd_kafka_cgrp_consumer_next_target_assignment_request_metadata( rd_kafka_op_t *rko; rd_kafka_cgrp_t *rkcg = rk->rk_cgrp; rd_kafka_Uuid_t topic_id; + rd_kafka_Uuid_t prev_topic_id = RD_KAFKA_UUID_ZERO; rd_list_t *topic_ids; int i; if (!rkcg->rkcg_next_target_assignment->cnt) { /* No metadata to request, continue with handle_next_assignment. */ - rd_kafka_cgrp_consumer_handle_next_assignment(rkcg); + rd_kafka_topic_partition_list_t *new_target_assignment = + rd_kafka_topic_partition_list_new(0); + rd_kafka_cgrp_consumer_handle_next_assignment( + rkcg, new_target_assignment, rd_true); + rd_kafka_topic_partition_list_destroy(new_target_assignment); return; } topic_ids = rd_list_new(1, rd_list_Uuid_destroy); - for (i = 0; i < rkcg->rkcg_next_target_assignment->cnt; i++) { topic_id = rd_kafka_topic_partition_get_topic_id( &rkcg->rkcg_next_target_assignment->elems[i]); - rd_list_add(topic_ids, rd_kafka_Uuid_copy(&topic_id)); + if (rd_kafka_Uuid_cmp(prev_topic_id, topic_id) && + !rd_list_find(topic_ids, &topic_id, rd_list_Uuid_cmp)) + rd_list_add(topic_ids, rd_kafka_Uuid_copy(&topic_id)); + prev_topic_id = topic_id; } rko = rd_kafka_op_new_cb(rkcg->rkcg_rk, RD_KAFKA_OP_METADATA, @@ -2751,6 +2815,7 @@ void rd_kafka_cgrp_handle_ConsumerGroupHeartbeat(rd_kafka_t *rk, rd_kafka_cgrp_t *rkcg = rk->rk_cgrp; const int log_decode_errors = LOG_ERR; int16_t error_code = 0; + int actions = 0; rd_kafkap_str_t error_str; rd_kafkap_str_t member_id; int32_t member_epoch; @@ -2797,7 +2862,7 @@ void rd_kafka_cgrp_handle_ConsumerGroupHeartbeat(rd_kafka_t *rk, rkbuf, rd_true, rd_false /* Don't use Topic Name */, 0, assignments_fields); - if (rd_rkb_is_dbg(rkb, CGRP)) { + if (rd_kafka_is_dbg(rk, CGRP)) { char assigned_topic_partitions_str[512] = "NULL"; if (assigned_topic_partitions) { @@ -2807,17 +2872,34 @@ void rd_kafka_cgrp_handle_ConsumerGroupHeartbeat(rd_kafka_t *rk, sizeof(assigned_topic_partitions_str), 0); } - rd_rkb_dbg(rkb, CGRP, "HEARTBEAT", - "Heartbeat response received target " - "assignment \"%s\"", - assigned_topic_partitions_str); + rd_kafka_dbg( + rk, CGRP, "HEARTBEAT", + "ConsumerGroupHeartbeat response received target " + "assignment \"%s\"", + assigned_topic_partitions_str); } if (assigned_topic_partitions) { RD_IF_FREE(rkcg->rkcg_next_target_assignment, rd_kafka_topic_partition_list_destroy); - rkcg->rkcg_next_target_assignment = - assigned_topic_partitions; + rkcg->rkcg_next_target_assignment = NULL; + if (rd_kafka_cgrp_consumer_is_new_assignment_different( + rkcg, assigned_topic_partitions)) { + /* We don't update the next_target_assignment + * in two cases: + * 1) If target assignment is present and the + * new assignment is same as target assignment, + * then we are already in process of adding that + * target assignment. We can ignore this new + * assignment. + * 2) If target assignment is not present then + * if the current assignment is present and the + * new assignment is same as current assignment, + * then we are already at correct assignment. We + * can ignore this new */ + rkcg->rkcg_next_target_assignment = + assigned_topic_partitions; + } } } @@ -2869,7 +2951,96 @@ void rd_kafka_cgrp_handle_ConsumerGroupHeartbeat(rd_kafka_t *rk, err: rkcg->rkcg_last_heartbeat_err = err; rkcg->rkcg_flags &= ~RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT; - rkcg->rkcg_last_heartbeat_err = err; + + switch (err) { + case RD_KAFKA_RESP_ERR__DESTROY: + /* quick cleanup */ + return; + + case RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS: + rd_kafka_dbg( + rkcg->rkcg_rk, CONSUMER, "HEARTBEAT", + "ConsumerGroupHeartbeat failed due to coordinator (%s) " + "loading in progress: %s: " + "retrying", + rkcg->rkcg_curr_coord + ? rd_kafka_broker_name(rkcg->rkcg_curr_coord) + : "none", + rd_kafka_err2str(err)); + actions = RD_KAFKA_ERR_ACTION_RETRY; + break; + + case RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP: + case RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE: + case RD_KAFKA_RESP_ERR__TRANSPORT: + rd_kafka_dbg( + rkcg->rkcg_rk, CONSUMER, "HEARTBEAT", + "ConsumerGroupHeartbeat failed due to coordinator (%s) " + "no longer available: %s: " + "re-querying for coordinator", + rkcg->rkcg_curr_coord + ? rd_kafka_broker_name(rkcg->rkcg_curr_coord) + : "none", + rd_kafka_err2str(err)); + /* Remain in joined state and keep querying for coordinator */ + actions = RD_KAFKA_ERR_ACTION_REFRESH; + break; + + case RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID: + case RD_KAFKA_RESP_ERR_FENCED_MEMBER_EPOCH: + rd_kafka_dbg(rkcg->rkcg_rk, CONSUMER, "HEARTBEAT", + "ConsumerGroupHeartbeat failed due to: %s: " + "will rejoin the group", + rd_kafka_err2str(err)); + rkcg->rkcg_consumer_flags |= + RD_KAFKA_CGRP_CONSUMER_F_WAIT_REJOIN; + return; + + case RD_KAFKA_RESP_ERR_INVALID_REQUEST: + case RD_KAFKA_RESP_ERR_GROUP_MAX_SIZE_REACHED: + case RD_KAFKA_RESP_ERR_UNSUPPORTED_ASSIGNOR: + case RD_KAFKA_RESP_ERR_UNSUPPORTED_VERSION: + case RD_KAFKA_RESP_ERR_UNRELEASED_INSTANCE_ID: + case RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED: + actions = RD_KAFKA_ERR_ACTION_FATAL; + break; + + default: + actions = rd_kafka_err_action(rkb, err, request, + RD_KAFKA_ERR_ACTION_END); + break; + } + + if (actions & RD_KAFKA_ERR_ACTION_FATAL) { + rd_kafka_set_fatal_error(rkcg->rkcg_rk, err, + "Fatal consumer error: %s", + rd_kafka_err2str(err)); + rd_kafka_cgrp_revoke_all_rejoin_maybe( + rkcg, rd_true, /*assignments lost*/ + rd_true, /*initiating*/ + "Fatal error in ConsumerGroupHeartbeat API response"); + return; + } + + if (!rkcg->rkcg_heartbeat_intvl_ms) { + /* When an error happens on first HB, it should be always + * retried, unless fatal, to avoid entering a tight loop + * and to use exponential backoff. */ + actions |= RD_KAFKA_ERR_ACTION_RETRY; + } + + if (actions & RD_KAFKA_ERR_ACTION_REFRESH) { + /* Re-query for coordinator */ + rkcg->rkcg_consumer_flags |= + RD_KAFKA_CGRP_CONSUMER_F_SEND_FULL_REQUEST; + rd_kafka_cgrp_coord_query(rkcg, rd_kafka_err2str(err)); + } + + if (actions & RD_KAFKA_ERR_ACTION_RETRY && + rd_kafka_buf_retry(rkb, request)) { + /* Retry */ + rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT; + } } @@ -5526,14 +5697,29 @@ void rd_kafka_cgrp_consumer_group_heartbeat(rd_kafka_cgrp_t *rkcg, void rd_kafka_cgrp_consumer_serve(rd_kafka_cgrp_t *rkcg) { rd_ts_t now = rd_clock(); - rd_bool_t full_request = rd_false; - rd_bool_t send_ack = rd_false; + rd_bool_t full_request = rkcg->rkcg_consumer_flags & + RD_KAFKA_CGRP_CONSUMER_F_SEND_FULL_REQUEST; + rd_bool_t send_ack = rd_false; if (unlikely(rd_kafka_fatal_error_code(rkcg->rkcg_rk))) return; + if (unlikely(rkcg->rkcg_consumer_flags & + RD_KAFKA_CGRP_CONSUMER_F_WAIT_REJOIN)) { + if (RD_KAFKA_CGRP_REBALANCING(rkcg)) + return; + rkcg->rkcg_consumer_flags &= + ~RD_KAFKA_CGRP_CONSUMER_F_WAIT_REJOIN; + rkcg->rkcg_consumer_flags |= + RD_KAFKA_CGRP_CONSUMER_F_WAIT_REJOIN_TO_COMPLETE; + rd_kafka_cgrp_revoke_all_rejoin(rkcg, rd_true, rd_true, + "member fenced - rejoining"); + } + switch (rkcg->rkcg_join_state) { case RD_KAFKA_CGRP_JOIN_STATE_INIT: + rkcg->rkcg_consumer_flags &= + ~RD_KAFKA_CGRP_CONSUMER_F_WAIT_REJOIN_TO_COMPLETE; full_request = rd_true; break; case RD_KAFKA_CGRP_JOIN_STATE_STEADY: @@ -5552,10 +5738,15 @@ void rd_kafka_cgrp_consumer_serve(rd_kafka_cgrp_t *rkcg) { } if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_SUBSCRIPTION && + !(rkcg->rkcg_consumer_flags & + RD_KAFKA_CGRP_CONSUMER_F_WAIT_REJOIN_TO_COMPLETE) && rd_interval(&rkcg->rkcg_heartbeat_intvl, - rkcg->rkcg_heartbeat_intvl_ms * 1000, now) > 0) + rkcg->rkcg_heartbeat_intvl_ms * 1000, now) > 0) { rd_kafka_cgrp_consumer_group_heartbeat(rkcg, full_request, send_ack); + rkcg->rkcg_consumer_flags &= + ~RD_KAFKA_CGRP_CONSUMER_F_SEND_FULL_REQUEST; + } } /** @@ -5831,6 +6022,15 @@ static void rd_kafka_cgrp_consumer_assignment_done(rd_kafka_cgrp_t *rkcg) { /* FALLTHRU */ case RD_KAFKA_CGRP_JOIN_STATE_INIT: + /* + * There maybe a case when there are no assignments are + * assigned to this consumer. In this case, while terminating + * the consumer can be in STEADY or INIT state and won't go + * to intermediate state. In this scenario, last leave call is + * done from here. + */ + rd_kafka_cgrp_leave_maybe(rkcg); + /* Check if cgrp is trying to terminate, which is safe to do * in these two states. Otherwise we'll need to wait for * the current state to decommission. */ diff --git a/src/rdkafka_cgrp.h b/src/rdkafka_cgrp.h index 322c808fdf..2cd5a59a3b 100644 --- a/src/rdkafka_cgrp.h +++ b/src/rdkafka_cgrp.h @@ -279,9 +279,15 @@ typedef struct rd_kafka_cgrp_s { #define RD_KAFKA_CGRP_CONSUMER_F_WAITS_ACK 0x1 /* TODO: write */ #define RD_KAFKA_CGRP_CONSUMER_F_SEND_NEW_SUBSCRIPTION 0x2 /* TODO: write */ #define RD_KAFKA_CGRP_CONSUMER_F_SENDING_NEW_SUBSCRIPTION \ - 0x4 /* TODO: write \ - */ -#define RD_KAFKA_CGRP_CONSUMER_F_SUBSCRIBED_ONCE 0x8 /* TODO: write */ + 0x4 /* TODO: write \ + */ +#define RD_KAFKA_CGRP_CONSUMER_F_SUBSCRIBED_ONCE 0x8 /* TODO: write */ +#define RD_KAFKA_CGRP_CONSUMER_F_SEND_FULL_REQUEST 0x10 /* TODO: write */ +#define RD_KAFKA_CGRP_CONSUMER_F_WAIT_REJOIN \ + 0x20 /* Member is fenced, need to rejoin */ +#define RD_KAFKA_CGRP_CONSUMER_F_WAIT_REJOIN_TO_COMPLETE \ + 0x40 /* Member is fenced, rejoining */ + /** Rejoin the group following a currently in-progress * incremental unassign. */ diff --git a/src/rdkafka_int.h b/src/rdkafka_int.h index 81bf06d637..82b7837d80 100644 --- a/src/rdkafka_int.h +++ b/src/rdkafka_int.h @@ -959,10 +959,15 @@ static RD_INLINE RD_UNUSED rd_kafka_resp_err_t rd_kafka_fatal_error_code(rd_kafka_t *rk) { /* This is an optimization to avoid an atomic read which are costly * on some platforms: - * Fatal errors are currently only raised by the idempotent producer - * and static consumers (group.instance.id). */ + * Fatal errors are currently raised by: + * 1) the idempotent producer + * 2) static consumers (group.instance.id) + * 3) Group using consumer protocol (Introduced in KIP-848). See exact + * errors in rd_kafka_cgrp_handle_ConsumerGroupHeartbeat() */ if ((rk->rk_type == RD_KAFKA_PRODUCER && rk->rk_conf.eos.idempotence) || - (rk->rk_type == RD_KAFKA_CONSUMER && rk->rk_conf.group_instance_id)) + (rk->rk_type == RD_KAFKA_CONSUMER && + (rk->rk_conf.group_instance_id || + rk->rk_conf.group_protocol == RD_KAFKA_GROUP_PROTOCOL_CONSUMER))) return rd_atomic32_get(&rk->rk_fatal.err); return RD_KAFKA_RESP_ERR_NO_ERROR; diff --git a/src/rdkafka_metadata.c b/src/rdkafka_metadata.c index 5436a8ba7e..7365be645f 100644 --- a/src/rdkafka_metadata.c +++ b/src/rdkafka_metadata.c @@ -833,6 +833,7 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb, rd_kafka_parse_Metadata_update_topic(rkb, &md->topics[i], &mdi->topics[i]); + // TODO: Should be done for requested_topic_ids as well. if (requested_topics) { rd_list_free_cb(missing_topics, rd_list_remove_cmp(missing_topics, @@ -859,6 +860,7 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb, } } + // TODO: Should be done for missing_topic_ids as well. /* Requested topics not seen in metadata? Propogate to topic code. */ if (missing_topics) { char *topic; @@ -960,6 +962,8 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb, rd_kafka_metadata_cache_expiry_start(rk); } + + // TODO: Should be done for requested_topic_ids as well. /* Remove cache hints for the originally requested topics. */ if (requested_topics) rd_kafka_metadata_cache_purge_hints(rk, requested_topics); @@ -991,6 +995,8 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb, } done: + + // TODO: Should be done for requested_topic_ids as well. if (missing_topics) rd_list_destroy(missing_topics); @@ -1007,6 +1013,7 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb, err_parse: err = rkbuf->rkbuf_err; err: + // TODO: Should be done for requested_topic_ids as well. if (requested_topics) { /* Failed requests shall purge cache hints for * the requested topics. */ @@ -1015,6 +1022,7 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb, rd_kafka_wrunlock(rkb->rkb_rk); } + // TODO: Should be done for requested_topic_ids as well. if (missing_topics) rd_list_destroy(missing_topics); rd_tmpabuf_destroy(&tbuf); diff --git a/src/rdkafka_mock_handlers.c b/src/rdkafka_mock_handlers.c index 047f890f5e..4d338bab6d 100644 --- a/src/rdkafka_mock_handlers.c +++ b/src/rdkafka_mock_handlers.c @@ -759,10 +759,10 @@ static int rd_kafka_mock_handle_OffsetCommit(rd_kafka_mock_connection_t *mconn, /* FIXME: also check that partitions are assigned to member */ } - rd_kafka_buf_read_i32(rkbuf, &TopicsCnt); + rd_kafka_buf_read_arraycnt(rkbuf, &TopicsCnt, RD_KAFKAP_TOPICS_MAX); /* Response: #Topics */ - rd_kafka_buf_write_i32(resp, TopicsCnt); + rd_kafka_buf_write_arraycnt(resp, TopicsCnt); while (TopicsCnt-- > 0) { rd_kafkap_str_t Topic; @@ -770,14 +770,15 @@ static int rd_kafka_mock_handle_OffsetCommit(rd_kafka_mock_connection_t *mconn, rd_kafka_mock_topic_t *mtopic; rd_kafka_buf_read_str(rkbuf, &Topic); - rd_kafka_buf_read_i32(rkbuf, &PartitionCnt); + rd_kafka_buf_read_arraycnt(rkbuf, &PartitionCnt, + RD_KAFKAP_PARTITIONS_MAX); mtopic = rd_kafka_mock_topic_find_by_kstr(mcluster, &Topic); /* Response: Topic */ rd_kafka_buf_write_kstr(resp, &Topic); /* Response: #Partitions */ - rd_kafka_buf_write_i32(resp, PartitionCnt); + rd_kafka_buf_write_arraycnt(resp, PartitionCnt); while (PartitionCnt-- > 0) { int32_t Partition; @@ -817,6 +818,7 @@ static int rd_kafka_mock_handle_OffsetCommit(rd_kafka_mock_connection_t *mconn, } rd_kafka_buf_read_str(rkbuf, &Metadata); + rd_kafka_buf_skip_tags(rkbuf); if (!err) rd_kafka_mock_commit_offset(mpart, &GroupId, @@ -825,7 +827,10 @@ static int rd_kafka_mock_handle_OffsetCommit(rd_kafka_mock_connection_t *mconn, /* Response: ErrorCode */ rd_kafka_buf_write_i16(resp, err); + rd_kafka_buf_write_tags(resp); } + rd_kafka_buf_skip_tags(rkbuf); + rd_kafka_buf_write_tags(resp); } rd_kafka_mock_connection_send_response(mconn, resp); @@ -2128,7 +2133,7 @@ const struct rd_kafka_mock_api_handler [RD_KAFKAP_Fetch] = {0, 11, -1, rd_kafka_mock_handle_Fetch}, [RD_KAFKAP_ListOffsets] = {0, 7, 6, rd_kafka_mock_handle_ListOffsets}, [RD_KAFKAP_OffsetFetch] = {0, 6, 6, rd_kafka_mock_handle_OffsetFetch}, - [RD_KAFKAP_OffsetCommit] = {0, 8, 8, rd_kafka_mock_handle_OffsetCommit}, + [RD_KAFKAP_OffsetCommit] = {0, 9, 8, rd_kafka_mock_handle_OffsetCommit}, [RD_KAFKAP_ApiVersion] = {0, 2, 3, rd_kafka_mock_handle_ApiVersion}, [RD_KAFKAP_Metadata] = {0, 9, 9, rd_kafka_mock_handle_Metadata}, [RD_KAFKAP_FindCoordinator] = {0, 3, 3, diff --git a/src/rdkafka_partition.c b/src/rdkafka_partition.c index fc665398f6..0affd72e60 100644 --- a/src/rdkafka_partition.c +++ b/src/rdkafka_partition.c @@ -2930,6 +2930,23 @@ rd_kafka_topic_partition_t *rd_kafka_topic_partition_list_add_with_topic_id( return rktpar; } + +rd_kafka_topic_partition_t * +rd_kafka_topic_partition_list_add_with_topic_name_and_id( + rd_kafka_topic_partition_list_t *rktparlist, + rd_kafka_Uuid_t topic_id, + const char *topic, + int32_t partition) { + rd_kafka_topic_partition_t *rktpar; + rktpar = rd_kafka_topic_partition_list_add0( + __FUNCTION__, __LINE__, rktparlist, topic, partition, NULL, NULL); + rd_kafka_topic_partition_private_t *parpriv = + rd_kafka_topic_partition_get_private(rktpar); + parpriv->topic_id = topic_id; + return rktpar; +} + + /** * Adds a consecutive list of partitions to a list */ @@ -4103,11 +4120,16 @@ const char *rd_kafka_topic_partition_list_str( int i; size_t of = 0; + if (!rktparlist->cnt) + dest[0] = '\0'; for (i = 0; i < rktparlist->cnt; i++) { const rd_kafka_topic_partition_t *rktpar = &rktparlist->elems[i]; char errstr[128]; char offsetstr[32]; + const char *topic_id_str = NULL; + const rd_kafka_Uuid_t topic_id = + rd_kafka_topic_partition_get_topic_id(rktpar); int r; if (!rktpar->err && (fmt_flags & RD_KAFKA_FMT_F_ONLY_ERR)) @@ -4125,14 +4147,19 @@ const char *rd_kafka_topic_partition_list_str( else offsetstr[0] = '\0'; + + if (!RD_KAFKA_UUID_IS_ZERO(topic_id)) + topic_id_str = rd_kafka_Uuid_base64str(&topic_id); + r = rd_snprintf(&dest[of], dest_size - of, "%s" - "%s[%" PRId32 + "%s(%s)[%" PRId32 "]" "%s" "%s", of == 0 ? "" : ", ", rktpar->topic, - rktpar->partition, offsetstr, errstr); + topic_id_str, rktpar->partition, offsetstr, + errstr); if ((size_t)r >= dest_size - of) { rd_snprintf(&dest[dest_size - 4], 4, "..."); diff --git a/src/rdkafka_partition.h b/src/rdkafka_partition.h index cdb023d87a..d104c9b6f7 100644 --- a/src/rdkafka_partition.h +++ b/src/rdkafka_partition.h @@ -712,6 +712,13 @@ rd_kafka_topic_partition_t *rd_kafka_topic_partition_list_add_with_topic_id( rd_kafka_Uuid_t topic_id, int32_t partition); +rd_kafka_topic_partition_t * +rd_kafka_topic_partition_list_add_with_topic_name_and_id( + rd_kafka_topic_partition_list_t *rktparlist, + rd_kafka_Uuid_t topic_id, + const char *topic, + int32_t partition); + rd_kafka_topic_partition_t *rd_kafka_topic_partition_list_upsert( rd_kafka_topic_partition_list_t *rktparlist, const char *topic, diff --git a/src/rdkafka_proto.h b/src/rdkafka_proto.h index 48f93ebc71..7f202a5e29 100644 --- a/src/rdkafka_proto.h +++ b/src/rdkafka_proto.h @@ -631,6 +631,12 @@ static RD_INLINE RD_UNUSED void rd_list_Uuid_destroy(void *uuid) { rd_kafka_Uuid_destroy((rd_kafka_Uuid_t *)uuid); } +static RD_INLINE RD_UNUSED int rd_list_Uuid_cmp(const void *uuid1, + const void *uuid2) { + return rd_kafka_Uuid_cmp(*((rd_kafka_Uuid_t *)uuid1), + *((rd_kafka_Uuid_t *)uuid2)); +} + /** * @name Producer ID and Epoch for the Idempotent Producer diff --git a/src/rdkafka_request.c b/src/rdkafka_request.c index 54dadd8efc..8179008dd9 100644 --- a/src/rdkafka_request.c +++ b/src/rdkafka_request.c @@ -220,6 +220,9 @@ rd_kafka_topic_partition_list_t *rd_kafka_buf_read_topic_partitions( int32_t TopicArrayCnt; rd_kafka_topic_partition_list_t *parts = NULL; + /* We assume here that the topic partition list is not NULL. + * FIXME: check NULL topic array case, if required in future. */ + rd_kafka_buf_read_arraycnt(rkbuf, &TopicArrayCnt, RD_KAFKAP_TOPICS_MAX); parts = rd_kafka_topic_partition_list_new( @@ -279,6 +282,10 @@ rd_kafka_topic_partition_list_t *rd_kafka_buf_read_topic_partitions( case RD_KAFKA_TOPIC_PARTITION_FIELD_METADATA: rd_assert(!*"metadata not implemented"); break; + case RD_KAFKA_TOPIC_PARTITION_FIELD_TIMESTAMP: + rd_assert( + !*"timestamp not implemented"); + break; case RD_KAFKA_TOPIC_PARTITION_FIELD_NOOP: /* Fallback */ case RD_KAFKA_TOPIC_PARTITION_FIELD_END: @@ -454,6 +461,11 @@ int rd_kafka_buf_write_topic_partitions( rkbuf, rktpar->metadata, rktpar->metadata_size); break; + case RD_KAFKA_TOPIC_PARTITION_FIELD_TIMESTAMP: + /* Current implementation is just + * sending a NULL value */ + rd_kafka_buf_write_i64(rkbuf, -1); + break; case RD_KAFKA_TOPIC_PARTITION_FIELD_NOOP: break; case RD_KAFKA_TOPIC_PARTITION_FIELD_END: @@ -1052,7 +1064,7 @@ void rd_kafka_OffsetForLeaderEpochRequest( RD_KAFKA_TOPIC_PARTITION_FIELD_END}; rd_kafka_buf_write_topic_partitions( rkbuf, parts, rd_false /*include invalid offsets*/, - rd_false /*skip valid offsets */, rd_false /*don't use topic id*/, + rd_false /*skip valid offsets*/, rd_false /*don't use topic id*/, rd_true /*use topic name*/, fields); rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); @@ -1584,12 +1596,16 @@ rd_kafka_handle_OffsetCommit(rd_kafka_t *rk, rd_kafka_buf_t *request, rd_kafka_topic_partition_list_t *offsets, rd_bool_t ignore_cgrp) { - const int log_decode_errors = LOG_ERR; - int32_t TopicArrayCnt; - int errcnt = 0; - int partcnt = 0; - int i; - int actions = 0; + const int log_decode_errors = LOG_ERR; + int errcnt = 0; + int partcnt = 0; + int actions = 0; + rd_kafka_topic_partition_list_t *partitions = NULL; + rd_kafka_topic_partition_t *partition = NULL; + const rd_kafka_topic_partition_field_t fields[] = { + RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION, + RD_KAFKA_TOPIC_PARTITION_FIELD_ERR, + RD_KAFKA_TOPIC_PARTITION_FIELD_END}; if (err) goto err; @@ -1597,49 +1613,37 @@ rd_kafka_handle_OffsetCommit(rd_kafka_t *rk, if (rd_kafka_buf_ApiVersion(rkbuf) >= 3) rd_kafka_buf_read_throttle_time(rkbuf); - rd_kafka_buf_read_i32(rkbuf, &TopicArrayCnt); - for (i = 0; i < TopicArrayCnt; i++) { - rd_kafkap_str_t topic; - char *topic_str; - int32_t PartArrayCnt; - int j; - - rd_kafka_buf_read_str(rkbuf, &topic); - rd_kafka_buf_read_i32(rkbuf, &PartArrayCnt); - - RD_KAFKAP_STR_DUPA(&topic_str, &topic); + partitions = rd_kafka_buf_read_topic_partitions( + rkbuf, rd_false /*don't use topic_id*/, rd_true /*use topic name*/, + 0, fields); - for (j = 0; j < PartArrayCnt; j++) { - int32_t partition; - int16_t ErrorCode; - rd_kafka_topic_partition_t *rktpar; - - rd_kafka_buf_read_i32(rkbuf, &partition); - rd_kafka_buf_read_i16(rkbuf, &ErrorCode); + if (!partitions) + goto err_parse; - rktpar = rd_kafka_topic_partition_list_find( - offsets, topic_str, partition); + partcnt = partitions->cnt; + RD_KAFKA_TPLIST_FOREACH(partition, partitions) { + rd_kafka_topic_partition_t *rktpar; - if (!rktpar) { - /* Received offset for topic/partition we didn't - * ask for, this shouldn't really happen. */ - continue; - } + rktpar = rd_kafka_topic_partition_list_find( + offsets, partition->topic, partition->partition); - rktpar->err = ErrorCode; - if (ErrorCode) { - err = ErrorCode; - errcnt++; - - /* Accumulate actions for per-partition - * errors. */ - actions |= rd_kafka_handle_OffsetCommit_error( - rkb, request, rktpar); - } + if (!rktpar) { + /* Received offset for topic/partition we didn't + * ask for, this shouldn't really happen. */ + continue; + } - partcnt++; + if (partition->err) { + rktpar->err = partition->err; + err = partition->err; + errcnt++; + /* Accumulate actions for per-partition + * errors. */ + actions |= rd_kafka_handle_OffsetCommit_error( + rkb, request, partition); } } + rd_kafka_topic_partition_list_destroy(partitions); /* If all partitions failed use error code * from last partition as the global error. */ @@ -1707,23 +1711,18 @@ int rd_kafka_OffsetCommitRequest(rd_kafka_broker_t *rkb, void *opaque, const char *reason) { rd_kafka_buf_t *rkbuf; - ssize_t of_TopicCnt = -1; - int TopicCnt = 0; - const char *last_topic = NULL; - ssize_t of_PartCnt = -1; - int PartCnt = 0; - int tot_PartCnt = 0; - int i; + int tot_PartCnt = 0; int16_t ApiVersion; int features; ApiVersion = rd_kafka_broker_ApiVersion_supported( - rkb, RD_KAFKAP_OffsetCommit, 0, 7, &features); + rkb, RD_KAFKAP_OffsetCommit, 0, 9, &features); rd_kafka_assert(NULL, offsets != NULL); - rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_OffsetCommit, 1, - 100 + (offsets->cnt * 128)); + rkbuf = rd_kafka_buf_new_flexver_request(rkb, RD_KAFKAP_OffsetCommit, 1, + 100 + (offsets->cnt * 128), + ApiVersion >= 8); /* ConsumerGroup */ rd_kafka_buf_write_str(rkbuf, cgmetadata->group_id, -1); @@ -1748,61 +1747,23 @@ int rd_kafka_OffsetCommitRequest(rd_kafka_broker_t *rkb, /* Sort offsets by topic */ rd_kafka_topic_partition_list_sort_by_topic(offsets); - /* TopicArrayCnt: Will be updated when we know the number of topics. */ - of_TopicCnt = rd_kafka_buf_write_i32(rkbuf, 0); - - for (i = 0; i < offsets->cnt; i++) { - rd_kafka_topic_partition_t *rktpar = &offsets->elems[i]; - - /* Skip partitions with invalid offset. */ - if (rktpar->offset < 0) - continue; - - if (last_topic == NULL || strcmp(last_topic, rktpar->topic)) { - /* New topic */ - - /* Finalize previous PartitionCnt */ - if (PartCnt > 0) - rd_kafka_buf_update_u32(rkbuf, of_PartCnt, - PartCnt); - - /* TopicName */ - rd_kafka_buf_write_str(rkbuf, rktpar->topic, -1); - /* PartitionCnt, finalized later */ - of_PartCnt = rd_kafka_buf_write_i32(rkbuf, 0); - PartCnt = 0; - last_topic = rktpar->topic; - TopicCnt++; - } - - /* Partition */ - rd_kafka_buf_write_i32(rkbuf, rktpar->partition); - PartCnt++; - tot_PartCnt++; - - /* Offset */ - rd_kafka_buf_write_i64(rkbuf, rktpar->offset); + /* Write partition list, filtering out partitions with valid + * offsets */ + rd_kafka_topic_partition_field_t fields[] = { + RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION, + RD_KAFKA_TOPIC_PARTITION_FIELD_OFFSET, + ApiVersion >= 6 ? RD_KAFKA_TOPIC_PARTITION_FIELD_EPOCH + : RD_KAFKA_TOPIC_PARTITION_FIELD_NOOP, + ApiVersion == 1 ? RD_KAFKA_TOPIC_PARTITION_FIELD_TIMESTAMP + : RD_KAFKA_TOPIC_PARTITION_FIELD_NOOP, + RD_KAFKA_TOPIC_PARTITION_FIELD_METADATA, + RD_KAFKA_TOPIC_PARTITION_FIELD_END}; - /* v6: KIP-101 CommittedLeaderEpoch */ - if (ApiVersion >= 6) - rd_kafka_buf_write_i32( - rkbuf, - rd_kafka_topic_partition_get_leader_epoch(rktpar)); - - /* v1: TimeStamp */ - if (ApiVersion == 1) - rd_kafka_buf_write_i64(rkbuf, -1); - - /* Metadata */ - /* Java client 0.9.0 and broker <0.10.0 can't parse - * Null metadata fields, so as a workaround we send an - * empty string if it's Null. */ - if (!rktpar->metadata) - rd_kafka_buf_write_str(rkbuf, "", 0); - else - rd_kafka_buf_write_str(rkbuf, rktpar->metadata, - rktpar->metadata_size); - } + tot_PartCnt = rd_kafka_buf_write_topic_partitions( + rkbuf, offsets, rd_true /*skip invalid offsets*/, + rd_false /*include valid offsets */, + rd_false /*don't use topic id*/, rd_true /*use topic name*/, + fields); if (tot_PartCnt == 0) { /* No topic+partitions had valid offsets to commit. */ @@ -1811,13 +1772,6 @@ int rd_kafka_OffsetCommitRequest(rd_kafka_broker_t *rkb, return 0; } - /* Finalize previous PartitionCnt */ - if (PartCnt > 0) - rd_kafka_buf_update_u32(rkbuf, of_PartCnt, PartCnt); - - /* Finalize TopicCnt */ - rd_kafka_buf_update_u32(rkbuf, of_TopicCnt, TopicCnt); - rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); rd_rkb_dbg(rkb, TOPIC, "OFFSET", diff --git a/src/rdkafka_request.h b/src/rdkafka_request.h index 39121a44a0..bbb3b747b9 100644 --- a/src/rdkafka_request.h +++ b/src/rdkafka_request.h @@ -75,6 +75,8 @@ typedef enum { RD_KAFKA_TOPIC_PARTITION_FIELD_CURRENT_EPOCH, /** Read/write int16_t for error code */ RD_KAFKA_TOPIC_PARTITION_FIELD_ERR, + /** Read/write timestamp */ + RD_KAFKA_TOPIC_PARTITION_FIELD_TIMESTAMP, /** Read/write str for metadata */ RD_KAFKA_TOPIC_PARTITION_FIELD_METADATA, /** Noop, useful for ternary ifs */ diff --git a/src/rdkafka_topic.c b/src/rdkafka_topic.c index 5c398db5b8..afdb3b5bf5 100644 --- a/src/rdkafka_topic.c +++ b/src/rdkafka_topic.c @@ -1314,9 +1314,10 @@ rd_kafka_topic_metadata_update(rd_kafka_topic_t *rkt, rkt->rkt_ts_metadata = ts_age; /* Set topic state. - * UNKNOWN_TOPIC_OR_PART may indicate that auto.create.topics failed */ + * UNKNOWN_TOPIC_* may indicate that auto.create.topics failed */ if (mdt->err == RD_KAFKA_RESP_ERR_TOPIC_EXCEPTION /*invalid topic*/ || - mdt->err == RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART) + mdt->err == RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART || + mdt->err == RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_ID) rd_kafka_topic_set_notexists(rkt, mdt->err); else if (mdt->partition_cnt > 0) rd_kafka_topic_set_state(rkt, RD_KAFKA_TOPIC_S_EXISTS); @@ -1446,8 +1447,15 @@ int rd_kafka_topic_metadata_update2( int r; rd_kafka_wrlock(rkb->rkb_rk); - if (!(rkt = - rd_kafka_topic_find(rkb->rkb_rk, mdt->topic, 0 /*!lock*/))) { + + if (likely(mdt->topic != NULL)) { + rkt = rd_kafka_topic_find(rkb->rkb_rk, mdt->topic, 0 /*!lock*/); + } else { + rkt = rd_kafka_topic_find_by_topic_id(rkb->rkb_rk, + mdit->topic_id); + } + + if (!rkt) { rd_kafka_wrunlock(rkb->rkb_rk); return -1; /* Ignore topics that we dont have locally. */ }