Skip to content

Commit

Permalink
Misc Promethus changes (#130)
Browse files Browse the repository at this point in the history
* Remove default GC and process collectors

* Use Prometheus metrics prefix

Even though it is considered an anti-pattern to add a prefix with the name of the software or component to metrics (according to the official Prometheus documentation), I have decided to add a prefix. I’ve found that this makes it much easier to find relevant metrics. The main disadvantage of per-component prefixes queries become slightly more complex if you want to query the same metric (e.g. HTTP request duration) across multiple components. This isn’t super important in our case though, so I think the trade-off is acceptable.

* Use keyword args to set Prometheus metric labels

As suggested by @stchris, thanks!
  • Loading branch information
tillprochaska authored Nov 21, 2023
1 parent 45f3de8 commit 5344cf5
Showing 1 changed file with 31 additions and 13 deletions.
44 changes: 31 additions & 13 deletions servicelayer/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,14 @@
from banal import ensure_list
from abc import ABC, abstractmethod

from prometheus_client import start_http_server, Counter, Histogram
from prometheus_client import (
start_http_server,
Counter,
Histogram,
REGISTRY,
GC_COLLECTOR,
PROCESS_COLLECTOR,
)

from servicelayer import settings
from servicelayer.jobs import Stage
Expand All @@ -22,26 +29,29 @@
INTERVAL = 2
TASK_FETCH_RETRY = 60 / INTERVAL

TASK_STARTED = Counter(
"task_started_total",
REGISTRY.unregister(GC_COLLECTOR)
REGISTRY.unregister(PROCESS_COLLECTOR)

TASKS_STARTED = Counter(
"servicelayer_tasks_started_total",
"Number of tasks that a worker started processing",
["stage"],
)

TASK_SUCCEEDED = Counter(
"task_succeeded_total",
TASKS_SUCCEEDED = Counter(
"servicelayer_tasks_succeeded_total",
"Number of successfully processed tasks",
["stage", "retries"],
)

TASK_FAILED = Counter(
"task_failed_total",
TASKS_FAILED = Counter(
"servicelayer_tasks_failed_total",
"Number of failed tasks",
["stage", "retries", "failed_permanently"],
)

TASK_DURATION = Histogram(
"task_duration_seconds",
"servicelayer_task_duration_seconds",
"Task duration in seconds",
["stage"],
# The bucket sizes are a rough guess right now, we might want to adjust
Expand Down Expand Up @@ -98,12 +108,12 @@ def handle_safe(self, task):
retries = unpack_int(task.context.get("retries"))

try:
TASK_STARTED.labels(task.stage.stage).inc()
TASKS_STARTED.labels(stage=task.stage.stage).inc()
start_time = default_timer()
self.handle(task)
duration = max(0, default_timer() - start_time)
TASK_DURATION.labels(task.stage.stage).observe(duration)
TASK_SUCCEEDED.labels(task.stage.stage, retries).inc()
TASK_DURATION.labels(stage=task.stage.stage).observe(duration)
TASKS_SUCCEEDED.labels(stage=task.stage.stage, retries=retries).inc()
except SystemExit as exc:
self.exit_code = exc.code
self.retry(task)
Expand Down Expand Up @@ -143,14 +153,22 @@ def retry(self, task):
log.warning(
f"Queueing failed task for retry #{retry_count}/{settings.WORKER_RETRY}..." # noqa
)
TASK_FAILED.labels(task.stage.stage, retries, False).inc()
TASKS_FAILED.labels(
stage=task.stage.stage,
retries=retries,
failed_permanently=False,
).inc()
task.context["retries"] = retry_count
task.stage.queue(task.payload, task.context)
else:
log.warning(
f"Failed task, exhausted retry count of {settings.WORKER_RETRY}"
)
TASK_FAILED.labels(task.stage.stage, retries, True).inc()
TASKS_FAILED.labels(
stage=task.stage.stage,
retries=retries,
failed_permanently=True,
).inc()

def process(self, blocking=True, interval=INTERVAL):
retries = 0
Expand Down

0 comments on commit 5344cf5

Please sign in to comment.