Skip to content

Commit

Permalink
Added Kafka transport
Browse files Browse the repository at this point in the history
  • Loading branch information
lspgn committed May 2, 2017
1 parent c626f80 commit dc4c08c
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 1 deletion.
3 changes: 2 additions & 1 deletion napalm_logs/transport/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
# Import napalm-logs pkgs
from napalm_logs.transport.base import TransportBase
from napalm_logs.transport.zeromq import ZMQTransport
from napalm_logs.transport.kafka import KafkaTransport
from napalm_logs.transport.cli import CLITransport
from napalm_logs.transport.log import LogTransport
# from napalm_logs.transport.kafka import KafkaTransport
Expand All @@ -20,7 +21,7 @@
'print': CLITransport,
'console': CLITransport,
'log': LogTransport,
# 'kafka': KafkaTransport,
'kafka': KafkaTransport,
# 'rmq': RabbitMQransport,
# 'rabbitmq': RabbitMQransport,
'*': ZMQTransport # default transport
Expand Down
42 changes: 42 additions & 0 deletions napalm_logs/transport/kafka.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# -*- coding: utf-8 -*-
'''
Kafka transport for napalm-logs.
'''
from __future__ import absolute_import
from __future__ import unicode_literals

# Import stdlib
import json
import logging

# Import third party libs
import kafka

# Import napalm-logs pkgs
from napalm_logs.exceptions import NapalmLogsException
from napalm_logs.transport.base import TransportBase

log = logging.getLogger(__name__)


class KafkaTransport(TransportBase):
'''
Kafka transport class.
'''
def __init__(self, bootstrap_servers, topic):
self.bootstrap_servers = bootstrap_servers
self.topic = topic

def start(self):
try:
self.producer = kafka.KafkaProducer(bootstrap_servers = self.bootstrap_servers)
except kafka.errors.NoBrokersAvailable as err:
log.error(err, exc_info=True)
raise NapalmLogsException(err)

def publish(self, obj):
self.producer.send(self.topic, obj)

def stop(self):
if hasattr(self, 'producer'):
self.producer.close()

0 comments on commit dc4c08c

Please sign in to comment.