Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for an undesired partition migration with stale leader epoch #4680

Merged
merged 4 commits into from
Apr 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ librdkafka v2.4.0 is a feature release:
* Fix for a wrong error returned on full metadata refresh before joining
a consumer group (#4678).
* Fix to metadata refresh interruption (#4679).
* Fix for an undesired partition migration with stale leader epoch (#4680).


## Upgrade considerations
Expand Down Expand Up @@ -63,6 +64,11 @@ librdkafka v2.4.0 is a feature release:
Metadata refreshes without partition leader change could lead to a loop of
metadata calls at fixed intervals. Solved by stopping metadata refresh when
all existing metadata is non-stale. Happening since 2.3.0 (#4679).
* Issues: #4687.
A partition migration could happen, using stale metadata, when the partition
was undergoing a validation and being retried because of an error.
Solved by doing a partition migration only with a non-stale leader epoch.
Happening since 2.1.0 (#4680).

### Consumer fixes

Expand Down
72 changes: 72 additions & 0 deletions src/rdkafka_mock.c
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,39 @@ rd_kafka_mock_partition_assign_replicas(rd_kafka_mock_partition_t *mpart,
mpart, mpart->replicas[rd_jitter(0, replica_cnt - 1)]);
}

/**
* @brief Push a partition leader response to passed \p mpart .
*/
static void
rd_kafka_mock_partition_push_leader_response0(rd_kafka_mock_partition_t *mpart,
int32_t leader_id,
int32_t leader_epoch) {
rd_kafka_mock_partition_leader_t *leader_response;

leader_response = rd_calloc(1, sizeof(*leader_response));
leader_response->leader_id = leader_id;
leader_response->leader_epoch = leader_epoch;
TAILQ_INSERT_TAIL(&mpart->leader_responses, leader_response, link);
}

/**
* @brief Return the first mocked partition leader response in \p mpart ,
* if available.
*/
rd_kafka_mock_partition_leader_t *
rd_kafka_mock_partition_next_leader_response(rd_kafka_mock_partition_t *mpart) {
return TAILQ_FIRST(&mpart->leader_responses);
}

/**
* @brief Unlink and destroy a partition leader response
*/
void rd_kafka_mock_partition_leader_destroy(
rd_kafka_mock_partition_t *mpart,
rd_kafka_mock_partition_leader_t *mpart_leader) {
TAILQ_REMOVE(&mpart->leader_responses, mpart_leader, link);
rd_free(mpart_leader);
}

/**
* @brief Unlink and destroy committed offset
Expand Down Expand Up @@ -546,13 +578,18 @@ rd_kafka_mock_commit_offset(rd_kafka_mock_partition_t *mpart,
static void rd_kafka_mock_partition_destroy(rd_kafka_mock_partition_t *mpart) {
rd_kafka_mock_msgset_t *mset, *tmp;
rd_kafka_mock_committed_offset_t *coff, *tmpcoff;
rd_kafka_mock_partition_leader_t *mpart_leader, *tmp_mpart_leader;

TAILQ_FOREACH_SAFE(mset, &mpart->msgsets, link, tmp)
rd_kafka_mock_msgset_destroy(mpart, mset);

TAILQ_FOREACH_SAFE(coff, &mpart->committed_offsets, link, tmpcoff)
rd_kafka_mock_committed_offset_destroy(mpart, coff);

TAILQ_FOREACH_SAFE(mpart_leader, &mpart->leader_responses, link,
tmp_mpart_leader)
rd_kafka_mock_partition_leader_destroy(mpart, mpart_leader);

rd_list_destroy(&mpart->pidstates);

rd_free(mpart->replicas);
Expand All @@ -579,6 +616,7 @@ static void rd_kafka_mock_partition_init(rd_kafka_mock_topic_t *mtopic,
mpart->update_follower_end_offset = rd_true;

TAILQ_INIT(&mpart->committed_offsets);
TAILQ_INIT(&mpart->leader_responses);

rd_list_init(&mpart->pidstates, 0, rd_free);

Expand Down Expand Up @@ -2096,6 +2134,23 @@ rd_kafka_mock_partition_set_follower_wmarks(rd_kafka_mock_cluster_t *mcluster,
rd_kafka_op_req(mcluster->ops, rko, RD_POLL_INFINITE));
}

rd_kafka_resp_err_t
rd_kafka_mock_partition_push_leader_response(rd_kafka_mock_cluster_t *mcluster,
const char *topic,
int partition,
int32_t leader_id,
int32_t leader_epoch) {
rd_kafka_op_t *rko = rd_kafka_op_new(RD_KAFKA_OP_MOCK);
rko->rko_u.mock.name = rd_strdup(topic);
rko->rko_u.mock.cmd = RD_KAFKA_MOCK_CMD_PART_PUSH_LEADER_RESPONSE;
rko->rko_u.mock.partition = partition;
rko->rko_u.mock.leader_id = leader_id;
rko->rko_u.mock.leader_epoch = leader_epoch;

return rd_kafka_op_err_destroy(
rd_kafka_op_req(mcluster->ops, rko, RD_POLL_INFINITE));
}

rd_kafka_resp_err_t
rd_kafka_mock_broker_set_down(rd_kafka_mock_cluster_t *mcluster,
int32_t broker_id) {
Expand Down Expand Up @@ -2379,6 +2434,23 @@ rd_kafka_mock_cluster_cmd(rd_kafka_mock_cluster_t *mcluster,
mpart->update_follower_end_offset = rd_false;
}
break;
case RD_KAFKA_MOCK_CMD_PART_PUSH_LEADER_RESPONSE:
mpart = rd_kafka_mock_partition_get(
mcluster, rko->rko_u.mock.name, rko->rko_u.mock.partition);
if (!mpart)
return RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART;

rd_kafka_dbg(mcluster->rk, MOCK, "MOCK",
"Push %s [%" PRId32 "] leader response: (%" PRId32
", %" PRId32 ")",
rko->rko_u.mock.name, rko->rko_u.mock.partition,
rko->rko_u.mock.leader_id,
rko->rko_u.mock.leader_epoch);

rd_kafka_mock_partition_push_leader_response0(
mpart, rko->rko_u.mock.leader_id,
rko->rko_u.mock.leader_epoch);
break;

/* Broker commands */
case RD_KAFKA_MOCK_CMD_BROKER_SET_UPDOWN:
Expand Down
18 changes: 18 additions & 0 deletions src/rdkafka_mock.h
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,24 @@ rd_kafka_mock_partition_set_follower_wmarks(rd_kafka_mock_cluster_t *mcluster,
int64_t lo,
int64_t hi);

/**
* @brief Push \p cnt Metadata leader response
* onto the cluster's stack for the given \p topic and \p partition.
*
* @param topic Topic to change
* @param partition Partition to change in \p topic
* @param leader_id Broker id of the leader node
* @param leader_epoch Leader epoch corresponding to the given \p leader_id
*
* @return Push operation error code
*/
RD_EXPORT
rd_kafka_resp_err_t
rd_kafka_mock_partition_push_leader_response(rd_kafka_mock_cluster_t *mcluster,
const char *topic,
int partition,
int32_t leader_id,
int32_t leader_epoch);

/**
* @brief Disconnects the broker and disallows any new connections.
Expand Down
51 changes: 39 additions & 12 deletions src/rdkafka_mock_handlers.c
Original file line number Diff line number Diff line change
Expand Up @@ -857,7 +857,8 @@ static int rd_kafka_mock_handle_ApiVersion(rd_kafka_mock_connection_t *mconn,
* @param mtopic may be NULL
*/
static void
rd_kafka_mock_buf_write_Metadata_Topic(rd_kafka_buf_t *resp,
rd_kafka_mock_buf_write_Metadata_Topic(rd_kafka_mock_cluster_t *mcluster,
rd_kafka_buf_t *resp,
int16_t ApiVersion,
const char *topic,
const rd_kafka_mock_topic_t *mtopic,
Expand All @@ -880,20 +881,46 @@ rd_kafka_mock_buf_write_Metadata_Topic(rd_kafka_buf_t *resp,
rd_kafka_buf_write_arraycnt(resp, partition_cnt);

for (i = 0; mtopic && i < partition_cnt; i++) {
const rd_kafka_mock_partition_t *mpart = &mtopic->partitions[i];
rd_kafka_mock_partition_leader_t *mpart_leader;
rd_kafka_mock_partition_t *mpart = &mtopic->partitions[i];
int r;

/* Response: ..Partitions.ErrorCode */
rd_kafka_buf_write_i16(resp, 0);
/* Response: ..Partitions.PartitionIndex */
rd_kafka_buf_write_i32(resp, mpart->id);
/* Response: ..Partitions.Leader */
rd_kafka_buf_write_i32(resp,
mpart->leader ? mpart->leader->id : -1);

if (ApiVersion >= 7) {
/* Response: ..Partitions.LeaderEpoch */
rd_kafka_buf_write_i32(resp, mpart->leader_epoch);
mpart_leader =
rd_kafka_mock_partition_next_leader_response(mpart);
if (mpart_leader) {
rd_kafka_dbg(
mcluster->rk, MOCK, "MOCK",
"MetadataRequest: using next leader response "
"(%" PRId32 ", %" PRId32 ")",
mpart_leader->leader_id,
mpart_leader->leader_epoch);

/* Response: ..Partitions.Leader */
rd_kafka_buf_write_i32(resp, mpart_leader->leader_id);

if (ApiVersion >= 7) {
/* Response: ..Partitions.LeaderEpoch */
rd_kafka_buf_write_i32(
resp, mpart_leader->leader_epoch);
}
rd_kafka_mock_partition_leader_destroy(mpart,
mpart_leader);
mpart_leader = NULL;
} else {
/* Response: ..Partitions.Leader */
rd_kafka_buf_write_i32(
resp, mpart->leader ? mpart->leader->id : -1);

if (ApiVersion >= 7) {
/* Response: ..Partitions.LeaderEpoch */
rd_kafka_buf_write_i32(resp,
mpart->leader_epoch);
}
}

/* Response: ..Partitions.#ReplicaNodes */
Expand Down Expand Up @@ -1010,8 +1037,8 @@ static int rd_kafka_mock_handle_Metadata(rd_kafka_mock_connection_t *mconn,

TAILQ_FOREACH(mtopic, &mcluster->topics, link) {
rd_kafka_mock_buf_write_Metadata_Topic(
resp, rkbuf->rkbuf_reqhdr.ApiVersion, mtopic->name,
mtopic, mtopic->err);
mcluster, resp, rkbuf->rkbuf_reqhdr.ApiVersion,
mtopic->name, mtopic, mtopic->err);
}

} else if (requested_topics) {
Expand All @@ -1033,8 +1060,8 @@ static int rd_kafka_mock_handle_Metadata(rd_kafka_mock_connection_t *mconn,
err = RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART;

rd_kafka_mock_buf_write_Metadata_Topic(
resp, rkbuf->rkbuf_reqhdr.ApiVersion, rktpar->topic,
mtopic, err ? err : mtopic->err);
mcluster, resp, rkbuf->rkbuf_reqhdr.ApiVersion,
rktpar->topic, mtopic, err ? err : mtopic->err);
}

} else {
Expand Down
21 changes: 21 additions & 0 deletions src/rdkafka_mock_int.h
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,16 @@ typedef struct rd_kafka_mock_committed_offset_s {
rd_kafkap_str_t *metadata; /**< Metadata, allocated separately */
} rd_kafka_mock_committed_offset_t;

/**
* @struct Leader id and epoch to return in a Metadata call.
*/
typedef struct rd_kafka_mock_partition_leader_s {
/**< Link to prev/next entries */
TAILQ_ENTRY(rd_kafka_mock_partition_leader_s) link;
int32_t leader_id; /**< Leader id */
int32_t leader_epoch; /**< Leader epoch */
} rd_kafka_mock_partition_leader_t;


TAILQ_HEAD(rd_kafka_mock_msgset_tailq_s, rd_kafka_mock_msgset_s);

Expand Down Expand Up @@ -276,6 +286,10 @@ typedef struct rd_kafka_mock_partition_s {
int32_t follower_id; /**< Preferred replica/follower */

struct rd_kafka_mock_topic_s *topic;

/**< Leader responses */
TAILQ_HEAD(, rd_kafka_mock_partition_leader_s)
leader_responses;
} rd_kafka_mock_partition_t;


Expand Down Expand Up @@ -477,6 +491,13 @@ int64_t rd_kafka_mock_partition_offset_for_leader_epoch(
const rd_kafka_mock_partition_t *mpart,
int32_t leader_epoch);

rd_kafka_mock_partition_leader_t *
rd_kafka_mock_partition_next_leader_response(rd_kafka_mock_partition_t *mpart);

void rd_kafka_mock_partition_leader_destroy(
rd_kafka_mock_partition_t *mpart,
rd_kafka_mock_partition_leader_t *mpart_leader);


/**
* @returns true if the ApiVersion is supported, else false.
Expand Down
12 changes: 11 additions & 1 deletion src/rdkafka_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,7 @@ struct rd_kafka_op_s {
RD_KAFKA_MOCK_CMD_PART_SET_LEADER,
RD_KAFKA_MOCK_CMD_PART_SET_FOLLOWER,
RD_KAFKA_MOCK_CMD_PART_SET_FOLLOWER_WMARKS,
RD_KAFKA_MOCK_CMD_PART_PUSH_LEADER_RESPONSE,
RD_KAFKA_MOCK_CMD_BROKER_SET_UPDOWN,
RD_KAFKA_MOCK_CMD_BROKER_SET_RTT,
RD_KAFKA_MOCK_CMD_BROKER_SET_RACK,
Expand All @@ -580,14 +581,17 @@ struct rd_kafka_op_s {
* PART_SET_FOLLOWER
* PART_SET_FOLLOWER_WMARKS
* BROKER_SET_RACK
* COORD_SET (key_type) */
* COORD_SET (key_type)
* PART_PUSH_LEADER_RESPONSE
*/
char *str; /**< For:
* COORD_SET (key) */
int32_t partition; /**< For:
* PART_SET_FOLLOWER
* PART_SET_FOLLOWER_WMARKS
* PART_SET_LEADER
* APIVERSION_SET (ApiKey)
* PART_PUSH_LEADER_RESPONSE
*/
int32_t broker_id; /**< For:
* PART_SET_FOLLOWER
Expand All @@ -607,6 +611,12 @@ struct rd_kafka_op_s {
* PART_SET_FOLLOWER_WMARKS
* APIVERSION_SET (maxver)
*/
int32_t leader_id; /**< Leader id, for:
* PART_PUSH_LEADER_RESPONSE
*/
int32_t leader_epoch; /**< Leader epoch, for:
* PART_PUSH_LEADER_RESPONSE
*/
} mock;

struct {
Expand Down
Loading