diff --git a/rdkafka_broker.c b/rdkafka_broker.c index faea7de6d7..95bc84b73d 100644 --- a/rdkafka_broker.c +++ b/rdkafka_broker.c @@ -1512,6 +1512,48 @@ void rd_kafka_dr_msgq (rd_kafka_t *rk, } + +/** + * Parses a Produce reply. + * Returns 0 on success or an error code on failure. + */ +static rd_kafka_resp_err_t +rd_kafka_produce_reply_handle (rd_kafka_broker_t *rkb, rd_kafka_buf_t *rkbuf) { + char *buf = rkbuf->rkbuf_buf2; + size_t size = rkbuf->rkbuf_len; + size_t of = 0; + int32_t TopicArrayCnt; + rd_kafkap_str_t *topic; + int32_t PartitionArrayCnt; + struct { + int32_t Partition; + int16_t ErrorCode; + int64_t Offset; + } RD_PACKED *hdr; + + _READ_I32(&TopicArrayCnt); + if (TopicArrayCnt != 1) + goto err; + + /* Since we only produce to one single topic+partition in each + * request we assume that the reply only contains one topic+partition + * and that it is the same that we requested. + * If not the broker is buggy. */ + _READ_STR(topic); + _READ_I32(&PartitionArrayCnt); + + if (PartitionArrayCnt != 1) + goto err; + + _READ_REF(hdr, sizeof(*hdr)); + + return ntohs(hdr->ErrorCode); + +err: + return RD_KAFKA_RESP_ERR__BAD_MSG; +} + + /** * Locality: io thread */ @@ -1526,6 +1568,11 @@ static void rd_kafka_produce_msgset_reply (rd_kafka_broker_t *rkb, "MessageSet with %i message(s) %sdelivered", request->rkbuf_msgq.rkmq_msg_cnt, err ? "not ": ""); + /* Parse Produce reply (unless the request errored) */ + if (!err) + err = rd_kafka_produce_reply_handle(rkb, reply); + + if (err) { rd_rkb_dbg(rkb, MSG, "MSGSET", "MessageSet with %i message(s) " "encountered error: %s", @@ -1535,6 +1582,10 @@ static void rd_kafka_produce_msgset_reply (rd_kafka_broker_t *rkb, switch (err) { case RD_KAFKA_RESP_ERR__DESTROY: + case RD_KAFKA_RESP_ERR_INVALID_MSG: + case RD_KAFKA_RESP_ERR_INVALID_MSG_SIZE: + case RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE: + /* Fatal errors: no message transmission retries */ break; case RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT: