From 2ebf8744c31c34eb819395da1c13f134a1008a4a Mon Sep 17 00:00:00 2001 From: Bastian Venthur Date: Mon, 21 Oct 2019 11:06:59 +0200 Subject: [PATCH] Added logging parameter for Avro(Consumer|Producer). The patch introduces **kwargs and just passes them to the underlying super calls. Tests are included. Closes: #698 --- src/confluent_kafka/avro/__init__.py | 8 +++--- tests/test_log.py | 39 ++++++++++++++++++++++++++++ 2 files changed, 43 insertions(+), 4 deletions(-) diff --git a/src/confluent_kafka/avro/__init__.py b/src/confluent_kafka/avro/__init__.py index dafec980d..70391ec8f 100644 --- a/src/confluent_kafka/avro/__init__.py +++ b/src/confluent_kafka/avro/__init__.py @@ -44,7 +44,7 @@ class AvroProducer(Producer): """ def __init__(self, config, default_key_schema=None, - default_value_schema=None, schema_registry=None): + default_value_schema=None, schema_registry=None, **kwargs): sr_conf = {key.replace("schema.registry.", ""): value for key, value in config.items() if key.startswith("schema.registry")} @@ -64,7 +64,7 @@ def __init__(self, config, default_key_schema=None, elif sr_conf.get("url", None) is not None: raise ValueError("Cannot pass schema_registry along with schema.registry.url config") - super(AvroProducer, self).__init__(ap_conf) + super(AvroProducer, self).__init__(ap_conf, **kwargs) self._serializer = MessageSerializer(schema_registry) self._key_schema = default_key_schema self._value_schema = default_value_schema @@ -123,7 +123,7 @@ class AvroConsumer(Consumer): :raises ValueError: For invalid configurations """ - def __init__(self, config, schema_registry=None, reader_key_schema=None, reader_value_schema=None): + def __init__(self, config, schema_registry=None, reader_key_schema=None, reader_value_schema=None, **kwargs): sr_conf = {key.replace("schema.registry.", ""): value for key, value in config.items() if key.startswith("schema.registry")} @@ -142,7 +142,7 @@ def __init__(self, config, schema_registry=None, reader_key_schema=None, reader_ elif sr_conf.get("url", None) is not None: raise ValueError("Cannot pass schema_registry along with schema.registry.url config") - super(AvroConsumer, self).__init__(ap_conf) + super(AvroConsumer, self).__init__(ap_conf, **kwargs) self._serializer = MessageSerializer(schema_registry, reader_key_schema, reader_value_schema) def poll(self, timeout=None): diff --git a/tests/test_log.py b/tests/test_log.py index 0e5c47783..60b27e30f 100644 --- a/tests/test_log.py +++ b/tests/test_log.py @@ -1,6 +1,7 @@ #!/usr/bin/env python import confluent_kafka +import confluent_kafka.avro import logging @@ -34,6 +35,26 @@ def test_logging_consumer(): kc.close() +def test_logging_avro_consumer(): + """ Tests that logging works """ + + logger = logging.getLogger('avroconsumer') + logger.setLevel(logging.DEBUG) + f = CountingFilter('avroconsumer') + logger.addFilter(f) + + kc = confluent_kafka.avro.AvroConsumer({'schema.registry.url': 'http://example.com', + 'group.id': 'test', + 'debug': 'all'}, + logger=logger) + while f.cnt == 0: + kc.poll(timeout=0.5) + + print('%s: %d log messages seen' % (f.name, f.cnt)) + + kc.close() + + def test_logging_producer(): """ Tests that logging works """ @@ -50,6 +71,24 @@ def test_logging_producer(): print('%s: %d log messages seen' % (f.name, f.cnt)) +def test_logging_avro_producer(): + """ Tests that logging works """ + + logger = logging.getLogger('avroproducer') + logger.setLevel(logging.DEBUG) + f = CountingFilter('avroproducer') + logger.addFilter(f) + + p = confluent_kafka.avro.AvroProducer({'schema.registry.url': 'http://example.com', + 'debug': 'all'}, + logger=logger) + + while f.cnt == 0: + p.poll(timeout=0.5) + + print('%s: %d log messages seen' % (f.name, f.cnt)) + + def test_logging_constructor(): """ Verify different forms of constructors """