diff --git a/CHANGELOG.md b/CHANGELOG.md index 934b2159c..18b37146b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,10 +4,12 @@ v2.4.1 is a maintenance release with the following fixes and enhancements: + - Added an example to show the usage of the custom logger with `AdminClient` - Removed usage of `strcpy` to enhance security of the client (#1745) - Fixed invalid write in `OAUTHBEARER/OIDC` extensions copy (#1745) - Fixed an issue related to import error of `TopicCollection` and `TopicPartitionInfo` classes when importing through other module like mypy. - Fixed a segfault when `commit` or `store_offsets` consumer method is called incorrectly with errored Message object + - Fixed `logger` not working when provided as an argument to `AdminClient` instead of a configuration property confluent-kafka-python is based on librdkafka v2.4.1, see the [librdkafka release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.4.1) diff --git a/examples/adminapi_logger.py b/examples/adminapi_logger.py new file mode 100644 index 000000000..d91cd89a7 --- /dev/null +++ b/examples/adminapi_logger.py @@ -0,0 +1,54 @@ +import sys +import logging + +from confluent_kafka.admin import AdminClient + +if len(sys.argv) != 2: + sys.stderr.write("Usage: %s \n" % sys.argv[0]) + sys.exit(1) + +broker = sys.argv[1] + +# Custom logger +logger = logging.getLogger('AdminClient') +logger.setLevel(logging.DEBUG) +handler = logging.StreamHandler() +handler.setFormatter(logging.Formatter('%(asctime)-15s %(levelname)-8s %(message)s')) +logger.addHandler(handler) + +# Create Admin client with logger +a = AdminClient({'bootstrap.servers': broker, + 'debug': 'all'}, + logger=logger) + +# Alternatively, pass the logger as a key. +# When passing it as an argument, it overwrites the key. +# +# a = AdminClient({'bootstrap.servers': broker, +# 'debug': 'all', +# 'logger': logger}) + +# Sample Admin API call +future = a.list_consumer_groups(request_timeout=10) + +while not future.done(): + # Log messages through custom logger while waiting for the result + a.poll(0.1) + +try: + list_consumer_groups_result = future.result() + print("\n\n\n========================= List consumer groups result Start =========================") + print("{} consumer groups".format(len(list_consumer_groups_result.valid))) + for valid in list_consumer_groups_result.valid: + print(" id: {} is_simple: {} state: {}".format( + valid.group_id, valid.is_simple_consumer_group, valid.state)) + print("{} errors".format(len(list_consumer_groups_result.errors))) + for error in list_consumer_groups_result.errors: + print(" error: {}".format(error)) + print("========================= List consumer groups result End =========================\n\n\n") + +except Exception: + raise + +# Log final log messages +a.poll(0) diff --git a/src/confluent_kafka/admin/__init__.py b/src/confluent_kafka/admin/__init__.py index 924361f2e..fb57b148e 100644 --- a/src/confluent_kafka/admin/__init__.py +++ b/src/confluent_kafka/admin/__init__.py @@ -109,17 +109,18 @@ class AdminClient (_AdminClientImpl): Requires broker version v0.11.0.0 or later. """ - def __init__(self, conf): + def __init__(self, conf, **kwargs): """ Create a new AdminClient using the provided configuration dictionary. The AdminClient is a standard Kafka protocol client, supporting the standard librdkafka configuration properties as specified at - https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md + https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md - At least 'bootstrap.servers' should be configured. + :param dict conf: Configuration properties. At a minimum ``bootstrap.servers`` **should** be set\n" + :param Logger logger: Optional Logger instance to use as a custom log messages handler. """ - super(AdminClient, self).__init__(conf) + super(AdminClient, self).__init__(conf, **kwargs) @staticmethod def _make_topics_result(f, futmap): diff --git a/tests/test_log.py b/tests/test_log.py index 6cd510819..b28fd798c 100644 --- a/tests/test_log.py +++ b/tests/test_log.py @@ -3,6 +3,7 @@ from io import StringIO import confluent_kafka import confluent_kafka.avro +import confluent_kafka.admin import logging @@ -17,6 +18,16 @@ def filter(self, record): print(record) +def _setup_string_buffer_logger(name): + stringBuffer = StringIO() + logger = logging.getLogger(name) + logger.setLevel(logging.DEBUG) + handler = logging.StreamHandler(stringBuffer) + handler.setFormatter(logging.Formatter('%(name)s Logger | %(message)s')) + logger.addHandler(handler) + return stringBuffer, logger + + def test_logging_consumer(): """ Tests that logging works """ @@ -120,12 +131,7 @@ def test_logging_constructor(): def test_producer_logger_logging_in_given_format(): """Test that asserts that logging is working by matching part of the log message""" - stringBuffer = StringIO() - logger = logging.getLogger('Producer') - logger.setLevel(logging.DEBUG) - handler = logging.StreamHandler(stringBuffer) - handler.setFormatter(logging.Formatter('%(name)s Logger | %(message)s')) - logger.addHandler(handler) + stringBuffer, logger = _setup_string_buffer_logger('Producer') p = confluent_kafka.Producer( {"bootstrap.servers": "test", "logger": logger, "debug": "msg"}) @@ -142,12 +148,7 @@ def test_producer_logger_logging_in_given_format(): def test_consumer_logger_logging_in_given_format(): """Test that asserts that logging is working by matching part of the log message""" - stringBuffer = StringIO() - logger = logging.getLogger('Consumer') - logger.setLevel(logging.DEBUG) - handler = logging.StreamHandler(stringBuffer) - handler.setFormatter(logging.Formatter('%(name)s Logger | %(message)s')) - logger.addHandler(handler) + stringBuffer, logger = _setup_string_buffer_logger('Consumer') c = confluent_kafka.Consumer( {"bootstrap.servers": "test", "group.id": "test", "logger": logger, "debug": "msg"}) @@ -158,3 +159,33 @@ def test_consumer_logger_logging_in_given_format(): c.close() assert "Consumer Logger | INIT" in logMessage + + +def test_admin_logger_logging_in_given_format_when_provided_in_conf(): + """Test that asserts that logging is working by matching part of the log message""" + + stringBuffer, logger = _setup_string_buffer_logger('Admin') + + admin_client = confluent_kafka.admin.AdminClient( + {"bootstrap.servers": "test", "logger": logger, "debug": "admin"}) + admin_client.poll(0) + + logMessage = stringBuffer.getvalue().strip() + stringBuffer.close() + + assert "Admin Logger | INIT" in logMessage + + +def test_admin_logger_logging_in_given_format_when_provided_as_admin_client_argument(): + """Test that asserts that logging is working by matching part of the log message""" + + stringBuffer, logger = _setup_string_buffer_logger('Admin') + + admin_client = confluent_kafka.admin.AdminClient( + {"bootstrap.servers": "test", "debug": "admin"}, logger=logger) + admin_client.poll(0) + + logMessage = stringBuffer.getvalue().strip() + stringBuffer.close() + + assert "Admin Logger | INIT" in logMessage