Skip to content

Commit

Permalink
Merge pull request #5 from lsst-sqre/tickets/DM-35232
Browse files Browse the repository at this point in the history
DM-35232: Externalize and improve Flux templates.
  • Loading branch information
athornton authored Jun 17, 2022
2 parents 20d0d58 + ed5dd64 commit b2bbd9c
Show file tree
Hide file tree
Showing 9 changed files with 120 additions and 93 deletions.
3 changes: 2 additions & 1 deletion src/rubin_influx_tools/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
from .bucketmapper import BucketMapper
from .influxclient import InfluxClient
from .influxfns import seconds_to_duration_literal
from .influxfns import get_template, seconds_to_duration_literal
from .restartmapper import RestartMapper
from .tokenmaker import TokenMaker

__all__ = [
"InfluxClient",
"seconds_to_duration_literal",
"get_template",
"BucketMapper",
"TokenMaker",
"RestartMapper",
Expand Down
10 changes: 9 additions & 1 deletion src/rubin_influx_tools/influxclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,15 @@ async def list_all_with_offset(
offset += len(i_list)
return o_list

async def set_org_id(self) -> None:
obj = await self.get(
self.api_url + "/buckets", params={"pagesize": "1"}
)
if not obj or "buckets" not in obj or not obj["buckets"]:
raise RuntimeError("Could not get orgID from bucket")
self.org_id = obj["buckets"][0]["orgID"]

async def main(self) -> None:
# Override this in a subclass to provide "the thing that the class
# should usually do when invoked."
pass
await self.set_org_id()
17 changes: 17 additions & 0 deletions src/rubin_influx_tools/influxfns.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
from datetime import timedelta
from os.path import dirname, join

from jinja2 import Template


def seconds_to_duration_literal(seconds: int) -> str:
Expand Down Expand Up @@ -30,3 +33,17 @@ def seconds_to_duration_literal(seconds: int) -> str:
if secs:
dls += f"{secs}s"
return dls


def get_template(
name: str,
template_marker: str = "_tmpl.flux",
dir: str = join(dirname(__file__), "templates"),
) -> Template:
"""Convenience function for retrieving a Jinja2 Template from a static
templated query.
"""
fn = join(dir, name + template_marker)
with open(fn) as f:
txt = f.read()
return Template(txt) # the shared default environment is fine
44 changes: 17 additions & 27 deletions src/rubin_influx_tools/restartmapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import Any, List

from .influxclient import InfluxClient
from .influxfns import get_template, seconds_to_duration_literal
from .influxtypes import (
BucketGet,
BucketPost,
Expand All @@ -12,37 +13,18 @@
TaskGet,
TaskPost,
)
from .tasktemplates import check_text, task_template


class RestartMapper(InfluxClient):
"""The logic needed to build alerts and tasks for applications."""

async def set_org_id(self) -> None:
"""Set the Organization ID, given the Organization name.
This doesn't actually work, even if you have r/w on orgs--you just
get an empty list back.
"""
url = f"{self.api_url}/orgs"
obj = await self.get(url, use_session_params=False)
orgs = obj["orgs"]
for o in orgs:
if o["name"] == self.org:
self.org_id = o["id"]
return
raise RuntimeError(f"Could not determine orgID for org {self.org}")

async def list_buckets(self) -> List[BucketGet]:
"""List all buckets."""
itemtype = "buckets"
url = f"{self.api_url}/{itemtype}"
b_list = await self.list_all(url, itemtype)
buckets = [BucketGet(**x) for x in b_list]
self.log.debug(f"Buckets -> {[x.name for x in buckets]}")
if buckets:
# This is cheesy, but the orgs permissions don't work.
self.org_id = buckets[0].orgID
return buckets

async def find_application_buckets(
Expand All @@ -58,15 +40,15 @@ async def find_application_buckets(
self.app_names = [x.name for x in app_buckets]
return app_buckets

async def construct_restarts_bucket(self) -> None:
async def construct_multiapp_bucket(self) -> None:
buckets = await self.list_buckets()
if not buckets:
return
await self.find_application_buckets(buckets)
bnames = [x.name for x in buckets]
if "restarts_" in bnames:
if "multiapp_" in bnames:
return
self.log.debug("Constructing restarts_ bucket")
self.log.debug("Constructing multiapp_ bucket")
url = f"{self.api_url}/buckets"
rr: List[RetentionRule] = [
{
Expand All @@ -76,8 +58,8 @@ async def construct_restarts_bucket(self) -> None:
}
]
bkt = BucketPost(
description="K8s apps restart tracking bucket",
name="restarts_",
description="K8s multiple apps tracking bucket",
name="multiapp_",
orgID=self.org_id,
retentionRules=rr,
)
Expand All @@ -101,8 +83,12 @@ async def construct_tasks(self) -> None:

def build_tasks(self, apps: List[str]) -> List[TaskPost]:
"""Create a list of task objects to post."""
task_template = get_template("restart")
tasks = []
offset = 0
for app in apps:
offset %= 60
offsetstr = seconds_to_duration_literal(offset)
taskname = f"{app.capitalize()} restarts"
tasks.append(
TaskPost(
Expand All @@ -111,10 +97,11 @@ def build_tasks(self, apps: List[str]) -> List[TaskPost]:
orgID=self.org_id,
status="active",
flux=task_template.render(
taskname=taskname, app_bucket=app
taskname=taskname, app_bucket=app, offset=offsetstr
),
)
)
offset += 1
return tasks

async def list_tasks(self) -> List[TaskGet]:
Expand Down Expand Up @@ -147,7 +134,9 @@ async def construct_check(self) -> List[Any]:
return resp

async def create_check(self, cname: str) -> CheckPost:
dq = DashboardQuery(name=cname, text=check_text)
check_template = get_template("check")
check_flux = check_template.render()
dq = DashboardQuery(name=cname, text=check_flux)
ck = CheckPost(
description=cname,
every="1m",
Expand All @@ -157,7 +146,8 @@ async def create_check(self, cname: str) -> CheckPost:
return ck

async def main(self) -> None:
await self.construct_restarts_bucket()
await self.set_org_id()
await self.construct_multiapp_bucket()
await self.construct_tasks()
# At the moment, building the check isn't working.
# However the manual check/alerts are fine.
Expand Down
52 changes: 0 additions & 52 deletions src/rubin_influx_tools/tasktemplates.py

This file was deleted.

14 changes: 14 additions & 0 deletions src/rubin_influx_tools/templates/check_tmpl.flux
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import "influxdata/influxdb/monitor"
import "influxdata/influxdb/v1"

data =
from(bucket: "multiapp_")
|> range(start: -1m)
|> filter(fn: (r) => r["_measurement"] == "kubernetes_pod_container")
|> filter(fn: (r) => r["_field"] == "differential_restarts")
|> aggregateWindow(every: 1m, fn: last, createEmpty: false)

crit = (r) => r["restarts_total"] != 0.0
messageFn = (r) => "Check: ${r._check_name}: ${r._value} restarts for ${r.cluster}/${r.container_name}/${r.pod_name}"

data |> v1["fieldsAsCols"]() |> monitor["check"](data: check, messageFn: messageFn, crit: crit)
30 changes: 30 additions & 0 deletions src/rubin_influx_tools/templates/restart_tmpl.flux
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
option v = {bucket: "_monitoring", timeRangeStart: -1h, timeRangeStop: now(), windowPeriod: 10000ms}

option task = {name: "{{taskname}}", every: 1m, offset: {{offset}}}

from(bucket: "{{app_bucket}}")
|> range(start: -1m)
|> filter(fn: (r) => r["_measurement"] == "kubernetes_pod_container")
|> filter(fn: (r) => r["_field"] == "restarts_total")
|> drop(columns: ["_start",
"_stop",
"host",
"namespace"])
|> aggregateWindow(every: 10s, fn: mean, createEmpty: false)
|> tail(n: 2)
|> difference(columns: ["_value"])
|> map(fn: (r) => ({
_time: r._time,
_measurement: r._measurement,
_field: "differential_restarts",
_value: r._value,
cluster: r.cluster,
container_name: r.container_name,
phase: r.phase,
pod_name: r.pod_name,
readiness: r.readiness,
state: r.state
})
)
|> yield(name: "mean")
|> to(bucket: "multiapp_", org: "square")
12 changes: 0 additions & 12 deletions src/rubin_influx_tools/tokenmaker.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,6 @@


class TokenMaker(InfluxClient):
async def set_org_id(self) -> None:
"""Set the org id; requires org read permission in token"""
url = f"{self.api_url}/orgs"
obj = await self.get(url)
orgs = obj["orgs"]
for o in orgs:
if o["name"] == self.org:
self.org_id = o["id"]
self.log.debug(f"Found OrgID: {self.org_id}")
return
raise RuntimeError(f"Could not determine orgID for org {self.org}")

def define_token(self) -> TokenPost:
perms: List[Permission] = []
categories = (
Expand Down
31 changes: 31 additions & 0 deletions tox.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
[tox]
envlist = py,coverage-report,typing,lint
isolated_build = True

[testenv]
description = Run pytest against {envname}.
deps =
-r{toxinidir}/requirements/main.txt
-r{toxinidir}/requirements/dev.txt
commands =
pytest -vvv --cov=rubin_influx_tools --cov-branch --cov-report= {posargs}

[testenv:coverage-report]
description = Compile coverage from each test run.
skip_install = true
deps = coverage[toml]>=5.0.2
depends =
py
commands = coverage report

[testenv:typing]
description = Run mypy.
commands =
mypy src/rubin_influx_tools tests setup.py

[testenv:lint]
description = Lint codebase by running pre-commit (Black, isort, Flake8).
skip_install = true
deps =
pre-commit
commands = pre-commit run --all-files

0 comments on commit b2bbd9c

Please sign in to comment.