Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A JSON-based minimal monitoring API #422

Merged
merged 7 commits into from
Jul 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
language: python
dist: "xenial"
python:
- "2.7"
- "3.4"
- "3.5"
- "3.6"
- "3.7"
install:
- pip install pylint pylint_runner ordereddict mysqlclient requests feedparser prometheus_client
script:
Expand Down
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@
All notable changes to this project will be documented in this file.
This project adheres to [Semantic Versioning](http://semver.org/).

##[1.3.3](https://github.com/OpenTSDB/tcollector/issues?q=is%3Aopen+is%3Aissue+milestone%3A1.3.3)

### Improvements
- An optional status monitoring API, serving JSON over HTTP

## [1.3.1](https://github.com/OpenTSDB/tcollector/issues?utf8=%E2%9C%93&q=milestone%3A1.3.1+)
### Collectors Added
- docker.py - Pulls metrics from a local Docker instance, tries /var/run/docker.sock, then localhost API
Expand Down
1 change: 1 addition & 0 deletions THANKS
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ Jacek Masiulaniec <jacek.masiulaniec@gmail.com>
Manuel Amador <rudd-o@rudd-o.com>
Tim Douglas <me@timdoug.com>
Stuart Warren <stuart.warren@ocado.com>
G-Research
4 changes: 3 additions & 1 deletion collectors/etc/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ def get_defaults():
'ssl': False,
'stdin': False,
'daemonize': False,
'hosts': False
'hosts': False,
"monitoring_interface": None,
"monitoring_port": 13280,
}

return defaults
73 changes: 69 additions & 4 deletions tcollector.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,11 @@
from queue import Queue, Empty, Full # pylint: disable=import-error
from urllib.request import Request, urlopen # pylint: disable=maybe-no-member,no-name-in-module,import-error
from urllib.error import HTTPError # pylint: disable=maybe-no-member,no-name-in-module,import-error

from http.server import HTTPServer, BaseHTTPRequestHandler # pylint: disable=maybe-no-member,no-name-in-module,import-error
else:
from Queue import Queue, Empty, Full # pylint: disable=maybe-no-member,no-name-in-module,import-error
from urllib2 import Request, urlopen, HTTPError # pylint: disable=maybe-no-member,no-name-in-module,import-error
from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler # pylint: disable=maybe-no-member,no-name-in-module,import-error

# global variables.
COLLECTORS = {}
Expand Down Expand Up @@ -234,6 +235,48 @@ def evict_old_keys(self, cut_off):
if time < cut_off:
del self.values[key]

def to_json(self):
"""Expose collector information in JSON-serializable format."""
result = {}
for attr in ["name", "mtime", "lastspawn", "killstate", "nextkill",
"lines_sent", "lines_received", "lines_invalid",
"last_datapoint", "dead"]:
result[attr] = getattr(self, attr)
return result


class StatusRequestHandler(BaseHTTPRequestHandler):
"""Serves status of collectors as JSON."""

def do_GET(self):
# This happens in different thread than the one updating collectors.
# However, all the attributes we're getting can't be corrupted by
# another thread changing them midway (it's integers and strings and
# the like), so worst case it's a tiny bit internally inconsistent.
# Which is fine for monitoring.
result = json.dumps([c.to_json() for c in self.server.collectors.values()])
self.send_response(200)
self.send_header("content-type", "text/json")
self.send_header("content-length", str(len(result)))
self.end_headers()
if PY3:
result = result.encode("utf-8")
self.wfile.write(result)


class StatusServer(HTTPServer):
"""Serves status of collectors over HTTP."""

def __init__(self, interface, port, collectors):
"""
interface: the interface to listen on, e.g. "127.0.0.1".
port: the port to listen on, e.g. 8080.
collectors: a dictionary mapping names to Collectors, typically the
global COLLECTORS.
"""
self.collectors = collectors
HTTPServer.__init__(self, (interface, port), StatusRequestHandler)


class StdinCollector(Collector):
"""A StdinCollector simply reads from STDIN and provides the
Expand Down Expand Up @@ -267,6 +310,8 @@ def shutdown(self):
pass




class ReaderThread(threading.Thread):
"""The main ReaderThread is responsible for reading from the collectors
and assuring that we always read from the input no matter what.
Expand Down Expand Up @@ -861,10 +906,12 @@ def parse_cmdline(argv):
'ssl': False,
'stdin': False,
'daemonize': False,
'hosts': False
'hosts': False,
"monitoring_interface": None,
"monitoring_port": 13280,
}
except:
sys.stderr.write("Unexpected error: %s" % sys.exc_info()[0])
except Exception as e:
sys.stderr.write("Unexpected error: %s\n" % e)
raise

# get arguments
Expand Down Expand Up @@ -956,6 +1003,15 @@ def parse_cmdline(argv):
help='Password to use for HTTP Basic Auth when sending the data via HTTP')
parser.add_option('--ssl', dest='ssl', action='store_true', default=defaults['ssl'],
help='Enable SSL - used in conjunction with http')
parser.add_option('--monitoring-interface', dest='monitoring_interface', action='store',
# Old installs may not have this config option:
default=defaults.get("monitoring_interface", None),
help="Interface for status API to listen on " +
"(e.g. '127.0.0.1, 0.0.0.0). " +
"Disabled by default.")
parser.add_option('--monitoring-port', dest='monitoring_port', action='store',
default=defaults.get("monitoring_port", 13280),
help="Port for status API to listen on.")
(options, args) = parser.parse_args(args=argv[1:])
if options.dedupinterval < 0:
parser.error('--dedup-interval must be at least 0 seconds')
Expand Down Expand Up @@ -1014,6 +1070,8 @@ def main(argv):

try:
options, args = parse_cmdline(argv)
except SystemExit:
raise
except:
sys.stderr.write("Unexpected error: %s" % sys.exc_info()[0])
return 1
Expand Down Expand Up @@ -1056,6 +1114,13 @@ def main(argv):
for sig in (signal.SIGTERM, signal.SIGINT):
signal.signal(sig, shutdown_signal)

# Status server, if it's configured:
if options.monitoring_interface is not None:
status_server = StatusServer(options.monitoring_interface, options.monitoring_port, COLLECTORS)
thread = threading.Thread(target=status_server.serve_forever)
thread.setDaemon(True) # keep thread from preventing shutdown
thread.start()

# at this point we're ready to start processing, so start the ReaderThread
# so we can have it running and pulling in data for us
reader = ReaderThread(options.dedupinterval, options.evictinterval, options.deduponlyzero)
Expand Down
43 changes: 42 additions & 1 deletion tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import mocks
import tcollector
import json
import threading

PY3 = sys.version_info[0] > 2

Expand Down Expand Up @@ -59,6 +61,45 @@ def check_access_rights(top):
"/collectors/0"
check_access_rights(collectors_path)

def test_json(self):
"""A collector can be serialized to JSON."""
collector = tcollector.Collector("myname", 17, "myname.py", mtime=23, lastspawn=15)
collector.nextkill += 8
collector.killstate += 2
collector.lines_sent += 10
collector.lines_received += 65
collector.lines_invalid += 7
self.assertEqual(collector.to_json(),
{"name": "myname",
"mtime": 23,
"lastspawn": 15,
"killstate": 2,
"nextkill": 8,
"lines_sent": 10,
"lines_received": 65,
"lines_invalid": 7,
"last_datapoint": collector.last_datapoint,
"dead": False})


class StatusServerTests(unittest.TestCase):
"""Tests for StatusServer."""

def test_endtoend(self):
"""We can get JSON status of collectors from StatusServer."""
collectors = {
"a": tcollector.Collector("mycollector", 5, "a.py"),
"b": tcollector.Collector("second", 3, "b.py"),
}
server = tcollector.StatusServer("127.0.0.1", 32025, collectors)
# runs in background until test suite exits :( but it works.
thread = threading.Thread(target=server.serve_forever)
thread.setDaemon(True)
thread.start()

r = tcollector.urlopen("http://127.0.0.1:32025").read()
self.assertEqual(json.loads(r), [c.to_json() for c in collectors.values()])


class TSDBlacklistingTests(unittest.TestCase):
"""
Expand Down Expand Up @@ -122,7 +163,7 @@ class UDPCollectorTests(unittest.TestCase):

def setUp(self):
if ('udp_bridge.py' not in tcollector.COLLECTORS): # pylint: disable=maybe-no-member
return
raise unittest.SkipTest("udp_bridge unavailable")

self.saved_exit = sys.exit
self.saved_stderr = sys.stderr
Expand Down