Skip to content

Commit

Permalink
Change listener to be plugable
Browse files Browse the repository at this point in the history
I have changed the listener to be plugable in the same way that the
publisher transport is.

I have also added a kafka listener too.
  • Loading branch information
luke-orden committed Jun 12, 2017
1 parent 61c60ad commit 2a8f164
Show file tree
Hide file tree
Showing 10 changed files with 241 additions and 81 deletions.
16 changes: 9 additions & 7 deletions napalm_logs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@

# Import napalm-logs pkgs
import napalm_logs.config as CONFIG
from napalm_logs.listener import get_listener
# processes
from napalm_logs.auth import NapalmLogsAuthProc
from napalm_logs.device import NapalmLogsDeviceProc
from napalm_logs.server import NapalmLogsServerProc
from napalm_logs.listener import NapalmLogsListenerProc
from napalm_logs.publisher import NapalmLogsPublisherProc
# exceptions
from napalm_logs.exceptions import BindException
Expand All @@ -42,7 +42,7 @@ class NapalmLogs:
def __init__(self,
address='0.0.0.0',
port=514,
protocol='udp',
listener='udp',
transport='zmq',
publish_address='0.0.0.0',
publish_port=49017,
Expand All @@ -62,14 +62,14 @@ def __init__(self,
:param address: The address to bind the syslog client. Default: 0.0.0.0.
:param port: Listen port. Default: 514.
:param protocol: Listen protocol. Default: udp.
:param listener: Listen type. Default: udp.
:param publish_address: The address to bing when publishing the OC
objects. Default: 0.0.0.0.
:param publish_port: Publish port. Default: 49017.
'''
self.address = address
self.port = port
self.protocol = protocol
self.listener = listener
self.publish_address = publish_address
self.publish_port = publish_port
self.auth_address = auth_address
Expand Down Expand Up @@ -315,12 +315,14 @@ def _start_auth_proc(self, auth_skt):
)
return proc

def _start_lst_proc(self, address, port, protocol, pipe):
def _start_lst_proc(self, address, port, listener, pipe):
'''
Start the listener process.
'''
log.debug('Starting the listener process')
listener = NapalmLogsListenerProc(address, port, protocol, pipe)
# Get the correct listener class
listener_class = get_listener(self.listener)
listener = listener_class(address, port, pipe)
proc = Process(target=listener.start)
proc.start()
log.debug('Started listener process as {pname} with PID {pid}'.format(
Expand Down Expand Up @@ -437,7 +439,7 @@ def start_engine(self):
# start listener process
self._processes.append(self._start_lst_proc(self.address,
self.port,
self.protocol,
self.listener,
lst_pipe))

def stop_engine(self):
Expand Down
5 changes: 3 additions & 2 deletions napalm_logs/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
CONFIG_FILE = os.path.join(ROOT_DIR, 'etc', 'napalm', 'logs')
ADDRESS = '0.0.0.0'
PORT = 514
PROTOCOL = 'udp'
LISTENER = 'udp'
PUBLISH_ADDRESS = '0.0.0.0'
PUBLISH_PORT = 49017
AUTH_ADDRESS = '0.0.0.0'
Expand All @@ -22,7 +22,8 @@
LOG_FORMAT = '%(asctime)s,%(msecs)03.0f [%(name)-17s][%(levelname)-8s] %(message)s'
LOG_FILE = os.path.join(ROOT_DIR, 'var', 'log', 'napalm', 'logs')
LOG_FILE_CLI_OPTIONS = ('cli', 'screen')
KAFKA_TOPIC = "napalm-logs"
KAFKA_LISTENER_TOPIC = "syslog.net"
KAFKA_PUBLISHER_TOPIC = "napalm-logs"

LOGGING_LEVEL = {
'debug': logging.DEBUG,
Expand Down
32 changes: 32 additions & 0 deletions napalm_logs/listener/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# -*- coding: utf-8 -*-
'''
napalm-logs pluggable listener.
'''
from __future__ import absolute_import
from __future__ import unicode_literals

# Import napalm-logs pkgs
from napalm_logs.listener.base import ListenerBase
from napalm_logs.listener.kafka import KafkaListener
from napalm_logs.listener.tcp import TCPListener
from napalm_logs.listener.udp import UDPListener
from napalm_logs.listener.kafka import HAS_KAFKA

LISTENER_LOOKUP = {
'tcp': TCPListener,
'udp': UDPListener,
'*': UDPListener # default listener
}

if HAS_KAFKA:
LISTENER_LOOKUP['kafka'] = KafkaListener

def get_listener(name):
'''
Return the listener class.
'''
return LISTENER_LOOKUP.get(name, LISTENER_LOOKUP['*'])

__all__ = (
'get_listener',
)
21 changes: 21 additions & 0 deletions napalm_logs/listener/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# -*- coding: utf-8 -*-
'''
napalm-logs listener base.
'''


class ListenerBase:
'''
The base class for the listener.
'''
def __init__(self, addr, port):
pass

def start(self):
pass

def publish(self, obj):
pass

def stop(self):
pass
77 changes: 77 additions & 0 deletions napalm_logs/listener/kafka.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# -*- coding: utf-8 -*-
'''
Kafka listener for napalm-logs.
'''
from __future__ import absolute_import
from __future__ import unicode_literals

# Import stdlib
import json
import signal
import logging

# Import third party libs
try:
import kafka
HAS_KAFKA = True
except ImportError as err:
HAS_KAFKA = False

# Import napalm-logs pkgs
from napalm_logs.exceptions import NapalmLogsException
from napalm_logs.listener.base import ListenerBase
from napalm_logs.config import KAFKA_LISTENER_TOPIC

log = logging.getLogger(__name__)


class KafkaListener(ListenerBase):
'''
Kafka listener class.
'''
def __init__(self, address, port, pipe):
self.address = address
self.port = port
self.pipe = pipe
self.topic = KAFKA_LISTENER_TOPIC
self.__up = False

def _exit_gracefully(self, signum, _):
log.debug('Caught signal in listener process')
self.stop()

def start(self):
'''
Start listening for messages
'''
# Start suicide polling thread
signal.signal(signal.SIGTERM, self._exit_gracefully)
self.__up = True
try:
self.consumer = kafka.KafkaConsumer(bootstrap_servers='{}:{}'.format(self.address, self.port), group_id='napalm-logs')
except kafka.errors.NoBrokersAvailable as err:
log.error(err, exc_info=True)
raise NapalmLogsException(err)
self.consumer.subscribe(topics=[self.topic])
while self.__up:
try:
msg = next(self.consumer)
except ValueError as error:
if self.__up is False:
return
else:
msg = 'Received kafka error: {}'.format(error)
log.error(msg, exc_info=True)
raise NapalmLogsExit(msg)
decoded = json.loads(msg.value)
log_message = decoded.get('message')
log_source = decoded.get('logsource')
log.debug('[{2}] Received {0} from {1}. Adding in the queue'.format(log_message, log_source, time.time()))
self.pipe.send((log_message, log_source))

def stop(self):
log.info('Stopping listener process')
self.__up = False
self.consumer.unsubscribe()
self.consumer.close()
self.pipe.close()
73 changes: 11 additions & 62 deletions napalm_logs/listener.py → napalm_logs/listener/tcp.py
Original file line number Diff line number Diff line change
@@ -1,57 +1,44 @@
# -*- coding: utf-8 -*-
'''
Listener worker process
Syslog TCP listener for napalm-logs.
'''
from __future__ import absolute_import
from __future__ import unicode_literals

# Import pythond stdlib
import os
import time
import signal
import socket
import logging
import threading

# Import third party libs
import zmq
import umsgpack

# Import napalm-logs pkgs
from napalm_logs.config import TIMEOUT
from napalm_logs.config import LST_IPC_URL
from napalm_logs.config import BUFFER_SIZE
from napalm_logs.proc import NapalmLogsProc
from napalm_logs.listener.base import ListenerBase
# exceptions
from napalm_logs.exceptions import BindException
from napalm_logs.exceptions import NapalmLogsExit

log = logging.getLogger(__name__)


class NapalmLogsListenerProc(NapalmLogsProc):
class TCPListener(ListenerBase):
'''
Listener sub-process class.
TCP syslog listener class
'''
def __init__(self, address, port, protocol, pipe):
def __init__(self, address, port, pipe):
self.address = address
self.port = port
self.protocol = protocol
self.pipe = pipe
self.__up = False

def _exit_gracefully(self, signum, _):
log.debug('Caught signal in listener process')
self.stop()

def _setup_ipc(self):
'''
Setup the IPC publisher.
'''
ctx = zmq.Context()
self.pub = ctx.socket(zmq.PUSH)
self.pub.bind(LST_IPC_URL)

def _open_socket(self, socket_type):
'''
Open the socket to listen for messages on
Expand Down Expand Up @@ -79,10 +66,12 @@ def _tcp_connection(self, conn, addr):
log.debug('Closing connection with {}'.format(addr))
conn.close()

def start_tcp(self):
def start(self):
'''
Start listening for messages on TCP and queue them
Start listening for messages
'''
signal.signal(signal.SIGTERM, self._exit_gracefully)
self.__up = True
self.skt = self._open_socket(socket.SOCK_STREAM)
try:
self.skt.bind((self.address, self.port))
Expand All @@ -95,55 +84,15 @@ def start_tcp(self):
self.skt.listen(1)
try:
conn, addr = self.skt.accept()
except socket.error as error:
if self.__up is False:
return
else:
msg = 'Received listener socket error: {}'.format(error)
log.error(msg, exc_info=True)
raise NapalmLogsExit(msg)
thread = threading.Thread(target=self._tcp_connection, args=(conn, addr,))
thread.start()

def start_udp(self):
'''
Start listening for messages on UDP and queue them
'''
self.skt = self._open_socket(socket.SOCK_DGRAM)
try:
self.skt.bind((self.address, self.port))
except socket.error as msg:
error_string = 'Unable to bind to port {} on {}: {}'.format(self.port, self.address, msg)
log.error(error_string, exc_info=True)
raise BindException(error_string)

while self.__up:
try:
msg, addr = self.skt.recvfrom(BUFFER_SIZE)
except socket.error as error:
if self.__up is False:
return
else:
msg = 'Received listener socket error: {}'.format(error)
log.error(msg, exc_info=True)
raise NapalmLogsExit(msg)
log.debug('[{2}] Received {0} from {1}. Adding in the queue'.format(msg, addr, time.time()))
self.pipe.send((msg, addr[0]))

def start(self):
'''
Start listening for messages on the appropriate protocol
'''
# self._setup_ipc()
# Start suicide polling thread
thread = threading.Thread(target=self._suicide_when_without_parent, args=(os.getppid(),))
thread.start()
signal.signal(signal.SIGTERM, self._exit_gracefully)
self.__up = True
if self.protocol == 'tcp':
self.start_tcp()
else:
self.start_udp()
thread = threading.Thread(target=self._tcp_connection, args=(conn, addr,))
thread.start()

def stop(self):
log.info('Stopping listener process')
Expand Down
Loading

0 comments on commit 2a8f164

Please sign in to comment.