Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KIP-951] Review comments for the tests #4767

Merged
merged 5 commits into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

librdkafka v2.5.0 is a feature release.

* [KIP-951](https://cwiki.apache.org/confluence/display/KAFKA/KIP-951%3A+Leader+discovery+optimisations+for+the+client)
Leader discovery optimisations for the client (#4756, #4767).
* Fix segfault when using long client id because of erased segment when using flexver. (#4689)
* Fix for an idempotent producer error, with a message batch not reconstructed
identically when retried (#4750)
Expand All @@ -12,6 +14,10 @@ librdkafka v2.5.0 is a feature release.
* Update bundled lz4 (used when `./configure --disable-lz4-ext`) to
[v1.9.4](https://github.com/lz4/lz4/releases/tag/v1.9.4), which contains
bugfixes and performance improvements (#4726).
* [KIP-951](https://cwiki.apache.org/confluence/display/KAFKA/KIP-951%3A+Leader+discovery+optimisations+for+the+client)
With this KIP leader updates are received through Produce and Fetch responses
in case of errors corresponding to leader changes and a partition migration
happens before refreshing the metadata cache (#4756, #4767).


## Fixes
Expand Down
9 changes: 5 additions & 4 deletions INTRODUCTION.md
Original file line number Diff line number Diff line change
Expand Up @@ -2044,7 +2044,7 @@ The [Apache Kafka Implementation Proposals (KIPs)](https://cwiki.apache.org/conf
| KIP-559 - Make the Kafka Protocol Friendlier with L7 Proxies | 2.5.0 | Not supported |
| KIP-568 - Explicit rebalance triggering on the Consumer | 2.6.0 | Not supported |
| KIP-659 - Add metadata to DescribeConfigsResponse | 2.6.0 | Not supported |
| KIP-580 - Exponential backoff for Kafka clients | 3.7.0 (WIP) | Supported |
| KIP-580 - Exponential backoff for Kafka clients | 3.7.0 | Supported |
| KIP-584 - Versioning scheme for features | WIP | Not supported |
| KIP-588 - Allow producers to recover gracefully from txn timeouts | 2.8.0 (WIP) | Not supported |
| KIP-601 - Configurable socket connection timeout | 2.7.0 | Supported |
Expand All @@ -2053,8 +2053,9 @@ The [Apache Kafka Implementation Proposals (KIPs)](https://cwiki.apache.org/conf
| KIP-654 - Aborted txns with non-flushed msgs should not be fatal | 2.7.0 | Supported |
| KIP-735 - Increase default consumer session timeout | 3.0.0 | Supported |
| KIP-768 - SASL/OAUTHBEARER OIDC support | 3.0 | Supported |
| KIP-881 - Rack-aware Partition Assignment for Kafka Consumers | 3.5.0 (WIP) | Supported |
| KIP-881 - Rack-aware Partition Assignment for Kafka Consumers | 3.5.0 | Supported |
| KIP-848 - The Next Generation of the Consumer Rebalance Protocol | 3.7.0 (EA) | Early Access |
| KIP-951 - Leader discovery optimisations for the client | 3.7.0 | Supported |



Expand All @@ -2068,8 +2069,8 @@ release of librdkafka.

| ApiKey | Request name | Kafka max | librdkafka max |
| ------- | ----------------------------- | ---------- | -------------- |
| 0 | Produce | 10 | 9 |
| 1 | Fetch | 16 | 15 |
| 0 | Produce | 10 | 10 |
| 1 | Fetch | 16 | 16 |
| 2 | ListOffsets | 8 | 7 |
| 3 | Metadata | 12 | 12 |
| 8 | OffsetCommit | 9 | 9 |
Expand Down
2 changes: 1 addition & 1 deletion src/rdkafka_metadata.c
Original file line number Diff line number Diff line change
Expand Up @@ -2094,7 +2094,7 @@ rd_kafka_metadata_update_op(rd_kafka_t *rk, rd_kafka_metadata_internal_t *mdi) {

rd_kafka_dbg(rk, METADATA, "METADATAUPDATE",
"Partition %s(%s)[%" PRId32
"]: "
"]:"
" updated with leader %" PRId32
" and epoch %" PRId32,
topic, rd_kafka_Uuid_base64str(&topic_id),
Expand Down
217 changes: 203 additions & 14 deletions src/rdkafka_mock_handlers.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,58 @@



void rd_kafka_mock_Produce_reply_tags_partition_write(
rd_kafka_buf_t *rkbuf,
int tagtype,
rd_kafka_mock_partition_t *mpart) {
switch (tagtype) {
case 0: /* CurrentLeader */
/* Leader id */
rd_kafka_buf_write_i32(rkbuf, mpart->leader->id);
/* Leader epoch */
rd_kafka_buf_write_i32(rkbuf, mpart->leader_epoch);
/* Field tags */
rd_kafka_buf_write_tags_empty(rkbuf);
break;
default:
break;
}
}

void rd_kafka_mock_Produce_reply_tags_write(
rd_kafka_buf_t *rkbuf,
int tagtype,
rd_kafka_mock_broker_t **changed_leaders,
int changed_leader_cnt) {
int i;
switch (tagtype) {
case 0: /* NodeEndpoints */
/* #NodeEndpoints */
rd_kafka_buf_write_arraycnt(rkbuf, changed_leader_cnt);
for (i = 0; i < changed_leader_cnt; i++) {
rd_kafka_mock_broker_t *changed_leader =
changed_leaders[i];
/* Leader id */
rd_kafka_buf_write_i32(rkbuf, changed_leader->id);
/* Leader Hostname */
rd_kafka_buf_write_str(
rkbuf, changed_leader->advertised_listener, -1);

/* Leader Port number */
rd_kafka_buf_write_i32(rkbuf,
(int32_t)changed_leader->port);

/* Leader Rack */
rd_kafka_buf_write_str(rkbuf, changed_leader->rack, -1);

/* Field tags */
rd_kafka_buf_write_tags_empty(rkbuf);
}
default:
break;
}
}

/**
* @brief Handle ProduceRequest
*/
Expand All @@ -55,6 +107,12 @@ static int rd_kafka_mock_handle_Produce(rd_kafka_mock_connection_t *mconn,
int16_t Acks;
int32_t TimeoutMs;
rd_kafka_resp_err_t all_err;
int32_t tags_to_write[1] = {0};
size_t tags_to_write_cnt = 0;
int changed_leaders_cnt = 0;
rd_kafka_mock_broker_t **changed_leaders =
rd_calloc(mcluster->broker_cnt, sizeof(*changed_leaders));


if (rkbuf->rkbuf_reqhdr.ApiVersion >= 3)
rd_kafka_buf_read_str(rkbuf, &TransactionalId);
Expand All @@ -78,7 +136,6 @@ static int rd_kafka_mock_handle_Produce(rd_kafka_mock_connection_t *mconn,
rd_kafka_buf_read_str(rkbuf, &Topic);
rd_kafka_buf_read_arraycnt(rkbuf, &PartitionCnt,
RD_KAFKAP_PARTITIONS_MAX);

mtopic = rd_kafka_mock_topic_find_by_kstr(mcluster, &Topic);

/* Response: Topic */
Expand All @@ -92,6 +149,8 @@ static int rd_kafka_mock_handle_Produce(rd_kafka_mock_connection_t *mconn,
rd_kafkap_bytes_t records;
rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;
int64_t BaseOffset = -1;
int32_t partition_tags_to_write[1] = {0};
size_t partition_tags_to_write_cnt = 0;

rd_kafka_buf_read_i32(rkbuf, &Partition);

Expand All @@ -100,10 +159,8 @@ static int rd_kafka_mock_handle_Produce(rd_kafka_mock_connection_t *mconn,
Partition);

rd_kafka_buf_read_kbytes(rkbuf, &records);

/* Partition Tags */
rd_kafka_buf_skip_tags(rkbuf);

/* Response: Partition */
rd_kafka_buf_write_i32(resp, Partition);

Expand Down Expand Up @@ -161,8 +218,38 @@ static int rd_kafka_mock_handle_Produce(rd_kafka_mock_connection_t *mconn,
/* Response: ErrorMessage */
rd_kafka_buf_write_str(resp, NULL, 0);
}

if (rkbuf->rkbuf_reqhdr.ApiVersion >= 10 &&
err == RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION) {
int changed_leader_idx;
/* See if this leader is already included */
for (changed_leader_idx = 0;
changed_leader_idx < changed_leaders_cnt;
changed_leader_idx++) {
if (changed_leaders[changed_leader_idx]
->id == mpart->leader->id)
break;
}
if (changed_leader_idx == changed_leaders_cnt) {
/* Add the new leader that wasn't
* present */
changed_leaders[changed_leaders_cnt] =
mpart->leader;
changed_leaders_cnt++;
}

partition_tags_to_write
[partition_tags_to_write_cnt] =
0 /* CurrentLeader */;
partition_tags_to_write_cnt++;
}

/* Response: Partition tags */
rd_kafka_buf_write_tags_empty(resp);
rd_kafka_buf_write_tags(
resp,
rd_kafka_mock_Produce_reply_tags_partition_write,
partition_tags_to_write,
partition_tags_to_write_cnt, mpart);
}

/* Topic tags */
Expand All @@ -177,17 +264,76 @@ static int rd_kafka_mock_handle_Produce(rd_kafka_mock_connection_t *mconn,
}

/* Response: Top level tags */
rd_kafka_buf_write_tags_empty(resp);
if (changed_leaders_cnt) {
tags_to_write[tags_to_write_cnt] = 0 /* NodeEndpoints */;
tags_to_write_cnt++;
}

rd_kafka_mock_connection_send_response0(mconn, resp, rd_true);
rd_kafka_buf_write_tags(resp, rd_kafka_mock_Produce_reply_tags_write,
tags_to_write, tags_to_write_cnt,
changed_leaders, changed_leaders_cnt);

rd_kafka_mock_connection_send_response0(mconn, resp, rd_true);
rd_free(changed_leaders);
return 0;

err_parse:
rd_free(changed_leaders);
rd_kafka_buf_destroy(resp);
return -1;
}

void rd_kafka_mock_Fetch_reply_tags_partition_write(
rd_kafka_buf_t *rkbuf,
int tagtype,
rd_kafka_mock_partition_t *mpart) {
switch (tagtype) {
case 1: /* CurrentLeader */
/* Leader id */
rd_kafka_buf_write_i32(rkbuf, mpart->leader->id);
/* Leader epoch */
rd_kafka_buf_write_i32(rkbuf, mpart->leader_epoch);
/* Field tags */
rd_kafka_buf_write_tags_empty(rkbuf);
break;
default:
break;
}
}

void rd_kafka_mock_Fetch_reply_tags_write(
rd_kafka_buf_t *rkbuf,
int tagtype,
rd_kafka_mock_broker_t **changed_leaders,
int changed_leader_cnt) {
int i;
switch (tagtype) {
case 0: /* NodeEndpoints */
/* #NodeEndpoints */
rd_kafka_buf_write_arraycnt(rkbuf, changed_leader_cnt);
for (i = 0; i < changed_leader_cnt; i++) {
rd_kafka_mock_broker_t *changed_leader =
changed_leaders[i];
/* Leader id */
rd_kafka_buf_write_i32(rkbuf, changed_leader->id);
/* Leader Hostname */
rd_kafka_buf_write_str(
rkbuf, changed_leader->advertised_listener, -1);

/* Leader Port number */
rd_kafka_buf_write_i32(rkbuf,
(int32_t)changed_leader->port);

/* Leader Rack */
rd_kafka_buf_write_str(rkbuf, changed_leader->rack, -1);

/* Field tags */
rd_kafka_buf_write_tags_empty(rkbuf);
}
default:
break;
}
}


/**
Expand All @@ -204,6 +350,13 @@ static int rd_kafka_mock_handle_Fetch(rd_kafka_mock_connection_t *mconn,
int8_t IsolationLevel;
size_t totsize = 0;

int32_t tags_to_write[1] = {0};
uint64_t tags_to_write_cnt = 0;

int changed_leaders_cnt = 0;
rd_kafka_mock_broker_t **changed_leaders =
rd_calloc(mcluster->broker_cnt, sizeof(*changed_leaders));

if (rkbuf->rkbuf_reqhdr.ApiVersion <= 14) {
rd_kafka_buf_read_i32(rkbuf, &ReplicaId);
}
Expand Down Expand Up @@ -281,8 +434,10 @@ static int rd_kafka_mock_handle_Fetch(rd_kafka_mock_connection_t *mconn,
rd_kafka_mock_partition_t *mpart = NULL;
rd_kafka_resp_err_t err = all_err;
rd_bool_t on_follower;
size_t partsize = 0;
const rd_kafka_mock_msgset_t *mset = NULL;
size_t partsize = 0;
const rd_kafka_mock_msgset_t *mset = NULL;
int32_t partition_tags_to_write[1] = {0};
uint64_t partition_tags_to_write_cnt = 0;

rd_kafka_buf_read_i32(rkbuf, &Partition);

Expand Down Expand Up @@ -422,14 +577,39 @@ static int rd_kafka_mock_handle_Fetch(rd_kafka_mock_connection_t *mconn,
rd_kafka_buf_write_arraycnt(resp, 0);
}

if (rkbuf->rkbuf_reqhdr.ApiVersion >= 12 &&
err == RD_KAFKA_RESP_ERR_NOT_LEADER_OR_FOLLOWER) {
int changed_leader_idx;
for (changed_leader_idx = 0;
changed_leader_idx < changed_leaders_cnt;
changed_leader_idx++) {
if (changed_leaders[changed_leader_idx]
->id == mpart->leader->id)
break;
}
if (changed_leader_idx == changed_leaders_cnt) {
changed_leaders[changed_leaders_cnt] =
mpart->leader;
changed_leaders_cnt++;
}
/* CurrentLeader */
partition_tags_to_write
[partition_tags_to_write_cnt] = 1;
partition_tags_to_write_cnt++;
}

/* Response: Partition tags */
rd_kafka_buf_write_tags_empty(resp);
rd_kafka_buf_write_tags(
resp,
rd_kafka_mock_Fetch_reply_tags_partition_write,
partition_tags_to_write,
partition_tags_to_write_cnt, mpart);
}

/* Response: Topic tags */
rd_kafka_buf_write_tags_empty(resp);
/* Topic tags */
rd_kafka_buf_skip_tags(rkbuf);
/* Response: Topic tags */
rd_kafka_buf_write_tags_empty(resp);
}

if (rkbuf->rkbuf_reqhdr.ApiVersion >= 7) {
Expand Down Expand Up @@ -466,8 +646,15 @@ static int rd_kafka_mock_handle_Fetch(rd_kafka_mock_connection_t *mconn,
/* Matt might do something sensible with this */
}

if (rkbuf->rkbuf_reqhdr.ApiVersion >= 16 && changed_leaders_cnt) {
tags_to_write[tags_to_write_cnt] = 0 /* NodeEndpoints */;
tags_to_write_cnt++;
}

/* Response: Top level tags */
rd_kafka_buf_write_tags_empty(resp);
rd_kafka_buf_write_tags(resp, rd_kafka_mock_Fetch_reply_tags_write,
tags_to_write, tags_to_write_cnt,
changed_leaders, changed_leaders_cnt);

/* If there was no data, delay up to MaxWait.
* This isn't strictly correct since we should cut the wait short
Expand All @@ -478,10 +665,12 @@ static int rd_kafka_mock_handle_Fetch(rd_kafka_mock_connection_t *mconn,
resp->rkbuf_ts_retry = rd_clock() + (MaxWait * 1000);

rd_kafka_mock_connection_send_response0(mconn, resp, rd_true);
rd_free(changed_leaders);
return 0;

err_parse:
rd_kafka_buf_destroy(resp);
rd_free(changed_leaders);
return -1;
}

Expand Down Expand Up @@ -2306,8 +2495,8 @@ rd_kafka_mock_handle_OffsetForLeaderEpoch(rd_kafka_mock_connection_t *mconn,
const struct rd_kafka_mock_api_handler
rd_kafka_mock_api_handlers[RD_KAFKAP__NUM] = {
/* [request-type] = { MinVersion, MaxVersion, FlexVersion, callback } */
[RD_KAFKAP_Produce] = {0, 9, 9, rd_kafka_mock_handle_Produce},
[RD_KAFKAP_Fetch] = {0, 15, 12, rd_kafka_mock_handle_Fetch},
[RD_KAFKAP_Produce] = {0, 10, 9, rd_kafka_mock_handle_Produce},
[RD_KAFKAP_Fetch] = {0, 16, 12, rd_kafka_mock_handle_Fetch},
[RD_KAFKAP_ListOffsets] = {0, 7, 6, rd_kafka_mock_handle_ListOffsets},
[RD_KAFKAP_OffsetFetch] = {0, 6, 6, rd_kafka_mock_handle_OffsetFetch},
[RD_KAFKAP_OffsetCommit] = {0, 9, 8, rd_kafka_mock_handle_OffsetCommit},
Expand Down
Loading