diff --git a/gst_plugins/python/sync_io_wrapper_bin.py b/gst_plugins/python/sync_io_wrapper_bin.py new file mode 100644 index 000000000..83068d6b3 --- /dev/null +++ b/gst_plugins/python/sync_io_wrapper_bin.py @@ -0,0 +1,153 @@ +import inspect +from threading import Event +from typing import Optional + +from savant.gstreamer import GObject, Gst # noqa:F401 +from savant.gstreamer.utils import ( + RequiredPropertyError, + gst_post_library_settings_error, + required_property, +) +from savant.utils.logging import LoggerMixin + +DEFAULT_MAX_QUEUE = 1 +WAIT_QUEUE_INTERVAL = 0.1 + + +class SyncIoWrapperBin(LoggerMixin, Gst.Bin): + GST_PLUGIN_NAME = 'sync_io_wrapper_bin' + + __gstmetadata__ = ( + 'Wrapper for synchronized processing by nested element', + 'Bin', + 'Wrapper for synchronized processing by nested element', + 'Pavel Tomskikh ', + ) + + __gsttemplates__ = ( + Gst.PadTemplate.new( + 'sink', + Gst.PadDirection.SINK, + Gst.PadPresence.ALWAYS, + Gst.Caps.new_any(), + ), + Gst.PadTemplate.new( + 'src', + Gst.PadDirection.SRC, + Gst.PadPresence.ALWAYS, + Gst.Caps.new_any(), + ), + ) + + __gproperties__ = { + 'max-queue-size': ( + GObject.TYPE_UINT, + 'Max queue size', + 'Max queue size', + 1, + GObject.G_MAXUINT, + DEFAULT_MAX_QUEUE, + GObject.ParamFlags.READWRITE, + ), + 'nested-element': ( + Gst.Element, + 'Nested element', + 'Nested element', + GObject.ParamFlags.READWRITE | Gst.PARAM_MUTABLE_READY, + ), + } + + def __init__(self): + super().__init__() + + self._sink_pad: Gst.GhostPad = Gst.GhostPad.new_no_target( + 'sink', Gst.PadDirection.SINK + ) + self.add_pad(self._sink_pad) + self._sink_pad.add_probe(Gst.PadProbeType.BUFFER, self._on_sink_buffer) + + self._src_pad: Gst.GhostPad = Gst.GhostPad.new_no_target( + 'src', Gst.PadDirection.SRC + ) + self._src_pad.add_probe(Gst.PadProbeType.BUFFER, self._on_src_buffer) + self.add_pad(self._src_pad) + + self._nested_element: Optional[Gst.Element] = None + + self._buffers_in = 0 + self._buffers_out = 0 + self._max_queue_size = DEFAULT_MAX_QUEUE + self._queue_is_ready = Event() + + def do_state_changed(self, old: Gst.State, new: Gst.State, pending: Gst.State): + if old == Gst.State.NULL and new != Gst.State.NULL: + try: + required_property('nested-element', self._nested_element) + except RequiredPropertyError as exc: + self.logger.exception('Failed to start element: %s', exc, exc_info=True) + frame = inspect.currentframe() + gst_post_library_settings_error(self, frame, __file__, text=exc.args[0]) + return + + def do_get_property(self, prop): + if prop.name == 'max-queue-size': + return self._max_queue_size + if prop.name == 'nested-element': + return self._nested_element + raise AttributeError(f'Unknown property {prop.name}') + + def do_set_property(self, prop, value): + if prop.name == 'max-queue-size': + self._max_queue_size = value + elif prop.name == 'nested-element': + if self._nested_element is not None: + self._sink_pad.set_target(None) + self._src_pad.set_target(None) + self.remove(self._nested_element) + + self._nested_element = value + self.add(self._nested_element) + self._buffers_in = 0 + self._buffers_out = 0 + self._sink_pad.set_target(self._nested_element.get_static_pad('sink')) + self._src_pad.set_target(self._nested_element.get_static_pad('src')) + self._nested_element.sync_state_with_parent() + + else: + raise AttributeError(f'Unknown property {prop.name}') + + def _on_sink_buffer(self, pad: Gst.Pad, info: Gst.PadProbeInfo): + self._buffers_in += 1 + self._queue_is_ready.clear() + while self._buffers_in - self._buffers_out > self._max_queue_size: + self.logger.debug( + 'Waiting for queued buffers to be processed: %s/%s', + self._buffers_in, + self._buffers_out, + ) + self._queue_is_ready.wait(WAIT_QUEUE_INTERVAL) + self.logger.debug( + 'Sending buffer to nested element: %s/%s', + self._buffers_in, + self._buffers_out, + ) + return Gst.PadProbeReturn.OK + + def _on_src_buffer(self, pad: Gst.Pad, info: Gst.PadProbeInfo): + self._buffers_out += 1 + self._queue_is_ready.set() + self.logger.debug( + 'Buffer processed: %s/%s', + self._buffers_in, + self._buffers_out, + ) + return Gst.PadProbeReturn.OK + + +# register plugin +GObject.type_register(SyncIoWrapperBin) +__gstelementfactory__ = ( + SyncIoWrapperBin.GST_PLUGIN_NAME, + Gst.Rank.NONE, + SyncIoWrapperBin, +) diff --git a/savant/deepstream/buffer_processor.py b/savant/deepstream/buffer_processor.py index f61981f71..c3e233847 100644 --- a/savant/deepstream/buffer_processor.py +++ b/savant/deepstream/buffer_processor.py @@ -1,9 +1,8 @@ """Buffer processor for DeepStream pipeline.""" -from collections import deque from heapq import heappop, heappush from queue import Queue -from typing import Deque, Dict, Iterator, List, NamedTuple, Optional, Tuple +from typing import Dict, Iterator, List, NamedTuple, Optional import pyds from pygstsavantframemeta import ( @@ -31,7 +30,6 @@ from savant.config.schema import FrameParameters from savant.deepstream.source_output import ( SourceOutput, - SourceOutputEncoded, SourceOutputOnlyMeta, SourceOutputWithFrame, ) @@ -44,7 +42,6 @@ from savant.gstreamer.codecs import Codec, CodecInfo from savant.meta.constants import PRIMARY_OBJECT_KEY, UNTRACKED_OBJECT_ID from savant.meta.type import ObjectSelectionType -from savant.utils.platform import is_aarch64 from savant.utils.sink_factories import SinkVideoFrame from savant.utils.source_info import SourceInfo, SourceInfoRegistry @@ -651,99 +648,6 @@ def _build_output_frame( ) -class NvDsJetsonH26XBufferProcessor(NvDsGstBufferProcessor): - def __init__(self, *args, **kwargs): - """Buffer processor for DeepStream pipeline. - - Workaround for a bug in h264x encoders on Jetson devices. - https://forums.developer.nvidia.com/t/nvv4l2h264enc-returns-frames-in-wrong-order-when-pts-doesnt-align-with-framerate/257363 - - Encoder "nvv4l2h26xenc" on Jetson devices produces frames with correct - DTS but with PTS and metadata from different frames. We store buffers - in a queue and wait for a buffer with correct PTS. - """ - - # source_id -> (DTS, buffer) - self._dts_queues: Dict[str, Deque[Tuple[int, Gst.Buffer]]] = {} - # source_id -> (PTS, IDX) - self._pts_queues: Dict[str, List[Tuple[int, Optional[int]]]] = {} - super().__init__(*args, **kwargs) - - def _iterate_output_frames( - self, - buffer: Gst.Buffer, - source_info: SourceInfo, - ) -> Iterator[_OutputFrame]: - """Iterate output frames from Gst.Buffer.""" - - frame_idx = extract_frame_idx(buffer) - dts_queue = self._dts_queues.setdefault(source_info.source_id, deque()) - pts_queue = self._pts_queues.setdefault(source_info.source_id, []) - # When the frame is empty assign DTS=PTS - dts = buffer.dts if buffer.get_size() > 0 else buffer.pts - if dts_queue or buffer.pts != dts: - self.logger.debug( - 'Storing frame with PTS %s, DTS %s and IDX %s to queue.', - buffer.pts, - dts, - frame_idx, - ) - dts_queue.append((dts, buffer)) - heappush(pts_queue, (buffer.pts, frame_idx)) - while dts_queue and dts_queue[0][0] == pts_queue[0][0]: - next_dts, next_buf = dts_queue.popleft() - next_pts, next_idx = heappop(pts_queue) - self.logger.debug( - 'Pushing output frame frame with PTS %s, DTS %s and IDX %s.', - next_pts, - next_dts, - next_idx, - ) - yield self._build_output_frame( - idx=next_idx, - pts=next_pts, - dts=next_dts, - buffer=next_buf, - ) - else: - self.logger.debug( - 'PTS and DTS of the frame are the same: %s. Pushing output frame.', dts - ) - yield self._build_output_frame( - idx=frame_idx, - pts=buffer.pts, - dts=dts, - buffer=buffer, - ) - - def on_eos(self, source_info: SourceInfo): - """Pipeline EOS handler.""" - dts_queue = self._dts_queues.pop(source_info.source_id, None) - pts_queue = self._pts_queues.pop(source_info.source_id, None) - if dts_queue is None: - return - while dts_queue: - dts, buffer = dts_queue.popleft() - pts, idx = dts, None - - if pts_queue: - while pts_queue and pts_queue[0][0] <= dts: - pts, idx = heappop(pts_queue) - if pts != dts: - pts = dts - idx = None - - output_frame = self._build_output_frame( - idx=idx, - pts=pts, - dts=dts, - buffer=buffer, - ) - sink_message = self._build_sink_video_frame(output_frame, source_info) - sink_message = self._delete_frame_from_pipeline(idx, sink_message) - self._queue.put(sink_message) - - class NvDsRawBufferProcessor(NvDsBufferProcessor): def __init__( self, @@ -859,16 +763,7 @@ def create_buffer_processor( pass_through_mode=pass_through_mode, ) - if ( - is_aarch64() - and isinstance(source_output, SourceOutputEncoded) - and source_output.encoder in ['nvv4l2h264enc', 'nvv4l2h265enc'] - ): - buffer_processor_class = NvDsJetsonH26XBufferProcessor - else: - buffer_processor_class = NvDsGstBufferProcessor - - return buffer_processor_class( + return NvDsGstBufferProcessor( queue=queue, sources=sources, frame_params=frame_params, diff --git a/savant/deepstream/source_output.py b/savant/deepstream/source_output.py index e1fdef624..f2f0de68c 100644 --- a/savant/deepstream/source_output.py +++ b/savant/deepstream/source_output.py @@ -473,6 +473,25 @@ class SourceOutputH26X(SourceOutputEncoded): Output contains frames encoded with h264 or h265 (hevc) codecs along with metadata. """ + def __init__( + self, + codec: CodecInfo, + output_frame: Dict[str, Any], + frame_params: FrameParameters, + condition: FrameProcessingCondition, + video_pipeline: VideoPipeline, + queue_properties: Dict[str, int], + ): + super().__init__( + codec=codec, + output_frame=output_frame, + frame_params=frame_params, + condition=condition, + video_pipeline=video_pipeline, + queue_properties=queue_properties, + ) + self._is_jetson_nvenc = is_aarch64() and self._encoder == codec.nv_encoder + def _add_transform_elems(self, pipeline: GstPipeline, source_info: SourceInfo): super()._add_transform_elems(pipeline, source_info) # A parser for codecs h264, h265 is added to include @@ -495,6 +514,29 @@ def _add_transform_elems(self, pipeline: GstPipeline, source_info: SourceInfo): 'Added parser %s with params %s', self._codec.parser, parser_params ) + def _create_encoder(self, pipeline: GstPipeline): + if not self._is_jetson_nvenc: + return super()._create_encoder(pipeline) + + # Workaround for a bug in h264x encoders on Jetson devices. + # https://forums.developer.nvidia.com/t/nvv4l2h264enc-returns-frames-in-wrong-order-when-pts-doesnt-align-with-framerate/257363 + # + # Encoder "nvv4l2h26xenc" on Jetson devices produces frames with correct + # DTS but with PTS and metadata from different frames. + # We don't send more than one frame to the encoder at a time to avoid this issue. + encoder = pipeline._element_factory.create( + PipelineElement(self._encoder, properties=self._params) + ) + add_pad_probe_to_move_frame( + encoder.get_static_pad('sink'), + self._video_pipeline, + 'encode', + ) + wrapped_encoder = pipeline.add_element(PipelineElement('sync_io_wrapper_bin')) + wrapped_encoder.set_property('nested-element', encoder) + + return wrapped_encoder + def _build_output_caps(self, width: int, height: int) -> Gst.Caps: caps_params = [ self._codec.caps_with_params,