Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change listener to be plugable #79

Merged
merged 1 commit into from
Jun 13, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions napalm_logs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@

# Import napalm-logs pkgs
import napalm_logs.config as CONFIG
from napalm_logs.listener import get_listener
# processes
from napalm_logs.auth import NapalmLogsAuthProc
from napalm_logs.device import NapalmLogsDeviceProc
from napalm_logs.server import NapalmLogsServerProc
from napalm_logs.listener import NapalmLogsListenerProc
from napalm_logs.publisher import NapalmLogsPublisherProc
# exceptions
from napalm_logs.exceptions import BindException
Expand All @@ -42,7 +42,7 @@ class NapalmLogs:
def __init__(self,
address='0.0.0.0',
port=514,
protocol='udp',
listener='udp',
transport='zmq',
publish_address='0.0.0.0',
publish_port=49017,
Expand All @@ -62,14 +62,14 @@ def __init__(self,

:param address: The address to bind the syslog client. Default: 0.0.0.0.
:param port: Listen port. Default: 514.
:param protocol: Listen protocol. Default: udp.
:param listener: Listen type. Default: udp.
:param publish_address: The address to bing when publishing the OC
objects. Default: 0.0.0.0.
:param publish_port: Publish port. Default: 49017.
'''
self.address = address
self.port = port
self.protocol = protocol
self.listener = listener
self.publish_address = publish_address
self.publish_port = publish_port
self.auth_address = auth_address
Expand Down Expand Up @@ -315,12 +315,14 @@ def _start_auth_proc(self, auth_skt):
)
return proc

def _start_lst_proc(self, address, port, protocol, pipe):
def _start_lst_proc(self, address, port, listener, pipe):
'''
Start the listener process.
'''
log.debug('Starting the listener process')
listener = NapalmLogsListenerProc(address, port, protocol, pipe)
# Get the correct listener class
listener_class = get_listener(self.listener)
listener = listener_class(address, port, pipe)
proc = Process(target=listener.start)
proc.start()
log.debug('Started listener process as {pname} with PID {pid}'.format(
Expand Down Expand Up @@ -437,7 +439,7 @@ def start_engine(self):
# start listener process
self._processes.append(self._start_lst_proc(self.address,
self.port,
self.protocol,
self.listener,
lst_pipe))

def stop_engine(self):
Expand Down
5 changes: 3 additions & 2 deletions napalm_logs/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
CONFIG_FILE = os.path.join(ROOT_DIR, 'etc', 'napalm', 'logs')
ADDRESS = '0.0.0.0'
PORT = 514
PROTOCOL = 'udp'
LISTENER = 'udp'
PUBLISH_ADDRESS = '0.0.0.0'
PUBLISH_PORT = 49017
AUTH_ADDRESS = '0.0.0.0'
Expand All @@ -22,7 +22,8 @@
LOG_FORMAT = '%(asctime)s,%(msecs)03.0f [%(name)-17s][%(levelname)-8s] %(message)s'
LOG_FILE = os.path.join(ROOT_DIR, 'var', 'log', 'napalm', 'logs')
LOG_FILE_CLI_OPTIONS = ('cli', 'screen')
KAFKA_TOPIC = "napalm-logs"
KAFKA_LISTENER_TOPIC = "syslog.net"
KAFKA_PUBLISHER_TOPIC = "napalm-logs"

LOGGING_LEVEL = {
'debug': logging.DEBUG,
Expand Down
7 changes: 7 additions & 0 deletions napalm_logs/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@ class BindException(NapalmLogsException):
pass


class ListenerException(NapalmLogsException):
'''
Exception raised when encountering an exception in a listener process
'''
pass


class ConfigurationException(NapalmLogsException):
'''
Exception thrown when the user configuration is not correct.
Expand Down
32 changes: 32 additions & 0 deletions napalm_logs/listener/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# -*- coding: utf-8 -*-
'''
napalm-logs pluggable listener.
'''
from __future__ import absolute_import
from __future__ import unicode_literals

# Import napalm-logs pkgs
from napalm_logs.listener.base import ListenerBase
from napalm_logs.listener.kafka import KafkaListener
from napalm_logs.listener.tcp import TCPListener
from napalm_logs.listener.udp import UDPListener
from napalm_logs.listener.kafka import HAS_KAFKA

LISTENER_LOOKUP = {
'tcp': TCPListener,
'udp': UDPListener,
'*': UDPListener # default listener
}

if HAS_KAFKA:
LISTENER_LOOKUP['kafka'] = KafkaListener

def get_listener(name):
'''
Return the listener class.
'''
return LISTENER_LOOKUP.get(name, LISTENER_LOOKUP['*'])

__all__ = (
'get_listener',
)
21 changes: 21 additions & 0 deletions napalm_logs/listener/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# -*- coding: utf-8 -*-
'''
napalm-logs listener base.
'''


class ListenerBase:
'''
The base class for the listener.
'''
def __init__(self, addr, port):
pass

def start(self):
pass

def publish(self, obj):
pass

def stop(self):
pass
79 changes: 79 additions & 0 deletions napalm_logs/listener/kafka.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# -*- coding: utf-8 -*-
'''
Kafka listener for napalm-logs.
'''
from __future__ import absolute_import
from __future__ import unicode_literals

# Import stdlib
import json
import time
import signal
import logging

# Import third party libs
try:
import kafka
HAS_KAFKA = True
except ImportError as err:
HAS_KAFKA = False

# Import napalm-logs pkgs
from napalm_logs.listener.base import ListenerBase
from napalm_logs.config import KAFKA_LISTENER_TOPIC
from napalm_logs.exceptions import ListenerException

log = logging.getLogger(__name__)


class KafkaListener(ListenerBase):
'''
Kafka listener class.
'''
def __init__(self, address, port, pipe):
self.address = address
self.port = port
self.pipe = pipe
self.topic = KAFKA_LISTENER_TOPIC
self.__up = False

def _exit_gracefully(self, signum, _):
log.debug('Caught signal in listener process')
self.stop()

def start(self):
'''
Start listening for messages
'''
# Start suicide polling thread
signal.signal(signal.SIGTERM, self._exit_gracefully)
self.__up = True
try:
self.consumer = kafka.KafkaConsumer(bootstrap_servers='{}:{}'.format(self.address, self.port),
group_id='napalm-logs')
except kafka.errors.NoBrokersAvailable as err:
log.error(err, exc_info=True)
raise ListenerException(err)
self.consumer.subscribe(topics=[self.topic])
while self.__up:
try:
msg = next(self.consumer)
except ValueError as error:
if self.__up is False:
return
else:
msg = 'Received kafka error: {}'.format(error)
log.error(msg, exc_info=True)
raise NapalmLogsExit(msg)
log_source = msg.key
decoded = json.loads(msg.value)
log_message = decoded.get('message')
log.debug('[{2}] Received {0} from {1}. Adding in the queue'.format(log_message, log_source, time.time()))
self.pipe.send((log_message, log_source))

def stop(self):
log.info('Stopping listener process')
self.__up = False
self.consumer.unsubscribe()
self.consumer.close()
self.pipe.close()
77 changes: 13 additions & 64 deletions napalm_logs/listener.py → napalm_logs/listener/tcp.py
Original file line number Diff line number Diff line change
@@ -1,57 +1,44 @@
# -*- coding: utf-8 -*-
'''
Listener worker process
Syslog TCP listener for napalm-logs.
'''
from __future__ import absolute_import
from __future__ import unicode_literals

# Import pythond stdlib
import os
import time
import signal
import socket
import logging
import threading

# Import third party libs
import zmq
import umsgpack

# Import napalm-logs pkgs
from napalm_logs.config import TIMEOUT
from napalm_logs.config import LST_IPC_URL
from napalm_logs.config import BUFFER_SIZE
from napalm_logs.proc import NapalmLogsProc
from napalm_logs.listener.base import ListenerBase
# exceptions
from napalm_logs.exceptions import BindException
from napalm_logs.exceptions import ListenerException
from napalm_logs.exceptions import NapalmLogsExit

log = logging.getLogger(__name__)


class NapalmLogsListenerProc(NapalmLogsProc):
class TCPListener(ListenerBase):
'''
Listener sub-process class.
TCP syslog listener class
'''
def __init__(self, address, port, protocol, pipe):
def __init__(self, address, port, pipe):
self.address = address
self.port = port
self.protocol = protocol
self.pipe = pipe
self.__up = False

def _exit_gracefully(self, signum, _):
log.debug('Caught signal in listener process')
self.stop()

def _setup_ipc(self):
'''
Setup the IPC publisher.
'''
ctx = zmq.Context()
self.pub = ctx.socket(zmq.PUSH)
self.pub.bind(LST_IPC_URL)

def _open_socket(self, socket_type):
'''
Open the socket to listen for messages on
Expand Down Expand Up @@ -79,71 +66,33 @@ def _tcp_connection(self, conn, addr):
log.debug('Closing connection with {}'.format(addr))
conn.close()

def start_tcp(self):
def start(self):
'''
Start listening for messages on TCP and queue them
Start listening for messages
'''
signal.signal(signal.SIGTERM, self._exit_gracefully)
self.__up = True
self.skt = self._open_socket(socket.SOCK_STREAM)
try:
self.skt.bind((self.address, self.port))
except socket.error as msg:
error_string = 'Unable to bind to port {} on {}: {}'.format(self.port, self.address, msg)
log.error(error_string, exc_info=True)
raise BindException(error_string)
raise ListenerException(error_string)

while self.__up:
self.skt.listen(1)
try:
conn, addr = self.skt.accept()
except socket.error as error:
if self.__up is False:
return
else:
msg = 'Received listener socket error: {}'.format(error)
log.error(msg, exc_info=True)
raise NapalmLogsExit(msg)
thread = threading.Thread(target=self._tcp_connection, args=(conn, addr,))
thread.start()

def start_udp(self):
'''
Start listening for messages on UDP and queue them
'''
self.skt = self._open_socket(socket.SOCK_DGRAM)
try:
self.skt.bind((self.address, self.port))
except socket.error as msg:
error_string = 'Unable to bind to port {} on {}: {}'.format(self.port, self.address, msg)
log.error(error_string, exc_info=True)
raise BindException(error_string)

while self.__up:
try:
msg, addr = self.skt.recvfrom(BUFFER_SIZE)
except socket.error as error:
if self.__up is False:
return
else:
msg = 'Received listener socket error: {}'.format(error)
log.error(msg, exc_info=True)
raise NapalmLogsExit(msg)
log.debug('[{2}] Received {0} from {1}. Adding in the queue'.format(msg, addr, time.time()))
self.pipe.send((msg, addr[0]))

def start(self):
'''
Start listening for messages on the appropriate protocol
'''
# self._setup_ipc()
# Start suicide polling thread
thread = threading.Thread(target=self._suicide_when_without_parent, args=(os.getppid(),))
thread.start()
signal.signal(signal.SIGTERM, self._exit_gracefully)
self.__up = True
if self.protocol == 'tcp':
self.start_tcp()
else:
self.start_udp()
thread = threading.Thread(target=self._tcp_connection, args=(conn, addr,))
thread.start()

def stop(self):
log.info('Stopping listener process')
Expand Down
Loading