Skip to content

Commit

Permalink
#48 move drawing on frames before demuxer
Browse files Browse the repository at this point in the history
  • Loading branch information
tomskikh committed Feb 6, 2023
1 parent 8c99751 commit b78cc8a
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 81 deletions.
6 changes: 0 additions & 6 deletions savant/deepstream/buffer_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
from savant.base.model import ObjectModel, ComplexModel
from savant.config.schema import PipelineElement, ModelElement, FrameParameters
from savant.converter.scale import scale_rbbox
from savant.deepstream.base_drawfunc import BaseNvDsDrawFunc
from savant.deepstream.nvinfer.model import (
NvInferRotatedObjectDetector,
NvInferDetector,
Expand Down Expand Up @@ -589,7 +588,6 @@ def __init__(
objects_preprocessing: ObjectsPreprocessing,
frame_params: FrameParameters,
output_frame: bool,
draw_func: Optional[BaseNvDsDrawFunc],
):
"""Buffer processor for DeepStream pipeline.
Expand All @@ -600,12 +598,10 @@ def __init__(
:param objects_preprocessing: Objects processing registry.
:param frame_params: Processing frame parameters (after nvstreammux).
:param output_frame: Whether to output frame or not.
:param draw_func: PyFunc for drawing on frames.
"""

self._output_frame = output_frame
self._codec = Codec.RAW_RGBA.value if output_frame else None
self._draw_func = draw_func
super().__init__(
queue=queue,
fps_meter=fps_meter,
Expand All @@ -626,8 +622,6 @@ def _iterate_output_frames(self, buffer: Gst.Buffer) -> Iterator[_OutputFrame]:
# get frame if required for output
if self._output_frame:
with get_nvds_buf_surface(buffer, nvds_frame_meta) as np_frame:
if self._draw_func:
self._draw_func(nvds_frame_meta, np_frame)
frame = np_frame.tobytes()
else:
frame = None
Expand Down
58 changes: 46 additions & 12 deletions savant/deepstream/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
nvds_obj_meta_iterator,
nvds_attr_meta_iterator,
nvds_remove_obj_attrs,
get_nvds_buf_surface,
)
from savant.meta.constants import UNTRACKED_OBJECT_ID
from savant.utils.fps_meter import FPSMeter
Expand All @@ -62,7 +63,7 @@ class NvDsPipeline(GstPipeline):
:param name: Pipeline name
:param source: Pipeline source element
:param elements: Pipeline elements
:key frame_params: Processing frame parameters (after nvstreammux)
:key frame: Processing frame parameters (after nvstreammux)
:key batch_size: Primary batch size (nvstreammux batch-size)
:key output_frame: Whether to include frame in module output, not just metadata
"""
Expand Down Expand Up @@ -107,24 +108,26 @@ def __init__(
if self._output_frame_codec is None:
self._source_output = SourceOutputOnlyMeta()
elif self._output_frame_codec == Codec.RAW_RGBA:
self._source_output = SourceOutputRawRgba()
self._source_output = SourceOutputRawRgba(
frame_params=self._frame_params,
)
elif self._output_frame_codec in [Codec.H264, Codec.HEVC]:
self._source_output = SourceOutputH26X(
codec=self._output_frame_codec.value,
params=output_frame.get('encoder_params'),
draw_func=self._draw_func,
frame_params=self._frame_params,
)
elif self._output_frame_codec == Codec.PNG:
self._source_output = SourceOutputPng(
codec=self._output_frame_codec.value,
params=output_frame.get('encoder_params'),
draw_func=self._draw_func,
frame_params=self._frame_params,
)
else:
self._source_output = SourceOutputEncoded(
codec=self._output_frame_codec.value,
params=output_frame.get('encoder_params'),
draw_func=self._draw_func,
frame_params=self._frame_params,
)

self._demuxer_src_pads: List[Gst.Pad] = []
Expand Down Expand Up @@ -162,7 +165,6 @@ def _build_buffer_processor(
objects_preprocessing=self._objects_preprocessing,
frame_params=self._frame_params,
output_frame=self._output_frame_codec is not None,
draw_func=self._draw_func,
)
return NvDsEncodedBufferProcessor(
queue=queue,
Expand Down Expand Up @@ -332,6 +334,18 @@ def _add_input_converter(
nv_video_converter.set_property(
'nvbuf-memory-type', int(pyds.NVBUF_MEM_CUDA_UNIFIED)
)
if self._frame_params.padding:
dest_crop = ':'.join(
str(x)
for x in [
self._frame_params.padding.left,
self._frame_params.padding.top,
self._frame_params.width,
self._frame_params.height,
]
)
nv_video_converter.set_property('dest-crop', dest_crop)

self._pipeline.add(nv_video_converter)
nv_video_converter.sync_state_with_parent()
video_converter_sink: Gst.Pad = nv_video_converter.get_static_pad('sink')
Expand All @@ -354,7 +368,12 @@ def _add_input_converter(

capsfilter: Gst.Element = Gst.ElementFactory.make('capsfilter')
capsfilter.set_property(
'caps', Gst.Caps.from_string('video/x-raw(memory:NVMM), format=RGBA')
'caps',
Gst.Caps.from_string(
'video/x-raw(memory:NVMM), format=RGBA, '
f'width={self._frame_params.total_width}, '
f'height={self._frame_params.total_height}'
),
)
capsfilter.set_state(Gst.State.PLAYING)
self._pipeline.add(capsfilter)
Expand Down Expand Up @@ -527,8 +546,8 @@ def _create_muxer(self, live_source: bool) -> Gst.Element:
"""

frame_processing_parameters = {
'width': self._frame_params.width,
'height': self._frame_params.height,
'width': self._frame_params.total_width,
'height': self._frame_params.total_height,
'batch-size': self._batch_size,
# Allowed range for batch-size: 1 - 1024
# Allowed range for buffer-pool-size: 4 - 1024
Expand Down Expand Up @@ -660,11 +679,26 @@ def _create_demuxer(self, link: bool) -> Gst.Element:
self._demuxer_src_pads = self._allocate_demuxer_pads(
demuxer, self._max_parallel_streams
)
demuxer.get_static_pad('sink').get_peer().add_probe(
Gst.PadProbeType.BUFFER, self.update_frame_meta
)
sink_peer_pad: Gst.Pad = demuxer.get_static_pad('sink').get_peer()
sink_peer_pad.add_probe(Gst.PadProbeType.BUFFER, self.update_frame_meta)
if self._draw_func and self._output_frame_codec:
sink_peer_pad.add_probe(Gst.PadProbeType.BUFFER, self._draw_on_frame_probe)
return demuxer

def _draw_on_frame_probe(
self,
pad: Gst.Pad,
info: Gst.PadProbeInfo,
) -> Gst.PadProbeReturn:
"""Pad probe to draw on frames."""

buffer: Gst.Buffer = info.get_buffer()
nvds_batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(buffer))
for nvds_frame_meta in nvds_frame_meta_iterator(nvds_batch_meta):
with get_nvds_buf_surface(buffer, nvds_frame_meta) as frame:
self._draw_func(nvds_frame_meta, frame)
return Gst.PadProbeReturn.OK

def _allocate_demuxer_pads(self, demuxer: Gst.Element, n_pads: int):
"""Allocate a fixed number of demuxer src pads."""

Expand Down
63 changes: 0 additions & 63 deletions savant/deepstream/source_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
from pygstsavantframemeta import add_convert_savant_frame_meta_pad_probe

from savant.config.schema import PipelineElement
from savant.deepstream.base_drawfunc import BaseNvDsDrawFunc
from savant.deepstream.utils import get_nvds_buf_surface, nvds_frame_meta_iterator
from savant.gstreamer import Gst # noqa:F401
from savant.gstreamer.codecs import CodecInfo
from savant.gstreamer.pipeline import GstPipeline
Expand Down Expand Up @@ -153,22 +151,17 @@ def __init__(
self,
codec: CodecInfo,
params: Optional[Dict[str, Any]],
draw_func: Optional[BaseNvDsDrawFunc],
):
"""
:param codec: Codec for output frames.
:param params: Parameters of the encoder.
:param draw_func: PyFunc to draw on frames.
"""

super().__init__()
self._codec = codec
self._params = params or {}
self._draw_func = draw_func

def _add_transform_elems(self, pipeline: GstPipeline, source_info: SourceInfo):
if self._draw_func:
self._add_frame_drawing(pipeline, source_info)
encoder = pipeline._add_element(
PipelineElement(self._codec.encoder, properties=self._params)
)
Expand All @@ -189,62 +182,6 @@ def _build_output_caps(self, source_info: SourceInfo) -> Gst.Caps:
)
)

def _add_frame_drawing(self, pipeline: GstPipeline, source_info: SourceInfo):
"""Add a pad probe to draw on frames if needed."""

capsfilter = pipeline._add_element(PipelineElement('capsfilter'))
caps = self._build_draw_caps(source_info)
capsfilter.set_property('caps', caps)
source_info.after_demuxer.append(capsfilter)
capsfilter.sync_state_with_parent()
self._logger.debug('Added capsfilter for drawing frames with caps %s', caps)

converter = pipeline._add_element(
PipelineElement(
'nvvideoconvert',
properties=(
{}
if is_aarch64()
else {'nvbuf-memory-type': int(pyds.NVBUF_MEM_CUDA_UNIFIED)}
),
),
)
source_info.after_demuxer.append(converter)
converter.get_static_pad('sink').add_probe(
Gst.PadProbeType.BUFFER,
self._draw_on_frame_probe,
)
converter.sync_state_with_parent()
self._logger.debug('Added converter with pad probe for drawing frames')

def _build_draw_caps(self, source_info: SourceInfo) -> Gst.Caps:
"""Build caps for a pad probe drawing on frames."""

return Gst.Caps.from_string(
', '.join(
[
'video/x-raw(memory:NVMM)',
'format=RGBA',
f'width={source_info.dest_resolution.width}',
f'height={source_info.dest_resolution.height}',
]
)
)

def _draw_on_frame_probe(
self,
pad: Gst.Pad,
info: Gst.PadProbeInfo,
) -> Gst.PadProbeReturn:
"""Pad probe to draw on frames."""

buffer: Gst.Buffer = info.get_buffer()
nvds_batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(buffer))
for nvds_frame_meta in nvds_frame_meta_iterator(nvds_batch_meta):
with get_nvds_buf_surface(buffer, nvds_frame_meta) as frame:
self._draw_func(nvds_frame_meta, frame)
return Gst.PadProbeReturn.OK


class SourceOutputH26X(SourceOutputEncoded):
"""Adds an output elements to a DeepStream pipeline.
Expand Down

0 comments on commit b78cc8a

Please sign in to comment.