Skip to content

Commit

Permalink
Merge pull request #7 from lsst-sqre/tickets/DM-35232C
Browse files Browse the repository at this point in the history
Add slack notifications and rename restartmapper to taskmaker
  • Loading branch information
athornton authored Jun 22, 2022
2 parents b2bbd9c + f9993bf commit 0aab009
Show file tree
Hide file tree
Showing 11 changed files with 149 additions and 72 deletions.
6 changes: 3 additions & 3 deletions requirements/main.txt
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,9 @@ attrs==21.4.0 \
--hash=sha256:2d27e3784d7a565d36ab851fe94887c5eccd6a463168875832a1be79c82828b4 \
--hash=sha256:626ba8234211db98e869df76230a137c4c40a12d72445c45d5f5b716f076e2fd
# via aiohttp
charset-normalizer==2.0.12 \
--hash=sha256:2857e29ff0d34db842cd7ca3230549d1a697f96ee6d3fb071cfa6c7393832597 \
--hash=sha256:6881edbebdb17b39b4eaaa821b438bf6eddffb4468cf344f09f89def34a8b1df
charset-normalizer==2.1.0 \
--hash=sha256:5189b6f22b01957427f35b6a08d9a0bc45b46d3788ef5a92e978433c7a35f8a5 \
--hash=sha256:575e708016ff3a5e3681541cb9d79312c416835686d054a23accb873b254f413
# via aiohttp
frozenlist==1.3.0 \
--hash=sha256:006d3595e7d4108a12025ddf415ae0f6c9e736e726a5db0183326fd191b14c5e \
Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ where = src
[options.entry_points]
console_scripts =
bucketmapper = rubin_influx_tools.cli:main
restartmapper = rubin_influx_tools.cli:main
taskmaker = rubin_influx_tools.cli:main
tokenmaker = rubin_influx_tools.cli:main

[flake8]
Expand Down
4 changes: 2 additions & 2 deletions src/rubin_influx_tools/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from .bucketmapper import BucketMapper
from .influxclient import InfluxClient
from .influxfns import get_template, seconds_to_duration_literal
from .restartmapper import RestartMapper
from .taskmaker import TaskMaker
from .tokenmaker import TokenMaker

__all__ = [
Expand All @@ -10,5 +10,5 @@
"get_template",
"BucketMapper",
"TokenMaker",
"RestartMapper",
"TaskMaker",
]
14 changes: 7 additions & 7 deletions src/rubin_influx_tools/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
from os.path import basename
from typing import Dict

from . import BucketMapper, InfluxClient, RestartMapper, TokenMaker
from . import BucketMapper, InfluxClient, TaskMaker, TokenMaker

KLASSMAP: Dict[str, type[InfluxClient]] = {
"bucketmapper": BucketMapper,
"tokenmaker": TokenMaker,
"restartmapper": RestartMapper,
"taskmaker": TaskMaker,
}


Expand All @@ -34,13 +34,13 @@ async def bucketmapper() -> None:


async def tokenmaker() -> None:
async with TokenMaker() as bm:
await bm.main()
async with TokenMaker() as tm:
await tm.main()


async def restartmapper() -> None:
async with RestartMapper() as bm:
await bm.main()
async def taskmaker() -> None:
async with TaskMaker() as tm:
await tm.main()


if __name__ == "__main__":
Expand Down
3 changes: 2 additions & 1 deletion src/rubin_influx_tools/influxfns.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,14 @@ def seconds_to_duration_literal(seconds: int) -> str:

def get_template(
name: str,
prefix: str = "",
template_marker: str = "_tmpl.flux",
dir: str = join(dirname(__file__), "templates"),
) -> Template:
"""Convenience function for retrieving a Jinja2 Template from a static
templated query.
"""
fn = join(dir, name + template_marker)
fn = join(dir, prefix + name + template_marker)
with open(fn) as f:
txt = f.read()
return Template(txt) # the shared default environment is fine
Original file line number Diff line number Diff line change
@@ -1,21 +1,24 @@
#!/usr/bin/env python3
from dataclasses import asdict
from typing import Any, List
from typing import List

from .influxclient import InfluxClient
from .influxfns import get_template, seconds_to_duration_literal
from .influxtypes import (
BucketGet,
BucketPost,
CheckPost,
DashboardQuery,
RetentionRule,
TaskGet,
TaskPost,
)

slack_timing = {
"restart": {"every": "1m", "offset": "30s"},
"memory_check": {"every": "5m", "offset": "43s"},
}

class RestartMapper(InfluxClient):

class TaskMaker(InfluxClient):
"""The logic needed to build alerts and tasks for applications."""

async def list_buckets(self) -> List[BucketGet]:
Expand Down Expand Up @@ -68,36 +71,63 @@ async def construct_multiapp_bucket(self) -> None:

async def construct_tasks(self) -> None:
extant_tasks = await self.list_tasks()
extant_tnames = [x.name for x in extant_tasks]
self._extant_tnames = [x.name for x in extant_tasks]

for ttype in "restart", "memory_check":
await self.construct_named_tasks(ttype)
await self.construct_slack_task(ttype)

async def construct_named_tasks(self, ttype: str) -> None:
apps_needing_tasks = [
x
for x in self.app_names
if f"{x.capitalize()} restarts" not in extant_tnames
if f"{x.capitalize()} {ttype}s" not in self._extant_tnames
]
self.log.debug(f"Apps needing tasks -> {apps_needing_tasks}")
self.log.debug(f"Apps needing {ttype} tasks -> {apps_needing_tasks}")

new_tasks = self.build_tasks(apps_needing_tasks)
new_tasks = await self.build_tasks(apps_needing_tasks, ttype)
payloads = [asdict(x) for x in new_tasks]
url = f"{self.api_url}/tasks"
await self.post(url, payloads)

def build_tasks(self, apps: List[str]) -> List[TaskPost]:
async def construct_slack_task(self, ttype: str) -> None:
every = slack_timing[ttype]["every"]
offset = slack_timing[ttype]["offset"]
tname = f"_slack_notify_{ttype}s"
if tname in self._extant_tnames:
return
task_text = get_template(ttype, template_marker="_slack.flux")
task = TaskPost(
description=tname,
org=self.org,
orgID=self.org_id,
status="active",
flux=task_text.render(offset=offset, every=every, taskname=tname),
)
payload = [asdict(task)]
url = f"{self.api_url}/tasks"
await self.post(url, payload)

async def build_tasks(self, apps: List[str], ttype: str) -> List[TaskPost]:
"""Create a list of task objects to post."""
task_template = get_template("restart")
task_template = get_template(ttype)
tasks = []
offset = 0
for app in apps:
offset %= 60
offsetstr = seconds_to_duration_literal(offset)
taskname = f"{app.capitalize()} restarts"
taskname = f"{app.capitalize()} {ttype}s"
tasks.append(
TaskPost(
description=taskname,
org=self.org,
orgID=self.org_id,
status="active",
flux=task_template.render(
taskname=taskname, app_bucket=app, offset=offsetstr
taskname=taskname,
app_bucket=app,
offset=offsetstr,
every="1m",
),
)
)
Expand All @@ -114,37 +144,6 @@ async def list_tasks(self) -> List[TaskGet]:
self.log.debug(f"Tasks -> {[x for x in tasks]}")
return tasks

async def construct_check(self) -> List[Any]:
self.check_id = ""
cname = "K8s app restarts TEST"
itemtype = "checks"
url = f"{self.api_url}/{itemtype}"
obj_list = await self.list_all_with_offset(url, itemtype)
self.log.debug(f"Checks -> {obj_list}")
resp = []
for c in obj_list:
if c["name"] == cname:
self.check_id = c["id"]
break
if not self.check_id:
cc = await self.create_check(cname)
payloads = [asdict(cc)]
resp = await self.post(url, payloads)
# gives a 400 Bad Request right now.
return resp

async def create_check(self, cname: str) -> CheckPost:
check_template = get_template("check")
check_flux = check_template.render()
dq = DashboardQuery(name=cname, text=check_flux)
ck = CheckPost(
description=cname,
every="1m",
orgID=self.org_id,
query=dq,
)
return ck

async def main(self) -> None:
await self.set_org_id()
await self.construct_multiapp_bucket()
Expand Down
14 changes: 0 additions & 14 deletions src/rubin_influx_tools/templates/check_tmpl.flux

This file was deleted.

37 changes: 37 additions & 0 deletions src/rubin_influx_tools/templates/memory_check_slack.flux
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import "slack"
import "influxdata/influxdb/secrets"

option v = {bucket: "_monitoring", timeRangeStart: -1h, timeRangeStop: now(), windowPeriod: 10000ms}

option task = {name: "{{taskname}}", every: {{every}}, offset: {{offset}}}

slackurl = secrets.get(key: "slack_notify_url")
toSlack = slack.endpoint(url: slackurl)

colorLevel = (v) => {
color =
if float(v: v) > 95.0 then
"danger"
else if float(v: v) >= 90.0 then
"warning"
else
"good"

return color
}

from(bucket: "multiapp_")
|> range(start: -2m)
|> filter(fn: (r) => r["_measurement"] == "kubernetes_pod_container")
|> filter(fn: (r) => r["_field"] == "mem_pct")
|> group(columns: ["_time"])
|> filter(fn: (r) => r._value > 90)
|> toSlack(
mapFn: (r) =>
({
channel: "roundtable-test-notifications",
text: "${r.cluster}/${r.container_name}/${r.pod_name} at ${r._time}: ${r._value}% of memory used",
color: colorLevel(v: r.value),
}),
)()
|> yield()
27 changes: 27 additions & 0 deletions src/rubin_influx_tools/templates/memory_check_tmpl.flux
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
option v = {bucket: "_monitoring", timeRangeStart: -1h, timeRangeStop: now(), windowPeriod: 10000ms}

option task = {name: "{{taskname}}", every: {{every}}, offset: {{offset}} }

from(bucket: "{{app_bucket}}")
|> range(start: -15m)
|> filter(fn: (r) => r["_measurement"] == "kubernetes_pod_container")
|> filter(fn: (r) => (r["_field"] == "memory_usage_bytes" or r["_field"] == "resource_limits_memory_bytes"))
|> group(columns: ["_measurement", "_field", "container_name", "pod_name", "cluster"])
|> aggregateWindow(every: 30s, fn: mean, createEmpty: false)
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
|> map(fn: (r) => ({
_time: r._time,
_measurement: r._measurement,
_field: "mem_pct",
_value: float(v: 100.0 * ((float(v: r.memory_usage_bytes)) / (float(v: r.resource_limits_memory_bytes)))),
cluster: r.cluster,
container_name: r.container_name,
application: "{{app_bucket}}",
pod_name: r.pod_name,
resource_limits_memory_bytes: r.resource_limits_memory_bytes,
memory_usage_bytes: r.memory_usage_bytes
})
)
|> filter(fn: (r) => exists r._value)
|> yield(name: "mean")
|> to(bucket: "multiapp_", org: "square")
25 changes: 25 additions & 0 deletions src/rubin_influx_tools/templates/restart_slack.flux
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import "slack"
import "influxdata/influxdb/secrets"

option v = {bucket: "_monitoring", timeRangeStart: -1h, timeRangeStop: now(), windowPeriod: 10000ms}

option task = {name: "{{taskname}}", every: {{every}}, offset: {{offset}}}

slackurl = secrets.get(key: "slack_notify_url")
toSlack = slack.endpoint(url: slackurl)

from(bucket: "multiapp_")
|> range(start: -2m)
|> filter(fn: (r) => r["_measurement"] == "kubernetes_pod_container")
|> filter(fn: (r) => r["_field"] == "restarts_total")
|> group(columns: ["_time"])
|> filter(fn: (r) => r._value != 0)
|> toSlack(
mapFn: (r) =>
({
channel: "roundtable-test-notifications",
text: "Restart(s) for ${r.cluster}/${r.container_name}/${r.pod_name} at ${r._time}: ${r._value}",
color: "danger",
}),
)()
|> yield()
4 changes: 3 additions & 1 deletion src/rubin_influx_tools/templates/restart_tmpl.flux
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
option v = {bucket: "_monitoring", timeRangeStart: -1h, timeRangeStop: now(), windowPeriod: 10000ms}

option task = {name: "{{taskname}}", every: 1m, offset: {{offset}}}
option task = {name: "{{taskname}}", every: {{every}}, offset: {{offset}} }

from(bucket: "{{app_bucket}}")
|> range(start: -1m)
|> filter(fn: (r) => r["_measurement"] == "kubernetes_pod_container")
|> filter(fn: (r) => r["_field"] == "restarts_total")
|> group(columns: ["_measurement", "_field", "container_name", "pod_name", "cluster"])
|> drop(columns: ["_start",
"_stop",
"host",
Expand All @@ -20,6 +21,7 @@ from(bucket: "{{app_bucket}}")
_value: r._value,
cluster: r.cluster,
container_name: r.container_name,
application: "{{app_bucket}}",
phase: r.phase,
pod_name: r.pod_name,
readiness: r.readiness,
Expand Down

0 comments on commit 0aab009

Please sign in to comment.