diff --git a/.travis.yml b/.travis.yml index 1f90ece..b8a7df0 100644 --- a/.travis.yml +++ b/.travis.yml @@ -18,8 +18,8 @@ deploy: skip_cleanup: true api-key: secure: Rxl45qbTHWIbOhst3PS60ETfW5wDByxp0xv4ZbtgRGe4SPvHtOLHRNGiajsQX37pgUFF9ALcCseY2cTk46jNEA1jOzFx4DDSKyH+Wu4H5F4M8JDBBlIsvsgezumLsYMqOL18caZA8J84N9UyuzgdPBDb0B0mMclRa9xRaxWncrUZgXwW9r3N2zU1LvGtd0Su4zLXXP6HC6mKHdOOaNSDONqaesx1njYTGr5fbWy7IXrjSg75wWCtHW1dKDPXmyyWZomwpmhURYfYXn/o9lRaXSDpLWx4xTsbJQdG9EiSPm5fLjfv9tZTxIF7jB0tTrOB63gGAgrLu0zC5Z5MJ1Y0+sbotI8eySI4w0GTffhi4WQjTTyO02vgPuSCm9JV5aW+YeNJtSncEgaVgsuUmZUiWdqMsvPG+bqOjh/i0eIkHr/v7cyf3HndFieZH9H3XdlEDtyr4SRExQSjG+be6mcGOJMWMrXervcW6kGP3pcX7EWgrFxnkz9lSgx/0meNMP4JDo8pZWg50b0xpni3zUcweTgCIeYUBd5aIKUvPaCqSHC1BAyZI5z3Cvdlq0tjCS726drQcV4OJNjrnmb301/K6MBbXhAsyhbkB1NpUZ0k0ZwmGxQ7iE4N1pod2BQbTPxjNUL1KNQJXFvjr9Clrw9Arqo6X9S9t//GP2DDl5Ke5KQ= - name: logagg-0.3.1 - tag_name: 0.3.1 + name: logagg-0.3.2 + tag_name: 0.3.2 on: branch: master repo: deep-compute/logagg diff --git a/README.md b/README.md index 54a426d..7c1e9a8 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,6 @@ # logagg -logs aggregation framework +**logs aggregation framework** +![screenshot from 2018-04-10 19-23-20](https://user-images.githubusercontent.com/33823698/39081130-94c32ab8-4559-11e8-8dbe-9208b4ed4cf3.png) Collects all the logs from the server and parses it for making a common schema for all the logs and stores at given storage engine. diff --git a/logagg/collector.py b/logagg/collector.py index fc4dd30..a60a718 100644 --- a/logagg/collector.py +++ b/logagg/collector.py @@ -20,6 +20,16 @@ After a downtime of collector, pygtail is missing logs from rotational files ''' +def load_formatter_fn(formatter): + ''' + >>> load_formatter_fn('logagg.formatters.basescript') #doctest: +ELLIPSIS + + ''' + obj = util.load_object(formatter) + if not hasattr(obj, 'ispartial'): + obj.ispartial = util.ispartial + return obj + class LogCollector(object): DESC = 'Collects the log information and sends to NSQTopic' @@ -33,7 +43,6 @@ class LogCollector(object): SCAN_FPATTERNS_INTERVAL = 30 # How often to scan filesystem for files matching fpatterns HOST = socket.gethostname() HEARTBEAT_RESTART_INTERVAL = 30 # Wait time if heartbeat sending stops - #TODO check for structure in validate_log_format LOG_STRUCTURE = { 'id': basestring, @@ -50,8 +59,11 @@ class LogCollector(object): 'error_tb' : basestring, } - def __init__(self, fpaths, nsq_sender, - heartbeat_interval, log=util.DUMMY_LOGGER): + def __init__(self, + fpaths, + heartbeat_interval, + nsq_sender=util.DUMMY, + log=util.DUMMY): self.fpaths = fpaths self.nsq_sender = nsq_sender self.heartbeat_interval = heartbeat_interval @@ -64,52 +76,168 @@ def __init__(self, fpaths, nsq_sender, self.queue = Queue.Queue(maxsize=self.QUEUE_MAX_SIZE) def _remove_redundancy(self, log): + """Removes duplicate data from 'data' inside log dict and brings it + out. + + >>> lc = LogCollector('file=/path/to/log_file.log:formatter=logagg.formatters.basescript', 30) + + >>> log = {'id' : 46846876, 'type' : 'log', + ... 'data' : {'a' : 1, 'b' : 2, 'type' : 'metric'}} + >>> lc._remove_redundancy(log) + {'data': {'a': 1, 'b': 2}, 'type': 'metric', 'id': 46846876} + """ for key in log: if key in log and key in log['data']: log[key] = log['data'].pop(key) return log def validate_log_format(self, log): + ''' + >>> lc = LogCollector('file=/path/to/file.log:formatter=logagg.formatters.basescript', 30) + + >>> incomplete_log = {'data' : {'x' : 1, 'y' : 2}, + ... 'raw' : 'Not all keys present'} + >>> lc.validate_log_format(incomplete_log) + 'failed' + + >>> redundant_log = {'one_invalid_key' : 'Extra information', + ... 'data': {'x' : 1, 'y' : 2}, + ... 'error': False, + ... 'error_tb': '', + ... 'event': 'event', + ... 'file': '/path/to/file.log', + ... 'formatter': 'logagg.formatters.mongodb', + ... 'host': 'deepcompute-ThinkPad-E470', + ... 'id': '0112358', + ... 'level': 'debug', + ... 'raw': 'some log line here', + ... 'timestamp': '2018-04-07T14:06:17.404818', + ... 'type': 'log'} + >>> lc.validate_log_format(redundant_log) + 'failed' + + >>> correct_log = {'data': {'x' : 1, 'y' : 2}, + ... 'error': False, + ... 'error_tb': '', + ... 'event': 'event', + ... 'file': '/path/to/file.log', + ... 'formatter': 'logagg.formatters.mongodb', + ... 'host': 'deepcompute-ThinkPad-E470', + ... 'id': '0112358', + ... 'level': 'debug', + ... 'raw': 'some log line here', + ... 'timestamp': '2018-04-07T14:06:17.404818', + ... 'type': 'log'} + >>> lc.validate_log_format(correct_log) + 'passed' + ''' + + keys_in_log = set(log) + keys_in_log_structure = set(self.LOG_STRUCTURE) + try: + assert (keys_in_log == keys_in_log_structure) + except AssertionError as e: + self.log.warning('formatted_log_structure_rejected' , + key_not_found = list(keys_in_log_structure-keys_in_log), + extra_keys_found = list(keys_in_log-keys_in_log_structure), + num_logs=1, + type='metric') + return 'failed' + for key in log: - assert (key in self.LOG_STRUCTURE) try: assert isinstance(log[key], self.LOG_STRUCTURE[key]) except AssertionError as e: - self.log.exception('formatted_log_structure_rejected' , log=log) + self.log.warning('formatted_log_structure_rejected' , + key_datatype_not_matched = key, + datatype_expected = type(self.LOG_STRUCTURE[key]), + datatype_got = type(log[key]), + num_logs=1, + type='metric') + return 'failed' + + return 'passed' + + def _full_from_frags(self, frags): + full_line = '\n'.join([l for l, _ in frags]) + line_info = frags[-1][-1] + return full_line, line_info + + def _iter_logs(self, freader, fmtfn): + # FIXME: does not handle partial lines + # at the start of a file properly + + frags = [] + + for line_info in freader: + line = line_info['line'][:-1] # remove new line char at the end + + if not fmtfn.ispartial(line) and frags: + yield self._full_from_frags(frags) + frags = [] + + frags.append((line, line_info)) + + if frags: + yield self._full_from_frags(frags) + + def assign_default_log_values(self, fpath, line, formatter): + ''' + >>> lc = LogCollector('file=/path/to/log_file.log:formatter=logagg.formatters.basescript', 30) + >>> from pprint import pprint + + >>> formatter = 'logagg.formatters.mongodb' + >>> fpath = '/var/log/mongodb/mongodb.log' + >>> line = 'some log line here' + + >>> default_log = lc.assign_default_log_values(fpath, line, formatter) + >>> pprint(default_log) #doctest: +ELLIPSIS + {'data': {}, + 'error': False, + 'error_tb': '', + 'event': 'event', + 'file': '/var/log/mongodb/mongodb.log', + 'formatter': 'logagg.formatters.mongodb', + 'host': '...', + 'id': None, + 'level': 'debug', + 'raw': 'some log line here', + 'timestamp': '...', + 'type': 'log'} + ''' + return dict( + id=None, + file=fpath, + host=self.HOST, + formatter=formatter, + event='event', + data={}, + raw=line, + timestamp=datetime.datetime.utcnow().isoformat(), + type='log', + level='debug', + error= False, + error_tb='', + ) @keeprunning(LOG_FILE_POLL_INTERVAL, on_error=util.log_exception) def collect_log_lines(self, log_file): L = log_file fpath = L['fpath'] - self.log.debug('tracking_file_for_log_lines', fpath=fpath) + fmtfn = L['formatter_fn'] + formatter = L['formatter'] freader = Pygtail(fpath) - for line_info in freader: - line = line_info['line'][:-1] # remove new line char at the end - - # assign default values - log = dict( - id=None, - file=fpath, - host=self.HOST, - formatter=L['formatter'], - event='event', - data={}, - raw=line, - timestamp=datetime.datetime.utcnow().isoformat(), - type='log', - level='debug', - error= False, - error_tb='', - ) + for line, line_info in self._iter_logs(freader, fmtfn): + log = self.assign_default_log_values(fpath, line, formatter) try: - _log = L['formatter_fn'](line) + _log = fmtfn(line) if isinstance(_log, RawLog): formatter, raw_log = _log['formatter'], _log['raw'] log.update(_log) - _log = util.load_object(formatter)(raw_log) + _log = load_formatter_fn(formatter)(raw_log) log.update(_log) except (SystemExit, KeyboardInterrupt) as e: raise @@ -122,7 +250,7 @@ def collect_log_lines(self, log_file): log['id'] = uuid.uuid1().hex log = self._remove_redundancy(log) - self.validate_log_format(log) + if self.validate_log_format(log) == 'failed': continue self.queue.put(dict(log=json.dumps(log), freader=freader, line_info=line_info)) @@ -175,10 +303,8 @@ def _get_msgs_from_queue(self, msgs, timeout): self.log.debug('got_msgs_from_mem_queue') return msgs_pending, msgs_nbytes, read_from_q - @keeprunning(0, on_error=util.log_exception) # FIXME: what wait time var here? def send_to_nsq(self, state): - self.log.debug('send_to_nsq') msgs = [] should_push = False @@ -199,10 +325,14 @@ def send_to_nsq(self, state): msgs_nbytes=msgs_nbytes) try: - self.log.debug('trying_to_push_to_nsq', msgs_length=len(msgs)) - self.nsq_sender.handle_logs(msgs) + if isinstance(self.nsq_sender, type(util.DUMMY)): + for m in msgs: + self.log.info('final_log_format', log=m['log']) + else: + self.log.debug('trying_to_push_to_nsq', msgs_length=len(msgs)) + self.nsq_sender.handle_logs(msgs) + self.log.debug('pushed_to_nsq', msgs_length=len(msgs)) self.confirm_success(msgs) - self.log.debug('pushed_to_nsq', msgs_length=len(msgs)) msgs = msgs_pending state.last_push_ts = time.time() except (SystemExit, KeyboardInterrupt): raise @@ -225,8 +355,31 @@ def confirm_success(self, msgs): @keeprunning(SCAN_FPATTERNS_INTERVAL, on_error=util.log_exception) def _scan_fpatterns(self, state): ''' - fpaths = 'file=/var/log/nginx/access.log:formatter=logagg.formatters.nginx_access' - fpattern = '/var/log/nginx/access.log' + For a list of given fpatterns, this starts a thread + collecting log lines from file + + >>> os.path.isfile = lambda path: path == '/path/to/log_file.log' + >>> lc = LogCollector('file=/path/to/log_file.log:formatter=logagg.formatters.basescript', 30) + + >>> print(lc.fpaths) + file=/path/to/log_file.log:formatter=logagg.formatters.basescript + + >>> print('formatters loaded:', lc.formatters) + {} + >>> print('log file reader threads started:', lc.log_reader_threads) + {} + >>> state = AttrDict(files_tracked=list()) + >>> print('files bieng tracked:', state.files_tracked) + [] + + + >>> if not state.files_tracked: + >>> lc._scan_fpatterns(state) + >>> print('formatters loaded:', lc.formatters) + >>> print('log file reader threads started:', lc.log_reader_threads) + >>> print('files bieng tracked:', state.files_tracked) + + ''' for f in self.fpaths: fpattern, formatter =(a.split('=')[1] for a in f.split(':', 1)) @@ -234,28 +387,28 @@ def _scan_fpatterns(self, state): # TODO code for scanning fpatterns for the files not yet present goes here fpaths = glob.glob(fpattern) # Load formatter_fn if not in list + fpaths = list(set(fpaths) - set(state.files_tracked)) for fpath in fpaths: - if fpath not in state.files_tracked: - try: - formatter_fn = self.formatters.get(formatter, - util.load_object(formatter)) - self.log.info('found_formatter_fn', fn=formatter) - self.formatters[formatter] = formatter_fn - except (SystemExit, KeyboardInterrupt): raise - except (ImportError, AttributeError): - self.log.exception('formatter_fn_not_found', fn=formatter) - sys.exit(-1) - # Start a thread for every file - self.log.info('found_log_file', log_file=fpath) - log_f = dict(fpath=fpath, fpattern=fpattern, - formatter=formatter, formatter_fn=formatter_fn) - log_key = (fpath, fpattern, formatter) - if log_key not in self.log_reader_threads: - self.log.info('starting_collect_log_lines_thread', log_key=log_key) - # There is no existing thread tracking this log file. Start one - log_reader_thread = util.start_daemon_thread(self.collect_log_lines, (log_f,)) - self.log_reader_threads[log_key] = log_reader_thread - state.files_tracked.append(fpath) + try: + formatter_fn = self.formatters.get(formatter, + load_formatter_fn(formatter)) + self.log.info('found_formatter_fn', fn=formatter) + self.formatters[formatter] = formatter_fn + except (SystemExit, KeyboardInterrupt): raise + except (ImportError, AttributeError): + self.log.exception('formatter_fn_not_found', fn=formatter) + sys.exit(-1) + # Start a thread for every file + self.log.info('found_log_file', log_file=fpath) + log_f = dict(fpath=fpath, fpattern=fpattern, + formatter=formatter, formatter_fn=formatter_fn) + log_key = (fpath, fpattern, formatter) + if log_key not in self.log_reader_threads: + self.log.info('starting_collect_log_lines_thread', log_key=log_key) + # There is no existing thread tracking this log file. Start one + log_reader_thread = util.start_daemon_thread(self.collect_log_lines, (log_f,)) + self.log_reader_threads[log_key] = log_reader_thread + state.files_tracked.append(fpath) time.sleep(self.SCAN_FPATTERNS_INTERVAL) @keeprunning(HEARTBEAT_RESTART_INTERVAL, on_error=util.log_exception) diff --git a/logagg/command.py b/logagg/command.py index 5764437..e6963df 100644 --- a/logagg/command.py +++ b/logagg/command.py @@ -5,18 +5,22 @@ from logagg.nsqsender import NSQSender from logagg import util + class LogaggCommand(BaseScript): DESC = 'Logagg command line tool' def collect(self): - nsq_sender = NSQSender(self.args.nsqd_http_address, - self.args.nsqtopic, - self.args.depth_limit_at_nsq, - self.log) + if not self.args.nsqd_http_address: + nsq_sender = util.DUMMY + else: + nsq_sender = NSQSender(self.args.nsqd_http_address, + self.args.nsqtopic, + self.args.depth_limit_at_nsq, + self.log) collector = LogCollector( self.args.file, - nsq_sender, self.args.heartbeat_interval, + nsq_sender, self.log) collector.start() @@ -52,26 +56,32 @@ def define_subcommands(self, subcommands): help='Collects the logs from \ different processes and sends to nsq') collect_cmd.set_defaults(func=self.collect) - collect_cmd.add_argument('--file', nargs='+', - help='Provide absolute path of logfile \ + collect_cmd.add_argument( + '--file', nargs='+', help='Provide absolute path of logfile \ including module name and function name,' - 'format: file=:formatter=,' - 'eg: file=/var/log/nginx/access.log:formatter=logagg.formatters.nginx_access') - collect_cmd.add_argument('--nsqtopic', - default='test_topic', - help='Topic name to publish messages. Ex: logs_and_metrics') - collect_cmd.add_argument('--nsqd-http-address', - default='localhost:4151', - help='nsqd http address where we send the messages') + 'format: file=:formatter=,' + 'eg: file=/var/log/nginx/access.log:formatter=logagg.formatters.nginx_access') + collect_cmd.add_argument( + '--nsqtopic', + nargs='?', + default='test_topic', + help='Topic name to publish messages. Ex: logs_and_metrics') + collect_cmd.add_argument( + '--nsqd-http-address', + nargs='?', + help='nsqd http address where we send the messages, eg. localhost:4151') collect_cmd.add_argument('--depth-limit-at-nsq', type=int, default=10000000, help='To limit the depth at nsq topic') - collect_cmd.add_argument('--heartbeat-interval', - type=int, default=30, - help='Time interval at which regular heartbeats to a nsqTopic "heartbeat" to know which hosts are running logagg') + collect_cmd.add_argument( + '--heartbeat-interval', + type=int, + default=30, + help='Time interval at which regular heartbeats to a nsqTopic \ + "heartbeat" to know which hosts are running logagg') forward_cmd = subcommands.add_parser('forward', - help='Collects all the messages\ + help='Collects all the messages\ from nsq and pushes to storage engine') forward_cmd.set_defaults(func=self.forward) forward_cmd.add_argument( @@ -80,12 +90,15 @@ def define_subcommands(self, subcommands): forward_cmd.add_argument( '--nsqchannel', help='NSQ channel name to read messages from. Ex: logs_and_metrics') - forward_cmd.add_argument('--nsqd-tcp-address', - default='localhost:4150', help='nsqd tcp address where we get the messages') - forward_cmd.add_argument('-t', '--target', nargs= '+', - help='Target database and database details,' - 'format: "forwarder=:host=:port=:user=:password=:db=:collection=",' - 'Ex: forwarder=logagg.forwarders.MongoDBForwarder:host=localhost:port=27017:user=some_user:password=xxxxx:db=logagg:collection=cluster_logs_and_metrics') + forward_cmd.add_argument( + '--nsqd-tcp-address', + default='localhost:4150', + help='nsqd tcp address where we get the messages') + forward_cmd.add_argument( + '-t', '--target', nargs='+', help='Target database and database details,' + 'format: "forwarder=:host=:port=:user=:password=:db=:collection=",' + 'Ex: forwarder=logagg.forwarders.MongoDBForwarder:host=localhost:port=27017:user=some_user:password=xxxxx:db=logagg:collection=cluster_logs_and_metrics') + def main(): LogaggCommand().start() diff --git a/logagg/formatters.py b/logagg/formatters.py index 851e53d..93421da 100644 --- a/logagg/formatters.py +++ b/logagg/formatters.py @@ -229,9 +229,7 @@ def basescript(line): def elasticsearch(line): ''' >>> import pprint - >>> input_line = '[2017-08-30T06:27:19,158] \ -... [WARN ][o.e.m.j.JvmGcMonitorService] [Glsuj_2] [gc][296816] \ -... overhead, spent [1.2s] collecting in the last [1.3s]' + >>> input_line = '[2017-08-30T06:27:19,158] [WARN ][o.e.m.j.JvmGcMonitorService] [Glsuj_2] [gc][296816] overhead, spent [1.2s] collecting in the last [1.3s]' >>> output_line = elasticsearch(input_line) >>> pprint.pprint(output_line) {'data': {'garbage_collector': 'gc', @@ -293,3 +291,26 @@ def elasticsearch(line): timestamp=datetime.datetime.isoformat(datetime.datetime.now()), data={'raw': line} ) + +LOG_BEGIN_PATTERN = [re.compile(r'^\s+\['), re.compile(r'^\[')] + +def elasticsearch_ispartial_log(line): + ''' + >>> line1 = ' [2018-04-03T00:22:38,048][DEBUG][o.e.c.u.c.QueueResizingEsThreadPoolExecutor] [search17/search]: there were [2000] tasks in [809ms], avg task time [28.4micros], EWMA task execution [790nanos], [35165.36 tasks/s], optimal queue is [35165], current capacity [1000]' + >>> line2 = ' org.elasticsearch.ResourceAlreadyExistsException: index [media_corpus_refresh/6_3sRAMsRr2r63J6gbOjQw] already exists' + >>> line3 = ' at org.elasticsearch.cluster.metadata.MetaDataCreateIndexService.validateIndexName(MetaDataCreateIndexService.java:151) ~[elasticsearch-6.2.0.jar:6.2.0]' + >>> elasticsearch_ispartial_log(line1) + False + >>> elasticsearch_ispartial_log(line2) + True + >>> elasticsearch_ispartial_log(line3) + True + ''' + match_result = [] + + for p in LOG_BEGIN_PATTERN: + if re.match(p, line) != None: + return False + return True + +elasticsearch.ispartial = elasticsearch_ispartial_log diff --git a/logagg/forwarder.py b/logagg/forwarder.py index e6c50cc..8a39ec5 100644 --- a/logagg/forwarder.py +++ b/logagg/forwarder.py @@ -4,16 +4,13 @@ from copy import deepcopy from multiprocessing.pool import ThreadPool -from deeputil import Dummy from logagg import util import ujson as json -DUMMY_LOGGER = Dummy() class LogForwarder(object): DESC = "Gets all the logs from nsq and stores in the storage engines" - QUEUE_EMPTY_SLEEP_TIME = 0.1 QUEUE_TIMEOUT = 1 QUEUE_MAX_SIZE = 50000 @@ -23,7 +20,7 @@ class LogForwarder(object): WAIT_TIME_TARGET_FAILURE = 2 - def __init__(self, message_source, targets, log=DUMMY_LOGGER): + def __init__(self, message_source, targets, log=util.DUMMY): self.message_source = message_source self.targets = targets @@ -72,37 +69,43 @@ def read_from_q(self): is_msg_limit_reached = len(msgs) >= self.MAX_MESSAGES_TO_PUSH is_max_time_elapsed = time_since_last_push >= self.MAX_SECONDS_TO_PUSH - should_push = len(msgs) > 0 and (is_max_time_elapsed or is_msg_limit_reached) + should_push = len(msgs) > 0 and ( + is_max_time_elapsed or is_msg_limit_reached) try: if should_push: self.log.debug('writing_messages_to_databases') self._write_messages(msgs) self._ack_messages(msgs) - self.log.debug('ack_to_nsq_is_done_for_msgs', num_msgs=len(msgs)) + self.log.debug('ack_to_nsq_is_done_for_msgs', + num_msgs=len(msgs)) msgs = [] last_push_ts = time.time() - except (SystemExit, KeyboardInterrupt): raise + except (SystemExit, KeyboardInterrupt): + raise def _ack_messages(self, msgs): for msg in msgs: try: msg.fin() - except (SystemExit, KeyboardInterrupt): raise - except: + except (SystemExit, KeyboardInterrupt): + raise + except BaseException: self.log.exception('msg_ack_failed') def _send_msgs_to_target(self, target, msgs): - while 1: + while True: try: target.handle_logs(msgs) break - except (SystemExit, KeyboardInterrupt): raise - except: + except (SystemExit, KeyboardInterrupt): + raise + except BaseException: # FIXME: do we log the failed messages themselves somewhere? - self.log.exception('_send_msgs_to_target_failed', target=target) + self.log.exception( + '_send_msgs_to_target_failed', target=target) time.sleep(self.WAIT_TIME_TARGET_FAILURE) # FIXME: also implement some sort of backoff sleep diff --git a/logagg/forwarders.py b/logagg/forwarders.py index 275fdea..e0f08d1 100644 --- a/logagg/forwarders.py +++ b/logagg/forwarders.py @@ -2,7 +2,7 @@ import ujson as json from deeputil import keeprunning -from logagg.util import DUMMY_LOGGER +from logagg.util import DUMMY class BaseForwarder(): @@ -24,15 +24,15 @@ def handle_logs(self, msgs): import pymongo from pymongo import MongoClient + class MongoDBForwarder(BaseForwarder): SERVER_SELECTION_TIMEOUT = 500 # MongoDB server selection timeout - # FIXME: normalize all var names def __init__(self, host, port, user, password, - db, collection, log=DUMMY_LOGGER): + db, collection, log=DUMMY): self.host = host self.port = port self.user = user @@ -41,8 +41,8 @@ def __init__(self, self.coll = collection self.log = log - self._ensure_connection() - + if host != 'no_host': + self._ensure_connection() # FIXME: clean up the logs @keeprunning(wait_secs=SERVER_SELECTION_TIMEOUT, exit_on_success=True) @@ -52,15 +52,60 @@ def _ensure_connection(self): self.passwd, self.host, self.port) - client = MongoClient(url, serverSelectionTimeoutMS=self.SERVER_SELECTION_TIMEOUT) + client = MongoClient( + url, serverSelectionTimeoutMS=self.SERVER_SELECTION_TIMEOUT) self.log.info('mongodb_server_connection_established', host=self.host) self.database = client[self.db_name] self.log.info('mongodb_database_created', db=self.db_name) self.collection = self.database[self.coll] - self.log.info('mongodb_collection_created' , - collection=self.collection, db=self.db_name) + self.log.info('mongodb_collection_created', + collection=self.collection, db=self.db_name) def _parse_msg_for_mongodb(self, msgs): + ''' + >>> mdbf = MongoDBForwarder('no_host', '27017', 'deadpool', + ... 'chimichanga', 'logs', 'collection') + >>> log = [{u'data': {u'_': {u'file': u'log.py', + ... u'fn': u'start', + ... u'ln': 8, + ... u'name': u'__main__'}, + ... u'a': 1, + ... u'b': 2, + ... u'msg': u'this is a dummy log'}, + ... u'error': False, + ... u'error_tb': u'', + ... u'event': u'some_log', + ... u'file': u'/var/log/sample.log', + ... u'formatter': u'logagg.formatters.basescript', + ... u'host': u'deepcompute', + ... u'id': u'20180409T095924_aec36d313bdc11e89da654e1ad04f45e', + ... u'level': u'info', + ... u'raw': u'{...}', + ... u'timestamp': u'2018-04-09T09:59:24.733945Z', + ... u'type': u'metric'}] + + >>> records = mdbf._parse_msg_for_mongodb(log) + >>> from pprint import pprint + >>> pprint(records) + [{'_id': u'20180409T095924_aec36d313bdc11e89da654e1ad04f45e', + u'data': {u'_': {u'file': u'log.py', + u'fn': u'start', + u'ln': 8, + u'name': u'__main__'}, + u'a': 1, + u'b': 2, + u'msg': u'this is a dummy log'}, + u'error': False, + u'error_tb': u'', + u'event': u'some_log', + u'file': u'/var/log/sample.log', + u'formatter': u'logagg.formatters.basescript', + u'host': u'deepcompute', + u'level': u'info', + u'raw': u'{...}', + u'timestamp': u'2018-04-09T09:59:24.733945Z', + u'type': u'metric'}] + ''' msgs_list = [] for msg in msgs: try: @@ -76,21 +121,22 @@ def handle_logs(self, msgs): self.log.debug('inserting_msgs_mongodb') self.collection.insert_many(msgs_list, ordered=False) self.log.info('logs_inserted_into_mongodb', - num_records=len(msgs), type='metric') + num_records=len(msgs), type='metric') except pymongo.errors.AutoReconnect(message='connection_to_mongodb_failed'): self._ensure_connection() except pymongo.errors.BulkWriteError as bwe: self.log.info('logs_inserted_into_mongodb', - num_records=bwe.details['nInserted'], type='metric', - records_not_inserted = bwe.details['writeErrors'], - num_records_missed= len(bwe.details['writeErrors'])) + num_records=bwe.details['nInserted'], type='metric', + records_not_inserted=bwe.details['writeErrors'], + num_records_missed=len(bwe.details['writeErrors'])) from influxdb import InfluxDBClient from influxdb.client import InfluxDBClientError from influxdb.client import InfluxDBServerError -from logagg.util import flatten_dict, is_number +from logagg.util import flatten_dict, is_number, MarkValue + class InfluxDBForwarder(BaseForwarder): EXCLUDE_TAGS = set(["id", "raw", "timestamp", "type", "event", "error"]) @@ -98,7 +144,7 @@ class InfluxDBForwarder(BaseForwarder): def __init__(self, host, port, user, password, - db, collection, log=DUMMY_LOGGER): + db, collection, log=DUMMY): self.host = host self.port = port self.user = user @@ -106,18 +152,57 @@ def __init__(self, self.db_name = db self.log = log - self._ensure_connection() + if host != 'no_host': + self._ensure_connection() def _ensure_connection(self): # Establish connection to influxDB to store metrics self.influxdb_client = InfluxDBClient(self.host, self.port, self.user, - self.passwd, self.db_name) + self.passwd, self.db_name) self.log.info('influxdb_server_connection_established', host=self.host) - self.influxdb_database = self.influxdb_client.create_database(self.db_name) + self.influxdb_database = self.influxdb_client.create_database( + self.db_name) self.log.info('influxdb_database_created', dbname=self.db_name) def _tag_and_field_maker(self, event): - + ''' + >>> idbf = InfluxDBForwarder('no_host', '8086', 'deadpool', + ... 'chimichanga', 'logs', 'collection') + >>> log = {u'data': {u'_': {u'file': u'log.py', + ... u'fn': u'start', + ... u'ln': 8, + ... u'name': u'__main__'}, + ... u'a': 1, + ... u'b': 2, + ... u'__ignore_this': 'some_string', + ... u'msg': u'this is a dummy log'}, + ... u'error': False, + ... u'error_tb': u'', + ... u'event': u'some_log', + ... u'file': u'/var/log/sample.log', + ... u'formatter': u'logagg.formatters.basescript', + ... u'host': u'deepcompute', + ... u'id': u'20180409T095924_aec36d313bdc11e89da654e1ad04f45e', + ... u'level': u'info', + ... u'raw': u'{...}', + ... u'timestamp': u'2018-04-09T09:59:24.733945Z', + ... u'type': u'metric'} + + >>> tags, fields = idbf._tag_and_field_maker(log) + >>> from pprint import pprint + >>> pprint(tags) + {u'data.msg': u'this is a dummy log', + u'error_tb': u'', + u'file': u'/var/log/sample.log', + u'formatter': u'logagg.formatters.basescript', + u'host': u'deepcompute', + u'level': u'info'} + >>> pprint(fields) + {u'data._': "{u'ln': 8, u'fn': u'start', u'file': u'log.py', u'name': u'__main__'}", + u'data.a': 1, + u'data.b': 2} + + ''' data = event.pop('data') data = flatten_dict({'data': data}) @@ -127,18 +212,68 @@ def _tag_and_field_maker(self, event): for k in data: v = data[k] - if is_number(v): + if is_number(v) or isinstance(v, MarkValue): f[k] = v else: + #if v.startswith('_'): f[k] = eval(v.split('_', 1)[1]) t[k] = v return t, f def _parse_msg_for_influxdb(self, msgs): + ''' + >>> from logagg.forwarders import InfluxDBForwarder + >>> idbf = InfluxDBForwarder('no_host', '8086', 'deadpool', + ... 'chimichanga', 'logs', 'collection') + + >>> valid_log = [{u'data': {u'_force_this_as_field': 'CXNS CNS nbkbsd', + ... u'a': 1, + ... u'b': 2, + ... u'msg': u'this is a dummy log'}, + ... u'error': False, + ... u'error_tb': u'', + ... u'event': u'some_log', + ... u'file': u'/var/log/sample.log', + ... u'formatter': u'logagg.formatters.basescript', + ... u'host': u'deepcompute', + ... u'id': u'20180409T095924_aec36d313bdc11e89da654e1ad04f45e', + ... u'level': u'info', + ... u'raw': u'{...}', + ... u'timestamp': u'2018-04-09T09:59:24.733945Z', + ... u'type': u'metric'}] + + >>> pointvalues = idbf._parse_msg_for_influxdb(valid_log) + >>> from pprint import pprint + >>> pprint(pointvalues) + [{'fields': {u'data._force_this_as_field': "'CXNS CNS nbkbsd'", + u'data.a': 1, + u'data.b': 2}, + 'measurement': u'some_log', + 'tags': {u'data.msg': u'this is a dummy log', + u'error_tb': u'', + u'file': u'/var/log/sample.log', + u'formatter': u'logagg.formatters.basescript', + u'host': u'deepcompute', + u'level': u'info'}, + 'time': u'2018-04-09T09:59:24.733945Z'}] + + >>> invalid_log = valid_log + >>> invalid_log[0]['error'] = True + >>> pointvalues = idbf._parse_msg_for_influxdb(invalid_log) + >>> pprint(pointvalues) + [] + + >>> invalid_log = valid_log + >>> invalid_log[0]['type'] = 'log' + >>> pointvalues = idbf._parse_msg_for_influxdb(invalid_log) + >>> pprint(pointvalues) + [] + ''' + series = [] for msg in msgs: - if msg.get('error') == True: + if msg.get('error'): continue if msg.get('type').lower() == 'metric': @@ -146,10 +281,10 @@ def _parse_msg_for_influxdb(self, msgs): measurement = msg.get('event') tags, fields = self._tag_and_field_maker(msg) pointvalues = { - "time": time, - "measurement": measurement, - "fields": fields, - "tags": tags } + "time": time, + "measurement": measurement, + "fields": fields, + "tags": tags} series.append(pointvalues) return series @@ -164,10 +299,10 @@ def handle_logs(self, msgs): self.log.debug('inserting_the_metrics_into_influxdb') self.influxdb_client.write_points(records) self.log.info('metrics_inserted_into_influxdb', - num_records=len(records), - type='metric') + num_records=len(records), + type='metric') except (InfluxDBClientError, InfluxDBServerError) as e: self.log.exception('failed_to_insert metric', - record=records, - num_records=len(records), - type='metric') + record=records, + num_records=len(records), + type='metric') diff --git a/logagg/nsqsender.py b/logagg/nsqsender.py index b5ac24b..3694149 100644 --- a/logagg/nsqsender.py +++ b/logagg/nsqsender.py @@ -11,13 +11,13 @@ class NSQSender(object): HEARTBEAT_TOPIC = 'Heartbeat#ephemeral' # Topic name at which heartbeat is to be sent MPUB_URL = 'http://%s/mpub?topic=%s' # Url to post msgs to NSQ - def __init__(self, http_loc, nsq_topic, nsq_max_depth, log=util.DUMMY_LOGGER): + def __init__(self, http_loc, nsq_topic, nsq_max_depth, log=util.DUMMY): self.nsqd_http_address = http_loc self.topic_name = nsq_topic self.nsq_max_depth = nsq_max_depth self.log = log - self.session = requests.Session() + self.session = requests self._ensure_topic(self.topic_name) self._ensure_topic(self.HEARTBEAT_TOPIC) @@ -27,7 +27,7 @@ def __init__(self, http_loc, nsq_topic, nsq_max_depth, log=util.DUMMY_LOGGER): def _ensure_topic(self, topic_name): u = 'http://%s/topic/create?topic=%s' % (self.nsqd_http_address, topic_name) try: - self.session.post(u) + self.session.post(u, timeout=1) except requests.exceptions.RequestException as e: self.log.exception('could_not_create_topic,retrying....', topic=topic_name) raise @@ -44,7 +44,7 @@ def _is_ready(self, topic_name): #Cheacking for ephmeral channels if '#' in topic_name: topic_name, tag =topic_name.split("#", 1) - + try: data = self.session.get(url).json() ''' @@ -56,10 +56,10 @@ def _is_ready(self, topic_name): ''' topics = data.get('topics', []) topics = [t for t in topics if t['topic_name'] == topic_name] - + if not topics: raise Exception('topic_missing_at_nsq') - + topic = topics[0] depth = topic['depth'] depth += sum(c.get('depth', 0) for c in topic['channels']) diff --git a/logagg/util.py b/logagg/util.py index 5c2f72c..cea0704 100644 --- a/logagg/util.py +++ b/logagg/util.py @@ -1,26 +1,29 @@ import collections from deeputil import Dummy -DUMMY_LOGGER = Dummy() from operator import attrgetter +DUMMY = Dummy() + + def memoize(f): # from: https://goo.gl/aXt4Qy class memodict(dict): __slots__ = () + def __missing__(self, key): self[key] = ret = f(key) return ret return memodict().__getitem__ + @memoize def load_object(imp_path): - ''' - Given a path (python import path), load the object. + """Given a path (python import path), load the object. eg of path: logagg.formatters.nginx_access : logagg.forwarders.mongodb - ''' + """ module_name, obj_name = imp_path.split('.', 1) module = __import__(module_name) obj = attrgetter(obj_name)(module) @@ -30,27 +33,31 @@ def load_object(imp_path): import traceback + def log_exception(self, __fn__): - self.log.exception('error_during_run_Continuing' , fn=__fn__.func_name, - tb=repr(traceback.format_exc())) + self.log.exception('error_during_run_Continuing', fn=__fn__.func_name, + tb=repr(traceback.format_exc())) from threading import Thread + def start_daemon_thread(target, args=()): + """starts a deamon thread for a given target function and arguments.""" th = Thread(target=target, args=args) th.daemon = True th.start() return th + def serialize_dict_keys(d, prefix=""): - """ returns all the keys in a dictionary + """returns all the keys in a dictionary. + >>> serialize_dict_keys({"a": {"b": {"c": 1, "b": 2} } }) ['a', 'a.b', 'a.b.c', 'a.b.b'] """ - keys = [] - for k,v in d.iteritems(): + for k, v in d.iteritems(): fqk = '%s%s' % (prefix, k) keys.append(fqk) if isinstance(v, dict): @@ -58,24 +65,56 @@ def serialize_dict_keys(d, prefix=""): return keys -def flatten_dict(d, parent_key='', sep='.', ignore_under_prefixed=True): +class MarkValue(str): pass + +def flatten_dict(d, parent_key='', sep='.', + ignore_under_prefixed=True, mark_value=True): ''' - >>> flatten_dict({"a": {"b": {"c": 1, "b": 2} } }) - {'a.b.b': 2, 'a.b.c': 1} + >>> flatten_dict({"a": {"b": {"c": 1, "b": 2, "__d": 'ignore', "_e": "mark"} } }) + {'a.b.b': 2, 'a.b.c': 1, 'a.b._e': "'mark'"} ''' items = {} for k in d: - if ignore_under_prefixed and k.startswith('_'): + if ignore_under_prefixed and k.startswith('__'): continue - v = d[k] + if mark_value and k.startswith('_') and not k.startswith('__'): + v = MarkValue(repr(v)) + new_key = sep.join((parent_key, k)) if parent_key else k if isinstance(v, collections.MutableMapping): - items.update(flatten_dict(v, new_key, sep=sep)) + items.update(flatten_dict(v, new_key, sep=sep, + ignore_under_prefixed=True, + mark_value=True) + ) else: items[new_key] = v return items + import numbers + def is_number(x): return isinstance(x, numbers.Number) + + +from re import match + + +spaces = (' ', '\t', '\n') +def ispartial(x): + ''' + If log line starts with a space it is recognized as a partial line + >>> ispartial('