From d364c654cb023997f6cff2117eaaac19b8950677 Mon Sep 17 00:00:00 2001 From: Pavel Tomskikh Date: Tue, 7 Mar 2023 17:37:09 +0700 Subject: [PATCH] Always-On Low Latency Streaming Sink (RTSP) (#74) --- README.md | 11 +- adapters/__init__.py | 0 .../python/always_on_rtsp_frame_processor.py | 212 ++++++++++ .../python/always_on_rtsp_frame_sink.py | 82 ++++ adapters/ds/sinks/always_on_rtsp/__init__.py | 0 adapters/ds/sinks/always_on_rtsp/__main__.py | 398 ++++++++++++++++++ .../ds/sinks/always_on_rtsp/last_frame.py | 11 + .../sinks/always_on_rtsp/timestamp_overlay.py | 55 +++ .../gst_plugins/python/adjust_timestamps.py | 37 ++ docker/Dockerfile.deepstream | 6 +- docker/Dockerfile.deepstream-l4t | 6 +- docker/Dockerfile.deepstream-l4t-6.0.1 | 6 +- savant/deepstream/opencv_utils.py | 9 +- savant/gst_plugins/python/avro_video_demux.py | 17 + savant/gstreamer/runner.py | 25 +- savant/utils/sink_factories.py | 6 + scripts/common.py | 58 +++ scripts/run_sink.py | 156 ++++++- scripts/run_source.py | 58 +-- 19 files changed, 1079 insertions(+), 74 deletions(-) create mode 100644 adapters/__init__.py create mode 100644 adapters/ds/gst_plugins/python/always_on_rtsp_frame_processor.py create mode 100644 adapters/ds/gst_plugins/python/always_on_rtsp_frame_sink.py create mode 100644 adapters/ds/sinks/always_on_rtsp/__init__.py create mode 100644 adapters/ds/sinks/always_on_rtsp/__main__.py create mode 100644 adapters/ds/sinks/always_on_rtsp/last_frame.py create mode 100644 adapters/ds/sinks/always_on_rtsp/timestamp_overlay.py diff --git a/README.md b/README.md index eb6aa80be..d0302865f 100644 --- a/README.md +++ b/README.md @@ -191,10 +191,7 @@ Currently, the following source adapters are available: - Local image file; - URL Image; - Image directory; -- USB cam; -- Apache Kafka; -- Elasticsearch; -- MongoDB. +- USB cam. There are basic sink adapters implemented: @@ -202,19 +199,19 @@ There are basic sink adapters implemented: - Resulting video overlay displayed on a screen (per source); - MP4 file (per source); - image directory (per source); +- Always-On RTSP Stream Sink. The framework uses an established protocol based on Apache Avro, both for sources and sinks. The sources and sinks talk to Savant through ZeroMQ sockets. ### Easy to Deploy -The framework and adapters are delivered as a Docker image. To implement the pipeline, you take the base image, add AI +The framework and the adapters are delivered as Docker images. To implement the pipeline, you take the base image, add AI models and a custom code with extra dependencies, then build the resulting image. Some pipelines which don't require additional dependencies can be implemented just by mapping directories with models and user functions into the docker image. -As for now, we provide images for conventional PC architecture based on Intel or AMD CPU and discrete GPUs and for -Jetson ARM-based devices. +As for now, we provide images for x86 architecture and for Jetson hardware. ## What's Next diff --git a/adapters/__init__.py b/adapters/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/adapters/ds/gst_plugins/python/always_on_rtsp_frame_processor.py b/adapters/ds/gst_plugins/python/always_on_rtsp_frame_processor.py new file mode 100644 index 000000000..a6c4130a1 --- /dev/null +++ b/adapters/ds/gst_plugins/python/always_on_rtsp_frame_processor.py @@ -0,0 +1,212 @@ +import inspect +from datetime import datetime, timedelta +from enum import Enum +from typing import Any + +import cv2 + +from adapters.ds.sinks.always_on_rtsp.last_frame import LastFrame +from adapters.ds.sinks.always_on_rtsp.timestamp_overlay import TimestampOverlay +from savant.deepstream.opencv_utils import nvds_to_gpu_mat +from savant.gstreamer import Gst, GstBase, GObject +from savant.gstreamer.utils import LoggerMixin, propagate_gst_setting_error + + +class Mode(Enum): + SCALE_TO_FIT = 'scale-to-fit' + CROP_TO_FIT = 'crop-to-fit' + + +CAPS = Gst.Caps.from_string('video/x-raw(memory:NVMM), format=RGBA') +DEFAULT_MAX_DELAY = timedelta(seconds=1) +DEFAULT_MODE = Mode.SCALE_TO_FIT + + +class AlwaysOnRtspFrameProcessor(LoggerMixin, GstBase.BaseTransform): + GST_PLUGIN_NAME: str = 'always_on_rtsp_frame_processor' + + __gstmetadata__ = ( + 'Always-On-RTSP frame processor', + 'Transform', + 'Frame processor for Always-On-RTSP sink. ' + 'Places stub image when actual frame is not available.', + 'Pavel Tomskikh ', + ) + + __gsttemplates__ = ( + Gst.PadTemplate.new('src', Gst.PadDirection.SRC, Gst.PadPresence.ALWAYS, CAPS), + Gst.PadTemplate.new( + 'sink', Gst.PadDirection.SINK, Gst.PadPresence.ALWAYS, CAPS + ), + ) + + __gproperties__ = { + 'max-delay-ms': ( + int, + 'Maximum delay for the last frame in milliseconds.', + 'Maximum delay for the last frame in milliseconds.', + 1, + 2147483647, + int(DEFAULT_MAX_DELAY.total_seconds() * 1000), + GObject.ParamFlags.READWRITE, + ), + 'last-frame': ( + object, + 'Last frame with its timestamp.', + 'Last frame with its timestamp.', + GObject.ParamFlags.READWRITE, + ), + 'mode': ( + str, + 'Transfer mode.', + 'Transfer mode (allowed: ' f'{", ".join([mode.value for mode in Mode])}).', + DEFAULT_MODE.value, + GObject.ParamFlags.READWRITE, + ), + } + + def __init__(self): + super().__init__() + # properties + self._max_delay = DEFAULT_MAX_DELAY + self._mode = DEFAULT_MODE + self._last_frame: LastFrame = None + + self._time_overlay = TimestampOverlay() + self._transfer = { + Mode.SCALE_TO_FIT: self.scale_to_fit, + Mode.CROP_TO_FIT: self.crop_to_fit, + } + + def do_get_property(self, prop: GObject.GParamSpec) -> Any: + if prop.name == 'max-delay-ms': + return int(self._max_delay.total_seconds() * 1000) + if prop.name == 'last-frame': + return self._last_frame + if prop.name == 'mode': + return self._mode.value + raise AttributeError(f'Unknown property {prop.name}.') + + def do_set_property(self, prop: GObject.GParamSpec, value: Any): + if prop.name == 'max-delay-ms': + self._max_delay = timedelta(milliseconds=value) + elif prop.name == 'last-frame': + self._last_frame = value + elif prop.name == 'mode': + self._mode = Mode(value) + else: + raise AttributeError(f'Unknown property {prop.name}.') + + def do_start(self): + if self._last_frame is None: + self.logger.exception('Property "last-frame" is not set') + frame = inspect.currentframe() + propagate_gst_setting_error(self, frame, __file__) + return False + return True + + def do_transform_ip(self, buffer: Gst.Buffer): + with nvds_to_gpu_mat(buffer, batch_id=0) as output_frame: + self.logger.debug('Output frame resolution is %sx%s', *output_frame.size()) + now = datetime.now() + input_frame = self._last_frame.frame + timestamp = self._last_frame.timestamp + delay = now - timestamp + if input_frame is not None and delay < self._max_delay: + self.logger.debug( + 'Got frame with timestamp %s and resolution %sx%s. Frame delay is %s.', + timestamp, + *input_frame.size(), + delay, + ) + if input_frame.size() == output_frame.size(): + input_frame.copyTo(output_frame) + else: + self._transfer[self._mode](input_frame, output_frame) + else: + self.logger.debug( + 'No new data received from the input. Sending stub image with the timestamp.' + ) + self._time_overlay.overlay_timestamp(output_frame, now) + return Gst.FlowReturn.OK + + def scale_to_fit( + self, + input_frame: cv2.cuda.GpuMat, + output_frame: cv2.cuda.GpuMat, + ): + in_width, in_height = input_frame.size() + in_aspect_ratio = in_width / in_height + out_width, out_height = output_frame.size() + out_aspect_ratio = out_width / out_height + if in_aspect_ratio < out_aspect_ratio: + target_height = out_height + target_width = int(target_height * in_aspect_ratio) + else: + target_width = out_width + target_height = int(target_width / in_aspect_ratio) + self.logger.debug( + 'Scaling input image from %sx%s to %sx%s', + in_width, + in_height, + target_width, + target_height, + ) + output_frame.setTo((0, 0, 0, 0)) + target = cv2.cuda.GpuMat( + output_frame, + ( + (out_width - target_width) // 2, # left + (out_height - target_height) // 2, # top + target_width, # width + target_height, # height + ), + ) + cv2.cuda.resize(input_frame, (target_width, target_height), target) + + def crop_to_fit( + self, + input_frame: cv2.cuda.GpuMat, + output_frame: cv2.cuda.GpuMat, + ): + in_width, in_height = input_frame.size() + out_width, out_height = output_frame.size() + target_width = min(in_width, out_width) + target_height = min(in_height, out_height) + + self.logger.debug( + 'Cropping input image from %sx%s to %sx%s', + in_width, + in_height, + target_width, + target_height, + ) + output_frame.setTo((0, 0, 0, 0)) + source = cv2.cuda.GpuMat( + input_frame, + ( + (in_width - target_width) // 2, # left + (in_height - target_height) // 2, # top + target_width, # width + target_height, # height + ), + ) + target = cv2.cuda.GpuMat( + output_frame, + ( + (out_width - target_width) // 2, # left + (out_height - target_height) // 2, # top + target_width, # width + target_height, # height + ), + ) + source.copyTo(target) + + +# register plugin +GObject.type_register(AlwaysOnRtspFrameProcessor) +__gstelementfactory__ = ( + AlwaysOnRtspFrameProcessor.GST_PLUGIN_NAME, + Gst.Rank.NONE, + AlwaysOnRtspFrameProcessor, +) diff --git a/adapters/ds/gst_plugins/python/always_on_rtsp_frame_sink.py b/adapters/ds/gst_plugins/python/always_on_rtsp_frame_sink.py new file mode 100644 index 000000000..e15a2ef39 --- /dev/null +++ b/adapters/ds/gst_plugins/python/always_on_rtsp_frame_sink.py @@ -0,0 +1,82 @@ +import inspect +from datetime import datetime +from typing import Any + +from adapters.ds.sinks.always_on_rtsp.last_frame import LastFrame +from adapters.ds.sinks.always_on_rtsp.timestamp_overlay import TimestampOverlay +from savant.deepstream.opencv_utils import nvds_to_gpu_mat +from savant.gstreamer import Gst, GstBase, GObject +from savant.gstreamer.utils import LoggerMixin, propagate_gst_setting_error + +CAPS = Gst.Caps.from_string('video/x-raw(memory:NVMM), format=RGBA') + + +class AlwaysOnRtspFrameSink(LoggerMixin, GstBase.BaseSink): + GST_PLUGIN_NAME: str = 'always_on_rtsp_frame_sink' + + __gstmetadata__ = ( + 'Always-On-RTSP frame sink', + 'Sink', + 'Frame sink for Always-On-RTSP sink. Takes decoded frames ' + 'from the input and sends them to the output pipeline.', + 'Pavel Tomskikh ', + ) + + __gsttemplates__ = ( + Gst.PadTemplate.new( + 'sink', Gst.PadDirection.SINK, Gst.PadPresence.ALWAYS, CAPS + ), + ) + + __gproperties__ = { + 'last-frame': ( + object, + 'Last frame with its timestamp.', + 'Last frame with its timestamp.', + GObject.ParamFlags.READWRITE, + ), + } + + def __init__(self): + super().__init__() + # properties + self._last_frame: LastFrame = None + + self._time_overlay = TimestampOverlay() + + def do_get_property(self, prop: GObject.GParamSpec) -> Any: + if prop.name == 'last-frame': + return self._last_frame + raise AttributeError(f'Unknown property {prop.name}.') + + def do_set_property(self, prop: GObject.GParamSpec, value: Any): + if prop.name == 'last-frame': + self._last_frame = value + else: + raise AttributeError(f'Unknown property {prop.name}.') + + def do_start(self): + if self._last_frame is None: + self.logger.exception('Property "last-frame" is not set') + frame = inspect.currentframe() + propagate_gst_setting_error(self, frame, __file__) + return False + return True + + def do_render(self, buffer: Gst.Buffer): + with nvds_to_gpu_mat(buffer, batch_id=0) as frame: + self.logger.debug('Input frame resolution is %sx%s', *frame.size()) + # Clone image for thread safety. The original CUDA memory will be released in this thread. + # TODO: don't allocate CUDA memory if frame size wasn't changed (threadsafe?) + self._last_frame.frame = frame.clone() + self._last_frame.timestamp = datetime.now() + return Gst.FlowReturn.OK + + +# register plugin +GObject.type_register(AlwaysOnRtspFrameSink) +__gstelementfactory__ = ( + AlwaysOnRtspFrameSink.GST_PLUGIN_NAME, + Gst.Rank.NONE, + AlwaysOnRtspFrameSink, +) diff --git a/adapters/ds/sinks/always_on_rtsp/__init__.py b/adapters/ds/sinks/always_on_rtsp/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/adapters/ds/sinks/always_on_rtsp/__main__.py b/adapters/ds/sinks/always_on_rtsp/__main__.py new file mode 100644 index 000000000..6c0a1ef98 --- /dev/null +++ b/adapters/ds/sinks/always_on_rtsp/__main__.py @@ -0,0 +1,398 @@ +import json +import logging +import os +import time +from dataclasses import asdict +from datetime import datetime +from distutils.util import strtobool +from pathlib import Path +from threading import Thread +from typing import Callable, List, Optional + +import pyds +from pygstsavantframemeta import gst_buffer_get_savant_frame_meta + +from adapters.ds.sinks.always_on_rtsp.last_frame import LastFrame +from savant.config.schema import PipelineElement +from savant.gstreamer import Gst +from savant.gstreamer.codecs import CODEC_BY_CAPS_NAME, Codec +from savant.gstreamer.element_factory import GstElementFactory +from savant.gstreamer.metadata import metadata_pop_frame_meta +from savant.gstreamer.runner import GstPipelineRunner +from savant.utils.platform import is_aarch64 +from savant.utils.zeromq import ReceiverSocketTypes + +logger = logging.getLogger(__name__) + + +def opt_config(name, default=None, convert=None): + conf_str = os.environ.get(name) + if conf_str: + return convert(conf_str) if convert else conf_str + return default + + +class Config: + def __init__(self): + self.stub_file_location = Path(os.environ['STUB_FILE_LOCATION']) + if not self.stub_file_location.exists(): + raise RuntimeError(f'File {self.stub_file_location} does not exist.') + if not self.stub_file_location.is_file(): + raise RuntimeError(f'{self.stub_file_location} is not a file.') + + self.max_delay_ms = opt_config('MAX_DELAY_MS', 1000, int) + self.transfer_mode = opt_config('TRANSFER_MODE', 'scale-to-fit') + self.source_id = os.environ['SOURCE_ID'] + + self.zmq_endpoint = os.environ['ZMQ_ENDPOINT'] + self.zmq_socket_type = opt_config( + 'ZMQ_SOCKET_TYPE', + ReceiverSocketTypes.SUB, + ReceiverSocketTypes.__getitem__, + ) + self.zmq_socket_bind = opt_config('ZMQ_SOCKET_BIND', False, strtobool) + + self.rtsp_uri = os.environ['RTSP_URI'] + self.rtsp_protocols = opt_config('RTSP_PROTOCOLS', 'tcp') + self.rtsp_latency_ms = opt_config('RTSP_LATENCY_MS', 100, int) + self.rtsp_keep_alive = opt_config('RTSP_KEEP_ALIVE', True, strtobool) + + self.encoder_profile = opt_config('ENCODER_PROFILE', 'High') + # default nvv4l2h264enc bitrate + self.encoder_bitrate = opt_config('ENCODER_BITRATE', 4000000, int) + + self.fps_period_frames = opt_config('FPS_PERIOD_FRAMES', 1000, int) + self.fps_period_seconds = opt_config('FPS_PERIOD_SECONDS', convert=float) + self.fps_output = opt_config('FPS_OUTPUT', 'stdout') + + self.metadata_output = opt_config('METADATA_OUTPUT') + + self.framerate = opt_config('FRAMERATE', '30/1') + self.sync = opt_config('SYNC_OUTPUT', False, strtobool) + + @property + def fps_meter_properties(self): + props = {'output': self.fps_output} + if self.fps_period_seconds: + props['period-seconds'] = self.fps_period_seconds + else: + props['period-frames'] = self.fps_period_frames + return props + + @property + def nvvideoconvert_properties(self): + props = {} + if not is_aarch64(): + props['nvbuf-memory-type'] = int(pyds.NVBUF_MEM_CUDA_UNIFIED) + return props + + +def log_frame_metadata(pad: Gst.Pad, info: Gst.PadProbeInfo, config: Config): + buffer: Gst.Buffer = info.get_buffer() + savant_frame_meta = gst_buffer_get_savant_frame_meta(buffer) + frame_idx = savant_frame_meta.idx if savant_frame_meta else None + frame_pts = buffer.pts + metadata = metadata_pop_frame_meta(config.source_id, frame_idx, frame_pts) + metadata_json = json.dumps(asdict(metadata)) + if config.metadata_output == 'logger': + logger.info('Frame metadata: %s', metadata_json) + else: + print(f'Frame metadata: {metadata_json}') + return Gst.PadProbeReturn.OK + + +def link_added_pad( + element: Gst.Element, + src_pad: Gst.Pad, + sink_pad: Gst.Pad, +): + assert src_pad.link(sink_pad) == Gst.PadLinkReturn.OK + + +def on_demuxer_pad_added( + element: Gst.Element, + src_pad: Gst.Pad, + config: Config, + pipeline: Gst.Pipeline, + factory: GstElementFactory, + sink_pad: Gst.Pad, +): + caps: Gst.Caps = src_pad.get_pad_template_caps() + logger.debug( + 'Added pad %s on element %s. Caps: %s.', + src_pad.get_name(), + element.get_name(), + caps, + ) + codec = CODEC_BY_CAPS_NAME[caps[0].get_name()] + if config.metadata_output: + src_pad.add_probe(Gst.PadProbeType.BUFFER, log_frame_metadata, config) + + if codec == Codec.RAW_RGBA: + capsfilter = factory.create( + PipelineElement( + 'capsfilter', + properties={'caps': caps}, + ) + ) + pipeline.add(capsfilter) + assert capsfilter.get_static_pad('src').link(sink_pad) == Gst.PadLinkReturn.OK + assert src_pad.link(capsfilter.get_static_pad('sink')) == Gst.PadLinkReturn.OK + capsfilter.sync_state_with_parent() + else: + decodebin = factory.create(PipelineElement('decodebin')) + pipeline.add(decodebin) + decodebin_sink_pad: Gst.Pad = decodebin.get_static_pad('sink') + decodebin.connect('pad-added', link_added_pad, sink_pad) + assert src_pad.link(decodebin_sink_pad) == Gst.PadLinkReturn.OK + decodebin.sync_state_with_parent() + logger.debug('Added decoder %s.', decodebin.get_name()) + + +def build_input_pipeline( + config: Config, + last_frame: LastFrame, + factory: GstElementFactory, +): + pipeline: Gst.Pipeline = Gst.Pipeline.new('input-pipeline') + + source_elements = [ + PipelineElement( + 'zeromq_src', + properties={ + 'socket': config.zmq_endpoint, + 'socket-type': config.zmq_socket_type.name, + 'bind': config.zmq_socket_bind, + }, + ), + PipelineElement( + 'avro_video_demux', + properties={ + 'source-id': config.source_id, + 'store-metadata': bool(config.metadata_output), + }, + ), + ] + sink_elements = [ + PipelineElement( + 'nvvideoconvert', + properties=config.nvvideoconvert_properties, + ), + PipelineElement( + 'capsfilter', + properties={'caps': 'video/x-raw(memory:NVMM), format=RGBA'}, + ), + PipelineElement( + 'fps_meter', + properties=config.fps_meter_properties, + ), + ] + if config.sync: + sink_elements.append( + PipelineElement( + 'adjust_timestamps', + properties={'adjust-first-frame': True}, + ) + ) + sink_elements.append( + PipelineElement( + 'always_on_rtsp_frame_sink', + properties={ + 'last-frame': last_frame, + 'sync': config.sync, + }, + ) + ) + + gst_source_elements = add_elements(pipeline, source_elements, factory) + gst_sink_elements = add_elements(pipeline, sink_elements, factory) + avro_video_demux = gst_source_elements[-1] + nvvideoconvert = gst_sink_elements[0] + + avro_video_demux.connect( + 'pad-added', + on_demuxer_pad_added, + config, + pipeline, + factory, + nvvideoconvert.get_static_pad('sink'), + ) + + return pipeline + + +def build_output_pipeline( + config: Config, + last_frame: LastFrame, + factory: GstElementFactory, +) -> Gst.Pipeline: + pipeline: Gst.Pipeline = Gst.Pipeline.new('output-pipeline') + + encoder_properties = { + 'profile': config.encoder_profile, + 'bitrate': config.encoder_bitrate, + } + if not is_aarch64(): + # nvv4l2h264enc doesn't encode video properly for the RTSP stream on dGPU + # https://forums.developer.nvidia.com/t/rtsp-stream-sent-by-rtspclientsink-doesnt-play-in-deepstream-6-2/244194 + encoder_properties['tuning-info-id'] = 'HighQualityPreset' + elements = [ + PipelineElement( + 'filesrc', + properties={ + 'location': str(config.stub_file_location.absolute()), + }, + ), + PipelineElement('jpegparse'), + PipelineElement('jpegdec'), + PipelineElement('imagefreeze'), + PipelineElement( + 'nvvideoconvert', + properties=config.nvvideoconvert_properties, + ), + PipelineElement( + 'capsfilter', + properties={ + 'caps': f'video/x-raw(memory:NVMM), format=RGBA, framerate={config.framerate}' + }, + ), + PipelineElement( + 'always_on_rtsp_frame_processor', + properties={ + 'max-delay-ms': config.max_delay_ms, + 'mode': config.transfer_mode, + 'last-frame': last_frame, + }, + ), + PipelineElement( + 'nvvideoconvert', + properties=config.nvvideoconvert_properties, + ), + PipelineElement( + 'nvv4l2h264enc', + properties=encoder_properties, + ), + PipelineElement( + 'h264parse', + properties={ + 'config-interval': -1, + }, + ), + PipelineElement( + 'fps_meter', + properties=config.fps_meter_properties, + ), + PipelineElement( + 'rtspclientsink', + properties={ + 'location': config.rtsp_uri, + 'protocols': config.rtsp_protocols, + 'latency': config.rtsp_latency_ms, + 'do-rtsp-keep-alive': config.rtsp_keep_alive, + }, + ), + ] + + add_elements(pipeline, elements, factory) + + return pipeline + + +def add_elements( + pipeline: Gst.Pipeline, + elements: List[PipelineElement], + factory: GstElementFactory, +) -> List[Gst.Element]: + gst_elements: List[Gst.Element] = [] + for element in elements: + gst_element = factory.create(element) + pipeline.add(gst_element) + if gst_elements: + assert gst_elements[-1].link(gst_element) + gst_elements.append(gst_element) + return gst_elements + + +class PipelineThread: + def __init__( + self, + build_pipeline: Callable[[Config, LastFrame, GstElementFactory], Gst.Pipeline], + thread_name: str, + config: Config, + last_frame: LastFrame, + factory: GstElementFactory, + ): + self.build_pipeline = build_pipeline + self.thread_name = thread_name + self.config = config + self.last_frame = last_frame + self.factory = factory + + self.is_running = False + self.thread: Optional[Thread] = None + + def start(self): + self.is_running = True + self.thread = Thread(name=self.thread_name, target=self.workload) + self.thread.start() + + def stop(self): + self.is_running = False + + def join(self): + self.thread.join() + + def workload(self): + pipeline = self.build_pipeline(self.config, self.last_frame, self.factory) + logger.info('Starting pipeline %s', pipeline.get_name()) + with GstPipelineRunner(pipeline) as runner: + while self.is_running and runner._is_running: + time.sleep(1) + logger.info('Pipeline %s is stopped', pipeline.get_name()) + self.is_running = False + + +def main(): + logging.basicConfig( + level=opt_config('LOGLEVEL', 'INFO'), + format='%(asctime)s [%(levelname)s] [%(name)s] [%(threadName)s] %(message)s', + ) + + config = Config() + last_frame = LastFrame(timestamp=datetime.utcfromtimestamp(0)) + + Gst.init(None) + logger.info('Starting Always-On-RTSP sink') + factory = GstElementFactory() + output_pipeline_thread = PipelineThread( + build_output_pipeline, + 'OutputPipeline', + config, + last_frame, + factory, + ) + input_pipeline_thread = PipelineThread( + build_input_pipeline, + 'InputPipeline', + config, + last_frame, + factory, + ) + output_pipeline_thread.start() + try: + while output_pipeline_thread.is_running: + input_pipeline_thread.start() + while ( + input_pipeline_thread.is_running and output_pipeline_thread.is_running + ): + time.sleep(1) + except KeyboardInterrupt: + pass + logger.info('Stopping Always-On-RTSP sink') + input_pipeline_thread.stop() + output_pipeline_thread.stop() + logger.info('Always-On-RTSP sink stopped') + + +if __name__ == '__main__': + main() diff --git a/adapters/ds/sinks/always_on_rtsp/last_frame.py b/adapters/ds/sinks/always_on_rtsp/last_frame.py new file mode 100644 index 000000000..acef37159 --- /dev/null +++ b/adapters/ds/sinks/always_on_rtsp/last_frame.py @@ -0,0 +1,11 @@ +from dataclasses import dataclass +from datetime import datetime +from typing import Optional + +import cv2 + + +@dataclass +class LastFrame: + timestamp: datetime + frame: Optional[cv2.cuda.GpuMat] = None diff --git a/adapters/ds/sinks/always_on_rtsp/timestamp_overlay.py b/adapters/ds/sinks/always_on_rtsp/timestamp_overlay.py new file mode 100644 index 000000000..d0e11bf4d --- /dev/null +++ b/adapters/ds/sinks/always_on_rtsp/timestamp_overlay.py @@ -0,0 +1,55 @@ +import logging +from datetime import datetime + +import cv2 +import numpy as np + + +class TimestampOverlay: + def __init__(self): + self.logger = logging.getLogger(f'{self.__module__}.{self.__class__.__name__}') + + # TODO: make properties configurable + self._width = 520 + self._height = 50 + self._location = (10, 35) + self._font = cv2.FONT_HERSHEY_SIMPLEX + self._font_scale = 1 + self._color = (255, 255, 255, 255) # white + self._thickness = 2 + self._line_type = cv2.LINE_AA + + self._placeholder = np.zeros((self._height, self._width, 4), dtype=np.uint8) + self.logger.debug( + 'Timestamp placeholder size is %sx%s', self._width, self._height + ) + + def overlay_timestamp(self, frame: cv2.cuda.GpuMat, timestamp: datetime): + # TODO: make timestamp precision configurable + frame_width, frame_height = frame.size() + self.logger.debug( + 'Placing timestamp %s on a frame of a size %sx%s', + timestamp, + frame_width, + frame_height, + ) + self._placeholder.fill(0) + cv2.putText( + self._placeholder, + str(timestamp), + self._location, + fontFace=self._font, + fontScale=self._font_scale, + color=self._color, + thickness=self._thickness, + lineType=self._line_type, + ) + cv2.cuda.GpuMat( + frame, + ( + frame_width - self._width, + 0, + self._width, + self._height, + ), + ).upload(self._placeholder) diff --git a/adapters/gst/gst_plugins/python/adjust_timestamps.py b/adapters/gst/gst_plugins/python/adjust_timestamps.py index b7d0b72c5..fcdb94f59 100644 --- a/adapters/gst/gst_plugins/python/adjust_timestamps.py +++ b/adapters/gst/gst_plugins/python/adjust_timestamps.py @@ -1,3 +1,5 @@ +from typing import Any + from savant.gstreamer import GObject, Gst, GstBase from savant.gstreamer.utils import LoggerMixin @@ -29,14 +31,45 @@ class AdjustTimestamps(LoggerMixin, GstBase.BaseTransform): ), ) + __gproperties__ = { + 'adjust-first-frame': ( + bool, + 'Adjust timestamp for the first frame.', + 'Adjust timestamp for the first frame.', + False, + GObject.ParamFlags.READWRITE, + ), + } + def __init__(self): super().__init__() self.offset = 0 self.max_pts = 0 self.max_dts = 0 + self.adjust_first_frame = False self.new_segment = True self.set_in_place(True) + def do_get_property(self, prop: GObject.GParamSpec): + """Gst plugin get property function. + + :param prop: structure that encapsulates the parameter info + """ + if prop.name == 'adjust-first-frame': + return self.adjust_first_frame + raise AttributeError(f'Unknown property {prop.name}.') + + def do_set_property(self, prop: GObject.GParamSpec, value: Any): + """Gst plugin set property function. + + :param prop: structure that encapsulates the parameter info + :param value: new value for parameter, type dependents on parameter + """ + if prop.name == 'adjust-first-frame': + self.adjust_first_frame = value + else: + raise AttributeError(f'Unknown property {prop.name}.') + def do_transform_ip(self, buffer: Gst.Buffer): if self.new_segment: # Calculating offset only at the beginning of the segment @@ -55,6 +88,10 @@ def update_offset(self, buffer: Gst.Buffer): buffer.dts, buffer.duration, ) + if self.adjust_first_frame: + current_running_time = self.get_clock().get_time() - self.get_base_time() + self.max_pts = max(self.max_pts, current_running_time) + self.max_dts = max(self.max_dts, current_running_time) delta = 0 if buffer.dts != Gst.CLOCK_TIME_NONE and buffer.dts < self.max_dts: self.logger.info('Buffer DTS is %s, expected: %s', buffer.dts, self.max_dts) diff --git a/docker/Dockerfile.deepstream b/docker/Dockerfile.deepstream index dffd30d0c..5c83b36cb 100644 --- a/docker/Dockerfile.deepstream +++ b/docker/Dockerfile.deepstream @@ -130,8 +130,10 @@ COPY samples samples FROM base AS adapters COPY adapters/ds adapters/ds -COPY adapters/gst/gst_plugins/python/adjust_timestamps.py adapters/ds/gst_plugins/python/adjust_timestamps.py -ENV GST_PLUGIN_PATH=$GST_PLUGIN_PATH:$APP_PATH/adapters/ds/gst_plugins +COPY adapters/gst/gst_plugins/python/adjust_timestamps.py \ + adapters/gst/gst_plugins/python/fps_meter.py \ + adapters/gst/gst_plugins/python/ +ENV GST_PLUGIN_PATH=$GST_PLUGIN_PATH:$APP_PATH/adapters/gst/gst_plugins:$APP_PATH/adapters/ds/gst_plugins ENTRYPOINT [] diff --git a/docker/Dockerfile.deepstream-l4t b/docker/Dockerfile.deepstream-l4t index 2a9668b17..9469aea5b 100644 --- a/docker/Dockerfile.deepstream-l4t +++ b/docker/Dockerfile.deepstream-l4t @@ -130,7 +130,9 @@ COPY samples samples FROM base AS adapters COPY adapters/ds adapters/ds -COPY adapters/gst/gst_plugins/python/adjust_timestamps.py adapters/ds/gst_plugins/python/adjust_timestamps.py -ENV GST_PLUGIN_PATH=$GST_PLUGIN_PATH:$APP_PATH/adapters/ds/gst_plugins +COPY adapters/gst/gst_plugins/python/adjust_timestamps.py \ + adapters/gst/gst_plugins/python/fps_meter.py \ + adapters/gst/gst_plugins/python/ +ENV GST_PLUGIN_PATH=$GST_PLUGIN_PATH:$APP_PATH/adapters/gst/gst_plugins:$APP_PATH/adapters/ds/gst_plugins ENTRYPOINT [] diff --git a/docker/Dockerfile.deepstream-l4t-6.0.1 b/docker/Dockerfile.deepstream-l4t-6.0.1 index 0c92a60e6..d6222ded9 100644 --- a/docker/Dockerfile.deepstream-l4t-6.0.1 +++ b/docker/Dockerfile.deepstream-l4t-6.0.1 @@ -248,7 +248,9 @@ COPY samples samples FROM base AS adapters COPY adapters/ds adapters/ds -COPY adapters/gst/gst_plugins/python/adjust_timestamps.py adapters/ds/gst_plugins/python/adjust_timestamps.py -ENV GST_PLUGIN_PATH=$GST_PLUGIN_PATH:$APP_PATH/adapters/ds/gst_plugins +COPY adapters/gst/gst_plugins/python/adjust_timestamps.py \ + adapters/gst/gst_plugins/python/fps_meter.py \ + adapters/gst/gst_plugins/python/ +ENV GST_PLUGIN_PATH=$GST_PLUGIN_PATH:$APP_PATH/adapters/gst/gst_plugins:$APP_PATH/adapters/ds/gst_plugins ENTRYPOINT [] diff --git a/savant/deepstream/opencv_utils.py b/savant/deepstream/opencv_utils.py index ba555eec9..d1e23fdb5 100644 --- a/savant/deepstream/opencv_utils.py +++ b/savant/deepstream/opencv_utils.py @@ -14,7 +14,8 @@ @contextmanager def nvds_to_gpu_mat( buffer: Gst.Buffer, - nvds_frame_meta: pyds.NvDsFrameMeta, + nvds_frame_meta: pyds.NvDsFrameMeta = None, + batch_id: int = None, ) -> ContextManager[cv2.cuda.GpuMat]: """Build GpuMat header for allocated CUDA-memory of the frame. @@ -29,10 +30,14 @@ def nvds_to_gpu_mat( :param buffer: Gstreamer buffer which contains NvBufSurface. :param nvds_frame_meta: NvDs frame metadata which contains frame info. + :param batch_id: Frame ID in a batch. Ignored when nvds_frame_meta is specified. :return: GpuMat header for allocated CUDA-memory of the frame. """ - py_ds_cuda_memory = PyDSCudaMemory(hash(buffer), nvds_frame_meta.batch_id) + if nvds_frame_meta is not None: + batch_id = nvds_frame_meta.batch_id + assert batch_id is not None + py_ds_cuda_memory = PyDSCudaMemory(hash(buffer), batch_id) try: cuda_ptr = py_ds_cuda_memory.GetMapCudaPtr() yield cv2.savant.createGpuMat( diff --git a/savant/gst_plugins/python/avro_video_demux.py b/savant/gst_plugins/python/avro_video_demux.py index 1d95cf48a..10d1714d9 100644 --- a/savant/gst_plugins/python/avro_video_demux.py +++ b/savant/gst_plugins/python/avro_video_demux.py @@ -66,6 +66,15 @@ 0, GObject.ParamFlags.READWRITE, ), + # TODO: filter frames by source id in zeromq_src + # https://github.com/insight-platform/Savant/issues/59 + 'source-id': ( + str, + 'Source ID filter.', + 'Filter frames by source ID.', + None, + GObject.ParamFlags.READWRITE, + ), } @@ -131,6 +140,7 @@ def __init__(self): self.expiration_thread = Thread(target=self.eviction_job, daemon=True) self.store_metadata = False self.max_parallel_streams: int = 0 + self.source_id: Optional[str] = None self._frame_idx_gen = itertools.count() @@ -167,6 +177,8 @@ def do_get_property(self, prop): return self.eos_on_timestamps_reset if prop.name == 'max-parallel-streams': return self.max_parallel_streams + if prop.name == 'source-id': + return self.source_id raise AttributeError(f'Unknown property {prop.name}') def do_set_property(self, prop, value): @@ -181,6 +193,8 @@ def do_set_property(self, prop, value): self.eos_on_timestamps_reset = value elif prop.name == 'max-parallel-streams': self.max_parallel_streams = value + elif prop.name == 'source-id': + self.source_id = value else: raise AttributeError(f'Unknown property {prop.name}') @@ -201,6 +215,9 @@ def handle_buffer( # TODO: Pipeline message types might be extended beyond only VideoFrame # Additional checks for audio/raw_tensors/etc. may be required schema_name, message = deserialize(message_bin) + if self.source_id is not None and message['source_id'] != self.source_id: + self.logger.debug('Skipping message from source %s', message['source_id']) + return Gst.FlowReturn.OK if schema_name == 'VideoFrame': return self.handle_video_frame(message) if schema_name == 'EndOfStream': diff --git a/savant/gstreamer/runner.py b/savant/gstreamer/runner.py index 9c1a62211..3c1c38ebe 100644 --- a/savant/gstreamer/runner.py +++ b/savant/gstreamer/runner.py @@ -1,7 +1,7 @@ """GStreamer pipeline runner class.""" from datetime import timedelta from time import time -from typing import Optional +from typing import Optional, Union import logging import os import threading @@ -18,10 +18,10 @@ class StateChangeError(Exception): class GstPipelineRunner: """Manages running Gstreamer pipeline. - :param pipeline: GstPipeline to run. + :param pipeline: GstPipeline or Gst.Pipeline to run. """ - def __init__(self, pipeline: GstPipeline): + def __init__(self, pipeline: Union[GstPipeline, Gst.Pipeline]): # pipeline error storage self._error: Optional[str] = None @@ -39,7 +39,10 @@ def __init__(self, pipeline: GstPipeline): self._main_loop = GLib.MainLoop() self._main_loop_thread = threading.Thread(target=self._main_loop_run) - self._pipeline: GstPipeline = pipeline + self._pipeline: Union[GstPipeline, Gst.Pipeline] = pipeline + self._gst_pipeline: Gst.Pipeline = ( + pipeline.pipeline if isinstance(pipeline, GstPipeline) else pipeline + ) def __enter__(self): self.startup() @@ -75,8 +78,9 @@ def startup(self): logger.debug('Setting pipeline to PLAYING...') self._pipeline.set_state(Gst.State.PLAYING) - logger.debug('Calling pipeline.on_startup()...') - self._pipeline.on_startup() + if isinstance(self._pipeline, GstPipeline): + logger.debug('Calling pipeline.on_startup()...') + self._pipeline.on_startup() logger.debug('Starting main loop thread...') self._is_running = True @@ -110,8 +114,9 @@ def shutdown(self): 'Pipeline execution ended after %s.', timedelta(seconds=exec_seconds) ) - logger.debug('Calling pipeline.on_shutdown()...') - self._pipeline.on_shutdown() + if isinstance(self._pipeline, GstPipeline): + logger.debug('Calling pipeline.on_shutdown()...') + self._pipeline.on_shutdown() def on_error( # pylint: disable=unused-argument self, bus: Gst.Bus, message: Gst.Message @@ -145,7 +150,7 @@ def on_state_changed( # pylint: disable=unused-argument """Change state callback.""" old_state, new_state, _ = msg.parse_state_changed() - if not msg.src == self._pipeline.pipeline: + if not msg.src == self._gst_pipeline: # not from the pipeline, ignore return @@ -158,5 +163,5 @@ def on_state_changed( # pylint: disable=unused-argument if old_state != new_state and os.getenv('GST_DEBUG_DUMP_DOT_DIR'): file_name = f'pipeline.{old_state_name}_{new_state_name}' Gst.debug_bin_to_dot_file_with_ts( - self._pipeline.pipeline, Gst.DebugGraphDetails.ALL, file_name + self._gst_pipeline, Gst.DebugGraphDetails.ALL, file_name ) diff --git a/savant/utils/sink_factories.py b/savant/utils/sink_factories.py index 1edfd5b31..6cd7fa623 100644 --- a/savant/utils/sink_factories.py +++ b/savant/utils/sink_factories.py @@ -129,6 +129,11 @@ def send_message( **kwargs, ): if isinstance(msg, SinkVideoFrame): + logger.debug( + 'Sending frame of source %s with PTS %s to ZeroMQ sink', + msg.source_id, + msg.frame_meta.pts, + ) message = { 'source_id': msg.frame_meta.source_id, 'pts': msg.frame_meta.pts, @@ -145,6 +150,7 @@ def send_message( message_bin = serialize(schema, message) output_zmq_socket.send(message_bin) elif isinstance(msg, SinkEndOfStream): + logger.debug('Sending EOS of source %s to ZeroMQ sink', msg.source_id) message = {'source_id': msg.source_id} message_bin = serialize(eos_schema, message) output_zmq_socket.send(message_bin) diff --git a/scripts/common.py b/scripts/common.py index daa8e73d8..c623e2cf1 100644 --- a/scripts/common.py +++ b/scripts/common.py @@ -1,4 +1,5 @@ """Common utilities for run scripts.""" +import string from pathlib import Path from typing import List, Iterable, Optional import sys @@ -76,6 +77,59 @@ def get_ipc_mounts(zmq_sockets: Iterable[str]) -> List[str]: return list(set(ipc_mounts)) +def validate_source_id(ctx, param, value): + safe_chars = set(string.ascii_letters + string.digits + '_.-') + invalid_chars = {char for char in value if char not in safe_chars} + if len(invalid_chars) > 0: + raise click.BadParameter(f'chars {invalid_chars} are not allowed.') + return value + + +def source_id_option(func): + return click.option( + '--source-id', + required=True, + type=click.STRING, + callback=validate_source_id, + help='Source ID, e.g. "camera1".', + )(func) + + +def fps_meter_options(func): + func = click.option( + '--fps-output', + help='Where to dump stats (stdout or logger).', + )(func) + func = click.option( + '--fps-period-frames', + type=int, + help='FPS measurement period, in frames.', + )(func) + func = click.option( + '--fps-period-seconds', + type=float, + help='FPS measurement period, in seconds.', + )(func) + return func + + +def build_common_envs( + source_id: str, + fps_period_frames: Optional[int], + fps_period_seconds: Optional[float], + fps_output: str, +): + """Generate env var run options.""" + envs = [f'SOURCE_ID={source_id}'] + if fps_period_frames: + envs.append(f'FPS_PERIOD_FRAMES={fps_period_frames}') + if fps_period_seconds: + envs.append(f'FPS_PERIOD_SECONDS={fps_period_seconds}') + if fps_output: + envs.append(f'FPS_OUTPUT={fps_output}') + return envs + + def build_docker_run_command( container_name: str, zmq_endpoint: str, @@ -89,6 +143,7 @@ def build_docker_run_command( devices: List[str] = None, with_gpu: bool = False, host_network: bool = False, + args: List[str] = None, ) -> List[str]: """Build docker run command for an adapter container. @@ -107,6 +162,7 @@ def build_docker_run_command( :param devices: add ``--devices`` parameters :param with_gpu: add ``--gpus=all`` parameter :param host_network: add ``--network=host`` parameter + :param args: add command line arguments to the entrypoint """ gst_debug = os.environ.get('GST_DEBUG', '2') # fmt: off @@ -149,6 +205,8 @@ def build_docker_run_command( command.append('--network=host') command.append(docker_image) + if args: + command.extend(args) return command diff --git a/scripts/run_sink.py b/scripts/run_sink.py index 56cdbc403..27b2d4546 100755 --- a/scripts/run_sink.py +++ b/scripts/run_sink.py @@ -1,10 +1,18 @@ #!/usr/bin/env python3 """Run sink adapter.""" import os +from typing import Optional import click -from common import build_docker_run_command, adapter_docker_image_option, run_command +from common import ( + build_docker_run_command, + adapter_docker_image_option, + run_command, + source_id_option, + fps_meter_options, + build_common_envs, +) @click.group() @@ -230,5 +238,151 @@ def video_files_sink( run_command(cmd) +@cli.command('always-on-rtsp') +@source_id_option +@click.option( + '--stub-file-location', + required=True, + help='Location of the stub image file. Image file must be in JPEG format.', +) +@click.option( + '--max-delay-ms', + type=click.INT, + default=1000, + help='Maximum delay for the last frame in milliseconds.', + show_default=True, +) +@click.option( + '--transfer-mode', + default='scale-to-fit', + help='Transfer mode. One of: "scale-to-fit", "crop-to-fit".', + show_default=True, +) +@click.option( + '--protocols', + default='tcp', + help='Allowed lower transport protocols, e.g. "tcp+udp-mcast+udp".', + show_default=True, +) +@click.option( + '--latency-ms', + type=click.INT, + default=100, + help='Amount of ms to buffer RTSP stream.', + show_default=True, +) +@click.option( + '--keep-alive', + type=click.BOOL, + default=True, + help='Send RTSP keep alive packets, disable for old incompatible server.', + show_default=True, +) +@click.option( + '--profile', + default='High', + help='H264 encoding profile. One of: "Baseline", "Main", "High".', + show_default=True, +) +@click.option( + '--bitrate', + type=click.INT, + default=4000000, + help='H264 encoding bitrate.', + show_default=True, +) +@click.option( + '--framerate', + default='30/1', + help='Frame rate of the output stream.', + show_default=True, +) +@click.option( + '--metadata-output', + help='Where to dump metadata (stdout or logger).', +) +@click.option( + '--sync', + is_flag=True, + default=False, + help=( + 'Show frames on sink synchronously (i.e. at the source file rate). ' + 'Note: inbound stream is not stable with this flag, try to avoid it.' + ), + show_default=True, +) +@fps_meter_options +@common_options +@adapter_docker_image_option('deepstream') +@click.argument('rtsp_uri', required=True) +def always_on_rtsp_sink( + in_endpoint: str, + in_type: str, + in_bind: bool, + docker_image: str, + source_id: str, + stub_file_location: str, + max_delay_ms: int, + transfer_mode: str, + protocols: str, + latency_ms: int, + keep_alive: bool, + profile: str, + bitrate: int, + framerate: str, + fps_period_frames: Optional[int], + fps_period_seconds: Optional[float], + fps_output: Optional[str], + metadata_output: Optional[str], + sync: bool, + rtsp_uri: str, +): + """Send video stream from specific source to RTSP server. + + RTSP_URI - URI of the RTSP server. + + Note: it is advisable to use --sync flag on source adapter or use a live + source adapter (e.g. rtsp or usb-cam). + """ + + assert os.path.exists(stub_file_location) + stub_file_location = os.path.abspath(stub_file_location) + + envs = build_common_envs( + source_id=source_id, + fps_period_frames=fps_period_frames, + fps_period_seconds=fps_period_seconds, + fps_output=fps_output, + ) + [ + f'STUB_FILE_LOCATION={stub_file_location}', + f'MAX_DELAY_MS={max_delay_ms}', + f'TRANSFER_MODE={transfer_mode}', + f'RTSP_URI={rtsp_uri}', + f'RTSP_PROTOCOLS={protocols}', + f'RTSP_LATENCY_MS={latency_ms}', + f'RTSP_KEEP_ALIVE={keep_alive}', + f'ENCODER_PROFILE={profile}', + f'ENCODER_BITRATE={bitrate}', + f'FRAMERATE={framerate}', + ] + if metadata_output: + envs.append(f'METADATA_OUTPUT={metadata_output}') + + cmd = build_docker_run_command( + 'sink-always-on-rtsp', + zmq_endpoint=in_endpoint, + zmq_type=in_type, + zmq_bind=in_bind, + sync=sync, + entrypoint='python', + args=['-m', 'adapters.ds.sinks.always_on_rtsp'], + envs=envs, + volumes=[f'{stub_file_location}:{stub_file_location}:ro'], + with_gpu=True, + docker_image=docker_image, + ) + run_command(cmd) + + if __name__ == '__main__': cli() diff --git a/scripts/run_source.py b/scripts/run_source.py index 81511a475..76b4e3ad5 100755 --- a/scripts/run_source.py +++ b/scripts/run_source.py @@ -2,11 +2,17 @@ """Run source adapter.""" import os from typing import List, Optional -import string import click -from common import build_docker_run_command, adapter_docker_image_option, run_command +from common import ( + build_docker_run_command, + adapter_docker_image_option, + run_command, + source_id_option, + fps_meter_options, + build_common_envs, +) @click.group() @@ -23,14 +29,6 @@ def cli(): ) -def validate_source_id(ctx, param, value): - safe_chars = set(string.ascii_letters + string.digits + '_.-') - invalid_chars = {char for char in value if char not in safe_chars} - if len(invalid_chars) > 0: - raise click.BadParameter(f'chars {invalid_chars} are not allowed.') - return value - - def common_options(func): """Common Click source adapter options.""" func = click.option( @@ -54,47 +52,11 @@ def common_options(func): ), show_default=True, )(func) - func = click.option( - '--fps-output', - help='Where to dump stats (stdout or logger).', - )(func) - func = click.option( - '--fps-period-frames', - type=int, - help='FPS measurement period, in frames.', - )(func) - func = click.option( - '--fps-period-seconds', - type=float, - help='FPS measurement period, in seconds.', - )(func) - func = click.option( - '--source-id', - required=True, - type=click.STRING, - callback=validate_source_id, - help='Source ID, e.g. "camera1".', - )(func) + func = fps_meter_options(func) + func = source_id_option(func) return func -def build_common_envs( - source_id: str, - fps_period_frames: Optional[int], - fps_period_seconds: Optional[float], - fps_output: str, -): - """Generate env var run options.""" - envs = [f'SOURCE_ID={source_id}'] - if fps_period_frames: - envs.append(f'FPS_PERIOD_FRAMES={fps_period_frames}') - if fps_period_seconds: - envs.append(f'FPS_PERIOD_SECONDS={fps_period_seconds}') - if fps_output: - envs.append(f'FPS_OUTPUT={fps_output}') - return envs - - def files_source( source_id: str, out_endpoint: str,