Skip to content

Commit

Permalink
Always-On Low Latency Streaming Sink (RTSP) (#74)
Browse files Browse the repository at this point in the history
  • Loading branch information
tomskikh authored Mar 7, 2023
1 parent 0ea7c2b commit d364c65
Show file tree
Hide file tree
Showing 19 changed files with 1,079 additions and 74 deletions.
11 changes: 4 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -191,30 +191,27 @@ Currently, the following source adapters are available:
- Local image file;
- URL Image;
- Image directory;
- USB cam;
- Apache Kafka;
- Elasticsearch;
- MongoDB.
- USB cam.

There are basic sink adapters implemented:

- Inference results are placed into JSON file stream;
- Resulting video overlay displayed on a screen (per source);
- MP4 file (per source);
- image directory (per source);
- Always-On RTSP Stream Sink.

The framework uses an established protocol based on Apache Avro, both for sources and sinks. The sources and sinks talk
to Savant through ZeroMQ sockets.

### Easy to Deploy

The framework and adapters are delivered as a Docker image. To implement the pipeline, you take the base image, add AI
The framework and the adapters are delivered as Docker images. To implement the pipeline, you take the base image, add AI
models and a custom code with extra dependencies, then build the resulting image. Some pipelines which don't require
additional dependencies can be implemented just by mapping directories with models and user functions into the docker
image.

As for now, we provide images for conventional PC architecture based on Intel or AMD CPU and discrete GPUs and for
Jetson ARM-based devices.
As for now, we provide images for x86 architecture and for Jetson hardware.

## What's Next

Expand Down
Empty file added adapters/__init__.py
Empty file.
212 changes: 212 additions & 0 deletions adapters/ds/gst_plugins/python/always_on_rtsp_frame_processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
import inspect
from datetime import datetime, timedelta
from enum import Enum
from typing import Any

import cv2

from adapters.ds.sinks.always_on_rtsp.last_frame import LastFrame
from adapters.ds.sinks.always_on_rtsp.timestamp_overlay import TimestampOverlay
from savant.deepstream.opencv_utils import nvds_to_gpu_mat
from savant.gstreamer import Gst, GstBase, GObject
from savant.gstreamer.utils import LoggerMixin, propagate_gst_setting_error


class Mode(Enum):
SCALE_TO_FIT = 'scale-to-fit'
CROP_TO_FIT = 'crop-to-fit'


CAPS = Gst.Caps.from_string('video/x-raw(memory:NVMM), format=RGBA')
DEFAULT_MAX_DELAY = timedelta(seconds=1)
DEFAULT_MODE = Mode.SCALE_TO_FIT


class AlwaysOnRtspFrameProcessor(LoggerMixin, GstBase.BaseTransform):
GST_PLUGIN_NAME: str = 'always_on_rtsp_frame_processor'

__gstmetadata__ = (
'Always-On-RTSP frame processor',
'Transform',
'Frame processor for Always-On-RTSP sink. '
'Places stub image when actual frame is not available.',
'Pavel Tomskikh <tomskih_pa@bw-sw.com>',
)

__gsttemplates__ = (
Gst.PadTemplate.new('src', Gst.PadDirection.SRC, Gst.PadPresence.ALWAYS, CAPS),
Gst.PadTemplate.new(
'sink', Gst.PadDirection.SINK, Gst.PadPresence.ALWAYS, CAPS
),
)

__gproperties__ = {
'max-delay-ms': (
int,
'Maximum delay for the last frame in milliseconds.',
'Maximum delay for the last frame in milliseconds.',
1,
2147483647,
int(DEFAULT_MAX_DELAY.total_seconds() * 1000),
GObject.ParamFlags.READWRITE,
),
'last-frame': (
object,
'Last frame with its timestamp.',
'Last frame with its timestamp.',
GObject.ParamFlags.READWRITE,
),
'mode': (
str,
'Transfer mode.',
'Transfer mode (allowed: ' f'{", ".join([mode.value for mode in Mode])}).',
DEFAULT_MODE.value,
GObject.ParamFlags.READWRITE,
),
}

def __init__(self):
super().__init__()
# properties
self._max_delay = DEFAULT_MAX_DELAY
self._mode = DEFAULT_MODE
self._last_frame: LastFrame = None

self._time_overlay = TimestampOverlay()
self._transfer = {
Mode.SCALE_TO_FIT: self.scale_to_fit,
Mode.CROP_TO_FIT: self.crop_to_fit,
}

def do_get_property(self, prop: GObject.GParamSpec) -> Any:
if prop.name == 'max-delay-ms':
return int(self._max_delay.total_seconds() * 1000)
if prop.name == 'last-frame':
return self._last_frame
if prop.name == 'mode':
return self._mode.value
raise AttributeError(f'Unknown property {prop.name}.')

def do_set_property(self, prop: GObject.GParamSpec, value: Any):
if prop.name == 'max-delay-ms':
self._max_delay = timedelta(milliseconds=value)
elif prop.name == 'last-frame':
self._last_frame = value
elif prop.name == 'mode':
self._mode = Mode(value)
else:
raise AttributeError(f'Unknown property {prop.name}.')

def do_start(self):
if self._last_frame is None:
self.logger.exception('Property "last-frame" is not set')
frame = inspect.currentframe()
propagate_gst_setting_error(self, frame, __file__)
return False
return True

def do_transform_ip(self, buffer: Gst.Buffer):
with nvds_to_gpu_mat(buffer, batch_id=0) as output_frame:
self.logger.debug('Output frame resolution is %sx%s', *output_frame.size())
now = datetime.now()
input_frame = self._last_frame.frame
timestamp = self._last_frame.timestamp
delay = now - timestamp
if input_frame is not None and delay < self._max_delay:
self.logger.debug(
'Got frame with timestamp %s and resolution %sx%s. Frame delay is %s.',
timestamp,
*input_frame.size(),
delay,
)
if input_frame.size() == output_frame.size():
input_frame.copyTo(output_frame)
else:
self._transfer[self._mode](input_frame, output_frame)
else:
self.logger.debug(
'No new data received from the input. Sending stub image with the timestamp.'
)
self._time_overlay.overlay_timestamp(output_frame, now)
return Gst.FlowReturn.OK

def scale_to_fit(
self,
input_frame: cv2.cuda.GpuMat,
output_frame: cv2.cuda.GpuMat,
):
in_width, in_height = input_frame.size()
in_aspect_ratio = in_width / in_height
out_width, out_height = output_frame.size()
out_aspect_ratio = out_width / out_height
if in_aspect_ratio < out_aspect_ratio:
target_height = out_height
target_width = int(target_height * in_aspect_ratio)
else:
target_width = out_width
target_height = int(target_width / in_aspect_ratio)
self.logger.debug(
'Scaling input image from %sx%s to %sx%s',
in_width,
in_height,
target_width,
target_height,
)
output_frame.setTo((0, 0, 0, 0))
target = cv2.cuda.GpuMat(
output_frame,
(
(out_width - target_width) // 2, # left
(out_height - target_height) // 2, # top
target_width, # width
target_height, # height
),
)
cv2.cuda.resize(input_frame, (target_width, target_height), target)

def crop_to_fit(
self,
input_frame: cv2.cuda.GpuMat,
output_frame: cv2.cuda.GpuMat,
):
in_width, in_height = input_frame.size()
out_width, out_height = output_frame.size()
target_width = min(in_width, out_width)
target_height = min(in_height, out_height)

self.logger.debug(
'Cropping input image from %sx%s to %sx%s',
in_width,
in_height,
target_width,
target_height,
)
output_frame.setTo((0, 0, 0, 0))
source = cv2.cuda.GpuMat(
input_frame,
(
(in_width - target_width) // 2, # left
(in_height - target_height) // 2, # top
target_width, # width
target_height, # height
),
)
target = cv2.cuda.GpuMat(
output_frame,
(
(out_width - target_width) // 2, # left
(out_height - target_height) // 2, # top
target_width, # width
target_height, # height
),
)
source.copyTo(target)


# register plugin
GObject.type_register(AlwaysOnRtspFrameProcessor)
__gstelementfactory__ = (
AlwaysOnRtspFrameProcessor.GST_PLUGIN_NAME,
Gst.Rank.NONE,
AlwaysOnRtspFrameProcessor,
)
82 changes: 82 additions & 0 deletions adapters/ds/gst_plugins/python/always_on_rtsp_frame_sink.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import inspect
from datetime import datetime
from typing import Any

from adapters.ds.sinks.always_on_rtsp.last_frame import LastFrame
from adapters.ds.sinks.always_on_rtsp.timestamp_overlay import TimestampOverlay
from savant.deepstream.opencv_utils import nvds_to_gpu_mat
from savant.gstreamer import Gst, GstBase, GObject
from savant.gstreamer.utils import LoggerMixin, propagate_gst_setting_error

CAPS = Gst.Caps.from_string('video/x-raw(memory:NVMM), format=RGBA')


class AlwaysOnRtspFrameSink(LoggerMixin, GstBase.BaseSink):
GST_PLUGIN_NAME: str = 'always_on_rtsp_frame_sink'

__gstmetadata__ = (
'Always-On-RTSP frame sink',
'Sink',
'Frame sink for Always-On-RTSP sink. Takes decoded frames '
'from the input and sends them to the output pipeline.',
'Pavel Tomskikh <tomskih_pa@bw-sw.com>',
)

__gsttemplates__ = (
Gst.PadTemplate.new(
'sink', Gst.PadDirection.SINK, Gst.PadPresence.ALWAYS, CAPS
),
)

__gproperties__ = {
'last-frame': (
object,
'Last frame with its timestamp.',
'Last frame with its timestamp.',
GObject.ParamFlags.READWRITE,
),
}

def __init__(self):
super().__init__()
# properties
self._last_frame: LastFrame = None

self._time_overlay = TimestampOverlay()

def do_get_property(self, prop: GObject.GParamSpec) -> Any:
if prop.name == 'last-frame':
return self._last_frame
raise AttributeError(f'Unknown property {prop.name}.')

def do_set_property(self, prop: GObject.GParamSpec, value: Any):
if prop.name == 'last-frame':
self._last_frame = value
else:
raise AttributeError(f'Unknown property {prop.name}.')

def do_start(self):
if self._last_frame is None:
self.logger.exception('Property "last-frame" is not set')
frame = inspect.currentframe()
propagate_gst_setting_error(self, frame, __file__)
return False
return True

def do_render(self, buffer: Gst.Buffer):
with nvds_to_gpu_mat(buffer, batch_id=0) as frame:
self.logger.debug('Input frame resolution is %sx%s', *frame.size())
# Clone image for thread safety. The original CUDA memory will be released in this thread.
# TODO: don't allocate CUDA memory if frame size wasn't changed (threadsafe?)
self._last_frame.frame = frame.clone()
self._last_frame.timestamp = datetime.now()
return Gst.FlowReturn.OK


# register plugin
GObject.type_register(AlwaysOnRtspFrameSink)
__gstelementfactory__ = (
AlwaysOnRtspFrameSink.GST_PLUGIN_NAME,
Gst.Rank.NONE,
AlwaysOnRtspFrameSink,
)
Empty file.
Loading

0 comments on commit d364c65

Please sign in to comment.