Skip to content

Commit

Permalink
886 Implemented stage latency stats (#888)
Browse files Browse the repository at this point in the history
* implemented stage latency stats
* Updated savant-rs to 0.4.4 (fixed latency collection metrics)
  • Loading branch information
bwsw authored Dec 3, 2024
1 parent 6e9d047 commit 957fc81
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 20 deletions.
9 changes: 6 additions & 3 deletions samples/pass_through_processing/docker-compose.l4t.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,13 @@ services:
- ..:/opt/savant/samples
command: samples/pass_through_processing/module.yml
environment:
- LOGLEVEL=info,savant_core::pipeline::stats=error
- MODULE_STAGE=detector
- MODEL_PATH=/cache/models/peoplenet_detector
- DOWNLOAD_PATH=/cache/downloads/peoplenet_detector
- ZMQ_SRC_ENDPOINT=sub+bind:ipc:///tmp/zmq-sockets/input-video.ipc
- ZMQ_SINK_ENDPOINT=pub+bind:ipc:///tmp/zmq-sockets/detector-output.ipc
- METRICS_FRAME_PERIOD=100
- METRICS_FRAME_PERIOD=1000
- METRICS_TIME_PERIOD=1
- METRICS_PROVIDER=prometheus
- METRICS_PROVIDER_PARAMS={"port":8000, "labels":{"module_stage":"detector"}}
Expand All @@ -50,10 +51,11 @@ services:
- ..:/opt/savant/samples
command: samples/pass_through_processing/module.yml
environment:
- LOGLEVEL=info,savant_core::pipeline::stats=error
- MODULE_STAGE=tracker
- MODEL_PATH=/cache/models/peoplenet_detector
- DOWNLOAD_PATH=/cache/downloads/peoplenet_detector
- METRICS_FRAME_PERIOD=100
- METRICS_FRAME_PERIOD=1000
- METRICS_TIME_PERIOD=1
- METRICS_PROVIDER=prometheus
- METRICS_PROVIDER_PARAMS={"port":8000, "labels":{"module_stage":"tracker"}}
Expand All @@ -70,8 +72,9 @@ services:
- ..:/opt/savant/samples
command: samples/pass_through_processing/module.yml
environment:
- LOGLEVEL=info,savant_core::pipeline::stats=error
- MODULE_STAGE=draw-func
- METRICS_FRAME_PERIOD=100
- METRICS_FRAME_PERIOD=1000
- METRICS_TIME_PERIOD=1
- METRICS_PROVIDER=prometheus
- METRICS_PROVIDER_PARAMS={"port":8000, "labels":{"module_stage":"draw-func"}}
Expand Down
9 changes: 6 additions & 3 deletions samples/pass_through_processing/docker-compose.x86.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ services:
- ..:/opt/savant/samples
command: samples/pass_through_processing/module.yml
environment:
- LOGLEVEL=info,savant_core::pipeline::stats=error
- MODULE_STAGE=detector
- MODEL_PATH=/cache/models/peoplenet_detector
- DOWNLOAD_PATH=/cache/downloads/peoplenet_detector
- METRICS_FRAME_PERIOD=100
- METRICS_FRAME_PERIOD=1000
- METRICS_TIME_PERIOD=1
- METRICS_PROVIDER=prometheus
- METRICS_PROVIDER_PARAMS={"port":8000, "labels":{"module_stage":"detector"}}
Expand All @@ -58,10 +59,11 @@ services:
- ..:/opt/savant/samples
command: samples/pass_through_processing/module.yml
environment:
- LOGLEVEL=info,savant_core::pipeline::stats=error
- MODULE_STAGE=tracker
- MODEL_PATH=/cache/models/peoplenet_detector
- DOWNLOAD_PATH=/cache/downloads/peoplenet_detector
- METRICS_FRAME_PERIOD=100
- METRICS_FRAME_PERIOD=1000
- METRICS_TIME_PERIOD=1
- METRICS_PROVIDER=prometheus
- METRICS_PROVIDER_PARAMS={"port":8000, "labels":{"module_stage":"tracker"}}
Expand All @@ -84,8 +86,9 @@ services:
- ..:/opt/savant/samples
command: samples/pass_through_processing/module.yml
environment:
- LOGLEVEL=info,savant_core::pipeline::stats=error
- MODULE_STAGE=draw-func
- METRICS_FRAME_PERIOD=100
- METRICS_FRAME_PERIOD=1000
- METRICS_TIME_PERIOD=1
- METRICS_PROVIDER=prometheus
- METRICS_PROVIDER_PARAMS={"port":8000, "labels":{"module_stage":"draw-func"}}
Expand Down
4 changes: 2 additions & 2 deletions samples/pass_through_processing/py_func_metrics_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@ def process_frame(self, buffer: Gst.Buffer, frame_meta: NvDsFrameMeta):
# Count the frame for this source
self.metrics['frames_per_source'].inc(
# 1, # Default increment value
# Labels should be a tuple and must match the labelnames
# Labels should be a tuple and must match the label names
labels=(frame_meta.source_id,),
)
try:
last_runtime_metric = self.get_runtime_metrics(1)[0]
queue_length = sum(
stage.queue_length for stage in last_runtime_metric.stage_stats
stage[0].queue_length for stage in last_runtime_metric.stage_stats
)
except IndexError:
queue_length = 0
Expand Down
2 changes: 1 addition & 1 deletion savant/VERSION
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
SAVANT=0.5.0
SAVANT_RS=0.4.1
SAVANT_RS=0.4.4
DEEPSTREAM=7.0
76 changes: 65 additions & 11 deletions savant/metrics/prometheus.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,12 @@ def __init__(
super().__init__(extra_labels)
self._pipeline = pipeline
label_names = ('record_type',)
stage_label_names = ('record_type', 'stage_name')
stage_performance_label_names = ('record_type', 'stage_name')
stage_latency_label_names = (
'record_type',
'destination_stage_name',
'source_stage_name',
)
self.register_metric(
Counter(
'frame_counter',
Expand All @@ -154,28 +159,59 @@ def __init__(
Gauge(
'stage_queue_length',
'Queue length in the stage',
stage_label_names,
stage_performance_label_names,
)
)
self.register_metric(
Counter(
'stage_frame_counter',
'Number of frames passed through the stage',
stage_label_names,
stage_performance_label_names,
)
)
self.register_metric(
Counter(
'stage_object_counter',
'Number of objects passed through the stage',
stage_label_names,
stage_performance_label_names,
)
)
self.register_metric(
Counter(
'stage_batch_counter',
'Number of frame batches passed through the stage',
stage_label_names,
stage_performance_label_names,
)
)
self.register_metric(
Gauge(
'stage_min_latency',
'Minimum latency (micros) measuring how long the data spent '
'on the previous stage before moving to the current stage',
stage_latency_label_names,
)
)
self.register_metric(
Gauge(
'stage_max_latency',
'Maximum latency (micros) measuring how long the data spent '
'on the previous stage before moving to the current stage',
stage_latency_label_names,
)
)
self.register_metric(
Gauge(
'stage_avg_latency',
'Average latency (micros) measuring how long the data '
'spent on the previous stage before moving to the current stage',
stage_latency_label_names,
)
)
self.register_metric(
Gauge(
'stage_latency_samples',
'Number of samples collected for latency measurement',
stage_latency_label_names,
)
)

Expand All @@ -198,20 +234,38 @@ def update_metrics(self, record: FrameProcessingStatRecord):
labels = (record_type_str,)
self._metrics['frame_counter'].set(record.frame_no, labels, ts)
self._metrics['object_counter'].set(record.object_counter, labels, ts)
for stage in record.stage_stats:
stage_labels = record_type_str, stage.stage_name
for sps, sls in record.stage_stats:
stage_performance_labels = record_type_str, sps.stage_name
self._metrics['stage_queue_length'].set(
stage.queue_length, stage_labels, ts
sps.queue_length, stage_performance_labels, ts
)
self._metrics['stage_frame_counter'].set(
stage.frame_counter, stage_labels, ts
sps.frame_counter, stage_performance_labels, ts
)
self._metrics['stage_object_counter'].set(
stage.object_counter, stage_labels, ts
sps.object_counter, stage_performance_labels, ts
)
self._metrics['stage_batch_counter'].set(
stage.batch_counter, stage_labels, ts
sps.batch_counter, stage_performance_labels, ts
)
for measurements in sls.latencies:
stage_latency_labels = (
record_type_str,
sls.stage_name,
measurements.source_stage_name,
)
self._metrics['stage_min_latency'].set(
measurements.min_latency_micros, stage_latency_labels, ts
)
self._metrics['stage_max_latency'].set(
measurements.max_latency_micros, stage_latency_labels, ts
)
self._metrics['stage_avg_latency'].set(
measurements.avg_latency_micros, stage_latency_labels, ts
)
self._metrics['stage_latency_samples'].set(
measurements.count, stage_latency_labels, ts
)

def get_last_records(self) -> List[FrameProcessingStatRecord]:
"""Get last metrics records from the pipeline.
Expand Down

0 comments on commit 957fc81

Please sign in to comment.