Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhancements #80

Merged
merged 8 commits into from
May 26, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ deploy:
skip_cleanup: true
api-key:
secure: Rxl45qbTHWIbOhst3PS60ETfW5wDByxp0xv4ZbtgRGe4SPvHtOLHRNGiajsQX37pgUFF9ALcCseY2cTk46jNEA1jOzFx4DDSKyH+Wu4H5F4M8JDBBlIsvsgezumLsYMqOL18caZA8J84N9UyuzgdPBDb0B0mMclRa9xRaxWncrUZgXwW9r3N2zU1LvGtd0Su4zLXXP6HC6mKHdOOaNSDONqaesx1njYTGr5fbWy7IXrjSg75wWCtHW1dKDPXmyyWZomwpmhURYfYXn/o9lRaXSDpLWx4xTsbJQdG9EiSPm5fLjfv9tZTxIF7jB0tTrOB63gGAgrLu0zC5Z5MJ1Y0+sbotI8eySI4w0GTffhi4WQjTTyO02vgPuSCm9JV5aW+YeNJtSncEgaVgsuUmZUiWdqMsvPG+bqOjh/i0eIkHr/v7cyf3HndFieZH9H3XdlEDtyr4SRExQSjG+be6mcGOJMWMrXervcW6kGP3pcX7EWgrFxnkz9lSgx/0meNMP4JDo8pZWg50b0xpni3zUcweTgCIeYUBd5aIKUvPaCqSHC1BAyZI5z3Cvdlq0tjCS726drQcV4OJNjrnmb301/K6MBbXhAsyhbkB1NpUZ0k0ZwmGxQ7iE4N1pod2BQbTPxjNUL1KNQJXFvjr9Clrw9Arqo6X9S9t//GP2DDl5Ke5KQ=
name: logagg-0.3.1
tag_name: 0.3.1
name: logagg-0.3.2
tag_name: 0.3.2
on:
branch: master
repo: deep-compute/logagg
Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# logagg
logs aggregation framework
**logs aggregation framework**
![screenshot from 2018-04-10 19-23-20](https://user-images.githubusercontent.com/33823698/39081130-94c32ab8-4559-11e8-8dbe-9208b4ed4cf3.png)

Collects all the logs from the server and parses it for making a common schema for all the logs and stores at given storage engine.

Expand Down
263 changes: 208 additions & 55 deletions logagg/collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,16 @@
After a downtime of collector, pygtail is missing logs from rotational files
'''

def load_formatter_fn(formatter):
'''
>>> load_formatter_fn('logagg.formatters.basescript') #doctest: +ELLIPSIS
<function basescript at 0x...>
'''
obj = util.load_object(formatter)
if not hasattr(obj, 'ispartial'):
obj.ispartial = util.ispartial
return obj

class LogCollector(object):
DESC = 'Collects the log information and sends to NSQTopic'

Expand All @@ -33,7 +43,6 @@ class LogCollector(object):
SCAN_FPATTERNS_INTERVAL = 30 # How often to scan filesystem for files matching fpatterns
HOST = socket.gethostname()
HEARTBEAT_RESTART_INTERVAL = 30 # Wait time if heartbeat sending stops
#TODO check for structure in validate_log_format

LOG_STRUCTURE = {
'id': basestring,
Expand All @@ -50,8 +59,11 @@ class LogCollector(object):
'error_tb' : basestring,
}

def __init__(self, fpaths, nsq_sender,
heartbeat_interval, log=util.DUMMY_LOGGER):
def __init__(self,
fpaths,
heartbeat_interval,
nsq_sender=util.DUMMY,
log=util.DUMMY):
self.fpaths = fpaths
self.nsq_sender = nsq_sender
self.heartbeat_interval = heartbeat_interval
Expand All @@ -64,52 +76,168 @@ def __init__(self, fpaths, nsq_sender,
self.queue = Queue.Queue(maxsize=self.QUEUE_MAX_SIZE)

def _remove_redundancy(self, log):
"""Removes duplicate data from 'data' inside log dict and brings it
out.

>>> lc = LogCollector('file=/path/to/log_file.log:formatter=logagg.formatters.basescript', 30)

>>> log = {'id' : 46846876, 'type' : 'log',
... 'data' : {'a' : 1, 'b' : 2, 'type' : 'metric'}}
>>> lc._remove_redundancy(log)
{'data': {'a': 1, 'b': 2}, 'type': 'metric', 'id': 46846876}
"""
for key in log:
if key in log and key in log['data']:
log[key] = log['data'].pop(key)
return log

def validate_log_format(self, log):
'''
>>> lc = LogCollector('file=/path/to/file.log:formatter=logagg.formatters.basescript', 30)

>>> incomplete_log = {'data' : {'x' : 1, 'y' : 2},
... 'raw' : 'Not all keys present'}
>>> lc.validate_log_format(incomplete_log)
'failed'

>>> redundant_log = {'one_invalid_key' : 'Extra information',
... 'data': {'x' : 1, 'y' : 2},
... 'error': False,
... 'error_tb': '',
... 'event': 'event',
... 'file': '/path/to/file.log',
... 'formatter': 'logagg.formatters.mongodb',
... 'host': 'deepcompute-ThinkPad-E470',
... 'id': '0112358',
... 'level': 'debug',
... 'raw': 'some log line here',
... 'timestamp': '2018-04-07T14:06:17.404818',
... 'type': 'log'}
>>> lc.validate_log_format(redundant_log)
'failed'

>>> correct_log = {'data': {'x' : 1, 'y' : 2},
... 'error': False,
... 'error_tb': '',
... 'event': 'event',
... 'file': '/path/to/file.log',
... 'formatter': 'logagg.formatters.mongodb',
... 'host': 'deepcompute-ThinkPad-E470',
... 'id': '0112358',
... 'level': 'debug',
... 'raw': 'some log line here',
... 'timestamp': '2018-04-07T14:06:17.404818',
... 'type': 'log'}
>>> lc.validate_log_format(correct_log)
'passed'
'''

keys_in_log = set(log)
keys_in_log_structure = set(self.LOG_STRUCTURE)
try:
assert (keys_in_log == keys_in_log_structure)
except AssertionError as e:
self.log.warning('formatted_log_structure_rejected' ,
key_not_found = list(keys_in_log_structure-keys_in_log),
extra_keys_found = list(keys_in_log-keys_in_log_structure),
num_logs=1,
type='metric')
return 'failed'

for key in log:
assert (key in self.LOG_STRUCTURE)
try:
assert isinstance(log[key], self.LOG_STRUCTURE[key])
except AssertionError as e:
self.log.exception('formatted_log_structure_rejected' , log=log)
self.log.warning('formatted_log_structure_rejected' ,
key_datatype_not_matched = key,
datatype_expected = type(self.LOG_STRUCTURE[key]),
datatype_got = type(log[key]),
num_logs=1,
type='metric')
return 'failed'

return 'passed'

def _full_from_frags(self, frags):
full_line = '\n'.join([l for l, _ in frags])
line_info = frags[-1][-1]
return full_line, line_info

def _iter_logs(self, freader, fmtfn):
# FIXME: does not handle partial lines
# at the start of a file properly

frags = []

for line_info in freader:
line = line_info['line'][:-1] # remove new line char at the end

if not fmtfn.ispartial(line) and frags:
yield self._full_from_frags(frags)
frags = []

frags.append((line, line_info))

if frags:
yield self._full_from_frags(frags)

def assign_default_log_values(self, fpath, line, formatter):
'''
>>> lc = LogCollector('file=/path/to/log_file.log:formatter=logagg.formatters.basescript', 30)
>>> from pprint import pprint

>>> formatter = 'logagg.formatters.mongodb'
>>> fpath = '/var/log/mongodb/mongodb.log'
>>> line = 'some log line here'

>>> default_log = lc.assign_default_log_values(fpath, line, formatter)
>>> pprint(default_log) #doctest: +ELLIPSIS
{'data': {},
'error': False,
'error_tb': '',
'event': 'event',
'file': '/var/log/mongodb/mongodb.log',
'formatter': 'logagg.formatters.mongodb',
'host': '...',
'id': None,
'level': 'debug',
'raw': 'some log line here',
'timestamp': '...',
'type': 'log'}
'''
return dict(
id=None,
file=fpath,
host=self.HOST,
formatter=formatter,
event='event',
data={},
raw=line,
timestamp=datetime.datetime.utcnow().isoformat(),
type='log',
level='debug',
error= False,
error_tb='',
)

@keeprunning(LOG_FILE_POLL_INTERVAL, on_error=util.log_exception)
def collect_log_lines(self, log_file):
L = log_file
fpath = L['fpath']
self.log.debug('tracking_file_for_log_lines', fpath=fpath)
fmtfn = L['formatter_fn']
formatter = L['formatter']

freader = Pygtail(fpath)
for line_info in freader:
line = line_info['line'][:-1] # remove new line char at the end

# assign default values
log = dict(
id=None,
file=fpath,
host=self.HOST,
formatter=L['formatter'],
event='event',
data={},
raw=line,
timestamp=datetime.datetime.utcnow().isoformat(),
type='log',
level='debug',
error= False,
error_tb='',
)
for line, line_info in self._iter_logs(freader, fmtfn):
log = self.assign_default_log_values(fpath, line, formatter)

try:
_log = L['formatter_fn'](line)
_log = fmtfn(line)

if isinstance(_log, RawLog):
formatter, raw_log = _log['formatter'], _log['raw']
log.update(_log)
_log = util.load_object(formatter)(raw_log)
_log = load_formatter_fn(formatter)(raw_log)

log.update(_log)
except (SystemExit, KeyboardInterrupt) as e: raise
Expand All @@ -122,7 +250,7 @@ def collect_log_lines(self, log_file):
log['id'] = uuid.uuid1().hex

log = self._remove_redundancy(log)
self.validate_log_format(log)
if self.validate_log_format(log) == 'failed': continue

self.queue.put(dict(log=json.dumps(log),
freader=freader, line_info=line_info))
Expand Down Expand Up @@ -175,10 +303,8 @@ def _get_msgs_from_queue(self, msgs, timeout):
self.log.debug('got_msgs_from_mem_queue')
return msgs_pending, msgs_nbytes, read_from_q


@keeprunning(0, on_error=util.log_exception) # FIXME: what wait time var here?
def send_to_nsq(self, state):
self.log.debug('send_to_nsq')
msgs = []
should_push = False

Expand All @@ -199,10 +325,14 @@ def send_to_nsq(self, state):
msgs_nbytes=msgs_nbytes)

try:
self.log.debug('trying_to_push_to_nsq', msgs_length=len(msgs))
self.nsq_sender.handle_logs(msgs)
if isinstance(self.nsq_sender, type(util.DUMMY)):
for m in msgs:
self.log.info('final_log_format', log=m['log'])
else:
self.log.debug('trying_to_push_to_nsq', msgs_length=len(msgs))
self.nsq_sender.handle_logs(msgs)
self.log.debug('pushed_to_nsq', msgs_length=len(msgs))
self.confirm_success(msgs)
self.log.debug('pushed_to_nsq', msgs_length=len(msgs))
msgs = msgs_pending
state.last_push_ts = time.time()
except (SystemExit, KeyboardInterrupt): raise
Expand All @@ -225,37 +355,60 @@ def confirm_success(self, msgs):
@keeprunning(SCAN_FPATTERNS_INTERVAL, on_error=util.log_exception)
def _scan_fpatterns(self, state):
'''
fpaths = 'file=/var/log/nginx/access.log:formatter=logagg.formatters.nginx_access'
fpattern = '/var/log/nginx/access.log'
For a list of given fpatterns, this starts a thread
collecting log lines from file

>>> os.path.isfile = lambda path: path == '/path/to/log_file.log'
>>> lc = LogCollector('file=/path/to/log_file.log:formatter=logagg.formatters.basescript', 30)

>>> print(lc.fpaths)
file=/path/to/log_file.log:formatter=logagg.formatters.basescript

>>> print('formatters loaded:', lc.formatters)
{}
>>> print('log file reader threads started:', lc.log_reader_threads)
{}
>>> state = AttrDict(files_tracked=list())
>>> print('files bieng tracked:', state.files_tracked)
[]


>>> if not state.files_tracked:
>>> lc._scan_fpatterns(state)
>>> print('formatters loaded:', lc.formatters)
>>> print('log file reader threads started:', lc.log_reader_threads)
>>> print('files bieng tracked:', state.files_tracked)


'''
for f in self.fpaths:
fpattern, formatter =(a.split('=')[1] for a in f.split(':', 1))
self.log.debug('scan_fpatterns', fpattern=fpattern, formatter=formatter)
# TODO code for scanning fpatterns for the files not yet present goes here
fpaths = glob.glob(fpattern)
# Load formatter_fn if not in list
fpaths = list(set(fpaths) - set(state.files_tracked))
for fpath in fpaths:
if fpath not in state.files_tracked:
try:
formatter_fn = self.formatters.get(formatter,
util.load_object(formatter))
self.log.info('found_formatter_fn', fn=formatter)
self.formatters[formatter] = formatter_fn
except (SystemExit, KeyboardInterrupt): raise
except (ImportError, AttributeError):
self.log.exception('formatter_fn_not_found', fn=formatter)
sys.exit(-1)
# Start a thread for every file
self.log.info('found_log_file', log_file=fpath)
log_f = dict(fpath=fpath, fpattern=fpattern,
formatter=formatter, formatter_fn=formatter_fn)
log_key = (fpath, fpattern, formatter)
if log_key not in self.log_reader_threads:
self.log.info('starting_collect_log_lines_thread', log_key=log_key)
# There is no existing thread tracking this log file. Start one
log_reader_thread = util.start_daemon_thread(self.collect_log_lines, (log_f,))
self.log_reader_threads[log_key] = log_reader_thread
state.files_tracked.append(fpath)
try:
formatter_fn = self.formatters.get(formatter,
load_formatter_fn(formatter))
self.log.info('found_formatter_fn', fn=formatter)
self.formatters[formatter] = formatter_fn
except (SystemExit, KeyboardInterrupt): raise
except (ImportError, AttributeError):
self.log.exception('formatter_fn_not_found', fn=formatter)
sys.exit(-1)
# Start a thread for every file
self.log.info('found_log_file', log_file=fpath)
log_f = dict(fpath=fpath, fpattern=fpattern,
formatter=formatter, formatter_fn=formatter_fn)
log_key = (fpath, fpattern, formatter)
if log_key not in self.log_reader_threads:
self.log.info('starting_collect_log_lines_thread', log_key=log_key)
# There is no existing thread tracking this log file. Start one
log_reader_thread = util.start_daemon_thread(self.collect_log_lines, (log_f,))
self.log_reader_threads[log_key] = log_reader_thread
state.files_tracked.append(fpath)
time.sleep(self.SCAN_FPATTERNS_INTERVAL)

@keeprunning(HEARTBEAT_RESTART_INTERVAL, on_error=util.log_exception)
Expand Down
Loading