Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
1. Convert metadata back to binary.
2. Add a check to make it has no null bytes.
3. Fix tests
4. Changelog
  • Loading branch information
milindl committed Aug 23, 2022
1 parent f484f14 commit fb703ac
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 20 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Confluent's Python client for Apache Kafka


## v1.10.0
## vNext

- Add metadata to TopicPartition type and commit() (#1410).

Expand Down
64 changes: 48 additions & 16 deletions src/confluent_kafka/src/confluent_kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -817,24 +817,25 @@ static int TopicPartition_clear (TopicPartition *self) {
self->error = NULL;
}
if (self->metadata) {
free(self->metadata);
Py_DECREF(self->metadata);
self->metadata = NULL;
}
return 0;
}

static void TopicPartition_setup (TopicPartition *self, const char *topic,
int partition, long long offset,
const char *metadata,
const char *metadata, size_t metadata_size,
rd_kafka_resp_err_t err) {
self->topic = strdup(topic);
self->partition = partition;
self->offset = offset;

if (metadata != NULL) {
self->metadata = strdup(metadata);
if (metadata != NULL && metadata_size > 0) {
self->metadata =
cfl_PyBin(_FromStringAndSize(metadata, metadata_size));
} else {
self->metadata = NULL;
self->metadata = NULL;
}

self->error = KafkaError_new_or_None(err, NULL);
Expand All @@ -855,23 +856,41 @@ static int TopicPartition_init (PyObject *self, PyObject *args,
const char *topic;
int partition = RD_KAFKA_PARTITION_UA;
long long offset = RD_KAFKA_OFFSET_INVALID;
const char *metadata = NULL;
Py_buffer metadata;
metadata.len = 0;

static char *kws[] = { "topic",
"partition",
"offset",
"metadata",
NULL };

if (!PyArg_ParseTupleAndKeywords(args, kwargs, "s|iLs", kws,
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "s|iLy*", kws,
&topic, &partition, &offset,
&metadata)) {
return -1;

}

// To maintain compability with librdkafka, TopicPartition's metadata
// is `bytes`, but we expect it to be a utf-8 encoded string with no
// null characters embedded, as expected by the kafka protocol.
for (int i = 0; i < metadata.len; i++) {
if (((char*)metadata.buf)[i] == 0) {
PyErr_SetString(PyExc_ValueError,
"embedded null character");
PyBuffer_Release(&metadata);
return -1;
}
}

TopicPartition_setup((TopicPartition *)self,
topic, partition, offset, metadata, 0);
topic, partition, offset,
metadata.buf, metadata.len, 0);

if (metadata.len > 0) {
PyBuffer_Release(&metadata);
}

return 0;
}

Expand Down Expand Up @@ -906,9 +925,9 @@ static PyMemberDef TopicPartition_members[] = {
" :py:const:`OFFSET_STORED`,"
" :py:const:`OFFSET_INVALID`\n"
},
{"metadata", T_STRING, offsetof(TopicPartition, metadata), READONLY,
"attribute metadata: Optional application metadata committed with the "
"offset (string)"},
{"metadata", T_OBJECT, offsetof(TopicPartition, metadata), READONLY,
"attribute metadata: Optional utf-8 encoded application metadata committed "
"with the offset (bytes)"},
{ "error", T_OBJECT, offsetof(TopicPartition, error), READONLY,
":attribute error: Indicates an error (with :py:class:`KafkaError`) unless None." },
{ NULL }
Expand Down Expand Up @@ -1059,14 +1078,15 @@ PyTypeObject TopicPartitionType = {
*/
static PyObject *TopicPartition_new0 (const char *topic, int partition,
long long offset, const char *metadata,
rd_kafka_resp_err_t err) {
size_t metadata_size, rd_kafka_resp_err_t err) {
TopicPartition *self;

self = (TopicPartition *)TopicPartitionType.tp_new(
&TopicPartitionType, NULL, NULL);

TopicPartition_setup(self, topic, partition,
offset, metadata, err);
offset, metadata,
metadata_size, err);

return (PyObject *)self;
}
Expand All @@ -1092,6 +1112,7 @@ PyObject *c_parts_to_py (const rd_kafka_topic_partition_list_t *c_parts) {
rktpar->topic, rktpar->partition,
rktpar->offset,
rktpar->metadata,
rktpar->metadata_size,
rktpar->err));
}

Expand Down Expand Up @@ -1135,8 +1156,19 @@ rd_kafka_topic_partition_list_t *py_to_c_parts (PyObject *plist) {
tp->partition);
rktpar->offset = tp->offset;
if (tp->metadata != NULL) {
rktpar->metadata_size = strlen(tp->metadata) + 1;
rktpar->metadata = strdup(tp->metadata);
char* metadata = NULL;
Py_ssize_t metadata_size = 0;

if (cfl_PyBin(_AsStringAndSize(
tp->metadata, &metadata, &metadata_size)) == -1) {
// We don't need to set an error ourselves if
// this call fails as it's already set.
rd_kafka_topic_partition_list_destroy(c_parts);
return NULL;
}

rktpar->metadata_size = metadata_size;
rktpar->metadata = strndup(metadata, metadata_size);
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/confluent_kafka/src/confluent_kafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ typedef struct {
char *topic;
int partition;
int64_t offset;
char *metadata;
PyObject *metadata;
PyObject *error;
} TopicPartition;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@


def commit_and_check(consumer, topic, metadata):
consumer.commit(offsets=[TopicPartition(topic, 0, 1, metadata)], asynchronous=False)
b_metadata = str.encode(metadata, 'utf8')
consumer.commit(offsets=[TopicPartition(topic, 0, 1, b_metadata)], asynchronous=False)
offsets = consumer.committed([TopicPartition(topic, 0)], timeout=100)
assert len(offsets) == 1
assert offsets[0].metadata == metadata
assert offsets[0].metadata == b_metadata


def test_consumer_topicpartition_metadata(kafka_cluster):
Expand Down

0 comments on commit fb703ac

Please sign in to comment.