-
Notifications
You must be signed in to change notification settings - Fork 903
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add timestamp() to librdkafka messages #50
Conversation
It looks like @qix hasn't signed our Contributor License Agreement, yet. Appreciation of efforts, clabot |
CLA signed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems reasonable. One nit, and of course the timestamp type is still missing. @edenhill do you want to take a look?
@@ -326,6 +326,14 @@ static PyObject *Message_offset (Message *self, PyObject *ignore) { | |||
} | |||
|
|||
|
|||
static PyObject *Message_timestamp (Message *self, PyObject *ignore) { | |||
if (self->timestamp > 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be >=
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should depend on the tstype, not the value of the timestamp.
E.g., even a seemingly invalid timestamp with a proper tstype is valid (because it might mean something in the future)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also needs tests:
- add to tests/test_Consumer.py (these are broker-less unit tests so just check that the API works and returns tstype none)
- add to integration tests in examples/integration_test.py (this should check that the returned timestamp actually makes sense (which requires a recent enough broker..), e.g. tstype is append and timestamp is reasonably within range to time.time()
@@ -362,6 +370,11 @@ static PyMethodDef Message_methods[] = { | |||
" :rtype: int or None\n" | |||
"\n" | |||
}, | |||
{ "timestamp", (PyCFunction)Message_timestamp, METH_NOARGS, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It needs to return the timestamp type as well (none, append, create).
Im not sure whether it should be some class level constants or strings, probably the former. @ewencp?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add the timestamp type as a class level constant, e.g.:
TIMESTAMP_NOT_AVAILABLE, TIMESTAMP_CREATE_TIME, TIMESTAMP_APPEND_TIME
Map it directly to the librdkafka counterparts.
@@ -495,6 +508,12 @@ PyObject *Message_new0 (const rd_kafka_message_t *rkm) { | |||
self->partition = rkm->partition; | |||
self->offset = rkm->offset; | |||
|
|||
rd_kafka_timestamp_type_t tstype; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Put variable declarations at the beginning of the scope to be C89 safe
@@ -326,6 +326,14 @@ static PyObject *Message_offset (Message *self, PyObject *ignore) { | |||
} | |||
|
|||
|
|||
static PyObject *Message_timestamp (Message *self, PyObject *ignore) { | |||
if (self->timestamp > 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should depend on the tstype, not the value of the timestamp.
E.g., even a seemingly invalid timestamp with a proper tstype is valid (because it might mean something in the future)
Sorry about the delay, this was blocked on a librdkafka bug: confluentinc/librdkafka#858
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you, almost there.
Some remaining actions:
- Change timestamp() to return a tstype, timestamp tuple.
- Define tstype constants
- Add unit-tests
- Improve documentation
@@ -327,13 +327,18 @@ static PyObject *Message_offset (Message *self, PyObject *ignore) { | |||
|
|||
|
|||
static PyObject *Message_timestamp (Message *self, PyObject *ignore) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be better to have a single timestamp() method that returns a tuple: tstype, timestamp
@@ -362,6 +370,11 @@ static PyMethodDef Message_methods[] = { | |||
" :rtype: int or None\n" | |||
"\n" | |||
}, | |||
{ "timestamp", (PyCFunction)Message_timestamp, METH_NOARGS, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add the timestamp type as a class level constant, e.g.:
TIMESTAMP_NOT_AVAILABLE, TIMESTAMP_CREATE_TIME, TIMESTAMP_APPEND_TIME
Map it directly to the librdkafka counterparts.
@@ -362,6 +370,11 @@ static PyMethodDef Message_methods[] = { | |||
" :rtype: int or None\n" | |||
"\n" | |||
}, | |||
{ "timestamp", (PyCFunction)Message_timestamp, METH_NOARGS, | |||
" :returns: message timestamp or None if not available.\n" | |||
" :rtype: int or None\n" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should probably return a local timestamp in Python format (e.g float), and the doc string should be updated to reflect what is being returned.
The timestamp() method should also return the tstype (tstype,timestamp) tuple
Hi @edenhill, I've made most of the changes you requested, but don't have too much time to work on the very specifics especially if there's going to be a lot of back and forth. Feel free to make the remaining changes yourself if you want. Some thoughts/issues:
|
p.s. We released https://github.com/smyte/kafka_store today which uses a fork based on my original commit. As soon as we get this merged in and a new release is out I'll remove the fork and move it back over. |
Thanks a lot for your effort! |
PR #50 message timestamps - additional docs, etc
updated readme
tstype
still needs to be exposed somehow, not sure of the best way to do that.The whitespace in
confluent_kafka.c
is a mix of tabs/spaces as well as spaces in blank lines which my editor automatically removed.