Skip to content

Commit

Permalink
Store offset commit metadata when calling rd_kafka_offsets_store (#…
Browse files Browse the repository at this point in the history
…4171)

store metadata when committing stored offset, in C and C++

---------
Co-authored-by: Mathis <mathis.pesch@datadoghq.com>
  • Loading branch information
emasab authored May 16, 2023
1 parent bc933a0 commit 68455af
Show file tree
Hide file tree
Showing 14 changed files with 257 additions and 20 deletions.
5 changes: 3 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
# librdkafka v2.1.2
# librdkafka v2.2.0

librdkafka v2.1.2 is a maintenance release:
librdkafka v2.2.0 is a feature release:

* Store offset commit metadata in `rd_kafka_offsets_store` (@mathispesch, #4084).
* Fix a bug that happens when skipping tags, causing buffer underflow in
MetadataResponse (#4278).

Expand Down
10 changes: 10 additions & 0 deletions src-cpp/HandleImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,12 @@ rd_kafka_topic_partition_list_t *partitions_to_c_parts(
rd_kafka_topic_partition_t *rktpar = rd_kafka_topic_partition_list_add(
c_parts, tpi->topic_.c_str(), tpi->partition_);
rktpar->offset = tpi->offset_;
if (tpi->metadata_.size()) {
void *metadata_p = mem_malloc(tpi->metadata_.size());
memcpy(metadata_p, tpi->metadata_.data(), tpi->metadata_.size());
rktpar->metadata = metadata_p;
rktpar->metadata_size = tpi->metadata_.size();
}
if (tpi->leader_epoch_ != -1)
rd_kafka_topic_partition_set_leader_epoch(rktpar, tpi->leader_epoch_);
}
Expand All @@ -417,6 +423,10 @@ void update_partitions_from_c_parts(
pp->offset_ = p->offset;
pp->err_ = static_cast<RdKafka::ErrorCode>(p->err);
pp->leader_epoch_ = rd_kafka_topic_partition_get_leader_epoch(p);
if (p->metadata_size) {
unsigned char *metadata = (unsigned char *)p->metadata;
pp->metadata_.assign(metadata, metadata + p->metadata_size);
}
}
}
}
Expand Down
6 changes: 6 additions & 0 deletions src-cpp/rdkafkacpp.h
Original file line number Diff line number Diff line change
Expand Up @@ -1986,6 +1986,12 @@ class RD_EXPORT TopicPartition {

/** @brief Set partition leader epoch. */
virtual void set_leader_epoch(int32_t leader_epoch) = 0;

/** @brief Get partition metadata. */
virtual std::vector<unsigned char> get_metadata() = 0;

/** @brief Set partition metadata. */
virtual void set_metadata(std::vector<unsigned char> &metadata) = 0;
};


Expand Down
14 changes: 13 additions & 1 deletion src-cpp/rdkafkacpp_int.h
Original file line number Diff line number Diff line change
Expand Up @@ -1260,7 +1260,10 @@ class TopicPartitionImpl : public TopicPartition {
offset_ = c_part->offset;
err_ = static_cast<ErrorCode>(c_part->err);
leader_epoch_ = rd_kafka_topic_partition_get_leader_epoch(c_part);
// FIXME: metadata
if (c_part->metadata_size > 0) {
unsigned char *metadata = (unsigned char *)c_part->metadata;
metadata_.assign(metadata, metadata + c_part->metadata_size);
}
}

static void destroy(std::vector<TopicPartition *> &partitions);
Expand Down Expand Up @@ -1292,6 +1295,14 @@ class TopicPartitionImpl : public TopicPartition {
leader_epoch_ = leader_epoch;
}

std::vector<unsigned char> get_metadata() {
return metadata_;
}

void set_metadata(std::vector<unsigned char> &metadata) {
metadata_ = metadata;
}

std::ostream &operator<<(std::ostream &ostrm) const {
return ostrm << topic_ << " [" << partition_ << "]";
}
Expand All @@ -1301,6 +1312,7 @@ class TopicPartitionImpl : public TopicPartition {
int64_t offset_;
ErrorCode err_;
int32_t leader_epoch_;
std::vector<unsigned char> metadata_;
};


Expand Down
10 changes: 6 additions & 4 deletions src/rdkafka_assignment.c
Original file line number Diff line number Diff line change
Expand Up @@ -341,15 +341,17 @@ static int rd_kafka_assignment_serve_removals(rd_kafka_t *rk) {
* so it will be committed below. */
rd_kafka_topic_partition_set_from_fetch_pos(
rktpar, rktp->rktp_stored_pos);
rd_kafka_topic_partition_set_metadata_from_rktp_stored(rktpar,
rktp);
valid_offsets += !RD_KAFKA_OFFSET_IS_LOGICAL(rktpar->offset);

/* Reset the stored offset to invalid so that
* a manual offset-less commit() or the auto-committer
* will not commit a stored offset from a previous
* assignment (issue #2782). */
rd_kafka_offset_store0(
rktp, RD_KAFKA_FETCH_POS(RD_KAFKA_OFFSET_INVALID, -1),
rd_true, RD_DONT_LOCK);
rktp, RD_KAFKA_FETCH_POS(RD_KAFKA_OFFSET_INVALID, -1), NULL,
0, rd_true, RD_DONT_LOCK);

/* Partition is no longer desired */
rd_kafka_toppar_desired_del(rktp);
Expand Down Expand Up @@ -745,8 +747,8 @@ rd_kafka_assignment_add(rd_kafka_t *rk,
/* Reset the stored offset to INVALID to avoid the race
* condition described in rdkafka_offset.h */
rd_kafka_offset_store0(
rktp, RD_KAFKA_FETCH_POS(RD_KAFKA_OFFSET_INVALID, -1),
rd_true /* force */, RD_DONT_LOCK);
rktp, RD_KAFKA_FETCH_POS(RD_KAFKA_OFFSET_INVALID, -1), NULL,
0, rd_true /* force */, RD_DONT_LOCK);

rd_kafka_toppar_unlock(rktp);
}
Expand Down
17 changes: 10 additions & 7 deletions src/rdkafka_offset.c
Original file line number Diff line number Diff line change
Expand Up @@ -557,8 +557,10 @@ rd_kafka_offset_broker_commit(rd_kafka_toppar_t *rktp, const char *reason) {
offsets = rd_kafka_topic_partition_list_new(1);
rktpar = rd_kafka_topic_partition_list_add(
offsets, rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition);

rd_kafka_topic_partition_set_from_fetch_pos(rktpar,
rktp->rktp_committing_pos);
rd_kafka_topic_partition_set_metadata_from_rktp_stored(rktpar, rktp);

rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSETCMT",
"%.*s [%" PRId32 "]: committing %s: %s",
Expand Down Expand Up @@ -654,8 +656,8 @@ rd_kafka_resp_err_t rd_kafka_offset_store(rd_kafka_topic_t *app_rkt,
}
rd_kafka_topic_rdunlock(rkt);

err = rd_kafka_offset_store0(rktp, pos, rd_false /* Don't force */,
RD_DO_LOCK);
err = rd_kafka_offset_store0(rktp, pos, NULL, 0,
rd_false /* Don't force */, RD_DO_LOCK);

rd_kafka_toppar_destroy(rktp);

Expand Down Expand Up @@ -691,7 +693,8 @@ rd_kafka_offsets_store(rd_kafka_t *rk,
rd_kafka_topic_partition_get_leader_epoch(rktpar);

rktpar->err = rd_kafka_offset_store0(
rktp, pos, rd_false /* don't force */, RD_DO_LOCK);
rktp, pos, rktpar->metadata, rktpar->metadata_size,
rd_false /* don't force */, RD_DO_LOCK);
rd_kafka_toppar_destroy(rktp);

if (rktpar->err)
Expand Down Expand Up @@ -725,8 +728,8 @@ rd_kafka_error_t *rd_kafka_offset_store_message(rd_kafka_message_t *rkmessage) {

pos = RD_KAFKA_FETCH_POS(rkmessage->offset + 1,
rkm->rkm_u.consumer.leader_epoch);
err = rd_kafka_offset_store0(rktp, pos, rd_false /* Don't force */,
RD_DO_LOCK);
err = rd_kafka_offset_store0(rktp, pos, NULL, 0,
rd_false /* Don't force */, RD_DO_LOCK);

if (err == RD_KAFKA_RESP_ERR__STATE)
return rd_kafka_error_new(err, "Partition is not assigned");
Expand Down Expand Up @@ -1440,7 +1443,7 @@ rd_kafka_resp_err_t rd_kafka_offset_store_stop(rd_kafka_toppar_t *rktp) {
rktp,
RD_KAFKA_FETCH_POS(rktp->rktp_offsets_fin.eof_offset,
rktp->rktp_leader_epoch),
rd_true /* force */, RD_DONT_LOCK);
NULL, 0, rd_true /* force */, RD_DONT_LOCK);

/* Commit offset to backing store.
* This might be an async operation. */
Expand Down Expand Up @@ -1538,7 +1541,7 @@ void rd_kafka_update_app_pos(rd_kafka_t *rk,

rktp->rktp_app_pos = pos;
if (rk->rk_conf.enable_auto_offset_store)
rd_kafka_offset_store0(rktp, pos,
rd_kafka_offset_store0(rktp, pos, NULL, 0,
/* force: ignore assignment state */
rd_true, RD_DONT_LOCK);

Expand Down
16 changes: 15 additions & 1 deletion src/rdkafka_offset.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ const char *rd_kafka_offset2str(int64_t offset);
*
* @param pos Offset and leader epoch to set, may be an absolute offset
* or .._INVALID.
* @param metadata Metadata to be set (optional).
* @param metadata_size Size of the metadata to be set.
* @param force Forcibly set \p offset regardless of assignment state.
* @param do_lock Whether to lock the \p rktp or not (already locked by caller).
*
Expand All @@ -84,6 +86,8 @@ const char *rd_kafka_offset2str(int64_t offset);
static RD_INLINE RD_UNUSED rd_kafka_resp_err_t
rd_kafka_offset_store0(rd_kafka_toppar_t *rktp,
const rd_kafka_fetch_pos_t pos,
void *metadata,
size_t metadata_size,
rd_bool_t force,
rd_dolock_t do_lock) {
rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;
Expand All @@ -96,7 +100,17 @@ rd_kafka_offset_store0(rd_kafka_toppar_t *rktp,
!rd_kafka_is_simple_consumer(rktp->rktp_rkt->rkt_rk))) {
err = RD_KAFKA_RESP_ERR__STATE;
} else {
rktp->rktp_stored_pos = pos;
if (rktp->rktp_stored_metadata) {
rd_free(rktp->rktp_stored_metadata);
rktp->rktp_stored_metadata = NULL;
}
rktp->rktp_stored_pos = pos;
rktp->rktp_stored_metadata_size = metadata_size;
if (metadata) {
rktp->rktp_stored_metadata = rd_malloc(metadata_size);
memcpy(rktp->rktp_stored_metadata, metadata,
rktp->rktp_stored_metadata_size);
}
}

if (do_lock)
Expand Down
18 changes: 18 additions & 0 deletions src/rdkafka_partition.c
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@ void rd_kafka_toppar_destroy_final(rd_kafka_toppar_t *rktp) {

rd_refcnt_destroy(&rktp->rktp_refcnt);

rd_free(rktp->rktp_stored_metadata);
rd_free(rktp);
}

Expand Down Expand Up @@ -2697,6 +2698,21 @@ void rd_kafka_topic_partition_set_from_fetch_pos(
fetchpos.leader_epoch);
}

/**
* @brief Set partition metadata from rktp stored one.
*/
void rd_kafka_topic_partition_set_metadata_from_rktp_stored(
rd_kafka_topic_partition_t *rktpar,
const rd_kafka_toppar_t *rktp) {
rktpar->metadata_size = rktp->rktp_stored_metadata_size;
if (rktp->rktp_stored_metadata) {
rktpar->metadata = rd_malloc(rktp->rktp_stored_metadata_size);
memcpy(rktpar->metadata, rktp->rktp_stored_metadata,
rktpar->metadata_size);
}
}


/**
* @brief Destroy all partitions in list.
*
Expand Down Expand Up @@ -3212,6 +3228,8 @@ int rd_kafka_topic_partition_list_set_offsets(
verb = "setting stored";
rd_kafka_topic_partition_set_from_fetch_pos(
rktpar, rktp->rktp_stored_pos);
rd_kafka_topic_partition_set_metadata_from_rktp_stored(
rktpar, rktp);
} else {
rktpar->offset = RD_KAFKA_OFFSET_INVALID;
}
Expand Down
9 changes: 9 additions & 0 deletions src/rdkafka_partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,11 @@ struct rd_kafka_toppar_s { /* rd_kafka_toppar_t */
/** Last stored offset, but maybe not yet committed. */
rd_kafka_fetch_pos_t rktp_stored_pos;

/* Last stored metadata, but
* maybe not committed yet. */
void *rktp_stored_metadata;
size_t rktp_stored_metadata_size;

/** Offset currently being committed */
rd_kafka_fetch_pos_t rktp_committing_pos;

Expand Down Expand Up @@ -892,6 +897,10 @@ void rd_kafka_topic_partition_set_from_fetch_pos(
rd_kafka_topic_partition_t *rktpar,
const rd_kafka_fetch_pos_t fetchpos);

void rd_kafka_topic_partition_set_metadata_from_rktp_stored(
rd_kafka_topic_partition_t *rktpar,
const rd_kafka_toppar_t *rktp);

static RD_UNUSED rd_kafka_fetch_pos_t rd_kafka_topic_partition_get_fetch_pos(
const rd_kafka_topic_partition_t *rktpar) {
rd_kafka_fetch_pos_t fetchpos = {
Expand Down
60 changes: 55 additions & 5 deletions tests/0130-store_offsets.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@


/**
* Verify that offsets_store() is not allowed for unassigned partitions,
* and that those offsets are not committed.
* Verify that offsets_store() commits the right offsets and metadata,
* and is not allowed for unassigned partitions.
*/
static void do_test_store_unassigned(void) {
const char *topic = test_mk_topic_name("0130_store_unassigned", 1);
Expand All @@ -40,6 +40,7 @@ static void do_test_store_unassigned(void) {
rd_kafka_topic_partition_list_t *parts;
rd_kafka_resp_err_t err;
rd_kafka_message_t *rkmessage;
char metadata[] = "metadata";
const int64_t proper_offset = 900, bad_offset = 300;

SUB_TEST_QUICK();
Expand All @@ -60,8 +61,13 @@ static void do_test_store_unassigned(void) {
TEST_SAY("Consume one message\n");
test_consumer_poll_once(c, NULL, tmout_multip(3000));

parts->elems[0].offset = proper_offset;
TEST_SAY("Storing offset %" PRId64 " while assigned: should succeed\n",
parts->elems[0].offset = proper_offset;
parts->elems[0].metadata_size = sizeof metadata;
parts->elems[0].metadata = malloc(parts->elems[0].metadata_size);
memcpy(parts->elems[0].metadata, metadata,
parts->elems[0].metadata_size);
TEST_SAY("Storing offset %" PRId64
" with metadata while assigned: should succeed\n",
parts->elems[0].offset);
TEST_CALL_ERR__(rd_kafka_offsets_store(c, parts));

Expand All @@ -71,7 +77,10 @@ static void do_test_store_unassigned(void) {
TEST_SAY("Unassigning partitions and trying to store again\n");
TEST_CALL_ERR__(rd_kafka_assign(c, NULL));

parts->elems[0].offset = bad_offset;
parts->elems[0].offset = bad_offset;
parts->elems[0].metadata_size = 0;
rd_free(parts->elems[0].metadata);
parts->elems[0].metadata = NULL;
TEST_SAY("Storing offset %" PRId64 " while unassigned: should fail\n",
parts->elems[0].offset);
err = rd_kafka_offsets_store(c, parts);
Expand Down Expand Up @@ -108,9 +117,50 @@ static void do_test_store_unassigned(void) {
"offset %" PRId64 ", not %" PRId64,
proper_offset, rkmessage->offset);

TEST_SAY(
"Retrieving committed offsets to verify committed offset "
"metadata\n");
rd_kafka_topic_partition_list_t *committed_toppar;
committed_toppar = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(committed_toppar, topic, 0);
TEST_CALL_ERR__(
rd_kafka_committed(c, committed_toppar, tmout_multip(3000)));
TEST_ASSERT(committed_toppar->elems[0].offset == proper_offset,
"Expected committed offset to be %" PRId64 ", not %" PRId64,
proper_offset, committed_toppar->elems[0].offset);
TEST_ASSERT(committed_toppar->elems[0].metadata != NULL,
"Expected metadata to not be NULL");
TEST_ASSERT(strcmp(committed_toppar->elems[0].metadata, metadata) == 0,
"Expected metadata to be %s, not %s", metadata,
(char *)committed_toppar->elems[0].metadata);

TEST_SAY("Storing next offset without metadata\n");
parts->elems[0].offset = proper_offset + 1;
TEST_CALL_ERR__(rd_kafka_offsets_store(c, parts));

TEST_SAY("Committing\n");
TEST_CALL_ERR__(rd_kafka_commit(c, NULL, rd_false /*sync*/));

TEST_SAY(
"Retrieving committed offset to verify empty committed offset "
"metadata\n");
rd_kafka_topic_partition_list_t *committed_toppar_empty;
committed_toppar_empty = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(committed_toppar_empty, topic, 0);
TEST_CALL_ERR__(
rd_kafka_committed(c, committed_toppar_empty, tmout_multip(3000)));
TEST_ASSERT(committed_toppar_empty->elems[0].offset ==
proper_offset + 1,
"Expected committed offset to be %" PRId64 ", not %" PRId64,
proper_offset, committed_toppar_empty->elems[0].offset);
TEST_ASSERT(committed_toppar_empty->elems[0].metadata == NULL,
"Expected metadata to be NULL");

rd_kafka_message_destroy(rkmessage);

rd_kafka_topic_partition_list_destroy(parts);
rd_kafka_topic_partition_list_destroy(committed_toppar);
rd_kafka_topic_partition_list_destroy(committed_toppar_empty);

rd_kafka_consumer_close(c);
rd_kafka_destroy(c);
Expand Down
Loading

0 comments on commit 68455af

Please sign in to comment.