Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KIP-320: Allow fetchers to detect #1540

Merged
merged 12 commits into from
Apr 5, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .semaphore/semaphore.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ agent:
global_job_config:
env_vars:
- name: LIBRDKAFKA_VERSION
value: v2.0.2
value: v2.1.0-RC1
prologue:
commands:
- checkout
Expand Down
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
env:
global:
- LIBRDKAFKA_VERSION=v2.0.2
- LIBRDKAFKA_SRC_VERSION=v2.0.2
- LIBRDKAFKA_VERSION=v2.1.0-RC1
- LIBRDKAFKA_SRC_VERSION=v2.1.0-RC1

jobs:
include:
Expand Down
7 changes: 6 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
# Confluent's Python client for Apache Kafka

## vNext
## v2.1.0

- Added `set_sasl_credentials`. This new method (on the Producer, Consumer, and AdminClient) allows modifying the stored
SASL PLAIN/SCRAM credentials that will be used for subsequent (new) connections to a broker (#1511).
- Wheels for Linux / arm64 (#1496).
- Added support for Default num_partitions in CreateTopics Admin API.
- Added support for password protected private key in CachedSchemaRegistryClient.
- Add reference support in Schema Registry client. (@RickTalken, #1304)
- KIP-320: Add offset leader epoch methods to the TopicPartition and Messageclasses (#1540).

confluent-kafka-python is based on librdkafka 2.1.0, see the
[librdkafka v2.1.0 release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.1.0)
and later ones for a complete list of changes, enhancements, fixes and upgrade considerations.

## v2.0.2

Expand Down
2 changes: 1 addition & 1 deletion examples/docker/Dockerfile.alpine
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ FROM alpine:3.12

COPY . /usr/src/confluent-kafka-python

ENV LIBRDKAFKA_VERSION v2.0.2
ENV LIBRDKAFKA_VERSION v2.1.0-RC1
ENV KAFKACAT_VERSION master


Expand Down
51 changes: 33 additions & 18 deletions src/confluent_kafka/src/Consumer.c
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,7 @@ static PyObject *Consumer_commit (Handle *self, PyObject *args,
} else if (msg) {
Message *m;
PyObject *uo8;
rd_kafka_topic_partition_t *rktpar;

if (PyObject_Type((PyObject *)msg) !=
(PyObject *)&MessageType) {
Expand All @@ -497,9 +498,12 @@ static PyObject *Consumer_commit (Handle *self, PyObject *args,
m = (Message *)msg;

c_offsets = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(
c_offsets, cfl_PyUnistr_AsUTF8(m->topic, &uo8),
m->partition)->offset =m->offset + 1;
rktpar = rd_kafka_topic_partition_list_add(
c_offsets, cfl_PyUnistr_AsUTF8(m->topic, &uo8),
m->partition);
rktpar->offset =m->offset + 1;
rd_kafka_topic_partition_set_leader_epoch(rktpar,
m->leader_epoch);
Py_XDECREF(uo8);

} else {
Expand Down Expand Up @@ -612,6 +616,7 @@ static PyObject *Consumer_store_offsets (Handle *self, PyObject *args,
} else {
Message *m;
PyObject *uo8;
rd_kafka_topic_partition_t *rktpar;

if (PyObject_Type((PyObject *)msg) !=
(PyObject *)&MessageType) {
Expand All @@ -623,9 +628,12 @@ static PyObject *Consumer_store_offsets (Handle *self, PyObject *args,
m = (Message *)msg;

c_offsets = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(
rktpar = rd_kafka_topic_partition_list_add(
c_offsets, cfl_PyUnistr_AsUTF8(m->topic, &uo8),
m->partition)->offset = m->offset + 1;
m->partition);
rktpar->offset = m->offset + 1;
rd_kafka_topic_partition_set_leader_epoch(rktpar,
m->leader_epoch);
Py_XDECREF(uo8);
}

Expand Down Expand Up @@ -783,9 +791,11 @@ static PyObject *Consumer_resume (Handle *self, PyObject *args,
static PyObject *Consumer_seek (Handle *self, PyObject *args, PyObject *kwargs) {

TopicPartition *tp;
rd_kafka_resp_err_t err;
rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;
static char *kws[] = { "partition", NULL };
rd_kafka_topic_t *rkt;
rd_kafka_topic_partition_list_t *seek_partitions;
rd_kafka_topic_partition_t *rktpar;
rd_kafka_error_t *error;

if (!self->rk) {
PyErr_SetString(PyExc_RuntimeError, "Consumer closed");
Expand All @@ -803,21 +813,26 @@ static PyObject *Consumer_seek (Handle *self, PyObject *args, PyObject *kwargs)
return NULL;
}

rkt = rd_kafka_topic_new(self->rk, tp->topic, NULL);
if (!rkt) {
cfl_PyErr_Format(rd_kafka_last_error(),
"Failed to get topic object for "
"topic \"%s\": %s",
tp->topic,
rd_kafka_err2str(rd_kafka_last_error()));
return NULL;
}
seek_partitions = rd_kafka_topic_partition_list_new(1);
rktpar = rd_kafka_topic_partition_list_add(seek_partitions,
tp->topic, tp->partition);
rktpar->offset = tp->offset;
rd_kafka_topic_partition_set_leader_epoch(rktpar, tp->leader_epoch);

Py_BEGIN_ALLOW_THREADS;
err = rd_kafka_seek(rkt, tp->partition, tp->offset, -1);
error = rd_kafka_seek_partitions(self->rk, seek_partitions, -1);
Py_END_ALLOW_THREADS;

rd_kafka_topic_destroy(rkt);
if (error) {
err = rd_kafka_error_code(error);
rd_kafka_error_destroy(error);
}

if (!err && seek_partitions->elems[0].err) {
err = seek_partitions->elems[0].err;
}

rd_kafka_topic_partition_list_destroy(seek_partitions);

if (err) {
cfl_PyErr_Format(err,
Expand Down
88 changes: 68 additions & 20 deletions src/confluent_kafka/src/confluent_kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,13 @@ static PyObject *Message_offset (Message *self, PyObject *ignore) {
Py_RETURN_NONE;
}

static PyObject *Message_leader_epoch (Message *self, PyObject *ignore) {
if (self->leader_epoch >= 0)
return cfl_PyInt_FromInt(self->leader_epoch);
else
Py_RETURN_NONE;
}


static PyObject *Message_timestamp (Message *self, PyObject *ignore) {
return Py_BuildValue("iL",
Expand Down Expand Up @@ -571,6 +578,11 @@ static PyMethodDef Message_methods[] = {
" :rtype: int or None\n"
"\n"
},
{ "leader_epoch", (PyCFunction)Message_leader_epoch, METH_NOARGS,
" :returns: message offset leader epoch or None if not available.\n"
" :rtype: int or None\n"
"\n"
},
{ "timestamp", (PyCFunction)Message_timestamp, METH_NOARGS,
"Retrieve timestamp type and timestamp from message.\n"
"The timestamp type is one of:\n\n"
Expand Down Expand Up @@ -784,6 +796,7 @@ PyObject *Message_new0 (const Handle *handle, const rd_kafka_message_t *rkm) {

self->partition = rkm->partition;
self->offset = rkm->offset;
self->leader_epoch = rd_kafka_message_leader_epoch(rkm);

self->timestamp = rd_kafka_message_timestamp(rkm, &self->tstype);

Expand Down Expand Up @@ -825,12 +838,17 @@ static int TopicPartition_clear (TopicPartition *self) {

static void TopicPartition_setup (TopicPartition *self, const char *topic,
int partition, long long offset,
int32_t leader_epoch,
const char *metadata,
rd_kafka_resp_err_t err) {
self->topic = strdup(topic);
self->partition = partition;
self->offset = offset;

if (leader_epoch < 0)
leader_epoch = -1;
self->leader_epoch = leader_epoch;

if (metadata != NULL) {
self->metadata = strdup(metadata);
} else {
Expand All @@ -854,23 +872,27 @@ static int TopicPartition_init (PyObject *self, PyObject *args,
PyObject *kwargs) {
const char *topic;
int partition = RD_KAFKA_PARTITION_UA;
int32_t leader_epoch = -1;
long long offset = RD_KAFKA_OFFSET_INVALID;
const char *metadata = NULL;

static char *kws[] = { "topic",
"partition",
"offset",
"metadata",
"leader_epoch",
NULL };

if (!PyArg_ParseTupleAndKeywords(args, kwargs, "s|iLs", kws,
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "s|iLsi", kws,
&topic, &partition, &offset,
&metadata)) {
&metadata,
&leader_epoch)) {
return -1;
}

TopicPartition_setup((TopicPartition *)self,
topic, partition, offset, metadata, 0);
topic, partition, offset,
leader_epoch, metadata, 0);
return 0;
}

Expand All @@ -890,6 +912,13 @@ static int TopicPartition_traverse (TopicPartition *self,
return 0;
}

static PyObject *TopicPartition_get_leader_epoch (TopicPartition *tp, void *closure) {
if (tp->leader_epoch >= 0) {
return cfl_PyInt_FromInt(tp->leader_epoch);
}
Py_RETURN_NONE;
}


static PyMemberDef TopicPartition_members[] = {
{ "topic", T_STRING, offsetof(TopicPartition, topic), READONLY,
Expand All @@ -913,15 +942,28 @@ static PyMemberDef TopicPartition_members[] = {
{ NULL }
};

static PyGetSetDef TopicPartition_getter_and_setters[] = {
{ "leader_epoch", TopicPartition_get_leader_epoch,
NULL,
":attribute leader_epoch: Offset leader epoch (int), or None"},
};


static PyObject *TopicPartition_str0 (TopicPartition *self) {
PyObject *errstr = NULL;
PyObject *errstr8 = NULL;
const char *c_errstr = NULL;
PyObject *ret;
char offset_str[40];
char leader_epoch_str[40];

snprintf(offset_str, sizeof(offset_str), "%"CFL_PRId64"", self->offset);
if (self->leader_epoch >= 0)
snprintf(leader_epoch_str, sizeof(leader_epoch_str),
"%"CFL_PRId32"", self->leader_epoch);
else
snprintf(leader_epoch_str, sizeof(leader_epoch_str),
"None");

if (self->error != Py_None) {
errstr = cfl_PyObject_Unistr(self->error);
Expand All @@ -930,9 +972,10 @@ static PyObject *TopicPartition_str0 (TopicPartition *self) {

ret = cfl_PyUnistr(
_FromFormat("TopicPartition{topic=%s,partition=%"CFL_PRId32
",offset=%s,error=%s}",
",offset=%s,leader_epoch=%s,error=%s}",
self->topic, self->partition,
offset_str,
leader_epoch_str,
c_errstr ? c_errstr : "None"));
Py_XDECREF(errstr8);
Py_XDECREF(errstr);
Expand Down Expand Up @@ -1037,35 +1080,37 @@ PyTypeObject TopicPartitionType = {
(traverseproc)TopicPartition_traverse, /* tp_traverse */
(inquiry)TopicPartition_clear, /* tp_clear */
(richcmpfunc)TopicPartition_richcompare, /* tp_richcompare */
0, /* tp_weaklistoffset */
0, /* tp_iter */
0, /* tp_iternext */
0, /* tp_methods */
TopicPartition_members,/* tp_members */
0, /* tp_getset */
0, /* tp_base */
0, /* tp_dict */
0, /* tp_descr_get */
0, /* tp_descr_set */
0, /* tp_dictoffset */
TopicPartition_init, /* tp_init */
0, /* tp_alloc */
TopicPartition_new /* tp_new */
0, /* tp_weaklistoffset */
0, /* tp_iter */
0, /* tp_iternext */
0, /* tp_methods */
TopicPartition_members, /* tp_members */
TopicPartition_getter_and_setters, /* tp_getset */
0, /* tp_base */
0, /* tp_dict */
0, /* tp_descr_get */
0, /* tp_descr_set */
0, /* tp_dictoffset */
TopicPartition_init, /* tp_init */
0, /* tp_alloc */
TopicPartition_new /* tp_new */
};

/**
* @brief Internal factory to create a TopicPartition object.
*/
static PyObject *TopicPartition_new0 (const char *topic, int partition,
long long offset, const char *metadata,
long long offset, int32_t leader_epoch,
const char *metadata,
rd_kafka_resp_err_t err) {
TopicPartition *self;

self = (TopicPartition *)TopicPartitionType.tp_new(
&TopicPartitionType, NULL, NULL);

TopicPartition_setup(self, topic, partition,
offset, metadata, err);
offset, leader_epoch,
metadata, err);

return (PyObject *)self;
}
Expand All @@ -1090,6 +1135,7 @@ PyObject *c_parts_to_py (const rd_kafka_topic_partition_list_t *c_parts) {
TopicPartition_new0(
rktpar->topic, rktpar->partition,
rktpar->offset,
rd_kafka_topic_partition_get_leader_epoch(rktpar),
rktpar->metadata,
rktpar->err));
}
Expand Down Expand Up @@ -1133,6 +1179,8 @@ rd_kafka_topic_partition_list_t *py_to_c_parts (PyObject *plist) {
tp->topic,
tp->partition);
rktpar->offset = tp->offset;
rd_kafka_topic_partition_set_leader_epoch(rktpar,
tp->leader_epoch);
if (tp->metadata != NULL) {
rktpar->metadata_size = strlen(tp->metadata) + 1;
rktpar->metadata = strdup(tp->metadata);
Expand Down
Loading