Skip to content

Commit

Permalink
#371 support ignorable shutdown message
Browse files Browse the repository at this point in the history
  • Loading branch information
tomskikh committed Aug 28, 2023
1 parent a1d55f5 commit 3017b62
Show file tree
Hide file tree
Showing 10 changed files with 135 additions and 8 deletions.
2 changes: 1 addition & 1 deletion docs/source/advanced_topics/0_dead_stream_eviction.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,6 @@ Take a look at the ``default.yaml`` for details:

.. literalinclude:: ../../../savant/config/default.yml
:language: YAML
:lines: 115-131
:lines: 119-135

You can override only required parameters in your module YAML configuration file. Also, take a look at corresponding environment variables helping to configure the parameters without specifying them in the module config.
2 changes: 1 addition & 1 deletion docs/source/savant_101/12_module_definition.rst
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ The following parameters are defined for a Savant module by default:

.. literalinclude:: ../../../savant/config/default.yml
:language: YAML
:lines: 1-116
:lines: 1-120

.. note::

Expand Down
2 changes: 1 addition & 1 deletion docs/source/savant_101/29_pipeline.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ Default module configuration file already defines the :py:attr:`~savant.config.s

.. literalinclude:: ../../../savant/config/default.yml
:language: YAML
:lines: 115-
:lines: 119-

It is possible to redefine them, but the encouraged operation mode assumes the use of ZeroMQ source and sink.

Expand Down
20 changes: 19 additions & 1 deletion gst_plugins/python/savant_rs_video_decode_bin.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,13 @@
NESTED_DEMUX_PROPERTIES = {
k: v
for k, v in SAVANT_RS_VIDEO_DEMUX_PROPERTIES.items()
if k in ['source-timeout', 'source-eviction-interval', 'max-parallel-streams']
if k
in [
'source-timeout',
'source-eviction-interval',
'max-parallel-streams',
'shutdown-auth',
]
}
SAVANT_RS_VIDEO_DECODE_BIN_PROPERTIES = {
'low-latency-decoding': (
Expand Down Expand Up @@ -110,6 +116,8 @@ class SavantRsVideoDecodeBin(LoggerMixin, Gst.Bin):

__gproperties__ = SAVANT_RS_VIDEO_DECODE_BIN_PROPERTIES

__gsignals__ = {'shutdown': (GObject.SignalFlags.RUN_LAST, None, ())}

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._elem_to_branch: Dict[Gst.Element, BranchInfo] = {}
Expand All @@ -126,6 +134,7 @@ def __init__(self, *args, **kwargs):
self._demuxer.set_property('eos-on-timestamps-reset', True)
self.add(self._demuxer)
self._demuxer.connect('pad-added', self.on_pad_added)
self._demuxer.connect('shutdown', self.on_shutdown)
self._max_parallel_streams: int = self._demuxer.get_property(
'max-parallel-streams'
)
Expand Down Expand Up @@ -412,6 +421,15 @@ def on_add_element(

return decoder

def on_shutdown(self, element: Gst.Element):
"""Handle shutdown signal."""

self.logger.debug(
'Received shutdown signal from %s. Passing it downstream.',
element.get_name(),
)
self.emit('shutdown')


# register plugin
GObject.type_register(SavantRsVideoDecodeBin)
Expand Down
44 changes: 42 additions & 2 deletions gst_plugins/python/savant_rs_video_demux.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,12 @@
from typing import Dict, NamedTuple, Optional

from savant_rs.pipeline2 import VideoPipeline
from savant_rs.primitives import EndOfStream, VideoFrame, VideoFrameTransformation
from savant_rs.primitives import (
EndOfStream,
Shutdown,
VideoFrame,
VideoFrameTransformation,
)
from savant_rs.utils import PropagatedContext

from savant.api.constants import DEFAULT_FRAMERATE
Expand Down Expand Up @@ -81,6 +86,13 @@
None,
GObject.ParamFlags.READWRITE,
),
'shutdown-auth': (
str,
'Authentication key for Shutdown message.',
'Authentication key for Shutdown message.',
None,
GObject.ParamFlags.READWRITE,
),
}


Expand Down Expand Up @@ -144,6 +156,8 @@ class SavantRsVideoDemux(LoggerMixin, Gst.Element):

__gproperties__ = SAVANT_RS_VIDEO_DEMUX_PROPERTIES

__gsignals__ = {'shutdown': (GObject.SignalFlags.RUN_LAST, None, ())}

def __init__(self):
super().__init__()
self.sources: Dict[str, SourceInfo] = {}
Expand All @@ -159,6 +173,7 @@ def __init__(self):
self.source_id: Optional[str] = None
self.video_pipeline: Optional[VideoPipeline] = None
self.pipeline_stage_name: Optional[str] = None
self.shutdown_auth: Optional[str] = None

self._frame_idx_gen = itertools.count()

Expand Down Expand Up @@ -200,6 +215,8 @@ def do_get_property(self, prop):
return self.video_pipeline
if prop.name == 'pipeline-stage-name':
return self.pipeline_stage_name
if prop.name == 'shutdown-auth':
return self.shutdown_auth
raise AttributeError(f'Unknown property {prop.name}')

def do_set_property(self, prop, value):
Expand All @@ -218,6 +235,8 @@ def do_set_property(self, prop, value):
self.video_pipeline = value
elif prop.name == 'pipeline-stage-name':
self.pipeline_stage_name = value
elif prop.name == 'shutdown-auth':
self.shutdown_auth = value
else:
raise AttributeError(f'Unknown property {prop.name}')

Expand Down Expand Up @@ -251,8 +270,10 @@ def handle_buffer(
)
elif message.is_end_of_stream():
result = self.handle_eos(message.as_end_of_stream())
elif message.is_shutdown():
result = self.handle_shutdown(message.as_shutdown())
else:
self.logger.debug('Unsupported message type for message %r', message)
self.logger.warning('Unsupported message type for message %r', message)
result = Gst.FlowReturn.OK

return result
Expand Down Expand Up @@ -408,6 +429,25 @@ def handle_eos(self, eos: EndOfStream) -> Gst.FlowReturn:

return Gst.FlowReturn.OK

def handle_shutdown(self, shutdown: Shutdown) -> Gst.FlowReturn:
"""Handle Shutdown message."""
if self.shutdown_auth is None or shutdown.auth != self.shutdown_auth:
self.logger.debug('Ignoring shutdown message.')
return Gst.FlowReturn.OK

self.logger.info('Received shutdown message.')
with self.source_lock:
self.is_running = False
for source_id, source_info in list(self.sources.items()):
self.logger.debug('Sending EOS to source %s.', source_id)
if source_info.src_pad is not None:
self.send_eos(source_info)
del self.sources[source_id]
self.logger.debug('Emitting shutdown signal.')
self.emit('shutdown')

return Gst.FlowReturn.OK

def add_source(self, source_id: str, source_info: SourceInfo):
"""Handle adding new source."""
caps = build_caps(source_info.params)
Expand Down
12 changes: 12 additions & 0 deletions gst_plugins/python/zeromq_source_bin.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ class ZeroMQSourceBin(LoggerMixin, Gst.Bin):
**SAVANT_RS_VIDEO_DECODE_BIN_PROPERTIES,
}

__gsignals__ = {'shutdown': (GObject.SignalFlags.RUN_LAST, None, ())}

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

Expand All @@ -47,6 +49,7 @@ def __init__(self, *args, **kwargs):
assert self._queue.link(self._decodebin)
self._decodebin.connect('pad-added', self.on_pad_added)
self._decodebin.connect('pad-removed', self.on_pad_removed)
self._decodebin.connect('shutdown', self.on_shutdown)

def do_get_property(self, prop):
"""Gst plugin get property function.
Expand Down Expand Up @@ -89,6 +92,15 @@ def on_pad_removed(self, element: Gst.Element, pad: Gst.Pad):
self.remove_pad(ghost_pad)
return

def on_shutdown(self, element: Gst.Element):
"""Handle shutdown signal."""

self.logger.info(
'Received shutdown signal from %s. Passing it downstream.',
element.get_name(),
)
self.emit('shutdown')


# register plugin
GObject.type_register(ZeroMQSourceBin)
Expand Down
2 changes: 1 addition & 1 deletion requirements/savant-rs.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
savant-rs==0.1.58
savant-rs==0.1.62
4 changes: 4 additions & 0 deletions savant/config/default.yml
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ parameters:
# Parameters for telemetry provider
provider_params: ${json:${oc.env:TELEMETRY_PROVIDER_PARAMS, null}}

# Shutdown authorization key. If set, module will shutdown when it receives
# a Shutdown message with this key.
# shutdown_auth: "shutdown-auth"


# pipeline definition
pipeline:
Expand Down
51 changes: 50 additions & 1 deletion savant/deepstream/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from collections import defaultdict
from pathlib import Path
from queue import Queue
from threading import Lock
from threading import Lock, Thread
from typing import Any, List, Optional, Tuple, Union

import pyds
Expand Down Expand Up @@ -121,6 +121,9 @@ def __init__(
'pipeline-decoder-stage-name': 'decode',
}
)
shutdown_auth = kwargs.get('shutdown_auth')
if shutdown_auth is not None:
pipeline_cfg.source.properties['shutdown-auth'] = shutdown_auth

buffer_queues: Optional[BufferQueuesParameters] = kwargs.get('buffer_queues')
if buffer_queues is not None:
Expand Down Expand Up @@ -212,9 +215,54 @@ def add_element(

def before_shutdown(self):
super().before_shutdown()
self._disable_eos_suppression()

def _on_shutdown_signal(self, element: Gst.Element):
"""Handle shutdown signal."""

self._logger.info('Received shutdown signal from %s.', element.get_name())
Thread(target=self._handle_shutdown_signal, daemon=True).start()

def _handle_shutdown_signal(self):
while self._sources.has_sources():
self._logger.debug('Waiting for sources to release')
time.sleep(0.1)
if not self._is_running:
self._logger.info('Pipeline was shut down already.')
return

self._logger.info('Shutting down the pipeline.')
# We need to connect a fakesink element to the demuxer and receive EOS
# with it to shut down the pipeine.
self._logger.debug('Adding fakesink to the pipeline')
fakesink = self.add_element(
PipelineElement(
'fakesink',
properties={
'sync': 0,
'qos': 0,
'enable-last-sample': 0,
},
),
link=False,
)
fakesink_pad: Gst.Pad = fakesink.get_static_pad('sink')
demuxer_src_pad = self._demuxer_src_pads[0]
self._logger.debug(
'Linking demuxer pad %s to fakesink pad %s',
demuxer_src_pad.get_name(),
fakesink_pad.get_name(),
)
assert demuxer_src_pad.link(fakesink_pad) == Gst.PadLinkReturn.OK
fakesink.sync_state_with_parent()
# This will also send EOS to all demuxer src pads.
self._disable_eos_suppression()

def _disable_eos_suppression(self):
self._logger.debug(
'Turning off "drop-pipeline-eos" of %s', self._muxer.get_name()
)
self._suppress_eos = False
self._muxer.set_property('drop-pipeline-eos', False)

# Source
Expand All @@ -231,6 +279,7 @@ def _add_source(self, source: PipelineElement):
self.on_source_added,
add_frames_to_pipeline,
)
_source.connect('shutdown', self._on_shutdown_signal)

# Need to suppress EOS on nvstreammux sink pad
# to prevent pipeline from shutting down
Expand Down
4 changes: 4 additions & 0 deletions savant/utils/source_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,7 @@ def get_id_by_pad_index(self, pad_idx: int) -> str:
:return: Source id value.
"""
return self._source_id_by_index[pad_idx]

def has_sources(self):
"""Check if there are any sources registered."""
return bool(self._sources)

0 comments on commit 3017b62

Please sign in to comment.