diff --git a/napalm_logs/transport/__init__.py b/napalm_logs/transport/__init__.py index 12c7b070..c97be127 100644 --- a/napalm_logs/transport/__init__.py +++ b/napalm_logs/transport/__init__.py @@ -8,6 +8,7 @@ # Import napalm-logs pkgs from napalm_logs.transport.base import TransportBase from napalm_logs.transport.zeromq import ZMQTransport +from napalm_logs.transport.kafka import KafkaTransport from napalm_logs.transport.cli import CLITransport from napalm_logs.transport.log import LogTransport # from napalm_logs.transport.kafka import KafkaTransport @@ -20,7 +21,7 @@ 'print': CLITransport, 'console': CLITransport, 'log': LogTransport, - # 'kafka': KafkaTransport, + 'kafka': KafkaTransport, # 'rmq': RabbitMQransport, # 'rabbitmq': RabbitMQransport, '*': ZMQTransport # default transport diff --git a/napalm_logs/transport/kafka.py b/napalm_logs/transport/kafka.py new file mode 100644 index 00000000..3409ae3d --- /dev/null +++ b/napalm_logs/transport/kafka.py @@ -0,0 +1,42 @@ +# -*- coding: utf-8 -*- +''' +Kafka transport for napalm-logs. +''' +from __future__ import absolute_import +from __future__ import unicode_literals + +# Import stdlib +import json +import logging + +# Import third party libs +import kafka + +# Import napalm-logs pkgs +from napalm_logs.exceptions import NapalmLogsException +from napalm_logs.transport.base import TransportBase + +log = logging.getLogger(__name__) + + +class KafkaTransport(TransportBase): + ''' + Kafka transport class. + ''' + def __init__(self, bootstrap_servers, topic): + self.bootstrap_servers = bootstrap_servers + self.topic = topic + + def start(self): + try: + self.producer = kafka.KafkaProducer(bootstrap_servers = self.bootstrap_servers) + except kafka.errors.NoBrokersAvailable as err: + log.error(err, exc_info=True) + raise NapalmLogsException(err) + + def publish(self, obj): + self.producer.send(self.topic, obj) + + def stop(self): + if hasattr(self, 'producer'): + self.producer.close()