Skip to content

Commit

Permalink
Fix NVENC PTS/DTS on Jetson (#819)
Browse files Browse the repository at this point in the history
* #811 add wrapper for encoders to prevent internal queue to fill up

* #811 use wrapper only for nvenc on jetson
  • Loading branch information
tomskikh authored Jul 24, 2024
1 parent d1a8d45 commit e334af9
Show file tree
Hide file tree
Showing 3 changed files with 197 additions and 107 deletions.
153 changes: 153 additions & 0 deletions gst_plugins/python/sync_io_wrapper_bin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
import inspect
from threading import Event
from typing import Optional

from savant.gstreamer import GObject, Gst # noqa:F401
from savant.gstreamer.utils import (
RequiredPropertyError,
gst_post_library_settings_error,
required_property,
)
from savant.utils.logging import LoggerMixin

DEFAULT_MAX_QUEUE = 1
WAIT_QUEUE_INTERVAL = 0.1


class SyncIoWrapperBin(LoggerMixin, Gst.Bin):
GST_PLUGIN_NAME = 'sync_io_wrapper_bin'

__gstmetadata__ = (
'Wrapper for synchronized processing by nested element',
'Bin',
'Wrapper for synchronized processing by nested element',
'Pavel Tomskikh <tomskih_pa@bw-sw.com>',
)

__gsttemplates__ = (
Gst.PadTemplate.new(
'sink',
Gst.PadDirection.SINK,
Gst.PadPresence.ALWAYS,
Gst.Caps.new_any(),
),
Gst.PadTemplate.new(
'src',
Gst.PadDirection.SRC,
Gst.PadPresence.ALWAYS,
Gst.Caps.new_any(),
),
)

__gproperties__ = {
'max-queue-size': (
GObject.TYPE_UINT,
'Max queue size',
'Max queue size',
1,
GObject.G_MAXUINT,
DEFAULT_MAX_QUEUE,
GObject.ParamFlags.READWRITE,
),
'nested-element': (
Gst.Element,
'Nested element',
'Nested element',
GObject.ParamFlags.READWRITE | Gst.PARAM_MUTABLE_READY,
),
}

def __init__(self):
super().__init__()

self._sink_pad: Gst.GhostPad = Gst.GhostPad.new_no_target(
'sink', Gst.PadDirection.SINK
)
self.add_pad(self._sink_pad)
self._sink_pad.add_probe(Gst.PadProbeType.BUFFER, self._on_sink_buffer)

self._src_pad: Gst.GhostPad = Gst.GhostPad.new_no_target(
'src', Gst.PadDirection.SRC
)
self._src_pad.add_probe(Gst.PadProbeType.BUFFER, self._on_src_buffer)
self.add_pad(self._src_pad)

self._nested_element: Optional[Gst.Element] = None

self._buffers_in = 0
self._buffers_out = 0
self._max_queue_size = DEFAULT_MAX_QUEUE
self._queue_is_ready = Event()

def do_state_changed(self, old: Gst.State, new: Gst.State, pending: Gst.State):
if old == Gst.State.NULL and new != Gst.State.NULL:
try:
required_property('nested-element', self._nested_element)
except RequiredPropertyError as exc:
self.logger.exception('Failed to start element: %s', exc, exc_info=True)
frame = inspect.currentframe()
gst_post_library_settings_error(self, frame, __file__, text=exc.args[0])
return

def do_get_property(self, prop):
if prop.name == 'max-queue-size':
return self._max_queue_size
if prop.name == 'nested-element':
return self._nested_element
raise AttributeError(f'Unknown property {prop.name}')

def do_set_property(self, prop, value):
if prop.name == 'max-queue-size':
self._max_queue_size = value
elif prop.name == 'nested-element':
if self._nested_element is not None:
self._sink_pad.set_target(None)
self._src_pad.set_target(None)
self.remove(self._nested_element)

self._nested_element = value
self.add(self._nested_element)
self._buffers_in = 0
self._buffers_out = 0
self._sink_pad.set_target(self._nested_element.get_static_pad('sink'))
self._src_pad.set_target(self._nested_element.get_static_pad('src'))
self._nested_element.sync_state_with_parent()

else:
raise AttributeError(f'Unknown property {prop.name}')

def _on_sink_buffer(self, pad: Gst.Pad, info: Gst.PadProbeInfo):
self._buffers_in += 1
self._queue_is_ready.clear()
while self._buffers_in - self._buffers_out > self._max_queue_size:
self.logger.debug(
'Waiting for queued buffers to be processed: %s/%s',
self._buffers_in,
self._buffers_out,
)
self._queue_is_ready.wait(WAIT_QUEUE_INTERVAL)
self.logger.debug(
'Sending buffer to nested element: %s/%s',
self._buffers_in,
self._buffers_out,
)
return Gst.PadProbeReturn.OK

def _on_src_buffer(self, pad: Gst.Pad, info: Gst.PadProbeInfo):
self._buffers_out += 1
self._queue_is_ready.set()
self.logger.debug(
'Buffer processed: %s/%s',
self._buffers_in,
self._buffers_out,
)
return Gst.PadProbeReturn.OK


# register plugin
GObject.type_register(SyncIoWrapperBin)
__gstelementfactory__ = (
SyncIoWrapperBin.GST_PLUGIN_NAME,
Gst.Rank.NONE,
SyncIoWrapperBin,
)
109 changes: 2 additions & 107 deletions savant/deepstream/buffer_processor.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
"""Buffer processor for DeepStream pipeline."""

from collections import deque
from heapq import heappop, heappush
from queue import Queue
from typing import Deque, Dict, Iterator, List, NamedTuple, Optional, Tuple
from typing import Dict, Iterator, List, NamedTuple, Optional

import pyds
from pygstsavantframemeta import (
Expand Down Expand Up @@ -31,7 +30,6 @@
from savant.config.schema import FrameParameters
from savant.deepstream.source_output import (
SourceOutput,
SourceOutputEncoded,
SourceOutputOnlyMeta,
SourceOutputWithFrame,
)
Expand All @@ -44,7 +42,6 @@
from savant.gstreamer.codecs import Codec, CodecInfo
from savant.meta.constants import PRIMARY_OBJECT_KEY, UNTRACKED_OBJECT_ID
from savant.meta.type import ObjectSelectionType
from savant.utils.platform import is_aarch64
from savant.utils.sink_factories import SinkVideoFrame
from savant.utils.source_info import SourceInfo, SourceInfoRegistry

Expand Down Expand Up @@ -651,99 +648,6 @@ def _build_output_frame(
)


class NvDsJetsonH26XBufferProcessor(NvDsGstBufferProcessor):
def __init__(self, *args, **kwargs):
"""Buffer processor for DeepStream pipeline.
Workaround for a bug in h264x encoders on Jetson devices.
https://forums.developer.nvidia.com/t/nvv4l2h264enc-returns-frames-in-wrong-order-when-pts-doesnt-align-with-framerate/257363
Encoder "nvv4l2h26xenc" on Jetson devices produces frames with correct
DTS but with PTS and metadata from different frames. We store buffers
in a queue and wait for a buffer with correct PTS.
"""

# source_id -> (DTS, buffer)
self._dts_queues: Dict[str, Deque[Tuple[int, Gst.Buffer]]] = {}
# source_id -> (PTS, IDX)
self._pts_queues: Dict[str, List[Tuple[int, Optional[int]]]] = {}
super().__init__(*args, **kwargs)

def _iterate_output_frames(
self,
buffer: Gst.Buffer,
source_info: SourceInfo,
) -> Iterator[_OutputFrame]:
"""Iterate output frames from Gst.Buffer."""

frame_idx = extract_frame_idx(buffer)
dts_queue = self._dts_queues.setdefault(source_info.source_id, deque())
pts_queue = self._pts_queues.setdefault(source_info.source_id, [])
# When the frame is empty assign DTS=PTS
dts = buffer.dts if buffer.get_size() > 0 else buffer.pts
if dts_queue or buffer.pts != dts:
self.logger.debug(
'Storing frame with PTS %s, DTS %s and IDX %s to queue.',
buffer.pts,
dts,
frame_idx,
)
dts_queue.append((dts, buffer))
heappush(pts_queue, (buffer.pts, frame_idx))
while dts_queue and dts_queue[0][0] == pts_queue[0][0]:
next_dts, next_buf = dts_queue.popleft()
next_pts, next_idx = heappop(pts_queue)
self.logger.debug(
'Pushing output frame frame with PTS %s, DTS %s and IDX %s.',
next_pts,
next_dts,
next_idx,
)
yield self._build_output_frame(
idx=next_idx,
pts=next_pts,
dts=next_dts,
buffer=next_buf,
)
else:
self.logger.debug(
'PTS and DTS of the frame are the same: %s. Pushing output frame.', dts
)
yield self._build_output_frame(
idx=frame_idx,
pts=buffer.pts,
dts=dts,
buffer=buffer,
)

def on_eos(self, source_info: SourceInfo):
"""Pipeline EOS handler."""
dts_queue = self._dts_queues.pop(source_info.source_id, None)
pts_queue = self._pts_queues.pop(source_info.source_id, None)
if dts_queue is None:
return
while dts_queue:
dts, buffer = dts_queue.popleft()
pts, idx = dts, None

if pts_queue:
while pts_queue and pts_queue[0][0] <= dts:
pts, idx = heappop(pts_queue)
if pts != dts:
pts = dts
idx = None

output_frame = self._build_output_frame(
idx=idx,
pts=pts,
dts=dts,
buffer=buffer,
)
sink_message = self._build_sink_video_frame(output_frame, source_info)
sink_message = self._delete_frame_from_pipeline(idx, sink_message)
self._queue.put(sink_message)


class NvDsRawBufferProcessor(NvDsBufferProcessor):
def __init__(
self,
Expand Down Expand Up @@ -859,16 +763,7 @@ def create_buffer_processor(
pass_through_mode=pass_through_mode,
)

if (
is_aarch64()
and isinstance(source_output, SourceOutputEncoded)
and source_output.encoder in ['nvv4l2h264enc', 'nvv4l2h265enc']
):
buffer_processor_class = NvDsJetsonH26XBufferProcessor
else:
buffer_processor_class = NvDsGstBufferProcessor

return buffer_processor_class(
return NvDsGstBufferProcessor(
queue=queue,
sources=sources,
frame_params=frame_params,
Expand Down
42 changes: 42 additions & 0 deletions savant/deepstream/source_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,25 @@ class SourceOutputH26X(SourceOutputEncoded):
Output contains frames encoded with h264 or h265 (hevc) codecs along with metadata.
"""

def __init__(
self,
codec: CodecInfo,
output_frame: Dict[str, Any],
frame_params: FrameParameters,
condition: FrameProcessingCondition,
video_pipeline: VideoPipeline,
queue_properties: Dict[str, int],
):
super().__init__(
codec=codec,
output_frame=output_frame,
frame_params=frame_params,
condition=condition,
video_pipeline=video_pipeline,
queue_properties=queue_properties,
)
self._is_jetson_nvenc = is_aarch64() and self._encoder == codec.nv_encoder

def _add_transform_elems(self, pipeline: GstPipeline, source_info: SourceInfo):
super()._add_transform_elems(pipeline, source_info)
# A parser for codecs h264, h265 is added to include
Expand All @@ -495,6 +514,29 @@ def _add_transform_elems(self, pipeline: GstPipeline, source_info: SourceInfo):
'Added parser %s with params %s', self._codec.parser, parser_params
)

def _create_encoder(self, pipeline: GstPipeline):
if not self._is_jetson_nvenc:
return super()._create_encoder(pipeline)

# Workaround for a bug in h264x encoders on Jetson devices.
# https://forums.developer.nvidia.com/t/nvv4l2h264enc-returns-frames-in-wrong-order-when-pts-doesnt-align-with-framerate/257363
#
# Encoder "nvv4l2h26xenc" on Jetson devices produces frames with correct
# DTS but with PTS and metadata from different frames.
# We don't send more than one frame to the encoder at a time to avoid this issue.
encoder = pipeline._element_factory.create(
PipelineElement(self._encoder, properties=self._params)
)
add_pad_probe_to_move_frame(
encoder.get_static_pad('sink'),
self._video_pipeline,
'encode',
)
wrapped_encoder = pipeline.add_element(PipelineElement('sync_io_wrapper_bin'))
wrapped_encoder.set_property('nested-element', encoder)

return wrapped_encoder

def _build_output_caps(self, width: int, height: int) -> Gst.Caps:
caps_params = [
self._codec.caps_with_params,
Expand Down

0 comments on commit e334af9

Please sign in to comment.