diff --git a/napalm_logs/base.py b/napalm_logs/base.py index d0382a90..58f74196 100644 --- a/napalm_logs/base.py +++ b/napalm_logs/base.py @@ -25,11 +25,11 @@ # Import napalm-logs pkgs import napalm_logs.config as CONFIG +from napalm_logs.listener import get_listener # processes from napalm_logs.auth import NapalmLogsAuthProc from napalm_logs.device import NapalmLogsDeviceProc from napalm_logs.server import NapalmLogsServerProc -from napalm_logs.listener import NapalmLogsListenerProc from napalm_logs.publisher import NapalmLogsPublisherProc # exceptions from napalm_logs.exceptions import BindException @@ -42,7 +42,7 @@ class NapalmLogs: def __init__(self, address='0.0.0.0', port=514, - protocol='udp', + listener='udp', transport='zmq', publish_address='0.0.0.0', publish_port=49017, @@ -62,14 +62,14 @@ def __init__(self, :param address: The address to bind the syslog client. Default: 0.0.0.0. :param port: Listen port. Default: 514. - :param protocol: Listen protocol. Default: udp. + :param listener: Listen type. Default: udp. :param publish_address: The address to bing when publishing the OC objects. Default: 0.0.0.0. :param publish_port: Publish port. Default: 49017. ''' self.address = address self.port = port - self.protocol = protocol + self.listener = listener self.publish_address = publish_address self.publish_port = publish_port self.auth_address = auth_address @@ -315,12 +315,14 @@ def _start_auth_proc(self, auth_skt): ) return proc - def _start_lst_proc(self, address, port, protocol, pipe): + def _start_lst_proc(self, address, port, listener, pipe): ''' Start the listener process. ''' log.debug('Starting the listener process') - listener = NapalmLogsListenerProc(address, port, protocol, pipe) + # Get the correct listener class + listener_class = get_listener(self.listener) + listener = listener_class(address, port, pipe) proc = Process(target=listener.start) proc.start() log.debug('Started listener process as {pname} with PID {pid}'.format( @@ -437,7 +439,7 @@ def start_engine(self): # start listener process self._processes.append(self._start_lst_proc(self.address, self.port, - self.protocol, + self.listener, lst_pipe)) def stop_engine(self): diff --git a/napalm_logs/config/__init__.py b/napalm_logs/config/__init__.py index 7f1d3f0c..3660b92c 100644 --- a/napalm_logs/config/__init__.py +++ b/napalm_logs/config/__init__.py @@ -13,7 +13,7 @@ CONFIG_FILE = os.path.join(ROOT_DIR, 'etc', 'napalm', 'logs') ADDRESS = '0.0.0.0' PORT = 514 -PROTOCOL = 'udp' +LISTENER = 'udp' PUBLISH_ADDRESS = '0.0.0.0' PUBLISH_PORT = 49017 AUTH_ADDRESS = '0.0.0.0' @@ -22,7 +22,8 @@ LOG_FORMAT = '%(asctime)s,%(msecs)03.0f [%(name)-17s][%(levelname)-8s] %(message)s' LOG_FILE = os.path.join(ROOT_DIR, 'var', 'log', 'napalm', 'logs') LOG_FILE_CLI_OPTIONS = ('cli', 'screen') -KAFKA_TOPIC = "napalm-logs" +KAFKA_LISTENER_TOPIC = "syslog.net" +KAFKA_PUBLISHER_TOPIC = "napalm-logs" LOGGING_LEVEL = { 'debug': logging.DEBUG, diff --git a/napalm_logs/exceptions.py b/napalm_logs/exceptions.py index 92766aa5..1a87c3c5 100644 --- a/napalm_logs/exceptions.py +++ b/napalm_logs/exceptions.py @@ -23,6 +23,13 @@ class BindException(NapalmLogsException): pass +class ListenerException(NapalmLogsException): + ''' + Exception raised when encountering an exception in a listener process + ''' + pass + + class ConfigurationException(NapalmLogsException): ''' Exception thrown when the user configuration is not correct. diff --git a/napalm_logs/listener/__init__.py b/napalm_logs/listener/__init__.py new file mode 100644 index 00000000..5a34cfa2 --- /dev/null +++ b/napalm_logs/listener/__init__.py @@ -0,0 +1,32 @@ +# -*- coding: utf-8 -*- +''' +napalm-logs pluggable listener. +''' +from __future__ import absolute_import +from __future__ import unicode_literals + +# Import napalm-logs pkgs +from napalm_logs.listener.base import ListenerBase +from napalm_logs.listener.kafka import KafkaListener +from napalm_logs.listener.tcp import TCPListener +from napalm_logs.listener.udp import UDPListener +from napalm_logs.listener.kafka import HAS_KAFKA + +LISTENER_LOOKUP = { + 'tcp': TCPListener, + 'udp': UDPListener, + '*': UDPListener # default listener +} + +if HAS_KAFKA: + LISTENER_LOOKUP['kafka'] = KafkaListener + +def get_listener(name): + ''' + Return the listener class. + ''' + return LISTENER_LOOKUP.get(name, LISTENER_LOOKUP['*']) + +__all__ = ( + 'get_listener', +) diff --git a/napalm_logs/listener/base.py b/napalm_logs/listener/base.py new file mode 100644 index 00000000..c2f6b208 --- /dev/null +++ b/napalm_logs/listener/base.py @@ -0,0 +1,21 @@ +# -*- coding: utf-8 -*- +''' +napalm-logs listener base. +''' + + +class ListenerBase: + ''' + The base class for the listener. + ''' + def __init__(self, addr, port): + pass + + def start(self): + pass + + def publish(self, obj): + pass + + def stop(self): + pass diff --git a/napalm_logs/listener/kafka.py b/napalm_logs/listener/kafka.py new file mode 100644 index 00000000..51621abe --- /dev/null +++ b/napalm_logs/listener/kafka.py @@ -0,0 +1,79 @@ +# -*- coding: utf-8 -*- +''' +Kafka listener for napalm-logs. +''' +from __future__ import absolute_import +from __future__ import unicode_literals + +# Import stdlib +import json +import time +import signal +import logging + +# Import third party libs +try: + import kafka + HAS_KAFKA = True +except ImportError as err: + HAS_KAFKA = False + +# Import napalm-logs pkgs +from napalm_logs.listener.base import ListenerBase +from napalm_logs.config import KAFKA_LISTENER_TOPIC +from napalm_logs.exceptions import ListenerException + +log = logging.getLogger(__name__) + + +class KafkaListener(ListenerBase): + ''' + Kafka listener class. + ''' + def __init__(self, address, port, pipe): + self.address = address + self.port = port + self.pipe = pipe + self.topic = KAFKA_LISTENER_TOPIC + self.__up = False + + def _exit_gracefully(self, signum, _): + log.debug('Caught signal in listener process') + self.stop() + + def start(self): + ''' + Start listening for messages + ''' + # Start suicide polling thread + signal.signal(signal.SIGTERM, self._exit_gracefully) + self.__up = True + try: + self.consumer = kafka.KafkaConsumer(bootstrap_servers='{}:{}'.format(self.address, self.port), + group_id='napalm-logs') + except kafka.errors.NoBrokersAvailable as err: + log.error(err, exc_info=True) + raise ListenerException(err) + self.consumer.subscribe(topics=[self.topic]) + while self.__up: + try: + msg = next(self.consumer) + except ValueError as error: + if self.__up is False: + return + else: + msg = 'Received kafka error: {}'.format(error) + log.error(msg, exc_info=True) + raise NapalmLogsExit(msg) + log_source = msg.key + decoded = json.loads(msg.value) + log_message = decoded.get('message') + log.debug('[{2}] Received {0} from {1}. Adding in the queue'.format(log_message, log_source, time.time())) + self.pipe.send((log_message, log_source)) + + def stop(self): + log.info('Stopping listener process') + self.__up = False + self.consumer.unsubscribe() + self.consumer.close() + self.pipe.close() diff --git a/napalm_logs/listener.py b/napalm_logs/listener/tcp.py similarity index 56% rename from napalm_logs/listener.py rename to napalm_logs/listener/tcp.py index f6044f6c..d273a3bb 100644 --- a/napalm_logs/listener.py +++ b/napalm_logs/listener/tcp.py @@ -1,12 +1,11 @@ # -*- coding: utf-8 -*- ''' -Listener worker process +Syslog TCP listener for napalm-logs. ''' from __future__ import absolute_import from __future__ import unicode_literals # Import pythond stdlib -import os import time import signal import socket @@ -14,29 +13,25 @@ import threading # Import third party libs -import zmq -import umsgpack # Import napalm-logs pkgs from napalm_logs.config import TIMEOUT -from napalm_logs.config import LST_IPC_URL from napalm_logs.config import BUFFER_SIZE -from napalm_logs.proc import NapalmLogsProc +from napalm_logs.listener.base import ListenerBase # exceptions -from napalm_logs.exceptions import BindException +from napalm_logs.exceptions import ListenerException from napalm_logs.exceptions import NapalmLogsExit log = logging.getLogger(__name__) -class NapalmLogsListenerProc(NapalmLogsProc): +class TCPListener(ListenerBase): ''' - Listener sub-process class. + TCP syslog listener class ''' - def __init__(self, address, port, protocol, pipe): + def __init__(self, address, port, pipe): self.address = address self.port = port - self.protocol = protocol self.pipe = pipe self.__up = False @@ -44,14 +39,6 @@ def _exit_gracefully(self, signum, _): log.debug('Caught signal in listener process') self.stop() - def _setup_ipc(self): - ''' - Setup the IPC publisher. - ''' - ctx = zmq.Context() - self.pub = ctx.socket(zmq.PUSH) - self.pub.bind(LST_IPC_URL) - def _open_socket(self, socket_type): ''' Open the socket to listen for messages on @@ -79,47 +66,24 @@ def _tcp_connection(self, conn, addr): log.debug('Closing connection with {}'.format(addr)) conn.close() - def start_tcp(self): + def start(self): ''' - Start listening for messages on TCP and queue them + Start listening for messages ''' + signal.signal(signal.SIGTERM, self._exit_gracefully) + self.__up = True self.skt = self._open_socket(socket.SOCK_STREAM) try: self.skt.bind((self.address, self.port)) except socket.error as msg: error_string = 'Unable to bind to port {} on {}: {}'.format(self.port, self.address, msg) log.error(error_string, exc_info=True) - raise BindException(error_string) + raise ListenerException(error_string) while self.__up: self.skt.listen(1) try: conn, addr = self.skt.accept() - except socket.error as error: - if self.__up is False: - return - else: - msg = 'Received listener socket error: {}'.format(error) - log.error(msg, exc_info=True) - raise NapalmLogsExit(msg) - thread = threading.Thread(target=self._tcp_connection, args=(conn, addr,)) - thread.start() - - def start_udp(self): - ''' - Start listening for messages on UDP and queue them - ''' - self.skt = self._open_socket(socket.SOCK_DGRAM) - try: - self.skt.bind((self.address, self.port)) - except socket.error as msg: - error_string = 'Unable to bind to port {} on {}: {}'.format(self.port, self.address, msg) - log.error(error_string, exc_info=True) - raise BindException(error_string) - - while self.__up: - try: - msg, addr = self.skt.recvfrom(BUFFER_SIZE) except socket.error as error: if self.__up is False: return @@ -127,23 +91,8 @@ def start_udp(self): msg = 'Received listener socket error: {}'.format(error) log.error(msg, exc_info=True) raise NapalmLogsExit(msg) - log.debug('[{2}] Received {0} from {1}. Adding in the queue'.format(msg, addr, time.time())) - self.pipe.send((msg, addr[0])) - - def start(self): - ''' - Start listening for messages on the appropriate protocol - ''' - # self._setup_ipc() - # Start suicide polling thread - thread = threading.Thread(target=self._suicide_when_without_parent, args=(os.getppid(),)) - thread.start() - signal.signal(signal.SIGTERM, self._exit_gracefully) - self.__up = True - if self.protocol == 'tcp': - self.start_tcp() - else: - self.start_udp() + thread = threading.Thread(target=self._tcp_connection, args=(conn, addr,)) + thread.start() def stop(self): log.info('Stopping listener process') diff --git a/napalm_logs/listener/udp.py b/napalm_logs/listener/udp.py new file mode 100644 index 00000000..e6a8f40b --- /dev/null +++ b/napalm_logs/listener/udp.py @@ -0,0 +1,81 @@ +# -*- coding: utf-8 -*- +''' +Syslog UDP listener for napalm-logs. +''' +from __future__ import absolute_import +from __future__ import unicode_literals + +# Import pythond stdlib +import time +import signal +import socket +import logging + +# Import third party libs + +# Import napalm-logs pkgs +from napalm_logs.config import BUFFER_SIZE +from napalm_logs.listener.base import ListenerBase +# exceptions +from napalm_logs.exceptions import ListenerException +from napalm_logs.exceptions import NapalmLogsExit + +log = logging.getLogger(__name__) + + +class UDPListener(ListenerBase): + ''' + UDP syslog listener class + ''' + def __init__(self, address, port, pipe): + self.address = address + self.port = port + self.pipe = pipe + self.__up = False + + def _exit_gracefully(self, signum, _): + log.debug('Caught signal in listener process') + self.stop() + + def _open_socket(self, socket_type): + ''' + Open the socket to listen for messages on + ''' + if ':' in self.address: + skt = socket.socket(socket.AF_INET6, socket_type) + else: + skt = socket.socket(socket.AF_INET, socket_type) + return skt + + def start(self): + ''' + Start listening for messages + ''' + signal.signal(signal.SIGTERM, self._exit_gracefully) + self.__up = True + self.skt = self._open_socket(socket.SOCK_DGRAM) + try: + self.skt.bind((self.address, self.port)) + except socket.error as msg: + error_string = 'Unable to bind to port {} on {}: {}'.format(self.port, self.address, msg) + log.error(error_string, exc_info=True) + raise ListenerException(error_string) + + while self.__up: + try: + msg, addr = self.skt.recvfrom(BUFFER_SIZE) + except socket.error as error: + if self.__up is False: + return + else: + msg = 'Received listener socket error: {}'.format(error) + log.error(msg, exc_info=True) + raise NapalmLogsExit(msg) + log.debug('[{2}] Received {0} from {1}. Adding in the queue'.format(msg, addr, time.time())) + self.pipe.send((msg, addr[0])) + + def stop(self): + log.info('Stopping listener process') + self.__up = False + self.skt.close() + self.pipe.close() diff --git a/napalm_logs/scripts/cli.py b/napalm_logs/scripts/cli.py index 97655758..58791e5a 100644 --- a/napalm_logs/scripts/cli.py +++ b/napalm_logs/scripts/cli.py @@ -106,10 +106,10 @@ def prepare(self): help=('Listener bind port. Default: {0}'.format(defaults.PORT)) ) self.add_option( - '--protocol', - dest='protocol', - choices=['tcp', 'udp'], - help=('Listener bind protocol. Default: {0}'.format(defaults.PROTOCOL)) + '--listener', + dest='listener', + choices=['kafka', 'tcp', 'udp'], + help=('Listener type. Default: {0}'.format(defaults.LISTENER)) ) self.add_option( '-t', '--transport', @@ -214,7 +214,7 @@ def parse(self, log, screen_handler): cfg = { 'address': self.options.address or file_cfg.get('address') or defaults.ADDRESS, 'port': self.options.port or file_cfg.get('port') or defaults.PORT, - 'protocol': self.options.protocol or file_cfg.get('protocol') or defaults.PROTOCOL, + 'listener': self.options.listener or file_cfg.get('listener') or defaults.LISTENER, 'transport': self.options.transport or file_cfg.get('transport'), 'publish_address': self.options.publish_address or file_cfg.get('publish_address')\ or defaults.PUBLISH_ADDRESS, diff --git a/napalm_logs/transport/kafka.py b/napalm_logs/transport/kafka.py index 0899013d..ce8175af 100644 --- a/napalm_logs/transport/kafka.py +++ b/napalm_logs/transport/kafka.py @@ -19,7 +19,7 @@ # Import napalm-logs pkgs from napalm_logs.exceptions import NapalmLogsException from napalm_logs.transport.base import TransportBase -from napalm_logs.config import KAFKA_TOPIC +from napalm_logs.config import KAFKA_PUBLISHER_TOPIC log = logging.getLogger(__name__) @@ -30,7 +30,7 @@ class KafkaTransport(TransportBase): ''' def __init__(self, addr, port): self.bootstrap_servers = '{addr}:{port}'.format(addr=addr, port=port) - self.topic = KAFKA_TOPIC + self.topic = KAFKA_PUBLISHER_TOPIC def start(self): try: