Skip to content

Commit

Permalink
#76 transfer multimedia object outside the avro message (#83)
Browse files Browse the repository at this point in the history
  • Loading branch information
tomskikh authored Mar 15, 2023
1 parent 77387f0 commit cfd96fd
Show file tree
Hide file tree
Showing 15 changed files with 289 additions and 89 deletions.
76 changes: 55 additions & 21 deletions adapters/gst/gst_plugins/python/video_files_sink.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import os
from typing import Dict, Optional
from typing import Dict, Optional, Union

from adapters.python.sinks.chunk_writer import ChunkWriter, CompositeChunkWriter
from adapters.python.sinks.metadata_json import MetadataJsonWriter, Patterns
from savant.api import deserialize
from savant.api.enums import ExternalFrameType
from savant.gst_plugins.python.avro_video_demux import build_caps, FrameParams
from savant.gstreamer import GLib, GObject, Gst, GstApp
from savant.gstreamer.codecs import Codec, CODEC_BY_NAME
Expand All @@ -28,8 +29,12 @@ def __init__(
self.caps = build_caps(frame_params)
super().__init__(chunk_size)

def _write(self, message: Dict, frame_num: Optional[int]) -> bool:
frame = message.pop('frame', None)
def _write(
self,
message: Dict,
data: Union[bytes, Gst.Memory],
frame_num: Optional[int],
) -> bool:
if 'pts' not in message:
return True
frame_pts = message['pts']
Expand All @@ -38,8 +43,12 @@ def _write(self, message: Dict, frame_num: Optional[int]) -> bool:
if frame_num is None:
# frame_num should not be None, but we don't use it here anyway
self.logger.warning('Frame_num is None for frame with PTS %s.', frame_pts)
if frame:
frame_buf: Gst.Buffer = Gst.Buffer.new_wrapped(frame)
if data:
if isinstance(data, bytes):
frame_buf: Gst.Buffer = Gst.Buffer.new_wrapped(data)
else:
frame_buf: Gst.Buffer = Gst.Buffer.new()
frame_buf.append_memory(data)
frame_buf.pts = frame_pts
frame_buf.dts = Gst.CLOCK_TIME_NONE if frame_dts is None else frame_dts
frame_buf.duration = (
Expand Down Expand Up @@ -230,20 +239,26 @@ def handle_buffer(self, sink_pad: Gst.Pad, buffer: Gst.Buffer) -> Gst.FlowReturn
buffer.get_size(),
buffer.pts,
)
message_bin = buffer.extract_dup(0, buffer.get_size())
frame_meta_mapinfo: Gst.MapInfo
result, frame_meta_mapinfo = buffer.map_range(0, 1, Gst.MapFlags.READ)
assert result, 'Cannot read buffer.'

# TODO: Pipeline message types might be extended beyond only VideoFrame
# Additional checks for audio/raw_tensors/etc. may be required
schema_name, message = deserialize(message_bin)
schema_name, message = deserialize(frame_meta_mapinfo.data)
message_with_schema = {**message, 'schema': schema_name}
if schema_name == 'VideoFrame':
return self.handle_video_frame(message_with_schema)
if schema_name == 'EndOfStream':
return self.handle_eos(message_with_schema)
self.logger.error('Unknown schema "%s"', schema_name)
return Gst.FlowReturn.ERROR
result = self.handle_video_frame(message_with_schema, buffer)
elif schema_name == 'EndOfStream':
result = self.handle_eos(message_with_schema)
else:
self.logger.error('Unknown schema "%s"', schema_name)
result = Gst.FlowReturn.ERROR

buffer.unmap(frame_meta_mapinfo)
return result

def handle_video_frame(self, message: Dict) -> Gst.FlowReturn:
def handle_video_frame(self, message: Dict, buffer: Gst.Buffer) -> Gst.FlowReturn:
source_id = message['source_id']
frame_params = FrameParams(
codec=CODEC_BY_NAME[message['codec']],
Expand All @@ -254,12 +269,31 @@ def handle_video_frame(self, message: Dict) -> Gst.FlowReturn:
assert frame_params.codec in [Codec.H264, Codec.HEVC, Codec.JPEG, Codec.PNG]
frame_pts = message['pts']
frame = message['frame']
self.logger.debug(
'Received frame %s from source %s, size: %s bytes',
frame_pts,
source_id,
len(frame) if frame else 0,
)
if isinstance(frame, bytes):
self.logger.debug(
'Received frame %s from source %s, size: %s bytes',
frame_pts,
source_id,
len(frame) if frame else 0,
)
else:
frame_type = ExternalFrameType(frame['type'])
if frame_type != ExternalFrameType.ZEROMQ:
self.logger.error('Unsupported frame type "%s".', frame_type.value)
return Gst.FlowReturn.ERROR
if buffer.n_memory() < 2:
self.logger.error(
'Buffer has %s regions of memory, expected at least 2.',
buffer.n_memory(),
)
return Gst.FlowReturn.ERROR
frame = buffer.get_memory_range(1, -1)
self.logger.debug(
'Received frame %s from source %s, size: %s bytes',
frame_pts,
source_id,
frame.size,
)

writer = self.writers.get(source_id)
if writer is None:
Expand All @@ -283,7 +317,7 @@ def handle_video_frame(self, message: Dict) -> Gst.FlowReturn:
self.writers[source_id] = writer
self.add(video_writer.bin)
video_writer.bin.sync_state_with_parent()
if writer.write(message, message['keyframe']):
if writer.write(message, frame, message['keyframe']):
return Gst.FlowReturn.OK
return Gst.FlowReturn.ERROR

Expand All @@ -292,7 +326,7 @@ def handle_eos(self, message: Dict) -> Gst.FlowReturn:
self.logger.info('Received EOS from source %s.', source_id)
writer = self.writers.get(source_id)
if writer is not None:
writer.write(message, can_start_new_chunk=False, is_frame=False)
writer.write(message, None, can_start_new_chunk=False, is_frame=False)
writer.close()
return Gst.FlowReturn.OK

Expand Down
47 changes: 40 additions & 7 deletions adapters/gst/gst_plugins/python/video_to_avro_serializer.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
import json
from copy import deepcopy
from fractions import Fraction
from pathlib import Path
from typing import Any, NamedTuple, Optional
from typing import Any, Dict, NamedTuple, Optional, Union

from savant.api import serialize, ENCODING_REGISTRY
from savant.api.enums import ExternalFrameType
from savant.gstreamer import GObject, Gst, GstBase
from savant.gstreamer.codecs import CODEC_BY_CAPS_NAME, Codec
from savant.gstreamer.metadata import DEFAULT_FRAMERATE
from savant.gstreamer.utils import LoggerMixin

EMBEDDED_FRAME_TYPE = 'embedded'


class FrameParams(NamedTuple):
"""Frame parameters."""
Expand Down Expand Up @@ -92,6 +94,14 @@ class VideoToAvroSerializer(LoggerMixin, GstBase.BaseTransform):
False,
GObject.ParamFlags.READWRITE,
),
'frame-type': (
str,
'Frame type.',
'Frame type (allowed: '
f'{", ".join([EMBEDDED_FRAME_TYPE] + [enum_member.value for enum_member in ExternalFrameType])})',
None,
GObject.ParamFlags.READWRITE,
),
}

def __init__(self):
Expand All @@ -108,6 +118,7 @@ def __init__(self):
self.location: Optional[Path] = None
self.last_location: Optional[Path] = None
self.default_framerate: str = DEFAULT_FRAMERATE
self.frame_type: Optional[ExternalFrameType] = ExternalFrameType.ZEROMQ

self.stream_in_progress = False
self.read_metadata: bool = False
Expand Down Expand Up @@ -157,6 +168,10 @@ def do_get_property(self, prop: GObject.GParamSpec):
return self.eos_on_frame_params_change
if prop.name == 'read-metadata':
return self.read_metadata
if prop.name == 'frame-type':
if self.frame_type is None:
return EMBEDDED_FRAME_TYPE
return self.frame_type.value
raise AttributeError(f'Unknown property {prop.name}.')

def do_set_property(self, prop: GObject.GParamSpec, value: Any):
Expand All @@ -181,6 +196,11 @@ def do_set_property(self, prop: GObject.GParamSpec, value: Any):
self.eos_on_frame_params_change = value
elif prop.name == 'read-metadata':
self.read_metadata = value
elif prop.name == 'frame-type':
if value == EMBEDDED_FRAME_TYPE:
self.frame_type = None
else:
self.frame_type = ExternalFrameType(value)
else:
raise AttributeError(f'Unknown property {prop.name}.')

Expand All @@ -194,7 +214,6 @@ def do_prepare_output_buffer(self, in_buf: Gst.Buffer):
self.logger.debug(
'Processing frame %s of size %s', in_buf.pts, in_buf.get_size()
)
frame = in_buf.extract_dup(0, in_buf.get_size())
if self.stream_in_progress:
if (
self.eos_on_location_change
Expand All @@ -209,16 +228,30 @@ def do_prepare_output_buffer(self, in_buf: Gst.Buffer):
self.last_location = self.location
self.last_frame_params = self.frame_params

frame_mapinfo: Optional[Gst.MapInfo] = None
if self.frame_type is None:
result, frame_mapinfo = in_buf.map(Gst.MapFlags.READ)
assert result, 'Cannot read buffer.'
frame = frame_mapinfo.data
elif self.frame_type == ExternalFrameType.ZEROMQ:
frame = {'type': self.frame_type.value}
else:
self.logger.error('Unsupported frame type "%s".', self.frame_type.value)
return Gst.FlowReturn.ERROR
message = self.build_message(
in_buf.pts,
in_buf.dts if in_buf.dts != Gst.CLOCK_TIME_NONE else None,
in_buf.duration if in_buf.duration != Gst.CLOCK_TIME_NONE else None,
frame,
frame=frame,
keyframe=not in_buf.has_flags(Gst.BufferFlags.DELTA_UNIT),
)
data = serialize(self.schema, message)
frame_meta = serialize(self.schema, message)

out_buf = Gst.Buffer.new_wrapped(data)
out_buf: Gst.Buffer = Gst.Buffer.new_wrapped(frame_meta)
if frame_mapinfo is not None:
in_buf.unmap(frame_mapinfo)
else:
out_buf.append_memory(in_buf.get_memory_range(0, -1))
out_buf.pts = in_buf.pts
out_buf.dts = in_buf.dts
out_buf.duration = in_buf.duration
Expand Down Expand Up @@ -276,7 +309,7 @@ def build_message(
pts: int,
dts: Optional[int],
duration: Optional[int],
frame: Optional[bytes],
frame: Optional[Union[bytes, Dict[str, Any]]],
**kwargs,
):
if pts == Gst.CLOCK_TIME_NONE:
Expand Down
25 changes: 20 additions & 5 deletions adapters/gst/gst_plugins/python/zeromq_sink.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""ZeroMQ sink."""
import inspect
from typing import List

import zmq

from savant.gst_plugins.python.zeromq_properties import (
Expand Down Expand Up @@ -138,18 +140,31 @@ def do_render(self, buffer: Gst.Buffer):
self.logger.debug(
'Processing frame %s of size %s', buffer.pts, buffer.get_size()
)
message: List[bytes] = [self.zmq_topic]
mapinfo_list: List[Gst.MapInfo] = []
mapinfo: Gst.MapInfo
result, mapinfo = buffer.map(Gst.MapFlags.READ)
result, mapinfo = buffer.map_range(0, 1, Gst.MapFlags.READ)
assert result, 'Cannot read buffer.'
data = mapinfo.data
self.logger.debug('Sending %s bytes to socket %s.', len(data), self.socket)
self.sender.send_multipart([self.zmq_topic, data])
mapinfo_list.append(mapinfo)
message.append(mapinfo.data)
if buffer.n_memory() > 1:
# TODO: Use Gst.Meta to check where to split buffer to ZeroMQ message parts
result, mapinfo = buffer.map_range(1, -1, Gst.MapFlags.READ)
assert result, 'Cannot read buffer.'
mapinfo_list.append(mapinfo)
message.append(mapinfo.data)
self.logger.debug(
'Sending %s bytes to socket %s.', sum(len(x) for x in message), self.socket
)
self.sender.send_multipart(message)
if self.wait_response:
resp = self.sender.recv()
self.logger.debug(
'Received %s bytes from socket %s.', len(resp), self.socket
)
buffer.unmap(mapinfo)
for mapinfo in mapinfo_list:
buffer.unmap(mapinfo)

return Gst.FlowReturn.OK

def do_stop(self):
Expand Down
23 changes: 17 additions & 6 deletions adapters/python/sinks/chunk_writer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
import math
from typing import List, Optional
from typing import Dict, List, Optional


class ChunkWriter:
Expand All @@ -17,13 +17,19 @@ def __init__(self, chunk_size: int):
self.frames_in_chunk = 0
self.opened = False

def write(self, data, can_start_new_chunk: bool, is_frame: bool = True) -> bool:
def write(
self,
message,
data,
can_start_new_chunk: bool,
is_frame: bool = True,
) -> bool:
if can_start_new_chunk and 0 < self.chunk_size <= self.frames_in_chunk:
self.close()
if not self.opened:
self.open()
frame_num = self.frames_in_chunk if is_frame else None
result = self._write(data, frame_num)
result = self._write(message, data, frame_num)
if is_frame:
self.frames_in_chunk += 1
return result
Expand Down Expand Up @@ -57,7 +63,7 @@ def _close(self):
def _flush(self):
pass

def _write(self, data, frame_num: Optional[int]) -> bool:
def _write(self, message, data, frame_num: Optional[int]) -> bool:
pass


Expand All @@ -78,8 +84,13 @@ def _flush(self):
for writer in self.writers:
writer.flush()

def _write(self, data, frame_num: Optional[int]) -> bool:
def _write(
self,
message: Dict,
data: List[bytes],
frame_num: Optional[int],
) -> bool:
for writer in self.writers:
if not writer._write(data, frame_num):
if not writer._write(message, data, frame_num):
return False
return True
Loading

0 comments on commit cfd96fd

Please sign in to comment.