Skip to content

Commit

Permalink
#372 don't pass source-id to zeromq_sink element
Browse files Browse the repository at this point in the history
  • Loading branch information
tomskikh committed Aug 30, 2023
1 parent a6c9507 commit bb6d82a
Show file tree
Hide file tree
Showing 8 changed files with 20 additions and 30 deletions.
7 changes: 5 additions & 2 deletions adapters/gst/gst_plugins/python/savant_rs_serializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from savant.api.enums import ExternalFrameType
from savant.gstreamer import GObject, Gst, GstBase
from savant.gstreamer.codecs import CODEC_BY_CAPS_NAME, Codec
from savant.gstreamer.utils import gst_buffer_from_list
from savant.utils.logging import LoggerMixin

EMBEDDED_FRAME_TYPE = 'embedded'
Expand Down Expand Up @@ -133,6 +134,7 @@ def __init__(self):
super().__init__()
# properties
self.source_id: Optional[str] = None
self.zmq_topic: Optional[bytes] = None
self.eos_on_file_end: bool = True
self.eos_on_loop_end: bool = False
self.eos_on_frame_params_change: bool = True
Expand Down Expand Up @@ -216,6 +218,7 @@ def do_set_property(self, prop: GObject.GParamSpec, value: Any):
"""
if prop.name == 'source-id':
self.source_id = value
self.zmq_topic = f'{value}/'.encode()
elif prop.name == 'location':
self.location = value
elif prop.name == 'framerate':
Expand Down Expand Up @@ -289,7 +292,7 @@ def do_prepare_output_buffer(self, in_buf: Gst.Buffer):
message = Message.video_frame(frame)
data = save_message_to_bytes(message)

out_buf: Gst.Buffer = Gst.Buffer.new_wrapped(data)
out_buf: Gst.Buffer = gst_buffer_from_list([self.zmq_topic, data])
if frame_mapinfo is not None:
in_buf.unmap(frame_mapinfo)
else:
Expand Down Expand Up @@ -343,7 +346,7 @@ def send_end_message(self):
self.logger.info('Sending serialized EOS message')
message = Message.end_of_stream(EndOfStream(self.source_id))
data = save_message_to_bytes(message)
out_buf = Gst.Buffer.new_wrapped(data)
out_buf = gst_buffer_from_list([self.zmq_topic, data])
self.srcpad.push(out_buf)
self.stream_in_progress = False

Expand Down
31 changes: 9 additions & 22 deletions adapters/gst/gst_plugins/python/zeromq_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,6 @@ class ZeroMQSink(LoggerMixin, GstBase.BaseSink):
Defaults.EOS_CONFIRMATION_RETRIES,
GObject.ParamFlags.READWRITE,
),
'source-id': (
str,
'Source ID',
'Source ID, e.g. "camera1".',
None,
GObject.ParamFlags.READWRITE,
),
}

def __init__(self):
Expand All @@ -97,8 +90,6 @@ def __init__(self):
self.receive_timeout = Defaults.SENDER_RECEIVE_TIMEOUT
self.req_receive_retries = Defaults.REQ_RECEIVE_RETRIES
self.eos_confirmation_retries = Defaults.EOS_CONFIRMATION_RETRIES
self.source_id: str = None
self.zmq_topic: bytes = None
self.set_sync(False)

def do_get_property(self, prop):
Expand All @@ -119,8 +110,6 @@ def do_get_property(self, prop):
return self.bind
if prop.name == 'send-hwm':
return self.send_hwm
if prop.name == 'source-id':
return self.source_id
if prop.name == 'receive-timeout':
return self.receive_timeout
if prop.name == 'req-receive-retries':
Expand All @@ -145,9 +134,6 @@ def do_set_property(self, prop, value):
self.bind = value
elif prop.name == 'send-hwm':
self.send_hwm = value
elif prop.name == 'source-id':
self.source_id = value
self.zmq_topic = f'{value}/'.encode()
elif prop.name == 'receive-timeout':
self.receive_timeout = value
elif prop.name == 'req-receive-retries':
Expand All @@ -159,7 +145,6 @@ def do_set_property(self, prop, value):

def do_start(self):
"""Start source."""
assert self.source_id, 'Source ID is required.'
try:
self.socket_type, self.bind, self.socket = parse_zmq_socket_uri(
uri=self.socket,
Expand Down Expand Up @@ -190,16 +175,18 @@ def do_render(self, buffer: Gst.Buffer):
self.logger.debug(
'Processing frame %s of size %s', buffer.pts, buffer.get_size()
)
message: List[bytes] = [self.zmq_topic]
message: List[bytes] = []
mapinfo_list: List[Gst.MapInfo] = []
mapinfo: Gst.MapInfo
result, mapinfo = buffer.map_range(0, 1, Gst.MapFlags.READ)
assert result, 'Cannot read buffer.'
mapinfo_list.append(mapinfo)
message.append(mapinfo.data)
if buffer.n_memory() > 1:
for i in range(2):
result, mapinfo = buffer.map_range(i, 1, Gst.MapFlags.READ)
assert result, 'Cannot read buffer.'
mapinfo_list.append(mapinfo)
message.append(mapinfo.data)

if buffer.n_memory() > 2:
# TODO: Use Gst.Meta to check where to split buffer to ZeroMQ message parts
result, mapinfo = buffer.map_range(1, -1, Gst.MapFlags.READ)
result, mapinfo = buffer.map_range(2, -1, Gst.MapFlags.READ)
assert result, 'Cannot read buffer.'
mapinfo_list.append(mapinfo)
message.append(mapinfo.data)
Expand Down
2 changes: 1 addition & 1 deletion adapters/gst/sources/ffmpeg.sh
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ PIPELINE=(
fps_meter "${FPS_PERIOD}" output="${FPS_OUTPUT}" !
savant_rs_serializer source-id="${SOURCE_ID}" !
zeromq_sink socket="${ZMQ_ENDPOINT}" socket-type="${ZMQ_SOCKET_TYPE}" bind="${ZMQ_SOCKET_BIND}"
sync="${SYNC_OUTPUT}" ts-offset="${SYNC_DELAY}" source-id="${SOURCE_ID}"
sync="${SYNC_OUTPUT}" ts-offset="${SYNC_DELAY}"
)

gst-launch-1.0 --eos-on-shutdown "${PIPELINE[@]}" &
Expand Down
2 changes: 1 addition & 1 deletion adapters/gst/sources/gige_cam.sh
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ PIPELINE+=(
savant_rs_serializer source-id="${SOURCE_ID}" !
fps_meter "${FPS_PERIOD}" output="${FPS_OUTPUT}" !
zeromq_sink socket="${ZMQ_ENDPOINT}" socket-type="${ZMQ_SOCKET_TYPE}" bind="${ZMQ_SOCKET_BIND}"
sync="${SYNC_OUTPUT}" source-id="${SOURCE_ID}"
sync="${SYNC_OUTPUT}"
)

handler() {
Expand Down
2 changes: 1 addition & 1 deletion adapters/gst/sources/media_files.sh
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ gst-launch-1.0 --eos-on-shutdown \
adjust_timestamps ! \
savant_rs_serializer source-id="${SOURCE_ID}" eos-on-file-end="${EOS_ON_FILE_END}" \
eos-on-frame-params-change=true read-metadata="${READ_METADATA}" ! \
zeromq_sink socket="${ZMQ_ENDPOINT}" socket-type="${ZMQ_SOCKET_TYPE}" bind="${ZMQ_SOCKET_BIND}" sync="${SYNC_OUTPUT}" source-id="${SOURCE_ID}" "${RECEIVE_TIMEOUT}" \
zeromq_sink socket="${ZMQ_ENDPOINT}" socket-type="${ZMQ_SOCKET_TYPE}" bind="${ZMQ_SOCKET_BIND}" sync="${SYNC_OUTPUT}" "${RECEIVE_TIMEOUT}" \
&

child_pid="$!"
Expand Down
2 changes: 1 addition & 1 deletion adapters/gst/sources/rtsp.sh
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ PIPELINE=(
fps_meter "${FPS_PERIOD}" output="${FPS_OUTPUT}" !
savant_rs_serializer source-id="${SOURCE_ID}" !
zeromq_sink socket="${ZMQ_ENDPOINT}" socket-type="${ZMQ_SOCKET_TYPE}" bind="${ZMQ_SOCKET_BIND}"
sync="${SYNC_OUTPUT}" ts-offset="${SYNC_DELAY}" source-id="${SOURCE_ID}"
sync="${SYNC_OUTPUT}" ts-offset="${SYNC_DELAY}"
)

gst-launch-1.0 --eos-on-shutdown "${PIPELINE[@]}" &
Expand Down
2 changes: 1 addition & 1 deletion adapters/gst/sources/usb_cam.sh
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ gst-launch-1.0 --eos-on-shutdown \
'video/x-raw,format=RGBA' ! \
fps_meter "${FPS_PERIOD}" output="${FPS_OUTPUT}" ! \
savant_rs_serializer source-id="${SOURCE_ID}" ! \
zeromq_sink socket="${ZMQ_ENDPOINT}" socket-type="${ZMQ_SOCKET_TYPE}" bind="${ZMQ_SOCKET_BIND}" sync="${SYNC_OUTPUT}" source-id="${SOURCE_ID}" \
zeromq_sink socket="${ZMQ_ENDPOINT}" socket-type="${ZMQ_SOCKET_TYPE}" bind="${ZMQ_SOCKET_BIND}" sync="${SYNC_OUTPUT}" \
&

child_pid="$!"
Expand Down
2 changes: 1 addition & 1 deletion adapters/gst/sources/video_loop.sh
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ fi
PIPELINE+=(
savant_rs_serializer source-id="${SOURCE_ID}" eos-on-loop-end="${EOS_ON_LOOP_END}"
read-metadata="${READ_METADATA}" !
zeromq_sink socket="${ZMQ_ENDPOINT}" socket-type="${ZMQ_SOCKET_TYPE}" bind="${ZMQ_SOCKET_BIND}" sync="${SYNC_OUTPUT}" source-id="${SOURCE_ID}"
zeromq_sink socket="${ZMQ_ENDPOINT}" socket-type="${ZMQ_SOCKET_TYPE}" bind="${ZMQ_SOCKET_BIND}" sync="${SYNC_OUTPUT}"
)

handler() {
Expand Down

0 comments on commit bb6d82a

Please sign in to comment.