Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add expiration to remove stale metrics #61

Merged
merged 2 commits into from
Mar 11, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion exampleconf/metric_example.yaml
Original file line number Diff line number Diff line change
@@ -3,6 +3,7 @@ metrics:
- name: 'mqtt_example' # Required(unique, if multiple, only last entry is kept)
help: 'MQTT example gauge' # Required
type: 'gauge' # Required ('gauge', 'counter', 'summary' or 'histogram')
expires: 60 # Optional time in seconds after last update to remove metric
#parameters: # Optional parameters for certain metrics
# buckets: # Optional (Passed as 'buckets' argument to Histogram)
# - .1
@@ -24,4 +25,4 @@ metrics:
regex: '(.*)' # Optional default '(.*)'
target_label: '__topic__' # Required (when label_configs is present and 'action' = 'replace')
replacement: '\1' # Optional default '\1'
action: 'replace' # Optional default 'replace'
action: 'replace' # Optional default 'replace'
56 changes: 45 additions & 11 deletions mqtt_exporter.py
Original file line number Diff line number Diff line change
@@ -17,6 +17,7 @@
from yamlreader import yaml_load
import utils.prometheus_additions
import version
import threading

VERSION = version.__version__
SUFFIXES_PER_TYPE = {
@@ -27,6 +28,7 @@
"histogram": ['sum', 'count', 'bucket'],
"enum": [],
}
METRICS_LOCK = threading.Semaphore()


def _read_config(config_path):
@@ -308,16 +310,33 @@ def _update_metrics(metrics, msg):
_export_to_prometheus(
derived_metric['name'], derived_metric, derived_labels)

if metric.get('expires'):
if metric.get('expiration_timer'):
metric.get('expiration_timer').cancel()
logging.debug(f"_update_metric Canceled existing timer for {metric.get('name')}")

metric['expiration_timer'] = threading.Timer(metric.get('expires'), _clear_metric, args=(metric, derived_metric))
metric['expiration_timer'].start()
logging.debug(f"_update_metric Set a {metric.get('expires')} second expiration timer for {metric.get('name')}")


def _clear_metric(metric, derived_metric):
with METRICS_LOCK:
metric['prometheus_metric']['parent'].clear()
derived_metric['prometheus_metric']['parent'].clear()
logging.debug(f"_clear_metric cleared metric {metric.get('name')}")


# noinspection PyUnusedLocal
def _on_message(client, userdata, msg):
"""The callback for when a PUBLISH message is received from the server."""
logging.debug(
f'_on_message Msg received on topic: {msg.topic}, Value: {str(msg.payload)}')
with METRICS_LOCK:
"""The callback for when a PUBLISH message is received from the server."""
logging.debug(
f'_on_message Msg received on topic: {msg.topic}, Value: {str(msg.payload)}')

for topic in userdata.keys():
if _topic_matches(topic, msg.topic):
_update_metrics(userdata[topic], msg)
for topic in userdata.keys():
if _topic_matches(topic, msg.topic):
_update_metrics(userdata[topic], msg)


def _mqtt_init(mqtt_config, metrics):
@@ -358,7 +377,7 @@ def _export_to_prometheus(name, metric, labels):
valid_types = metric_wrappers.keys()
if metric['type'] not in valid_types:
logging.error(
f"Metric type: {metric['type']}, is not a valid metric type. Must be one of: {valid_types} - ingnoring"
f"Metric type: {metric['type']}, is not a valid metric type. Must be one of: {valid_types} - ignoring"
)
return

@@ -430,6 +449,10 @@ def __init__(self, name, help_text, label_names, *args, **kwargs) -> None:
self.metric = prometheus.Gauge(
name, help_text, list(label_names)
)

def clear(self):
self.metric.clear()

def update(self, label_values, value):
child = self.metric.labels(*label_values)
child.set(value)
@@ -446,15 +469,14 @@ def __init__(self, name, help_text, label_names, *args, **kwargs) -> None:
name, help_text, list(label_names)
)

def clear(self):
self.metric.clear()

def update(self, label_values, value):
child = self.metric.labels(*label_values)
child.inc(value)
return child

class HistogramWrapper():
"""
Wrapper to provide generic interface to Summary metric
"""
Comment on lines -454 to -457
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HistogramWrapper is defined on line 519 below.


class CounterAbsoluteWrapper():
"""
@@ -466,6 +488,9 @@ def __init__(self, name, help_text, label_names, *args, **kwargs) -> None:
name, help_text, list(label_names)
)

def clear(self):
self.metric.clear()

def update(self, label_values, value):
child = self.metric.labels(*label_values)
child.set(value)
@@ -482,6 +507,9 @@ def __init__(self, name, help_text, label_names, *args, **kwargs) -> None:
name, help_text, list(label_names)
)

def clear(self):
self.metric.clear()

def update(self, label_values, value):
child = self.metric.labels(*label_values)
child.observe(value)
@@ -505,6 +533,9 @@ def __init__(self, name, help_text, label_names, *args, **kwargs) -> None:
name, help_text, list(label_names), **params
)

def clear(self):
self.metric.clear()

def update(self, label_values, value):
child = self.metric.labels(*label_values)
child.observe(value)
@@ -521,6 +552,9 @@ def __init__(self, name, help_text, label_names, *args, **kwargs) -> None:
name, help_text, list(label_names), **params
)

def clear(self):
self.metric.clear()

def update(self, label_values, value):
child = self.metric.labels(*label_values)
child.state(value)