Skip to content

Commit

Permalink
Merge pull request #61 from confluentinc/PR50_msg_timestamps
Browse files Browse the repository at this point in the history
PR #50 message timestamps - additional docs, etc
  • Loading branch information
edenhill authored Nov 9, 2016
2 parents 745b6f3 + 11b8a20 commit 312bd1f
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 28 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ Tests

**Run integration tests:**

$ examples/integration_test.py <kafka-broker>
$ examples/integration_test.py <kafka-broker> [<test-topic>]

**WARNING**: These tests require an active Kafka cluster and will make use of a topic named 'test'.

Expand Down
37 changes: 36 additions & 1 deletion confluent_kafka/src/confluent_kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,13 @@ static PyObject *Message_offset (Message *self, PyObject *ignore) {
}


static PyObject *Message_timestamp (Message *self, PyObject *ignore) {
return Py_BuildValue("iL",
self->tstype,
self->timestamp);
}


static PyMethodDef Message_methods[] = {
{ "error", (PyCFunction)Message_error, METH_NOARGS,
" The message object is also used to propagate errors and events, "
Expand Down Expand Up @@ -362,6 +369,28 @@ static PyMethodDef Message_methods[] = {
" :rtype: int or None\n"
"\n"
},
{ "timestamp", (PyCFunction)Message_timestamp, METH_NOARGS,
" Retrieve timestamp type and timestamp from message.\n"
" The timestamp type is one of:\n"
" * :py:const:`TIMESTAMP_NOT_AVAILABLE`"
" - Timestamps not supported by broker\n"
" * :py:const:`TIMESTAMP_CREATE_TIME` "
" - Message creation time (or source / producer time)\n"
" * :py:const:`TIMESTAMP_LOG_APPEND_TIME` "
" - Broker receive time\n"
"\n"
" The returned timestamp should be ignored if the timestamp type is "
":py:const:`TIMESTAMP_NOT_AVAILABLE`.\n"
"\n"
" The timestamp is the number of milliseconds since the epoch (UTC).\n"
"\n"
" Timestamps require broker version 0.10.0.0 or later and \n"
" ``{'api.version.request': True}`` configured on the client.\n"
"\n"
" :returns: tuple of message timestamp type, and timestamp.\n"
" :rtype: (int, int)\n"
"\n"
},
{ NULL }
};

Expand Down Expand Up @@ -441,7 +470,7 @@ PyTypeObject MessageType = {
"An application must check with :py:func:`error()` to see if the "
"object is a proper message (error() returns None) or an "
"error/event.\n"
"\n"
"\n"
"This class is not user-instantiable.\n"
"\n"
".. py:function:: len()\n"
Expand Down Expand Up @@ -495,6 +524,8 @@ PyObject *Message_new0 (const rd_kafka_message_t *rkm) {
self->partition = rkm->partition;
self->offset = rkm->offset;

self->timestamp = rd_kafka_message_timestamp(rkm, &self->tstype);

return (PyObject *)self;
}

Expand Down Expand Up @@ -1484,6 +1515,10 @@ static PyObject *_init_cimpl (void) {
Py_INCREF(KafkaException);
PyModule_AddObject(m, "KafkaException", KafkaException);

PyModule_AddIntConstant(m, "TIMESTAMP_NOT_AVAILABLE", RD_KAFKA_TIMESTAMP_NOT_AVAILABLE);
PyModule_AddIntConstant(m, "TIMESTAMP_CREATE_TIME", RD_KAFKA_TIMESTAMP_CREATE_TIME);
PyModule_AddIntConstant(m, "TIMESTAMP_LOG_APPEND_TIME", RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME);

return m;
}

Expand Down
2 changes: 2 additions & 0 deletions confluent_kafka/src/confluent_kafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,8 @@ typedef struct {
PyObject *error;
int32_t partition;
int64_t offset;
int64_t timestamp;
rd_kafka_timestamp_type_t tstype;
} Message;

extern PyTypeObject MessageType;
Expand Down
60 changes: 38 additions & 22 deletions examples/integration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,16 @@
with_progress = False

# Kafka bootstrap server(s)
bootstrap_servers = 'localhost'
bootstrap_servers = None

# Topic to use
topic = 'test'

# API version requests are only implemented in Kafka broker >=0.10
# but the client handles failed API version requests gracefully for older
# versions as well, except for 0.9.0.x which will stall for about 10s
# on each connect with this set to True.
api_version_request = True

# global variable to be set by stats_cb call back function
good_stats_cb_result = False
Expand Down Expand Up @@ -83,29 +91,30 @@ def verify_producer():
# Producer config
conf = {'bootstrap.servers': bootstrap_servers,
'error_cb': error_cb,
'api.version.request': api_version_request,
'default.topic.config':{'produce.offset.report': True}}

# Create producer
p = confluent_kafka.Producer(**conf)
print('producer at %s' % p)

# Produce some messages
p.produce('test', 'Hello Python!')
p.produce('test', key='Just a key')
p.produce('test', partition=1, value='Strictly for partition 1',
p.produce(topic, 'Hello Python!')
p.produce(topic, key='Just a key')
p.produce(topic, partition=1, value='Strictly for partition 1',
key='mykey')

# Produce more messages, now with delivery report callbacks in various forms.
mydr = MyTestDr()
p.produce('test', value='This one has a dr callback',
p.produce(topic, value='This one has a dr callback',
callback=mydr.delivery)
p.produce('test', value='This one has a lambda',
p.produce(topic, value='This one has a lambda',
callback=lambda err, msg: MyTestDr._delivery(err, msg))
p.produce('test', value='This one has neither')
p.produce(topic, value='This one has neither')

# Produce even more messages
for i in range(0, 10):
p.produce('test', value='Message #%d' % i, key=str(i),
p.produce(topic, value='Message #%d' % i, key=str(i),
callback=mydr.delivery)
p.poll(0)

Expand All @@ -119,11 +128,11 @@ def verify_producer():
def verify_producer_performance(with_dr_cb=True):
""" Time how long it takes to produce and delivery X messages """
conf = {'bootstrap.servers': bootstrap_servers,
'api.version.request': api_version_request,
'error_cb': error_cb}

p = confluent_kafka.Producer(**conf)

topic = 'test'
msgcnt = 1000000
msgsize = 100
msg_pattern = 'test.py performance'
Expand All @@ -144,9 +153,9 @@ def verify_producer_performance(with_dr_cb=True):
for i in range(0, msgcnt):
try:
if with_dr_cb:
p.produce('test', value=msg_payload, callback=dr.delivery)
p.produce(topic, value=msg_payload, callback=dr.delivery)
else:
p.produce('test', value=msg_payload)
p.produce(topic, value=msg_payload)
except BufferError as e:
# Local queue is full (slow broker connection?)
msgs_backpressure += 1
Expand Down Expand Up @@ -213,6 +222,7 @@ def verify_consumer():
'group.id': 'test.py',
'session.timeout.ms': 6000,
'enable.auto.commit': False,
'api.version.request': api_version_request,
'on_commit': print_commit_result,
'error_cb': error_cb,
'default.topic.config': {
Expand All @@ -223,7 +233,7 @@ def verify_consumer():
c = confluent_kafka.Consumer(**conf)

# Subscribe to a list of topics
c.subscribe(["test"])
c.subscribe([topic])

max_msgcnt = 100
msgcnt = 0
Expand All @@ -245,10 +255,10 @@ def verify_consumer():
print('Consumer error: %s: ignoring' % msg.error())
break

if False:
print('%s[%d]@%d: key=%s, value=%s' % \
(msg.topic(), msg.partition(), msg.offset(),
msg.key(), msg.value()))
tstype, timestamp = msg.timestamp()
print('%s[%d]@%d: key=%s, value=%s, tstype=%d, timestamp=%s' % \
(msg.topic(), msg.partition(), msg.offset(),
msg.key(), msg.value(), tstype, timestamp))

if (msg.offset() % 5) == 0:
# Async commit
Expand All @@ -268,7 +278,7 @@ def verify_consumer():

# Start a new client and get the committed offsets
c = confluent_kafka.Consumer(**conf)
offsets = c.committed(list(map(lambda p: confluent_kafka.TopicPartition("test", p), range(0,3))))
offsets = c.committed(list(map(lambda p: confluent_kafka.TopicPartition(topic, p), range(0,3))))
for tp in offsets:
print(tp)

Expand Down Expand Up @@ -302,7 +312,7 @@ def my_on_revoke (consumer, partitions):
print(' %s [%d] @ %d' % (p.topic, p.partition, p.offset))
consumer.unassign()

c.subscribe(["test"], on_assign=my_on_assign, on_revoke=my_on_revoke)
c.subscribe([topic], on_assign=my_on_assign, on_revoke=my_on_revoke)

max_msgcnt = 1000000
bytecnt = 0
Expand Down Expand Up @@ -362,10 +372,11 @@ def verify_stats_cb():
def stats_cb(stats_json_str):
global good_stats_cb_result
stats_json = json.loads(stats_json_str)
if 'test' in stats_json['topics']:
app_offset = stats_json['topics']['test']['partitions']['0']['app_offset']
if topic in stats_json['topics']:
app_offset = stats_json['topics'][topic]['partitions']['0']['app_offset']
if app_offset > 0:
print("# app_offset stats for topic test partition 0: %d" % app_offset)
print("# app_offset stats for topic %s partition 0: %d" % \
(topic, app_offset))
good_stats_cb_result = True

conf = {'bootstrap.servers': bootstrap_servers,
Expand All @@ -379,7 +390,7 @@ def stats_cb(stats_json_str):
}}

c = confluent_kafka.Consumer(**conf)
c.subscribe(["test"])
c.subscribe([topic])

max_msgcnt = 1000000
bytecnt = 0
Expand Down Expand Up @@ -437,6 +448,11 @@ def stats_cb(stats_json_str):

if len(sys.argv) > 1:
bootstrap_servers = sys.argv[1]
if len(sys.argv) > 2:
topic = sys.argv[2]
else:
print('Usage: %s <broker> [<topic>]' % sys.argv[0])
sys.exit(1)

print('Using confluent_kafka module version %s (0x%x)' % confluent_kafka.version())
print('Using librdkafka version %s (0x%x)' % confluent_kafka.libversion())
Expand Down
5 changes: 4 additions & 1 deletion tests/test_Consumer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python

from confluent_kafka import Consumer, TopicPartition, KafkaError, KafkaException
from confluent_kafka import Consumer, TopicPartition, KafkaError, KafkaException, TIMESTAMP_NOT_AVAILABLE


def test_basic_api():
Expand Down Expand Up @@ -36,6 +36,9 @@ def dummy_assign_revoke (consumer, partitions):
else:
print('OK: consumed message')

if msg is not None:
assert msg.timestamp() == (TIMESTAMP_NOT_AVAILABLE, -1)

partitions = list(map(lambda p: TopicPartition("test", p), range(0,100,3)))
kc.assign(partitions)

Expand Down
6 changes: 3 additions & 3 deletions tests/test_docs.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ def test_verify_docs():
fails += 1
elif not re.search(r':', d):
print('Missing Doxygen tag for: %s (type %s)' % (n, type(o)))
if not isinstance(o, ModuleType):
# Ignore missing doc strings for the cimpl module itself and
# integer constants (which can't have a doc string)
if n != 'cimpl' and type(o) not in [int]:
fails += 1

assert fails == 0


6 changes: 6 additions & 0 deletions tests/test_enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,9 @@ def test_enums():
KafkaError class without an instantiated object. """
print(confluent_kafka.KafkaError._NO_OFFSET)
print(confluent_kafka.KafkaError.REBALANCE_IN_PROGRESS)

def test_tstype_enums():
""" Make sure librdkafka tstype enums are available. """
assert confluent_kafka.TIMESTAMP_NOT_AVAILABLE == 0
assert confluent_kafka.TIMESTAMP_CREATE_TIME == 1
assert confluent_kafka.TIMESTAMP_LOG_APPEND_TIME == 2

0 comments on commit 312bd1f

Please sign in to comment.