Skip to content

Commit

Permalink
A JSON-based minimal monitoring API (#422)
Browse files Browse the repository at this point in the history
* Add to_json() method to the Collector class.

* Add 3.7.

* HTTP server that serves the status.

* Hook up server so it can run on startup.

* Credit and changelog.

* Fix bug where --help prints error message.

* Rename to --monitoring-interface --monitoring-port.
  • Loading branch information
itamarst authored and johann8384 committed Jul 30, 2019
1 parent 1a91267 commit 771c7aa
Show file tree
Hide file tree
Showing 6 changed files with 122 additions and 6 deletions.
2 changes: 2 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
language: python
dist: "xenial"
python:
- "2.7"
- "3.4"
- "3.5"
- "3.6"
- "3.7"
install:
- pip install pylint pylint_runner ordereddict mysqlclient requests feedparser prometheus_client
script:
Expand Down
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@
All notable changes to this project will be documented in this file.
This project adheres to [Semantic Versioning](http://semver.org/).

##[1.3.3](https://github.com/OpenTSDB/tcollector/issues?q=is%3Aopen+is%3Aissue+milestone%3A1.3.3)

### Improvements
- An optional status monitoring API, serving JSON over HTTP

## [1.3.1](https://github.com/OpenTSDB/tcollector/issues?utf8=%E2%9C%93&q=milestone%3A1.3.1+)
### Collectors Added
- docker.py - Pulls metrics from a local Docker instance, tries /var/run/docker.sock, then localhost API
Expand Down
1 change: 1 addition & 0 deletions THANKS
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ Jacek Masiulaniec <jacek.masiulaniec@gmail.com>
Manuel Amador <rudd-o@rudd-o.com>
Tim Douglas <me@timdoug.com>
Stuart Warren <stuart.warren@ocado.com>
G-Research
4 changes: 3 additions & 1 deletion collectors/etc/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ def get_defaults():
'ssl': False,
'stdin': False,
'daemonize': False,
'hosts': False
'hosts': False,
"monitoring_interface": None,
"monitoring_port": 13280,
}

return defaults
73 changes: 69 additions & 4 deletions tcollector.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,11 @@
from queue import Queue, Empty, Full # pylint: disable=import-error
from urllib.request import Request, urlopen # pylint: disable=maybe-no-member,no-name-in-module,import-error
from urllib.error import HTTPError # pylint: disable=maybe-no-member,no-name-in-module,import-error

from http.server import HTTPServer, BaseHTTPRequestHandler # pylint: disable=maybe-no-member,no-name-in-module,import-error
else:
from Queue import Queue, Empty, Full # pylint: disable=maybe-no-member,no-name-in-module,import-error
from urllib2 import Request, urlopen, HTTPError # pylint: disable=maybe-no-member,no-name-in-module,import-error
from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler # pylint: disable=maybe-no-member,no-name-in-module,import-error

# global variables.
COLLECTORS = {}
Expand Down Expand Up @@ -234,6 +235,48 @@ def evict_old_keys(self, cut_off):
if time < cut_off:
del self.values[key]

def to_json(self):
"""Expose collector information in JSON-serializable format."""
result = {}
for attr in ["name", "mtime", "lastspawn", "killstate", "nextkill",
"lines_sent", "lines_received", "lines_invalid",
"last_datapoint", "dead"]:
result[attr] = getattr(self, attr)
return result


class StatusRequestHandler(BaseHTTPRequestHandler):
"""Serves status of collectors as JSON."""

def do_GET(self):
# This happens in different thread than the one updating collectors.
# However, all the attributes we're getting can't be corrupted by
# another thread changing them midway (it's integers and strings and
# the like), so worst case it's a tiny bit internally inconsistent.
# Which is fine for monitoring.
result = json.dumps([c.to_json() for c in self.server.collectors.values()])
self.send_response(200)
self.send_header("content-type", "text/json")
self.send_header("content-length", str(len(result)))
self.end_headers()
if PY3:
result = result.encode("utf-8")
self.wfile.write(result)


class StatusServer(HTTPServer):
"""Serves status of collectors over HTTP."""

def __init__(self, interface, port, collectors):
"""
interface: the interface to listen on, e.g. "127.0.0.1".
port: the port to listen on, e.g. 8080.
collectors: a dictionary mapping names to Collectors, typically the
global COLLECTORS.
"""
self.collectors = collectors
HTTPServer.__init__(self, (interface, port), StatusRequestHandler)


class StdinCollector(Collector):
"""A StdinCollector simply reads from STDIN and provides the
Expand Down Expand Up @@ -267,6 +310,8 @@ def shutdown(self):
pass




class ReaderThread(threading.Thread):
"""The main ReaderThread is responsible for reading from the collectors
and assuring that we always read from the input no matter what.
Expand Down Expand Up @@ -861,10 +906,12 @@ def parse_cmdline(argv):
'ssl': False,
'stdin': False,
'daemonize': False,
'hosts': False
'hosts': False,
"monitoring_interface": None,
"monitoring_port": 13280,
}
except:
sys.stderr.write("Unexpected error: %s" % sys.exc_info()[0])
except Exception as e:
sys.stderr.write("Unexpected error: %s\n" % e)
raise

# get arguments
Expand Down Expand Up @@ -956,6 +1003,15 @@ def parse_cmdline(argv):
help='Password to use for HTTP Basic Auth when sending the data via HTTP')
parser.add_option('--ssl', dest='ssl', action='store_true', default=defaults['ssl'],
help='Enable SSL - used in conjunction with http')
parser.add_option('--monitoring-interface', dest='monitoring_interface', action='store',
# Old installs may not have this config option:
default=defaults.get("monitoring_interface", None),
help="Interface for status API to listen on " +
"(e.g. '127.0.0.1, 0.0.0.0). " +
"Disabled by default.")
parser.add_option('--monitoring-port', dest='monitoring_port', action='store',
default=defaults.get("monitoring_port", 13280),
help="Port for status API to listen on.")
(options, args) = parser.parse_args(args=argv[1:])
if options.dedupinterval < 0:
parser.error('--dedup-interval must be at least 0 seconds')
Expand Down Expand Up @@ -1014,6 +1070,8 @@ def main(argv):

try:
options, args = parse_cmdline(argv)
except SystemExit:
raise
except:
sys.stderr.write("Unexpected error: %s" % sys.exc_info()[0])
return 1
Expand Down Expand Up @@ -1056,6 +1114,13 @@ def main(argv):
for sig in (signal.SIGTERM, signal.SIGINT):
signal.signal(sig, shutdown_signal)

# Status server, if it's configured:
if options.monitoring_interface is not None:
status_server = StatusServer(options.monitoring_interface, options.monitoring_port, COLLECTORS)
thread = threading.Thread(target=status_server.serve_forever)
thread.setDaemon(True) # keep thread from preventing shutdown
thread.start()

# at this point we're ready to start processing, so start the ReaderThread
# so we can have it running and pulling in data for us
reader = ReaderThread(options.dedupinterval, options.evictinterval, options.deduponlyzero)
Expand Down
43 changes: 42 additions & 1 deletion tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import mocks
import tcollector
import json
import threading

PY3 = sys.version_info[0] > 2

Expand Down Expand Up @@ -59,6 +61,45 @@ def check_access_rights(top):
"/collectors/0"
check_access_rights(collectors_path)

def test_json(self):
"""A collector can be serialized to JSON."""
collector = tcollector.Collector("myname", 17, "myname.py", mtime=23, lastspawn=15)
collector.nextkill += 8
collector.killstate += 2
collector.lines_sent += 10
collector.lines_received += 65
collector.lines_invalid += 7
self.assertEqual(collector.to_json(),
{"name": "myname",
"mtime": 23,
"lastspawn": 15,
"killstate": 2,
"nextkill": 8,
"lines_sent": 10,
"lines_received": 65,
"lines_invalid": 7,
"last_datapoint": collector.last_datapoint,
"dead": False})


class StatusServerTests(unittest.TestCase):
"""Tests for StatusServer."""

def test_endtoend(self):
"""We can get JSON status of collectors from StatusServer."""
collectors = {
"a": tcollector.Collector("mycollector", 5, "a.py"),
"b": tcollector.Collector("second", 3, "b.py"),
}
server = tcollector.StatusServer("127.0.0.1", 32025, collectors)
# runs in background until test suite exits :( but it works.
thread = threading.Thread(target=server.serve_forever)
thread.setDaemon(True)
thread.start()

r = tcollector.urlopen("http://127.0.0.1:32025").read()
self.assertEqual(json.loads(r), [c.to_json() for c in collectors.values()])


class TSDBlacklistingTests(unittest.TestCase):
"""
Expand Down Expand Up @@ -122,7 +163,7 @@ class UDPCollectorTests(unittest.TestCase):

def setUp(self):
if ('udp_bridge.py' not in tcollector.COLLECTORS): # pylint: disable=maybe-no-member
return
raise unittest.SkipTest("udp_bridge unavailable")

self.saved_exit = sys.exit
self.saved_stderr = sys.stderr
Expand Down

0 comments on commit 771c7aa

Please sign in to comment.