From 365298fb7556ce024bf635bbc2ebfc7c8d052c80 Mon Sep 17 00:00:00 2001 From: Jeremy Figgins Date: Thu, 2 Mar 2023 21:23:35 -0600 Subject: [PATCH 1/2] Add expiration to remove stale metrics --- exampleconf/metric_example.yaml | 3 +- mqtt_exporter.py | 49 ++++++++++++++++++++++++++++----- 2 files changed, 44 insertions(+), 8 deletions(-) diff --git a/exampleconf/metric_example.yaml b/exampleconf/metric_example.yaml index 89164fa..cec65a6 100644 --- a/exampleconf/metric_example.yaml +++ b/exampleconf/metric_example.yaml @@ -3,6 +3,7 @@ metrics: - name: 'mqtt_example' # Required(unique, if multiple, only last entry is kept) help: 'MQTT example gauge' # Required type: 'gauge' # Required ('gauge', 'counter', 'summary' or 'histogram') + expires: 60 # Optional time in seconds after last update to remove metric #parameters: # Optional parameters for certain metrics # buckets: # Optional (Passed as 'buckets' argument to Histogram) # - .1 @@ -24,4 +25,4 @@ metrics: regex: '(.*)' # Optional default '(.*)' target_label: '__topic__' # Required (when label_configs is present and 'action' = 'replace') replacement: '\1' # Optional default '\1' - action: 'replace' # Optional default 'replace' \ No newline at end of file + action: 'replace' # Optional default 'replace' diff --git a/mqtt_exporter.py b/mqtt_exporter.py index 5f7d221..a362d8a 100755 --- a/mqtt_exporter.py +++ b/mqtt_exporter.py @@ -17,6 +17,7 @@ from yamlreader import yaml_load import utils.prometheus_additions import version +import threading VERSION = version.__version__ SUFFIXES_PER_TYPE = { @@ -27,6 +28,7 @@ "histogram": ['sum', 'count', 'bucket'], "enum": [], } +METRICS_LOCK = threading.Semaphore() def _read_config(config_path): @@ -308,16 +310,30 @@ def _update_metrics(metrics, msg): _export_to_prometheus( derived_metric['name'], derived_metric, derived_labels) + if metric.get('expires'): + if metric.get('expiration_timer'): + metric['expiration_timer'].cancel() + + metric['expiration_timer'] = threading.Timer(metric.get('expires'), _remove_metric, args=(metric, derived_metric)) + metric['expiration_timer'].start() + + +def _remove_metric(metric, derived_metric): + with METRICS_LOCK: + metric['prometheus_metric']['parent'].clear() + derived_metric['prometheus_metric']['parent'].clear() + # noinspection PyUnusedLocal def _on_message(client, userdata, msg): - """The callback for when a PUBLISH message is received from the server.""" - logging.debug( - f'_on_message Msg received on topic: {msg.topic}, Value: {str(msg.payload)}') + with METRICS_LOCK: + """The callback for when a PUBLISH message is received from the server.""" + logging.debug( + f'_on_message Msg received on topic: {msg.topic}, Value: {str(msg.payload)}') - for topic in userdata.keys(): - if _topic_matches(topic, msg.topic): - _update_metrics(userdata[topic], msg) + for topic in userdata.keys(): + if _topic_matches(topic, msg.topic): + _update_metrics(userdata[topic], msg) def _mqtt_init(mqtt_config, metrics): @@ -358,7 +374,7 @@ def _export_to_prometheus(name, metric, labels): valid_types = metric_wrappers.keys() if metric['type'] not in valid_types: logging.error( - f"Metric type: {metric['type']}, is not a valid metric type. Must be one of: {valid_types} - ingnoring" + f"Metric type: {metric['type']}, is not a valid metric type. Must be one of: {valid_types} - ignoring" ) return @@ -430,6 +446,10 @@ def __init__(self, name, help_text, label_names, *args, **kwargs) -> None: self.metric = prometheus.Gauge( name, help_text, list(label_names) ) + + def clear(self): + self.metric.clear() + def update(self, label_values, value): child = self.metric.labels(*label_values) child.set(value) @@ -446,6 +466,9 @@ def __init__(self, name, help_text, label_names, *args, **kwargs) -> None: name, help_text, list(label_names) ) + def clear(self): + self.metric.clear() + def update(self, label_values, value): child = self.metric.labels(*label_values) child.inc(value) @@ -466,6 +489,9 @@ def __init__(self, name, help_text, label_names, *args, **kwargs) -> None: name, help_text, list(label_names) ) + def clear(self): + self.metric.clear() + def update(self, label_values, value): child = self.metric.labels(*label_values) child.set(value) @@ -482,6 +508,9 @@ def __init__(self, name, help_text, label_names, *args, **kwargs) -> None: name, help_text, list(label_names) ) + def clear(self): + self.metric.clear() + def update(self, label_values, value): child = self.metric.labels(*label_values) child.observe(value) @@ -505,6 +534,9 @@ def __init__(self, name, help_text, label_names, *args, **kwargs) -> None: name, help_text, list(label_names), **params ) + def clear(self): + self.metric.clear() + def update(self, label_values, value): child = self.metric.labels(*label_values) child.observe(value) @@ -521,6 +553,9 @@ def __init__(self, name, help_text, label_names, *args, **kwargs) -> None: name, help_text, list(label_names), **params ) + def clear(self): + self.metric.clear() + def update(self, label_values, value): child = self.metric.labels(*label_values) child.state(value) From d89621d43098b16581cbdae82cc66e0cc5b69a95 Mon Sep 17 00:00:00 2001 From: Jeremy Figgins Date: Sat, 4 Mar 2023 19:10:27 -0600 Subject: [PATCH 2/2] Add logging, minor code cleanup --- mqtt_exporter.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/mqtt_exporter.py b/mqtt_exporter.py index a362d8a..9aac1aa 100755 --- a/mqtt_exporter.py +++ b/mqtt_exporter.py @@ -312,16 +312,19 @@ def _update_metrics(metrics, msg): if metric.get('expires'): if metric.get('expiration_timer'): - metric['expiration_timer'].cancel() + metric.get('expiration_timer').cancel() + logging.debug(f"_update_metric Canceled existing timer for {metric.get('name')}") - metric['expiration_timer'] = threading.Timer(metric.get('expires'), _remove_metric, args=(metric, derived_metric)) + metric['expiration_timer'] = threading.Timer(metric.get('expires'), _clear_metric, args=(metric, derived_metric)) metric['expiration_timer'].start() + logging.debug(f"_update_metric Set a {metric.get('expires')} second expiration timer for {metric.get('name')}") -def _remove_metric(metric, derived_metric): +def _clear_metric(metric, derived_metric): with METRICS_LOCK: metric['prometheus_metric']['parent'].clear() derived_metric['prometheus_metric']['parent'].clear() + logging.debug(f"_clear_metric cleared metric {metric.get('name')}") # noinspection PyUnusedLocal @@ -474,10 +477,6 @@ def update(self, label_values, value): child.inc(value) return child -class HistogramWrapper(): - """ - Wrapper to provide generic interface to Summary metric - """ class CounterAbsoluteWrapper(): """