Skip to content

Commit

Permalink
Handle per topic+partition produce errors (issue #40)
Browse files Browse the repository at this point in the history
Produce errors were previously only handled if the entire request failed.
  • Loading branch information
edenhill committed Dec 27, 2013
1 parent 20e1b93 commit a52f585
Showing 1 changed file with 51 additions and 0 deletions.
51 changes: 51 additions & 0 deletions rdkafka_broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -1512,6 +1512,48 @@ void rd_kafka_dr_msgq (rd_kafka_t *rk,
}



/**
* Parses a Produce reply.
* Returns 0 on success or an error code on failure.
*/
static rd_kafka_resp_err_t
rd_kafka_produce_reply_handle (rd_kafka_broker_t *rkb, rd_kafka_buf_t *rkbuf) {
char *buf = rkbuf->rkbuf_buf2;
size_t size = rkbuf->rkbuf_len;
size_t of = 0;
int32_t TopicArrayCnt;
rd_kafkap_str_t *topic;
int32_t PartitionArrayCnt;
struct {
int32_t Partition;
int16_t ErrorCode;
int64_t Offset;
} RD_PACKED *hdr;

_READ_I32(&TopicArrayCnt);
if (TopicArrayCnt != 1)
goto err;

/* Since we only produce to one single topic+partition in each
* request we assume that the reply only contains one topic+partition
* and that it is the same that we requested.
* If not the broker is buggy. */
_READ_STR(topic);
_READ_I32(&PartitionArrayCnt);

if (PartitionArrayCnt != 1)
goto err;

_READ_REF(hdr, sizeof(*hdr));

return ntohs(hdr->ErrorCode);

err:
return RD_KAFKA_RESP_ERR__BAD_MSG;
}


/**
* Locality: io thread
*/
Expand All @@ -1526,6 +1568,11 @@ static void rd_kafka_produce_msgset_reply (rd_kafka_broker_t *rkb,
"MessageSet with %i message(s) %sdelivered",
request->rkbuf_msgq.rkmq_msg_cnt, err ? "not ": "");

/* Parse Produce reply (unless the request errored) */
if (!err)
err = rd_kafka_produce_reply_handle(rkb, reply);


if (err) {
rd_rkb_dbg(rkb, MSG, "MSGSET", "MessageSet with %i message(s) "
"encountered error: %s",
Expand All @@ -1535,6 +1582,10 @@ static void rd_kafka_produce_msgset_reply (rd_kafka_broker_t *rkb,
switch (err)
{
case RD_KAFKA_RESP_ERR__DESTROY:
case RD_KAFKA_RESP_ERR_INVALID_MSG:
case RD_KAFKA_RESP_ERR_INVALID_MSG_SIZE:
case RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE:
/* Fatal errors: no message transmission retries */
break;

case RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT:
Expand Down

0 comments on commit a52f585

Please sign in to comment.