Skip to content

Commit

Permalink
Less strict message.max.bytes check for individual messages (#993)
Browse files Browse the repository at this point in the history
Since the final request size can't be known at produce() time
we allow ProduceRequests larger than message.max.bytes (overshot by at
most one message) and instead rely on the broker enforcing the
MessageSet size.
  • Loading branch information
edenhill committed Oct 14, 2019
1 parent a12b909 commit b1b4d14
Show file tree
Hide file tree
Showing 8 changed files with 95 additions and 49 deletions.
2 changes: 1 addition & 1 deletion CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ builtin.features | * | | gzip, snappy,
client.id | * | | rdkafka | low | Client identifier. <br>*Type: string*
metadata.broker.list | * | | | high | Initial list of brokers as a CSV list of broker host or host:port. The application may also use `rd_kafka_brokers_add()` to add brokers during runtime. <br>*Type: string*
bootstrap.servers | * | | | high | Alias for `metadata.broker.list`: Initial list of brokers as a CSV list of broker host or host:port. The application may also use `rd_kafka_brokers_add()` to add brokers during runtime. <br>*Type: string*
message.max.bytes | * | 1000 .. 1000000000 | 1000000 | medium | Maximum Kafka protocol request message size. <br>*Type: integer*
message.max.bytes | * | 1000 .. 1000000000 | 1000000 | medium | Maximum Kafka protocol request message size. Due to differing framing overhead between protocol versions the producer is unable to reliably enforce a strict max message limit at produce time and may exceed the maximum size by one message in protocol ProduceRequests, the broker will enforce the the topic's `max.message.bytes` limit (see Apache Kafka documentation). <br>*Type: integer*
message.copy.max.bytes | * | 0 .. 1000000000 | 65535 | low | Maximum size for message to be copied to buffer. Messages larger than this will be passed by reference (zero-copy) at the expense of larger iovecs. <br>*Type: integer*
receive.message.max.bytes | * | 1000 .. 2147483647 | 100000000 | medium | Maximum Kafka protocol response message size. This serves as a safety precaution to avoid memory exhaustion in case of protocol hickups. This value must be at least `fetch.max.bytes` + 512 to allow for protocol overhead; the value is adjusted automatically unless the configuration property is explicitly set. <br>*Type: integer*
max.in.flight.requests.per.connection | * | 1 .. 1000000 | 1000000 | low | Maximum number of in-flight requests per broker connection. This is a generic property applied to all broker communication, however it is primarily relevant to produce requests. In particular, note that other mechanisms limit the number of outstanding consumer fetch request per broker to one. <br>*Type: integer*
Expand Down
1 change: 0 additions & 1 deletion src/rdkafka_broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -940,7 +940,6 @@ static void rd_kafka_buf_finalize (rd_kafka_t *rk, rd_kafka_buf_t *rkbuf) {

/* Calculate total request buffer length. */
totsize = rd_buf_len(&rkbuf->rkbuf_buf) - 4;
rd_assert(totsize <= (size_t)rk->rk_conf.max_msg_size);

/* Set up a buffer reader for sending the buffer. */
rd_slice_init_full(&rkbuf->rkbuf_reader, &rkbuf->rkbuf_buf);
Expand Down
11 changes: 8 additions & 3 deletions src/rdkafka_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -271,9 +271,14 @@ static const struct rd_kafka_property rd_kafka_properties[] = {
{ _RK_GLOBAL|_RK_HIGH, "bootstrap.servers", _RK_C_ALIAS, 0,
"See metadata.broker.list",
.sdef = "metadata.broker.list" },
{ _RK_GLOBAL|_RK_MED, "message.max.bytes", _RK_C_INT, _RK(max_msg_size),
"Maximum Kafka protocol request message size.",
1000, 1000000000, 1000000 },
{ _RK_GLOBAL|_RK_MED, "message.max.bytes", _RK_C_INT, _RK(max_msg_size),
"Maximum Kafka protocol request message size. "
"Due to differing framing overhead between protocol versions the "
"producer is unable to reliably enforce a strict max message limit "
"at produce time and may exceed the maximum size by one message in "
"protocol ProduceRequests, the broker will enforce the the topic's "
"`max.message.bytes` limit (see Apache Kafka documentation).",
1000, 1000000000, 1000000 },
{ _RK_GLOBAL, "message.copy.max.bytes", _RK_C_INT,
_RK(msg_copy_max_size),
"Maximum size for message to be copied to buffer. "
Expand Down
16 changes: 8 additions & 8 deletions src/rdkafka_msg.c
Original file line number Diff line number Diff line change
Expand Up @@ -169,14 +169,14 @@ static rd_kafka_msg_t *rd_kafka_msg_new0 (rd_kafka_itopic_t *rkt,
if (hdrs)
hdrs_size = rd_kafka_headers_serialized_size(hdrs);

if (unlikely(len + keylen + hdrs_size >
(size_t)rkt->rkt_rk->rk_conf.max_msg_size ||
keylen > INT32_MAX)) {
*errp = RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE;
if (errnop)
*errnop = EMSGSIZE;
return NULL;
}
if (unlikely(len > INT32_MAX || keylen > INT32_MAX ||
rd_kafka_msg_max_wire_size(keylen, len, hdrs_size) >
(size_t)rkt->rkt_rk->rk_conf.max_msg_size)) {
*errp = RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE;
if (errnop)
*errnop = EMSGSIZE;
return NULL;
}

if (msgflags & RD_KAFKA_MSG_F_BLOCK)
*errp = rd_kafka_curr_msgs_add(
Expand Down
13 changes: 13 additions & 0 deletions src/rdkafka_msg.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,19 @@ size_t rd_kafka_msg_wire_size (const rd_kafka_msg_t *rkm, int MsgVersion) {
}


/**
* @returns the maximum total on-wire message size regardless of MsgVersion.
*
* @remark This does not account for the ProduceRequest, et.al, just the
* per-message overhead.
*/
static RD_INLINE RD_UNUSED
size_t rd_kafka_msg_max_wire_size (size_t keylen, size_t valuelen,
size_t hdrslen) {
return RD_KAFKAP_MESSAGE_V2_OVERHEAD +
keylen + valuelen + hdrslen;
}

/**
* @returns the enveloping rd_kafka_msg_t pointer for a rd_kafka_msg_t
* wrapped rd_kafka_message_t.
Expand Down
16 changes: 12 additions & 4 deletions src/rdkafka_msgset_writer.c
Original file line number Diff line number Diff line change
Expand Up @@ -848,10 +848,19 @@ rd_kafka_msgset_writer_write_msgq (rd_kafka_msgset_writer_t *msetw,
break;
}

/* Check if there is enough space in the current messageset
* to add this message.
* Since calculating the total size of a request at produce()
* time is tricky (we don't know the protocol version or
* MsgVersion that will be used), we allow a messageset to
* overshoot the message.max.bytes limit by one message to
* avoid getting stuck here.
* The actual messageset size is enforced by the broker. */
if (unlikely(msgcnt == msetw->msetw_msgcntmax ||
len + rd_kafka_msg_wire_size(rkm, msetw->
msetw_MsgVersion) >
max_msg_size)) {
(msgcnt > 0 &&
len + rd_kafka_msg_wire_size(rkm, msetw->
msetw_MsgVersion) >
max_msg_size))) {
rd_rkb_dbg(rkb, MSG, "PRODUCE",
"%.*s [%"PRId32"]: "
"No more space in current MessageSet "
Expand Down Expand Up @@ -886,7 +895,6 @@ rd_kafka_msgset_writer_write_msgq (rd_kafka_msgset_writer_t *msetw,
len += rd_kafka_msgset_writer_write_msg(msetw, rkm, msgcnt, 0,
NULL);

rd_dassert(len <= max_msg_size);
msgcnt++;

} while ((rkm = TAILQ_FIRST(&rkmq->rkmq_msgs)));
Expand Down
2 changes: 1 addition & 1 deletion src/rdkafka_zstd.c
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ rd_kafka_zstd_decompress (rd_kafka_broker_t *rkb,
rd_rkb_dbg(rkb, MSG, "ZSTD",
"Unable to decompress ZSTD "
"(input buffer %"PRIusz", output buffer %llu): "
"output would exceed receive.message.max.bytes (%d)",
"output would exceed message.max.bytes (%d)",
inlen, out_bufsize, rkb->rkb_rk->rk_conf.max_msg_size);

return RD_KAFKA_RESP_ERR__BAD_COMPRESSION;
Expand Down
83 changes: 52 additions & 31 deletions tests/0003-msgmaxsize.c
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,22 @@ int main_0003_msgmaxsize (int argc, char **argv) {
rd_kafka_conf_t *conf;
rd_kafka_topic_conf_t *topic_conf;
char errstr[512];
char *msg;
static const int msgsize = 100000;
int msgcnt = 10;

static const struct {
ssize_t keylen;
ssize_t len;
rd_kafka_resp_err_t exp_err;
} sizes[] = {
/* message.max.bytes is including framing */
{ -1, 5000, RD_KAFKA_RESP_ERR_NO_ERROR },
{ 0, 99900, RD_KAFKA_RESP_ERR_NO_ERROR },
{ 0, 100000, RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE },
{ 100000, 0, RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE },
{ 1000, 100000, RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE },
{ 0, 101000, RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE },
{ 99000, -1, RD_KAFKA_RESP_ERR_NO_ERROR },
{ -1, -1, RD_KAFKA_RESP_ERR__END }
};
int i;

test_conf_init(&conf, &topic_conf, 10);
Expand All @@ -97,37 +110,47 @@ int main_0003_msgmaxsize (int argc, char **argv) {
TEST_FAIL("Failed to create topic: %s\n",
rd_strerror(errno));

msg = calloc(1, msgsize);

/* Produce 'msgcnt' messages, size odd ones larger than max.bytes,
* and even ones smaller than max.bytes. */
for (i = 0 ; i < msgcnt ; i++) {
for (i = 0 ; sizes[i].exp_err != RD_KAFKA_RESP_ERR__END ; i++) {
void *value = sizes[i].len != -1 ?
calloc(1, sizes[i].len) : NULL;
size_t len = sizes[i].len != -1 ? sizes[i].len : 0;
void *key = sizes[i].keylen != -1 ?
calloc(1, sizes[i].keylen) : NULL;
size_t keylen = sizes[i].keylen != -1 ? sizes[i].keylen : 0;
int *msgidp = malloc(sizeof(*msgidp));
size_t len;
int toobig = i & 1;
rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;

*msgidp = i;
if (toobig) {
/* Too big */
len = 200000;
} else {
/* Good size */
len = 5000;
msgs_wait |= (1 << i);
}

rd_snprintf(msg, msgsize, "%s test message #%i", argv[0], i);
r = rd_kafka_produce(rkt, partition, RD_KAFKA_MSG_F_COPY,
msg, len, NULL, 0, msgidp);

if (toobig) {
if (r != -1)
TEST_FAIL("Succeeded to produce too "
"large message #%i\n", i);
free(msgidp);
} else if (r == -1)
TEST_FAIL("Failed to produce message #%i: %s\n",
i, rd_strerror(errno));
value, len,
key, keylen,
msgidp);
if (r == -1)
err = rd_kafka_last_error();

if (err != sizes[i].exp_err) {
TEST_FAIL("Msg #%d produce(len=%"PRIdsz
", keylen=%"PRIdsz"): got %s, expected %s",
i,
sizes[i].len,
sizes[i].keylen,
rd_kafka_err2name(err),
rd_kafka_err2name(sizes[i].exp_err));
} else {
TEST_SAY("Msg #%d produce() returned expected %s "
"for value size %"PRIdsz
" and key size %"PRIdsz"\n",
i,
rd_kafka_err2name(err),
sizes[i].len,
sizes[i].keylen);

if (!sizes[i].exp_err)
msgs_wait |= (1 << i);
else
free(msgidp);
}
}

/* Wait for messages to be delivered. */
Expand All @@ -137,8 +160,6 @@ int main_0003_msgmaxsize (int argc, char **argv) {
if (msgs_wait != 0)
TEST_FAIL("Still waiting for messages: 0x%x\n", msgs_wait);

free(msg);

/* Destroy topic */
rd_kafka_topic_destroy(rkt);

Expand Down

0 comments on commit b1b4d14

Please sign in to comment.