Skip to content

Commit

Permalink
Added logging parameter for Avro(Consumer|Producer).
Browse files Browse the repository at this point in the history
The patch introduces **kwargs and just passes them to the underlying
super calls.

Tests are included.

Closes: #698
  • Loading branch information
venthur committed Mar 9, 2021
1 parent bd4baf5 commit cfb9794
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 4 deletions.
8 changes: 4 additions & 4 deletions src/confluent_kafka/avro/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class AvroProducer(Producer):
"""

def __init__(self, config, default_key_schema=None,
default_value_schema=None, schema_registry=None):
default_value_schema=None, schema_registry=None, **kwargs):

sr_conf = {key.replace("schema.registry.", ""): value
for key, value in config.items() if key.startswith("schema.registry")}
Expand All @@ -64,7 +64,7 @@ def __init__(self, config, default_key_schema=None,
elif sr_conf.get("url", None) is not None:
raise ValueError("Cannot pass schema_registry along with schema.registry.url config")

super(AvroProducer, self).__init__(ap_conf)
super(AvroProducer, self).__init__(ap_conf, **kwargs)
self._serializer = MessageSerializer(schema_registry)
self._key_schema = default_key_schema
self._value_schema = default_value_schema
Expand Down Expand Up @@ -123,7 +123,7 @@ class AvroConsumer(Consumer):
:raises ValueError: For invalid configurations
"""

def __init__(self, config, schema_registry=None, reader_key_schema=None, reader_value_schema=None):
def __init__(self, config, schema_registry=None, reader_key_schema=None, reader_value_schema=None, **kwargs):

sr_conf = {key.replace("schema.registry.", ""): value
for key, value in config.items() if key.startswith("schema.registry")}
Expand All @@ -142,7 +142,7 @@ def __init__(self, config, schema_registry=None, reader_key_schema=None, reader_
elif sr_conf.get("url", None) is not None:
raise ValueError("Cannot pass schema_registry along with schema.registry.url config")

super(AvroConsumer, self).__init__(ap_conf)
super(AvroConsumer, self).__init__(ap_conf, **kwargs)
self._serializer = MessageSerializer(schema_registry, reader_key_schema, reader_value_schema)

def poll(self, timeout=None):
Expand Down
39 changes: 39 additions & 0 deletions tests/test_log.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#!/usr/bin/env python

import confluent_kafka
import confluent_kafka.avro
import logging


Expand Down Expand Up @@ -34,6 +35,26 @@ def test_logging_consumer():
kc.close()


def test_logging_avro_consumer():
""" Tests that logging works """

logger = logging.getLogger('avroconsumer')
logger.setLevel(logging.DEBUG)
f = CountingFilter('avroconsumer')
logger.addFilter(f)

kc = confluent_kafka.avro.AvroConsumer({'schema.registry.url': 'http://example.com',
'group.id': 'test',
'debug': 'all'},
logger=logger)
while f.cnt == 0:
kc.poll(timeout=0.5)

print('%s: %d log messages seen' % (f.name, f.cnt))

kc.close()


def test_logging_producer():
""" Tests that logging works """

Expand All @@ -50,6 +71,24 @@ def test_logging_producer():
print('%s: %d log messages seen' % (f.name, f.cnt))


def test_logging_avro_producer():
""" Tests that logging works """

logger = logging.getLogger('avroproducer')
logger.setLevel(logging.DEBUG)
f = CountingFilter('avroproducer')
logger.addFilter(f)

p = confluent_kafka.avro.AvroProducer({'schema.registry.url': 'http://example.com',
'debug': 'all'},
logger=logger)

while f.cnt == 0:
p.poll(timeout=0.5)

print('%s: %d log messages seen' % (f.name, f.cnt))


def test_logging_constructor():
""" Verify different forms of constructors """

Expand Down

0 comments on commit cfb9794

Please sign in to comment.