Skip to content

Commit

Permalink
Fix timestamp extraction for compressed messages
Browse files Browse the repository at this point in the history
  • Loading branch information
Yunjing Xu committed Oct 26, 2016
1 parent 702d402 commit 5576b5a
Showing 1 changed file with 24 additions and 12 deletions.
36 changes: 24 additions & 12 deletions src/rdkafka_broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -3471,6 +3471,17 @@ static char *rd_kafka_snappy_java_decompress (rd_kafka_broker_t *rkb,
#endif


/**
* Message header
*/
struct msg_hdr {
int64_t Offset;
int32_t MessageSize;
uint32_t Crc;
int8_t MagicByte; /* MsgVersion */
int8_t Attributes;
int64_t Timestamp;
};

/**
* Parses a MessageSet and enqueues internal ops on the local
Expand All @@ -3480,6 +3491,7 @@ static rd_kafka_resp_err_t
rd_kafka_messageset_handle (rd_kafka_broker_t *rkb,
rd_kafka_toppar_t *rktp,
rd_kafka_q_t *rkq,
struct msg_hdr *blk_hdr,
struct rd_kafka_toppar_ver *tver,
int16_t ApiVersion,
rd_kafka_buf_t *rkbuf_orig,
Expand All @@ -3502,14 +3514,6 @@ rd_kafka_messageset_handle (rd_kafka_broker_t *rkb,
rktp->rktp_partition);

while (rd_kafka_buf_remain(rkbuf) > 0) {
struct {
int64_t Offset;
int32_t MessageSize;
uint32_t Crc;
int8_t MagicByte; /* MsgVersion */
int8_t Attributes;
int64_t Timestamp;
} hdr;
rd_kafkap_bytes_t Key;
rd_kafkap_bytes_t Value;
int32_t Value_len;
Expand All @@ -3520,6 +3524,7 @@ rd_kafka_messageset_handle (rd_kafka_broker_t *rkb,
int relative_offsets;
rd_kafka_resp_err_t err RD_UNUSED = RD_KAFKA_RESP_ERR_NO_ERROR;

struct msg_hdr hdr;
rd_kafka_buf_read_i64(rkbuf, &hdr.Offset);
rd_kafka_buf_read_i32(rkbuf, &hdr.MessageSize);
rd_kafka_buf_read_i32(rkbuf, &hdr.Crc);
Expand Down Expand Up @@ -3601,8 +3606,15 @@ rd_kafka_messageset_handle (rd_kafka_broker_t *rkb,
rd_kafka_buf_keep(rkbuf_orig);

if (hdr.MagicByte >= 1 && hdr.Timestamp) {
rkm->rkm_timestamp = hdr.Timestamp;
if (hdr.Attributes & RD_KAFKA_MSG_ATTR_LOG_APPEND_TIME)
int8_t attributes;
if (blk_hdr && hdr.Timestamp == -1) {
rkm->rkm_timestamp = blk_hdr->Timestamp;
attributes = blk_hdr->Attributes;
} else {
rkm->rkm_timestamp = hdr.Timestamp;
attributes = hdr.Attributes;
}
if (attributes & RD_KAFKA_MSG_ATTR_LOG_APPEND_TIME)
rkm->rkm_tstype = RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME;
else
rkm->rkm_tstype = RD_KAFKA_TIMESTAMP_CREATE_TIME;
Expand Down Expand Up @@ -3787,7 +3799,7 @@ rd_kafka_messageset_handle (rd_kafka_broker_t *rkb,
/* Now parse the contained Messages */
rd_kafka_messageset_handle(rkb, rktp,
relative_offsets ?
&relq : rkq, tver,
&relq : rkq, &hdr, tver,
ApiVersion,
rkbufz, outbuf, outlen);

Expand Down Expand Up @@ -4076,7 +4088,7 @@ rd_kafka_fetch_reply_handle (rd_kafka_broker_t *rkb,

/* Parse and handle the message set */
err2 = rd_kafka_messageset_handle(
rkb, rktp, &tmp_opq, tver,
rkb, rktp, &tmp_opq, NULL, tver,
request->rkbuf_reqhdr.ApiVersion,
rkbuf, rkbuf->rkbuf_rbuf+rkbuf->rkbuf_of,
hdr.MessageSetSize);
Expand Down

0 comments on commit 5576b5a

Please sign in to comment.