diff --git a/.travis.yml b/.travis.yml index b958c04f..f7e382b9 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,9 +1,11 @@ language: python +dist: "xenial" python: - "2.7" - "3.4" - "3.5" - "3.6" + - "3.7" install: - pip install pylint pylint_runner ordereddict mysqlclient requests feedparser prometheus_client script: diff --git a/CHANGELOG.md b/CHANGELOG.md index e8922109..46a05af7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,11 @@ All notable changes to this project will be documented in this file. This project adheres to [Semantic Versioning](http://semver.org/). +##[1.3.3](https://github.com/OpenTSDB/tcollector/issues?q=is%3Aopen+is%3Aissue+milestone%3A1.3.3) + +### Improvements +- An optional status monitoring API, serving JSON over HTTP + ## [1.3.1](https://github.com/OpenTSDB/tcollector/issues?utf8=%E2%9C%93&q=milestone%3A1.3.1+) ### Collectors Added - docker.py - Pulls metrics from a local Docker instance, tries /var/run/docker.sock, then localhost API diff --git a/THANKS b/THANKS index b6a86818..899886c6 100644 --- a/THANKS +++ b/THANKS @@ -16,3 +16,4 @@ Jacek Masiulaniec Manuel Amador Tim Douglas Stuart Warren +G-Research \ No newline at end of file diff --git a/collectors/etc/config.py b/collectors/etc/config.py index e22a1fe0..5592b219 100644 --- a/collectors/etc/config.py +++ b/collectors/etc/config.py @@ -73,7 +73,9 @@ def get_defaults(): 'ssl': False, 'stdin': False, 'daemonize': False, - 'hosts': False + 'hosts': False, + "monitoring_interface": None, + "monitoring_port": 13280, } return defaults diff --git a/tcollector.py b/tcollector.py index a2cf01d7..8cf476d6 100755 --- a/tcollector.py +++ b/tcollector.py @@ -45,10 +45,11 @@ from queue import Queue, Empty, Full # pylint: disable=import-error from urllib.request import Request, urlopen # pylint: disable=maybe-no-member,no-name-in-module,import-error from urllib.error import HTTPError # pylint: disable=maybe-no-member,no-name-in-module,import-error - + from http.server import HTTPServer, BaseHTTPRequestHandler # pylint: disable=maybe-no-member,no-name-in-module,import-error else: from Queue import Queue, Empty, Full # pylint: disable=maybe-no-member,no-name-in-module,import-error from urllib2 import Request, urlopen, HTTPError # pylint: disable=maybe-no-member,no-name-in-module,import-error + from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler # pylint: disable=maybe-no-member,no-name-in-module,import-error # global variables. COLLECTORS = {} @@ -234,6 +235,48 @@ def evict_old_keys(self, cut_off): if time < cut_off: del self.values[key] + def to_json(self): + """Expose collector information in JSON-serializable format.""" + result = {} + for attr in ["name", "mtime", "lastspawn", "killstate", "nextkill", + "lines_sent", "lines_received", "lines_invalid", + "last_datapoint", "dead"]: + result[attr] = getattr(self, attr) + return result + + +class StatusRequestHandler(BaseHTTPRequestHandler): + """Serves status of collectors as JSON.""" + + def do_GET(self): + # This happens in different thread than the one updating collectors. + # However, all the attributes we're getting can't be corrupted by + # another thread changing them midway (it's integers and strings and + # the like), so worst case it's a tiny bit internally inconsistent. + # Which is fine for monitoring. + result = json.dumps([c.to_json() for c in self.server.collectors.values()]) + self.send_response(200) + self.send_header("content-type", "text/json") + self.send_header("content-length", str(len(result))) + self.end_headers() + if PY3: + result = result.encode("utf-8") + self.wfile.write(result) + + +class StatusServer(HTTPServer): + """Serves status of collectors over HTTP.""" + + def __init__(self, interface, port, collectors): + """ + interface: the interface to listen on, e.g. "127.0.0.1". + port: the port to listen on, e.g. 8080. + collectors: a dictionary mapping names to Collectors, typically the + global COLLECTORS. + """ + self.collectors = collectors + HTTPServer.__init__(self, (interface, port), StatusRequestHandler) + class StdinCollector(Collector): """A StdinCollector simply reads from STDIN and provides the @@ -267,6 +310,8 @@ def shutdown(self): pass + + class ReaderThread(threading.Thread): """The main ReaderThread is responsible for reading from the collectors and assuring that we always read from the input no matter what. @@ -861,10 +906,12 @@ def parse_cmdline(argv): 'ssl': False, 'stdin': False, 'daemonize': False, - 'hosts': False + 'hosts': False, + "monitoring_interface": None, + "monitoring_port": 13280, } - except: - sys.stderr.write("Unexpected error: %s" % sys.exc_info()[0]) + except Exception as e: + sys.stderr.write("Unexpected error: %s\n" % e) raise # get arguments @@ -956,6 +1003,15 @@ def parse_cmdline(argv): help='Password to use for HTTP Basic Auth when sending the data via HTTP') parser.add_option('--ssl', dest='ssl', action='store_true', default=defaults['ssl'], help='Enable SSL - used in conjunction with http') + parser.add_option('--monitoring-interface', dest='monitoring_interface', action='store', + # Old installs may not have this config option: + default=defaults.get("monitoring_interface", None), + help="Interface for status API to listen on " + + "(e.g. '127.0.0.1, 0.0.0.0). " + + "Disabled by default.") + parser.add_option('--monitoring-port', dest='monitoring_port', action='store', + default=defaults.get("monitoring_port", 13280), + help="Port for status API to listen on.") (options, args) = parser.parse_args(args=argv[1:]) if options.dedupinterval < 0: parser.error('--dedup-interval must be at least 0 seconds') @@ -1014,6 +1070,8 @@ def main(argv): try: options, args = parse_cmdline(argv) + except SystemExit: + raise except: sys.stderr.write("Unexpected error: %s" % sys.exc_info()[0]) return 1 @@ -1056,6 +1114,13 @@ def main(argv): for sig in (signal.SIGTERM, signal.SIGINT): signal.signal(sig, shutdown_signal) + # Status server, if it's configured: + if options.monitoring_interface is not None: + status_server = StatusServer(options.monitoring_interface, options.monitoring_port, COLLECTORS) + thread = threading.Thread(target=status_server.serve_forever) + thread.setDaemon(True) # keep thread from preventing shutdown + thread.start() + # at this point we're ready to start processing, so start the ReaderThread # so we can have it running and pulling in data for us reader = ReaderThread(options.dedupinterval, options.evictinterval, options.deduponlyzero) diff --git a/tests.py b/tests.py index 1348d8ee..50f259bd 100755 --- a/tests.py +++ b/tests.py @@ -19,6 +19,8 @@ import mocks import tcollector +import json +import threading PY3 = sys.version_info[0] > 2 @@ -59,6 +61,45 @@ def check_access_rights(top): "/collectors/0" check_access_rights(collectors_path) + def test_json(self): + """A collector can be serialized to JSON.""" + collector = tcollector.Collector("myname", 17, "myname.py", mtime=23, lastspawn=15) + collector.nextkill += 8 + collector.killstate += 2 + collector.lines_sent += 10 + collector.lines_received += 65 + collector.lines_invalid += 7 + self.assertEqual(collector.to_json(), + {"name": "myname", + "mtime": 23, + "lastspawn": 15, + "killstate": 2, + "nextkill": 8, + "lines_sent": 10, + "lines_received": 65, + "lines_invalid": 7, + "last_datapoint": collector.last_datapoint, + "dead": False}) + + +class StatusServerTests(unittest.TestCase): + """Tests for StatusServer.""" + + def test_endtoend(self): + """We can get JSON status of collectors from StatusServer.""" + collectors = { + "a": tcollector.Collector("mycollector", 5, "a.py"), + "b": tcollector.Collector("second", 3, "b.py"), + } + server = tcollector.StatusServer("127.0.0.1", 32025, collectors) + # runs in background until test suite exits :( but it works. + thread = threading.Thread(target=server.serve_forever) + thread.setDaemon(True) + thread.start() + + r = tcollector.urlopen("http://127.0.0.1:32025").read() + self.assertEqual(json.loads(r), [c.to_json() for c in collectors.values()]) + class TSDBlacklistingTests(unittest.TestCase): """ @@ -122,7 +163,7 @@ class UDPCollectorTests(unittest.TestCase): def setUp(self): if ('udp_bridge.py' not in tcollector.COLLECTORS): # pylint: disable=maybe-no-member - return + raise unittest.SkipTest("udp_bridge unavailable") self.saved_exit = sys.exit self.saved_stderr = sys.stderr