Skip to content

Commit

Permalink
Logger with Admin Client (#1758)
Browse files Browse the repository at this point in the history
* Fixed logger not working when provided as an argument to AdminClient instead of a configuration property
* Updated examples/adminapi.py to include usage of the custom logger with AdminClient
  • Loading branch information
pranavrth authored Jul 5, 2024
1 parent a6d2e1e commit 858347c
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 16 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@

v2.4.1 is a maintenance release with the following fixes and enhancements:

- Added an example to show the usage of the custom logger with `AdminClient`
- Removed usage of `strcpy` to enhance security of the client (#1745)
- Fixed invalid write in `OAUTHBEARER/OIDC` extensions copy (#1745)
- Fixed an issue related to import error of `TopicCollection` and `TopicPartitionInfo` classes when importing through other module like mypy.
- Fixed a segfault when `commit` or `store_offsets` consumer method is called incorrectly with errored Message object
- Fixed `logger` not working when provided as an argument to `AdminClient` instead of a configuration property

confluent-kafka-python is based on librdkafka v2.4.1, see the
[librdkafka release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.4.1)
Expand Down
54 changes: 54 additions & 0 deletions examples/adminapi_logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import sys
import logging

from confluent_kafka.admin import AdminClient

if len(sys.argv) != 2:
sys.stderr.write("Usage: %s <broker>\n" % sys.argv[0])
sys.exit(1)

broker = sys.argv[1]

# Custom logger
logger = logging.getLogger('AdminClient')
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter('%(asctime)-15s %(levelname)-8s %(message)s'))
logger.addHandler(handler)

# Create Admin client with logger
a = AdminClient({'bootstrap.servers': broker,
'debug': 'all'},
logger=logger)

# Alternatively, pass the logger as a key.
# When passing it as an argument, it overwrites the key.
#
# a = AdminClient({'bootstrap.servers': broker,
# 'debug': 'all',
# 'logger': logger})

# Sample Admin API call
future = a.list_consumer_groups(request_timeout=10)

while not future.done():
# Log messages through custom logger while waiting for the result
a.poll(0.1)

try:
list_consumer_groups_result = future.result()
print("\n\n\n========================= List consumer groups result Start =========================")
print("{} consumer groups".format(len(list_consumer_groups_result.valid)))
for valid in list_consumer_groups_result.valid:
print(" id: {} is_simple: {} state: {}".format(
valid.group_id, valid.is_simple_consumer_group, valid.state))
print("{} errors".format(len(list_consumer_groups_result.errors)))
for error in list_consumer_groups_result.errors:
print(" error: {}".format(error))
print("========================= List consumer groups result End =========================\n\n\n")

except Exception:
raise

# Log final log messages
a.poll(0)
9 changes: 5 additions & 4 deletions src/confluent_kafka/admin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,17 +109,18 @@ class AdminClient (_AdminClientImpl):
Requires broker version v0.11.0.0 or later.
"""

def __init__(self, conf):
def __init__(self, conf, **kwargs):
"""
Create a new AdminClient using the provided configuration dictionary.
The AdminClient is a standard Kafka protocol client, supporting
the standard librdkafka configuration properties as specified at
https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md
At least 'bootstrap.servers' should be configured.
:param dict conf: Configuration properties. At a minimum ``bootstrap.servers`` **should** be set\n"
:param Logger logger: Optional Logger instance to use as a custom log messages handler.
"""
super(AdminClient, self).__init__(conf)
super(AdminClient, self).__init__(conf, **kwargs)

@staticmethod
def _make_topics_result(f, futmap):
Expand Down
55 changes: 43 additions & 12 deletions tests/test_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from io import StringIO
import confluent_kafka
import confluent_kafka.avro
import confluent_kafka.admin
import logging


Expand All @@ -17,6 +18,16 @@ def filter(self, record):
print(record)


def _setup_string_buffer_logger(name):
stringBuffer = StringIO()
logger = logging.getLogger(name)
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler(stringBuffer)
handler.setFormatter(logging.Formatter('%(name)s Logger | %(message)s'))
logger.addHandler(handler)
return stringBuffer, logger


def test_logging_consumer():
""" Tests that logging works """

Expand Down Expand Up @@ -120,12 +131,7 @@ def test_logging_constructor():
def test_producer_logger_logging_in_given_format():
"""Test that asserts that logging is working by matching part of the log message"""

stringBuffer = StringIO()
logger = logging.getLogger('Producer')
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler(stringBuffer)
handler.setFormatter(logging.Formatter('%(name)s Logger | %(message)s'))
logger.addHandler(handler)
stringBuffer, logger = _setup_string_buffer_logger('Producer')

p = confluent_kafka.Producer(
{"bootstrap.servers": "test", "logger": logger, "debug": "msg"})
Expand All @@ -142,12 +148,7 @@ def test_producer_logger_logging_in_given_format():
def test_consumer_logger_logging_in_given_format():
"""Test that asserts that logging is working by matching part of the log message"""

stringBuffer = StringIO()
logger = logging.getLogger('Consumer')
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler(stringBuffer)
handler.setFormatter(logging.Formatter('%(name)s Logger | %(message)s'))
logger.addHandler(handler)
stringBuffer, logger = _setup_string_buffer_logger('Consumer')

c = confluent_kafka.Consumer(
{"bootstrap.servers": "test", "group.id": "test", "logger": logger, "debug": "msg"})
Expand All @@ -158,3 +159,33 @@ def test_consumer_logger_logging_in_given_format():
c.close()

assert "Consumer Logger | INIT" in logMessage


def test_admin_logger_logging_in_given_format_when_provided_in_conf():
"""Test that asserts that logging is working by matching part of the log message"""

stringBuffer, logger = _setup_string_buffer_logger('Admin')

admin_client = confluent_kafka.admin.AdminClient(
{"bootstrap.servers": "test", "logger": logger, "debug": "admin"})
admin_client.poll(0)

logMessage = stringBuffer.getvalue().strip()
stringBuffer.close()

assert "Admin Logger | INIT" in logMessage


def test_admin_logger_logging_in_given_format_when_provided_as_admin_client_argument():
"""Test that asserts that logging is working by matching part of the log message"""

stringBuffer, logger = _setup_string_buffer_logger('Admin')

admin_client = confluent_kafka.admin.AdminClient(
{"bootstrap.servers": "test", "debug": "admin"}, logger=logger)
admin_client.poll(0)

logMessage = stringBuffer.getvalue().strip()
stringBuffer.close()

assert "Admin Logger | INIT" in logMessage

0 comments on commit 858347c

Please sign in to comment.