From fc0405c8f38ccf7949d70394f0014cd129dafe23 Mon Sep 17 00:00:00 2001 From: ed Date: Sun, 20 Aug 2023 17:58:06 +0000 Subject: [PATCH] add prometheus metrics; closes #49 --- README.md | 49 +++++++++++++ copyparty/__main__.py | 13 ++++ copyparty/httpcli.py | 3 + copyparty/httpsrv.py | 3 + copyparty/metrics.py | 163 ++++++++++++++++++++++++++++++++++++++++++ copyparty/up2k.py | 44 +++++++++++- scripts/sfx.ls | 1 + tests/test_httpcli.py | 12 ++-- tests/util.py | 12 +++- 9 files changed, 293 insertions(+), 7 deletions(-) create mode 100644 copyparty/metrics.py diff --git a/README.md b/README.md index 73973c0b..5e5972b2 100644 --- a/README.md +++ b/README.md @@ -71,6 +71,7 @@ turn almost any device into a file server with resumable uploads/downloads using * [themes](#themes) * [complete examples](#complete-examples) * [reverse-proxy](#reverse-proxy) - running copyparty next to other websites + * [prometheus](#prometheus) - metrics/stats can be enabled * [packages](#packages) - the party might be closer than you think * [arch package](#arch-package) - now [available on aur](https://aur.archlinux.org/packages/copyparty) maintained by [@icxes](https://github.com/icxes) * [fedora package](#fedora-package) - now [available on copr-pypi](https://copr.fedorainfracloud.org/coprs/g/copr/PyPI/) @@ -1008,6 +1009,9 @@ you can also set transaction limits which apply per-IP and per-volume, but these * `:c,maxn=250,3600` allows 250 files over 1 hour from each IP (tracked per-volume) * `:c,maxb=1g,300` allows 1 GiB total over 5 minutes from each IP (tracked per-volume) +notes: +* `vmaxb` and `vmaxn` requires either the `e2ds` volflag or `-e2dsa` global-option + ## compress uploads @@ -1238,6 +1242,51 @@ example webserver configs: * [apache2 config](contrib/apache/copyparty.conf) -- location-based +## prometheus + +metrics/stats can be enabled at `/.cpr/s/metrics` for grafana / prometheus / etc. + +must be enabled with `--stats` since it reduces startup time a tiny bit + +the endpoint is only accessible by `admin` accounts, meaning the `a` in `rwmda` of the following example commandline: `python3 -m copyparty -a ed:wark -v /mnt/nas::rwmda,ed` + +follow a guide for setting up `node_exporter` except have it read from copyparty instead; example `/etc/prometheus/prometheus.yml` below + +```yaml +scrape_configs: + - job_name: copyparty + metrics_path: /.cpr/s/metrics + basic_auth: + password: wark + static_configs: + - targets: ['192.168.123.1:3923'] +``` + +currently the following metrics are available, +* `cpp_uptime` in seconds +* `cpp_bans` number of banned IPs + +and these are available per-volume only: +* `cpp_disk_size` total HDD size in MiB +* `cpp_disk_free` free HDD space in MiB + +and these are per-volume and `total`: +* `cpp_vol_mib` size of all files in MiB +* `cpp_vol_files` number of files +* `cpp_dupe_mib` amount of MiB presumably saved by deduplication +* `cpp_dupe_files` number of dupe files +* `cpp_unf_mib` currently unfinished / incoming uploads + +some of the metrics have additional requirements to function correctly, +* `cpp_vol_mib` and `cpp_vol_files` requires either the `e2ds` volflag or `-e2dsa` global-option + +the following options are available to disable some of the metrics: +* `--nos-hdd` disables `cpp_disk_*` which can prevent spinning up HDDs +* `--nos-vol` disables `cpp_vol_*` which reduces server startup time +* `--nos-dup` disables `cpp_dupe_*` which reduces the server load caused by prometheus queries +* `--nos-unf` disables `cpp_unf_mib` for no particular purpose + + # packages the party might be closer than you think diff --git a/copyparty/__main__.py b/copyparty/__main__.py index 4714fbe7..68536e7c 100755 --- a/copyparty/__main__.py +++ b/copyparty/__main__.py @@ -727,8 +727,11 @@ def get_sects(): things to check if it does not work at all: * is there a firewall blocking port 5353 on either the server or client? + (for example, clients may be able to send queries to copyparty, + but the replies could get lost) * is multicast accidentally disabled on either the server or client? + (look for mDNS log messages saying "new client on [...]") * the router/switch must be multicast and igmp capable @@ -959,6 +962,15 @@ def add_hooks(ap): ap2.add_argument("--xban", metavar="CMD", type=u, action="append", help="execute CMD if someone gets banned (pw/404)") +def add_stats(ap): + ap2 = ap.add_argument_group('grafana/prometheus metrics endpoint') + ap2.add_argument("--stats", action="store_true", help="enable stats at /.cpr/s/metrics for admin accounts") + ap2.add_argument("--nos-hdd", action="store_true", help="disable disk-space metrics (used/free space)") + ap2.add_argument("--nos-vol", action="store_true", help="disable volume size metrics (num files, total bytes, vmaxb/vmaxn)") + ap2.add_argument("--nos-dup", action="store_true", help="disable dupe-files metrics (good idea; very slow)") + ap2.add_argument("--nos-unf", action="store_true", help="disable unfinished-uploads metrics") + + def add_yolo(ap): ap2 = ap.add_argument_group('yolo options') ap2.add_argument("--allow-csrf", action="store_true", help="disable csrf protections; let other domains/sites impersonate you through cross-site requests") @@ -1208,6 +1220,7 @@ def run_argparse( add_yolo(ap) add_handlers(ap) add_hooks(ap) + add_stats(ap) add_ui(ap, retry) add_admin(ap) add_logging(ap) diff --git a/copyparty/httpcli.py b/copyparty/httpcli.py index ed637cd3..7068d8e1 100644 --- a/copyparty/httpcli.py +++ b/copyparty/httpcli.py @@ -822,6 +822,9 @@ def handle_get(self) -> bool: self.reply(b"", 301, headers=h) return True + if self.vpath == ".cpr/s/metrics": + return self.conn.hsrv.metrics.tx(self) + path_base = os.path.join(self.E.mod, "web") static_path = absreal(os.path.join(path_base, self.vpath[5:])) if static_path in self.conn.hsrv.statics: diff --git a/copyparty/httpsrv.py b/copyparty/httpsrv.py index 539238e3..ca9430d4 100644 --- a/copyparty/httpsrv.py +++ b/copyparty/httpsrv.py @@ -56,6 +56,7 @@ sys.exit(1) from .httpconn import HttpConn +from .metrics import Metrics from .u2idx import U2idx from .util import ( E_SCK, @@ -99,6 +100,7 @@ def __init__(self, broker: "BrokerCli", nid: Optional[int]) -> None: # redefine in case of multiprocessing socket.setdefaulttimeout(120) + self.t0 = time.time() nsuf = "-n{}-i{:x}".format(nid, os.getpid()) if nid else "" self.magician = Magician() self.nm = NetMap([], {}) @@ -122,6 +124,7 @@ def __init__(self, broker: "BrokerCli", nid: Optional[int]) -> None: self.t_periodic: Optional[threading.Thread] = None self.u2fh = FHC() + self.metrics = Metrics(self) self.srvs: list[socket.socket] = [] self.ncli = 0 # exact self.clients: set[HttpConn] = set() # laggy diff --git a/copyparty/metrics.py b/copyparty/metrics.py new file mode 100644 index 00000000..b1672bd7 --- /dev/null +++ b/copyparty/metrics.py @@ -0,0 +1,163 @@ +# coding: utf-8 +from __future__ import print_function, unicode_literals + +import json +import time + +from .__init__ import TYPE_CHECKING +from .util import Pebkac, get_df, unhumanize + +if TYPE_CHECKING: + from .httpcli import HttpCli + from .httpsrv import HttpSrv + + +class Metrics(object): + def __init__(self, hsrv: "HttpSrv") -> None: + self.hsrv = hsrv + + def tx(self, cli: "HttpCli") -> bool: + if not cli.avol: + raise Pebkac(403, "not allowed for user " + cli.uname) + + args = cli.args + if not args.stats: + raise Pebkac(403, "the stats feature is not enabled in server config") + + conn = cli.conn + vfs = conn.asrv.vfs + allvols = list(sorted(vfs.all_vols.items())) + + idx = conn.get_u2idx() + if not idx or not hasattr(idx, "p_end"): + idx = None + + ret: list[str] = [] + + def add(name: str, typ: str, v: str, desc: str) -> None: + zs = "# HELP %s %s\n# TYPE %s %s\n%s %s" + ret.append(zs % (name, desc, name, typ, name, v)) + + def addh(name: str, typ: str, desc: str) -> None: + zs = "# HELP %s %s\n# TYPE %s %s" + ret.append(zs % (name, desc, name, typ)) + + def addv(name: str, v: str) -> None: + ret.append("%s %s" % (name, v)) + + v = "{:.3f}".format(time.time() - self.hsrv.t0) + add("cpp_uptime", "counter", v, "time since last server restart") + + v = str(len(conn.bans or [])) + add("cpp_bans", "counter", v, "number of banned IPs") + + if not args.nos_hdd: + addh("cpp_disk_mib", "gauge", "total HDD size (MiB) of volume") + addh("cpp_disk_free", "gauge", "free HDD space (MiB) in volume") + for vpath, vol in allvols: + free, total = get_df(vol.realpath) + + v = "{:.3f}".format(total / 1048576.0) + addv('cpp_disk_size{vol="/%s"}' % (vpath), v) + + v = "{:.3f}".format(free / 1048576.0) + addv('cpp_disk_free{vol="/%s"}' % (vpath), v) + + if idx and not args.nos_vol: + addh("cpp_vol_mib", "gauge", "total MiB in volume") + addh("cpp_vol_files", "gauge", "total num files in volume") + addh("cpp_vol_mib_free", "gauge", "free space (vmaxb) in volume") + addh("cpp_vol_files_free", "gauge", "free space (vmaxn) in volume") + tnbytes = 0 + tnfiles = 0 + + volsizes = [] + try: + ptops = [x.realpath for _, x in allvols] + x = self.hsrv.broker.ask("up2k.get_volsizes", ptops) + volsizes = x.get() + except Exception as ex: + cli.log("tx_stats get_volsizes: {!r}".format(ex), 3) + + for (vpath, vol), (nbytes, nfiles) in zip(allvols, volsizes): + tnbytes += nbytes + tnfiles += nfiles + v = "{:.3f}".format(nbytes / 1048576.0) + addv('cpp_vol_mib{vol="/%s"}' % (vpath), v) + addv('cpp_vol_files{vol="/%s"}' % (vpath), str(nfiles)) + + if vol.flags.get("vmaxb") or vol.flags.get("vmaxn"): + + zi = unhumanize(vol.flags.get("vmaxb") or "0") + if zi: + v = "{:.3f}".format((zi - nbytes) / 1048576.0) + addv('cpp_vol_mib_free{vol="/%s"}' % (vpath), v) + + zi = unhumanize(vol.flags.get("vmaxn") or "0") + if zi: + v = str(zi - nfiles) + addv('cpp_vol_nfiles_free{vol="/%s"}' % (vpath), v) + + if volsizes: + v = "{:.3f}".format(tnbytes / 1048576.0) + addv('cpp_vol_mib{vol="total"}', v) + addv('cpp_vol_files{vol="total"}', str(tnfiles)) + + if idx and not args.nos_dup: + addh("cpp_dupe_mib", "gauge", "num dupe MiB in volume") + addh("cpp_dupe_files", "gauge", "num dupe files in volume") + tnbytes = 0 + tnfiles = 0 + for vpath, vol in allvols: + cur = idx.get_cur(vol.realpath) + if not cur: + continue + + nbytes = 0 + nfiles = 0 + q = "select sz, count(*)-1 c from up group by w having c" + for sz, c in cur.execute(q): + nbytes += sz * c + nfiles += c + + tnbytes += nbytes + tnfiles += nfiles + v = "{:.3f}".format(nbytes / 1048576.0) + addv('cpp_dupe_mib{vol="/%s"}' % (vpath), v) + addv('cpp_dupe_files{vol="/%s"}' % (vpath), str(nfiles)) + + v = "{:.3f}".format(tnbytes / 1048576.0) + addv('cpp_dupe_mib{vol="total"}', v) + addv('cpp_dupe_files{vol="total"}', str(tnfiles)) + + if not args.nos_unf: + addh("cpp_unf_mib", "gauge", "incoming/unfinished uploads (MiB)") + addh("cpp_unf_files", "gauge", "incoming/unfinished uploads (num files)") + tnbytes = 0 + tnfiles = 0 + try: + x = self.hsrv.broker.ask("up2k.get_unfinished") + xs = x.get() + xj = json.loads(xs) + for ptop, (nbytes, nfiles) in xj.items(): + tnbytes += nbytes + tnfiles += nfiles + vol = next((x[1] for x in allvols if x[1].realpath == ptop), None) + if not vol: + t = "tx_stats get_unfinished: could not map {}" + cli.log(t.format(ptop), 3) + continue + + v = "{:.3f}".format(nbytes / 1048576.0) + addv('cpp_unf_mib{vol="/%s"}' % (vol.vpath), v) + addv('cpp_unf_files{vol="/%s"}' % (vol.vpath), str(nfiles)) + + v = "{:.3f}".format(tnbytes / 1048576.0) + addv('cpp_unf_mib{vol="total"}', v) + addv('cpp_unf_files{vol="total"}', str(tnfiles)) + + except Exception as ex: + cli.log("tx_stats get_unfinished: {!r}".format(ex), 3) + + cli.reply("\n".join(ret).encode("utf-8"), mime="text/plain") + return True diff --git a/copyparty/up2k.py b/copyparty/up2k.py index cccc521a..78e9dbc1 100644 --- a/copyparty/up2k.py +++ b/copyparty/up2k.py @@ -267,11 +267,49 @@ def get_state(self) -> str: } return json.dumps(ret, indent=4) + def get_unfinished(self) -> str: + if PY2 or not self.mutex.acquire(timeout=0.5): + return "{}" + + ret: dict[str, tuple[int, int]] = {} + try: + for ptop, tab2 in self.registry.items(): + nbytes = 0 + nfiles = 0 + drp = self.droppable.get(ptop, {}) + for wark, job in tab2.items(): + if wark in drp: + continue + + nfiles += 1 + try: + # close enough on average + nbytes += len(job["need"]) * job["size"] // len(job["hash"]) + except: + pass + + ret[ptop] = (nbytes, nfiles) + finally: + self.mutex.release() + + return json.dumps(ret, indent=4) + def get_volsize(self, ptop: str) -> tuple[int, int]: with self.mutex: return self._get_volsize(ptop) + def get_volsizes(self, ptops: list[str]) -> list[tuple[int, int]]: + ret = [] + with self.mutex: + for ptop in ptops: + ret.append(self._get_volsize(ptop)) + + return ret + def _get_volsize(self, ptop: str) -> tuple[int, int]: + if "e2ds" not in self.flags.get(ptop, {}): + return (0, 0) + cur = self.cur[ptop] nbytes = self.volsize[cur] nfiles = self.volnfiles[cur] @@ -946,7 +984,11 @@ def _build_file_index(self, vol: VFS, all_vols: list[VFS]) -> tuple[bool, bool]: db.c.connection.commit() - if vol.flags.get("vmaxb") or vol.flags.get("vmaxn"): + if ( + vol.flags.get("vmaxb") + or vol.flags.get("vmaxn") + or (self.args.stats and not self.args.nos_vol) + ): zs = "select count(sz), sum(sz) from up" vn, vb = db.c.execute(zs).fetchone() vb = vb or 0 diff --git a/scripts/sfx.ls b/scripts/sfx.ls index a2cfe485..8f522a2e 100644 --- a/scripts/sfx.ls +++ b/scripts/sfx.ls @@ -21,6 +21,7 @@ copyparty/httpconn.py, copyparty/httpsrv.py, copyparty/ico.py, copyparty/mdns.py, +copyparty/metrics.py, copyparty/mtag.py, copyparty/multicast.py, copyparty/pwhash.py, diff --git a/tests/test_httpcli.py b/tests/test_httpcli.py index 8e57a1db..54a3c470 100644 --- a/tests/test_httpcli.py +++ b/tests/test_httpcli.py @@ -12,7 +12,7 @@ import unittest from tests import util as tu -from tests.util import Cfg +from tests.util import Cfg, eprint from copyparty.authsrv import AuthSrv from copyparty.httpcli import HttpCli @@ -93,7 +93,7 @@ def test(self): res = "ok " + fp in ret print("[{}] {} {} = {}".format(fp, rok, wok, res)) if rok != res: - print("\033[33m{}\n# {}\033[0m".format(ret, furl)) + eprint("\033[33m{}\n# {}\033[0m".format(ret, furl)) self.fail() # file browser: html @@ -101,7 +101,7 @@ def test(self): res = "'{}'".format(self.fn) in ret print(res) if rok != res: - print("\033[33m{}\n# {}\033[0m".format(ret, durl)) + eprint("\033[33m{}\n# {}\033[0m".format(ret, durl)) self.fail() # file browser: json @@ -110,7 +110,7 @@ def test(self): res = '"{}"'.format(self.fn) in ret print(res) if rok != res: - print("\033[33m{}\n# {}\033[0m".format(ret, url)) + eprint("\033[33m{}\n# {}\033[0m".format(ret, url)) self.fail() # tar @@ -132,7 +132,9 @@ def test(self): if durl.split("/")[-1] in self.can_read: ref = [x for x in vfiles if self.in_dive(top + "/" + durl, x)] for f in ref: - print("{}: {}".format("ok" if f in tar_ok else "NG", f)) + ok = f in tar_ok + pr = print if ok else eprint + pr("{}: {}".format("ok" if ok else "NG", f)) ref.sort() tar_ok.sort() self.assertEqual(ref, tar_ok) diff --git a/tests/util.py b/tests/util.py index 9b001ed9..5cbb35d7 100644 --- a/tests/util.py +++ b/tests/util.py @@ -1,3 +1,7 @@ +#!/usr/bin/env python3 +# coding: utf-8 +from __future__ import print_function, unicode_literals + import os import re import sys @@ -23,6 +27,12 @@ def nah(*a, **ka): return False +def eprint(*a, **ka): + ka["file"] = sys.stderr + print(*a, **ka) + sys.stderr.flush() + + if MACOS: import posixpath @@ -114,7 +124,7 @@ def __init__(self, a=None, v=None, c=None): ex = "df loris re_maxage rproxy rsp_jtr rsp_slp s_wr_slp theme themes turbo" ka.update(**{k: 0 for k in ex.split()}) - ex = "ah_alg doctitle favico html_head lg_sbf log_fk md_sbf mth name textfiles unlist vname R RS SR" + ex = "ah_alg bname doctitle favico html_head lg_sbf log_fk md_sbf mth name textfiles unlist vname R RS SR" ka.update(**{k: "" for k in ex.split()}) ex = "on403 on404 xad xar xau xban xbd xbr xbu xiu xm"