From 957fc81c4c10c056c4457d85e6a9cda9b6fcdef4 Mon Sep 17 00:00:00 2001 From: Ivan Kud Date: Tue, 3 Dec 2024 07:09:02 +0100 Subject: [PATCH] 886 Implemented stage latency stats (#888) * implemented stage latency stats * Updated savant-rs to 0.4.4 (fixed latency collection metrics) --- .../docker-compose.l4t.yml | 9 ++- .../docker-compose.x86.yml | 9 ++- .../py_func_metrics_example.py | 4 +- savant/VERSION | 2 +- savant/metrics/prometheus.py | 76 ++++++++++++++++--- 5 files changed, 80 insertions(+), 20 deletions(-) diff --git a/samples/pass_through_processing/docker-compose.l4t.yml b/samples/pass_through_processing/docker-compose.l4t.yml index 3b8ebcfae..cb7492b72 100644 --- a/samples/pass_through_processing/docker-compose.l4t.yml +++ b/samples/pass_through_processing/docker-compose.l4t.yml @@ -30,12 +30,13 @@ services: - ..:/opt/savant/samples command: samples/pass_through_processing/module.yml environment: + - LOGLEVEL=info,savant_core::pipeline::stats=error - MODULE_STAGE=detector - MODEL_PATH=/cache/models/peoplenet_detector - DOWNLOAD_PATH=/cache/downloads/peoplenet_detector - ZMQ_SRC_ENDPOINT=sub+bind:ipc:///tmp/zmq-sockets/input-video.ipc - ZMQ_SINK_ENDPOINT=pub+bind:ipc:///tmp/zmq-sockets/detector-output.ipc - - METRICS_FRAME_PERIOD=100 + - METRICS_FRAME_PERIOD=1000 - METRICS_TIME_PERIOD=1 - METRICS_PROVIDER=prometheus - METRICS_PROVIDER_PARAMS={"port":8000, "labels":{"module_stage":"detector"}} @@ -50,10 +51,11 @@ services: - ..:/opt/savant/samples command: samples/pass_through_processing/module.yml environment: + - LOGLEVEL=info,savant_core::pipeline::stats=error - MODULE_STAGE=tracker - MODEL_PATH=/cache/models/peoplenet_detector - DOWNLOAD_PATH=/cache/downloads/peoplenet_detector - - METRICS_FRAME_PERIOD=100 + - METRICS_FRAME_PERIOD=1000 - METRICS_TIME_PERIOD=1 - METRICS_PROVIDER=prometheus - METRICS_PROVIDER_PARAMS={"port":8000, "labels":{"module_stage":"tracker"}} @@ -70,8 +72,9 @@ services: - ..:/opt/savant/samples command: samples/pass_through_processing/module.yml environment: + - LOGLEVEL=info,savant_core::pipeline::stats=error - MODULE_STAGE=draw-func - - METRICS_FRAME_PERIOD=100 + - METRICS_FRAME_PERIOD=1000 - METRICS_TIME_PERIOD=1 - METRICS_PROVIDER=prometheus - METRICS_PROVIDER_PARAMS={"port":8000, "labels":{"module_stage":"draw-func"}} diff --git a/samples/pass_through_processing/docker-compose.x86.yml b/samples/pass_through_processing/docker-compose.x86.yml index b11774e0c..5e4202b65 100644 --- a/samples/pass_through_processing/docker-compose.x86.yml +++ b/samples/pass_through_processing/docker-compose.x86.yml @@ -30,10 +30,11 @@ services: - ..:/opt/savant/samples command: samples/pass_through_processing/module.yml environment: + - LOGLEVEL=info,savant_core::pipeline::stats=error - MODULE_STAGE=detector - MODEL_PATH=/cache/models/peoplenet_detector - DOWNLOAD_PATH=/cache/downloads/peoplenet_detector - - METRICS_FRAME_PERIOD=100 + - METRICS_FRAME_PERIOD=1000 - METRICS_TIME_PERIOD=1 - METRICS_PROVIDER=prometheus - METRICS_PROVIDER_PARAMS={"port":8000, "labels":{"module_stage":"detector"}} @@ -58,10 +59,11 @@ services: - ..:/opt/savant/samples command: samples/pass_through_processing/module.yml environment: + - LOGLEVEL=info,savant_core::pipeline::stats=error - MODULE_STAGE=tracker - MODEL_PATH=/cache/models/peoplenet_detector - DOWNLOAD_PATH=/cache/downloads/peoplenet_detector - - METRICS_FRAME_PERIOD=100 + - METRICS_FRAME_PERIOD=1000 - METRICS_TIME_PERIOD=1 - METRICS_PROVIDER=prometheus - METRICS_PROVIDER_PARAMS={"port":8000, "labels":{"module_stage":"tracker"}} @@ -84,8 +86,9 @@ services: - ..:/opt/savant/samples command: samples/pass_through_processing/module.yml environment: + - LOGLEVEL=info,savant_core::pipeline::stats=error - MODULE_STAGE=draw-func - - METRICS_FRAME_PERIOD=100 + - METRICS_FRAME_PERIOD=1000 - METRICS_TIME_PERIOD=1 - METRICS_PROVIDER=prometheus - METRICS_PROVIDER_PARAMS={"port":8000, "labels":{"module_stage":"draw-func"}} diff --git a/samples/pass_through_processing/py_func_metrics_example.py b/samples/pass_through_processing/py_func_metrics_example.py index c079afef6..0d5066e24 100644 --- a/samples/pass_through_processing/py_func_metrics_example.py +++ b/samples/pass_through_processing/py_func_metrics_example.py @@ -48,13 +48,13 @@ def process_frame(self, buffer: Gst.Buffer, frame_meta: NvDsFrameMeta): # Count the frame for this source self.metrics['frames_per_source'].inc( # 1, # Default increment value - # Labels should be a tuple and must match the labelnames + # Labels should be a tuple and must match the label names labels=(frame_meta.source_id,), ) try: last_runtime_metric = self.get_runtime_metrics(1)[0] queue_length = sum( - stage.queue_length for stage in last_runtime_metric.stage_stats + stage[0].queue_length for stage in last_runtime_metric.stage_stats ) except IndexError: queue_length = 0 diff --git a/savant/VERSION b/savant/VERSION index c3df1635d..e18716e5f 100644 --- a/savant/VERSION +++ b/savant/VERSION @@ -1,3 +1,3 @@ SAVANT=0.5.0 -SAVANT_RS=0.4.1 +SAVANT_RS=0.4.4 DEEPSTREAM=7.0 diff --git a/savant/metrics/prometheus.py b/savant/metrics/prometheus.py index dbc81feae..1135a83e7 100644 --- a/savant/metrics/prometheus.py +++ b/savant/metrics/prometheus.py @@ -135,7 +135,12 @@ def __init__( super().__init__(extra_labels) self._pipeline = pipeline label_names = ('record_type',) - stage_label_names = ('record_type', 'stage_name') + stage_performance_label_names = ('record_type', 'stage_name') + stage_latency_label_names = ( + 'record_type', + 'destination_stage_name', + 'source_stage_name', + ) self.register_metric( Counter( 'frame_counter', @@ -154,28 +159,59 @@ def __init__( Gauge( 'stage_queue_length', 'Queue length in the stage', - stage_label_names, + stage_performance_label_names, ) ) self.register_metric( Counter( 'stage_frame_counter', 'Number of frames passed through the stage', - stage_label_names, + stage_performance_label_names, ) ) self.register_metric( Counter( 'stage_object_counter', 'Number of objects passed through the stage', - stage_label_names, + stage_performance_label_names, ) ) self.register_metric( Counter( 'stage_batch_counter', 'Number of frame batches passed through the stage', - stage_label_names, + stage_performance_label_names, + ) + ) + self.register_metric( + Gauge( + 'stage_min_latency', + 'Minimum latency (micros) measuring how long the data spent ' + 'on the previous stage before moving to the current stage', + stage_latency_label_names, + ) + ) + self.register_metric( + Gauge( + 'stage_max_latency', + 'Maximum latency (micros) measuring how long the data spent ' + 'on the previous stage before moving to the current stage', + stage_latency_label_names, + ) + ) + self.register_metric( + Gauge( + 'stage_avg_latency', + 'Average latency (micros) measuring how long the data ' + 'spent on the previous stage before moving to the current stage', + stage_latency_label_names, + ) + ) + self.register_metric( + Gauge( + 'stage_latency_samples', + 'Number of samples collected for latency measurement', + stage_latency_label_names, ) ) @@ -198,20 +234,38 @@ def update_metrics(self, record: FrameProcessingStatRecord): labels = (record_type_str,) self._metrics['frame_counter'].set(record.frame_no, labels, ts) self._metrics['object_counter'].set(record.object_counter, labels, ts) - for stage in record.stage_stats: - stage_labels = record_type_str, stage.stage_name + for sps, sls in record.stage_stats: + stage_performance_labels = record_type_str, sps.stage_name self._metrics['stage_queue_length'].set( - stage.queue_length, stage_labels, ts + sps.queue_length, stage_performance_labels, ts ) self._metrics['stage_frame_counter'].set( - stage.frame_counter, stage_labels, ts + sps.frame_counter, stage_performance_labels, ts ) self._metrics['stage_object_counter'].set( - stage.object_counter, stage_labels, ts + sps.object_counter, stage_performance_labels, ts ) self._metrics['stage_batch_counter'].set( - stage.batch_counter, stage_labels, ts + sps.batch_counter, stage_performance_labels, ts ) + for measurements in sls.latencies: + stage_latency_labels = ( + record_type_str, + sls.stage_name, + measurements.source_stage_name, + ) + self._metrics['stage_min_latency'].set( + measurements.min_latency_micros, stage_latency_labels, ts + ) + self._metrics['stage_max_latency'].set( + measurements.max_latency_micros, stage_latency_labels, ts + ) + self._metrics['stage_avg_latency'].set( + measurements.avg_latency_micros, stage_latency_labels, ts + ) + self._metrics['stage_latency_samples'].set( + measurements.count, stage_latency_labels, ts + ) def get_last_records(self) -> List[FrameProcessingStatRecord]: """Get last metrics records from the pipeline.