diff --git a/CHANGELOG.md b/CHANGELOG.md index f750b4b3b7..e5b56bb5a6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -62,6 +62,13 @@ librdkafka v1.8.0 is a security release: from the broker. +### Admin fixes + + * `DeleteRecords()` could crash if one of the underlying requests + (for a given partition leader) failed at the transport level (e.g., timeout). + (#3476). + + # librdkafka v1.7.0 diff --git a/src/rdkafka_admin.c b/src/rdkafka_admin.c index ded369ead3..a6591b77fd 100644 --- a/src/rdkafka_admin.c +++ b/src/rdkafka_admin.c @@ -320,7 +320,10 @@ rd_kafka_admin_fanout_worker (rd_kafka_t *rk, rd_kafka_q_t *rkq, */ /** - * @brief Create a new admin_result op based on the request op \p rko_req + * @brief Create a new admin_result op based on the request op \p rko_req. + * + * @remark This moves the rko_req's admin_request.args list from \p rko_req + * to the returned rko. The \p rko_req args will be emptied. */ static rd_kafka_op_t *rd_kafka_admin_result_new (rd_kafka_op_t *rko_req) { rd_kafka_op_t *rko_result; @@ -361,6 +364,12 @@ static rd_kafka_op_t *rd_kafka_admin_result_new (rd_kafka_op_t *rko_req) { rd_kafka_confval_get_ptr(&rko_req->rko_u.admin_request. options.opaque); + /* Move request arguments (list) from request to result. + * This is mainly so that partial_response() knows what arguments + * were provided to the response's request it is merging. */ + rd_list_move(&rko_result->rko_u.admin_result.args, + &rko_req->rko_u.admin_request.args); + rko_result->rko_evtype = rko_req->rko_u.admin_request.reply_event_type; return rko_result; @@ -1712,7 +1721,7 @@ rd_kafka_CreateTopicsResponse_parse (rd_kafka_op_t *rko_req, * in the same order as they were requested. The broker * does not maintain ordering unfortunately. */ skel.topic = terr->topic; - orig_pos = rd_list_index(&rko_req->rko_u.admin_request.args, + orig_pos = rd_list_index(&rko_result->rko_u.admin_result.args, &skel, rd_kafka_NewTopic_cmp); if (orig_pos == -1) { rd_kafka_topic_result_destroy(terr); @@ -1930,7 +1939,7 @@ rd_kafka_DeleteTopicsResponse_parse (rd_kafka_op_t *rko_req, * in the same order as they were requested. The broker * does not maintain ordering unfortunately. */ skel.topic = terr->topic; - orig_pos = rd_list_index(&rko_req->rko_u.admin_request.args, + orig_pos = rd_list_index(&rko_result->rko_u.admin_result.args, &skel, rd_kafka_DeleteTopic_cmp); if (orig_pos == -1) { rd_kafka_topic_result_destroy(terr); @@ -2224,7 +2233,7 @@ rd_kafka_CreatePartitionsResponse_parse (rd_kafka_op_t *rko_req, * in the same order as they were requested. The broker * does not maintain ordering unfortunately. */ skel.topic = terr->topic; - orig_pos = rd_list_index(&rko_req->rko_u.admin_request.args, + orig_pos = rd_list_index(&rko_result->rko_u.admin_result.args, &skel, rd_kafka_NewPartitions_cmp); if (orig_pos == -1) { rd_kafka_topic_result_destroy(terr); @@ -2796,7 +2805,7 @@ rd_kafka_AlterConfigsResponse_parse (rd_kafka_op_t *rko_req, * does not maintain ordering unfortunately. */ skel.restype = config->restype; skel.name = config->name; - orig_pos = rd_list_index(&rko_req->rko_u.admin_request.args, + orig_pos = rd_list_index(&rko_result->rko_u.admin_result.args, &skel, rd_kafka_ConfigResource_cmp); if (orig_pos == -1) { rd_kafka_ConfigResource_destroy(config); @@ -3101,7 +3110,7 @@ rd_kafka_DescribeConfigsResponse_parse (rd_kafka_op_t *rko_req, * does not maintain ordering unfortunately. */ skel.restype = config->restype; skel.name = config->name; - orig_pos = rd_list_index(&rko_req->rko_u.admin_request.args, + orig_pos = rd_list_index(&rko_result->rko_u.admin_result.args, &skel, rd_kafka_ConfigResource_cmp); if (orig_pos == -1) rd_kafka_buf_parse_fail( @@ -3257,13 +3266,40 @@ rd_kafka_DeleteRecords_response_merge (rd_kafka_op_t *rko_fanout, rd_assert(rko_partial->rko_evtype == RD_KAFKA_EVENT_DELETERECORDS_RESULT); - /* Partitions from the DeleteRecordsResponse */ - partitions = rd_list_elem(&rko_partial->rko_u.admin_result.results, 0); - - /* Partitions (offsets) from the DeleteRecords() call */ + /* All partitions (offsets) from the DeleteRecords() call */ respartitions = rd_list_elem(&rko_fanout->rko_u.admin_request. fanout.results, 0); + if (rko_partial->rko_err) { + /* If there was a request-level error, set the error on + * all requested partitions for this request. */ + const rd_kafka_topic_partition_list_t *reqpartitions; + rd_kafka_topic_partition_t *reqpartition; + + /* Partitions (offsets) from this DeleteRecordsRequest */ + reqpartitions = rd_list_elem(&rko_partial->rko_u. + admin_result.args, 0); + + RD_KAFKA_TPLIST_FOREACH(reqpartition, reqpartitions) { + rd_kafka_topic_partition_t *respart; + + /* Find result partition */ + respart = rd_kafka_topic_partition_list_find( + respartitions, + reqpartition->topic, + reqpartition->partition); + + rd_assert(respart || !*"respart not found"); + + respart->err = rko_partial->rko_err; + } + + return; + } + + /* Partitions from the DeleteRecordsResponse */ + partitions = rd_list_elem(&rko_partial->rko_u.admin_result.results, 0); + RD_KAFKA_TPLIST_FOREACH(partition, partitions) { rd_kafka_topic_partition_t *respart; @@ -3898,8 +3934,7 @@ rd_kafka_OffsetDeleteResponse_parse (rd_kafka_op_t *rko_req, rd_kafka_op_t *rko_result; int16_t ErrorCode; rd_kafka_topic_partition_list_t *partitions = NULL; - const rd_kafka_DeleteConsumerGroupOffsets_t *del_grpoffsets = - rd_list_elem(&rko_req->rko_u.admin_request.args, 0); + const rd_kafka_DeleteConsumerGroupOffsets_t *del_grpoffsets; rd_kafka_buf_read_i16(reply, &ErrorCode); if (ErrorCode) { @@ -3924,6 +3959,8 @@ rd_kafka_OffsetDeleteResponse_parse (rd_kafka_op_t *rko_req, /* Create result op and group_result_t */ rko_result = rd_kafka_admin_result_new(rko_req); + del_grpoffsets = rd_list_elem(&rko_result->rko_u.admin_result.args, 0); + rd_list_init(&rko_result->rko_u.admin_result.results, 1, rd_kafka_group_result_free); rd_list_add(&rko_result->rko_u.admin_result.results, diff --git a/src/rdkafka_op.h b/src/rdkafka_op.h index 5a0e6f4cca..8a0ee0d289 100644 --- a/src/rdkafka_op.h +++ b/src/rdkafka_op.h @@ -480,6 +480,12 @@ struct rd_kafka_op_s { rd_kafka_op_type_t reqtype; /**< Request op type, * used for logging. */ + rd_list_t args; /**< Args moved from the request op + * when the result op is created. + * + * Type depends on request. + */ + char *errstr; /**< Error string, if rko_err * is set, else NULL. */ diff --git a/src/rdlist.c b/src/rdlist.c index 2b8aecaf9b..5ac224a149 100644 --- a/src/rdlist.c +++ b/src/rdlist.c @@ -492,6 +492,22 @@ void *rd_list_copy_preallocated (const void *elem, void *opaque) { } + +void rd_list_move (rd_list_t *dst, rd_list_t *src) { + rd_list_init_copy(dst, src); + + if (src->rl_flags & RD_LIST_F_FIXED_SIZE) { + rd_list_copy_preallocated0(dst, src); + } else { + memcpy(dst->rl_elems, src->rl_elems, + src->rl_cnt * sizeof(*src->rl_elems)); + dst->rl_cnt = src->rl_cnt; + } + + src->rl_cnt = 0; +} + + /** * @name Misc helpers for common list types * @{ diff --git a/src/rdlist.h b/src/rdlist.h index 7e5469568e..b7bfa4276a 100644 --- a/src/rdlist.h +++ b/src/rdlist.h @@ -358,6 +358,15 @@ void *rd_list_string_copy (const void *elem, void *opaque) { +/** + * @brief Move elements from \p src to \p dst. + * + * @remark \p dst will be initialized first. + * @remark \p src will be emptied. + */ +void rd_list_move (rd_list_t *dst, rd_list_t *src); + + /** * @name Misc helpers for common list types * @{