Skip to content

Commit

Permalink
805 implement messagedump player adapter (#808)
Browse files Browse the repository at this point in the history
* Fixing a few typos in the readme files of the samples
* Implementing message dump player
* code fixes

---------

Co-authored-by: Ivan A. Kudriavtsev <kudryavtsev_ia@bw-sw.com>
  • Loading branch information
placccebo and bwsw authored Jul 5, 2024
1 parent 81533d3 commit 58d655f
Show file tree
Hide file tree
Showing 18 changed files with 291 additions and 26 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,8 @@ The following source adapters are available:
- [Kafka-Redis](https://docs.savant-ai.io/develop/savant_101/10_adapters.html#kafka-redis-source-adapter);
- [Video loop URL](https://docs.savant-ai.io/develop/savant_101/10_adapters.html#video-loop-source-adapter);
- [Multi-stream Source](https://docs.savant-ai.io/develop/savant_101/10_adapters.html#multi-stream-source-adapter);
- [Amazon Kinesis Video Streams Source](https://docs.savant-ai.io/develop/savant_101/10_adapters.html#kinesis-video-stream-source-adapter).
- [Amazon Kinesis Video Streams Source](https://docs.savant-ai.io/develop/savant_101/10_adapters.html#kinesis-video-stream-source-adapter);
- [Message Dump Player](https://docs.savant-ai.io/develop/savant_101/10_adapters.html#message-dump-player-source-adapter).

Several sink adapters are implemented:

Expand Down
2 changes: 1 addition & 1 deletion adapters/ds/sinks/always_on_rtsp/app_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from typing import List, Optional

from adapters.ds.sinks.always_on_rtsp.config import CommonStreamConfig
from savant.utils.config import opt_config, strtobool, req_config
from savant.utils.config import opt_config, req_config, strtobool
from savant.utils.zeromq import ReceiverSocketTypes


Expand Down
3 changes: 1 addition & 2 deletions adapters/ds/sinks/always_on_rtsp/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from enum import Enum
from pathlib import Path
from typing import Optional
from savant.utils.config import req_config

from savant_rs.pipeline2 import (
StageFunction,
Expand All @@ -14,7 +13,7 @@

from adapters.ds.sinks.always_on_rtsp.utils import nvidia_runtime_is_available
from savant.gstreamer.codecs import CODEC_BY_NAME, Codec
from savant.utils.config import opt_config, strtobool
from savant.utils.config import opt_config, req_config, strtobool
from savant.utils.zeromq import ReceiverSocketTypes

ENCODER_DEFAULT_PROFILES = {
Expand Down
3 changes: 1 addition & 2 deletions adapters/gst/sinks/multistream_kvs.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import time
from fractions import Fraction
from typing import Dict, Optional
from savant.utils.config import req_config

from pygstsavantframemeta import (
gst_buffer_add_savant_frame_meta,
Expand All @@ -25,7 +24,7 @@
from savant.api.enums import ExternalFrameType
from savant.gstreamer import Gst, GstApp
from savant.gstreamer.codecs import Codec
from savant.utils.config import opt_config, strtobool
from savant.utils.config import opt_config, req_config, strtobool
from savant.utils.logging import get_logger, init_logging
from savant.utils.zeromq import ZeroMQMessage, ZeroMQSource

Expand Down
3 changes: 1 addition & 2 deletions adapters/gst/sinks/video_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@
from savant.api.parser import convert_ts
from savant.gstreamer import GLib, Gst, GstApp
from savant.gstreamer.codecs import Codec
from savant.utils.config import opt_config, strtobool
from savant.utils.config import opt_config, req_config, strtobool
from savant.utils.logging import get_logger, init_logging
from savant.utils.welcome import get_starting_message
from savant.utils.zeromq import ZeroMQMessage, ZeroMQSource
from savant.utils.config import req_config

LOGGER_NAME = 'adapters.video_files_sink'
DEFAULT_CHUNK_SIZE = 10000
Expand Down
3 changes: 1 addition & 2 deletions adapters/python/bridge/buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import signal
import time
from typing import Dict, Optional, Tuple
from savant.utils.config import req_config

import msgpack
from rocksq.blocking import PersistentQueueWithCapacity
Expand All @@ -24,7 +23,7 @@
from adapters.shared.thread import BaseThreadWorker
from savant.metrics import Counter, Gauge
from savant.metrics.prometheus import BaseMetricsCollector, PrometheusMetricsExporter
from savant.utils.config import opt_config, strtobool
from savant.utils.config import opt_config, req_config, strtobool
from savant.utils.logging import get_logger, init_logging
from savant.utils.welcome import get_starting_message
from savant.utils.zeromq import ZeroMQMessage, ZeroMQSource
Expand Down
3 changes: 1 addition & 2 deletions adapters/python/shared/kafka_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,10 @@

from confluent_kafka.admin import AdminClient, ClusterMetadata, NewTopic

from savant.utils.config import opt_config, strtobool
from savant.utils.config import opt_config, req_config, strtobool
from savant.utils.fps_meter import FPSMeter
from savant.utils.logging import get_logger, init_logging
from savant.utils.welcome import get_starting_message
from savant.utils.config import req_config

LOGGER_NAME = 'adapters.kafka_redis'
logger = get_logger(LOGGER_NAME)
Expand Down
3 changes: 1 addition & 2 deletions adapters/python/sinks/image_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,10 @@
frame_has_objects,
)
from savant.api.enums import ExternalFrameType
from savant.utils.config import opt_config, strtobool
from savant.utils.config import opt_config, req_config, strtobool
from savant.utils.logging import get_logger, init_logging
from savant.utils.welcome import get_starting_message
from savant.utils.zeromq import ZeroMQMessage, ZeroMQSource
from savant.utils.config import req_config

LOGGER_NAME = 'adapters.image_files_sink'
DEFAULT_CHUNK_SIZE = 10000
Expand Down
2 changes: 1 addition & 1 deletion adapters/python/sinks/kafka_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from savant.api.enums import ExternalFrameType
from savant.client import SinkBuilder
from savant.client.runner.sink import SinkResult
from savant.utils.config import opt_config, strtobool, req_config
from savant.utils.config import opt_config, req_config, strtobool


class KafkaConfig(BaseKafkaConfig):
Expand Down
3 changes: 1 addition & 2 deletions adapters/python/sinks/metadata_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@
from adapters.python.sinks.chunk_writer import ChunkWriter
from savant.api.constants import DEFAULT_NAMESPACE
from savant.api.parser import parse_video_frame
from savant.utils.config import opt_config, strtobool
from savant.utils.config import opt_config, req_config, strtobool
from savant.utils.logging import get_logger, init_logging
from savant.utils.welcome import get_starting_message
from savant.utils.zeromq import ZeroMQMessage, ZeroMQSource
from savant.utils.config import req_config

LOGGER_NAME = 'adapters.metadata_json_sink'

Expand Down
180 changes: 180 additions & 0 deletions adapters/python/sources/message_dump_player.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
import signal
import time
from typing import Optional, Tuple

import msgpack
from savant_rs.utils.serialization import Message, load_message_from_bytes
from savant_rs.zmq import BlockingWriter, WriterConfigBuilder

from savant.utils.config import opt_config, req_config, strtobool
from savant.utils.logging import get_logger, init_logging
from savant.utils.welcome import get_starting_message

LOGGER_NAME = 'adapters.message_dump_player'

logger = get_logger(LOGGER_NAME)


class Config:
"""Configuration for the adapter."""

def __init__(self):
self.playlist_path = req_config('PLAYLIST_PATH')
self.zmq_endpoint = req_config('ZMQ_ENDPOINT')
self.sync_output = opt_config('SYNC_OUTPUT', False, strtobool)


class MessageDumpReader:
"""Reads messages from the message dump file."""

def __init__(self, file_path: str):
self._logger = get_logger(f'{LOGGER_NAME}.{self.__class__.__name__}')

try:
self._list_file = open(file_path, 'rb')
except Exception:
raise ValueError(f'Failed to open a file: {file_path}')

self._message_dump_file = None
self._unpacker = None
self._next_message_dump_file()

def read(self) -> Optional[Tuple[int, str, Message, bytes]]:
"""Unpack the next message from the file."""

if not self._unpacker:
return None
message = None
while message is None:
try:
message = self._unpacker.unpack()
except msgpack.OutOfData:
if not self._next_message_dump_file():
return None
except Exception as e:
self._logger.error('Failed to unpack message: %s', e)
raise RuntimeError('Failed to unpack message')

if not isinstance(message, (Tuple, list)) or len(message) != 4:
self._logger.error('Invalid message format: %s', message)
raise RuntimeError('Invalid message format')

ts, topic, meta, content = message

return ts, bytes(topic).decode(), load_message_from_bytes(meta), content

def _next_message_dump_file(self):
"""Open the next message dump file from the list."""

if self._message_dump_file:
self._message_dump_file.close()

file_path = self._list_file.readline()
if not file_path:
return False

try:
self._message_dump_file = open(file_path.strip(), 'rb')
except FileNotFoundError:
self._logger.error('Message dump file not found: %s', file_path)
raise RuntimeError('Message dump file not found')
except OSError as e:
self._logger.error(
'Failed to open a message dump file [%s]: %s', file_path, e
)
raise RuntimeError('Failed to open a message dump file')
try:
self._unpacker = msgpack.Unpacker(self._message_dump_file)
except Exception as e:
self._logger.error('Failed to create unpacker for message dump file: %s', e)
raise RuntimeError('Failed to create unpacker for message dump file')

return True

def __del__(self):
try:
if self._message_dump_file:
self._message_dump_file.close()
if self._list_file:
self._list_file.close()
except Exception as e:
self._logger.error('Failed to clean up the resources: %s', e)


class Player:
"""Receives messages from the dump and sends them to ZeroMQ socket."""

def __init__(
self,
reader: MessageDumpReader,
config: Config,
):
self._logger = get_logger(f'{LOGGER_NAME}.{self.__class__.__name__}')
self._reader = reader

config_builder = WriterConfigBuilder(config.zmq_endpoint)
self._writer = BlockingWriter(config_builder.build())
self._sync_output = config.sync_output
# Last sending time in seconds and last frame timestamp in nanoseconds for synchronization
self._last_send_time = None
self._last_ts = None

def play(self):
self._writer.start()
message = self._reader.read()
while message is not None:
try:
ts, topic, meta, content = message
self._send_message(ts, topic, meta, content)
message = self._reader.read()
except Exception as e:
self._logger.error('Failed to send message: %s', e)
break
self._logger.info('There are no more messages to send. Stopping the adapter.')
self._writer.shutdown()

def _send_message(self, ts: int, topic: str, meta: Message, content: bytes):
"""Send a message to the sink ZeroMQ socket. Synchronize the sending if needed."""

self._logger.debug('Sending message to the sink ZeroMQ socket')

if self._sync_output:
if self._last_send_time is not None:
delta = (ts - self._last_ts) / 1.0e9 - (
time.time() - self._last_send_time
)
if delta > 0:
time.sleep(delta)
elif delta < 0:
self._logger.warning('Message is late by %f seconds', -delta)
self._last_send_time = time.time()
self._last_ts = ts

self._writer.send_message(topic, meta, content)


def main():
init_logging()
logger.info(get_starting_message('message dump player adapter'))
# To gracefully shutdown the adapter on SIGTERM (raise KeyboardInterrupt)
signal.signal(signal.SIGTERM, signal.getsignal(signal.SIGINT))

try:
config = Config()
reader = MessageDumpReader(config.playlist_path)
player = Player(reader, config)
except Exception as e:
logger.error('Failed to start the adapter: %s', e)
exit(1)

try:
player.play()
except KeyboardInterrupt:
logger.info('Stopping the adapter')
except Exception as e:
logger.error('Adapter failed during execution: %s', e)
exit(1)


if __name__ == '__main__':
main()
36 changes: 35 additions & 1 deletion docs/source/savant_101/10_adapters.rst
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,8 @@ Currently, the following source adapters are available:
- GigE (Genicam) industrial cam;
- FFmpeg;
- Multi-stream;
- Kafka-Redis Source.
- Kafka-Redis Source;
- Message Dump Player.

Most source adapters accept the following common parameters:

Expand Down Expand Up @@ -866,6 +867,39 @@ Example:
"is_playing": false
}
Message Dump Player Source Adapter
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

The Message Dump Player Adapter plays video dumps sequentially from a playlist file and sends them to a module.
Playlist file contains a list of message dump files, one per line.
It's one shot adapter, i.e. it stops after playing all files from the playlist.

**Parameters**:

- ``PLAYLIST_PATH``: a path to the playlist file;
- ``SYNC_OUTPUT``: flag specifying if to send frames synchronously (i.e. at the source file rate); default is ``False``.

Running the adapter with Docker:

.. code-block:: bash
docker run --rm -it --name message-dump-player-test \
--entrypoint python \
-e PLAYLIST_PATH=/path/to/playlist/file.txt \
-e SYNC_OUTPUT=False \
-e ZMQ_ENDPOINT=dealer+connect:ipc:///tmp/zmq-sockets/input-video.ipc \
-v /path/to/playlist/file.txt:/path/to/playlist/file.txt:ro \
-v /path/to/dump/files:/path/to/dump/files \
-v /tmp/zmq-sockets:/tmp/zmq-sockets \
ghcr.io/insight-platform/savant-adapters-py:latest \
-m adapters.python.sources.message_dump_player
Running with the helper script:

.. code-block:: bash
./scripts/run_source.py mdp --playlist /path/to/playlist/file.txt --dump-files-dir /path/to/dump/files
Sink Adapters
-------------

Expand Down
1 change: 1 addition & 0 deletions requirements/base.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
msgpack~=1.0
numpy>=1.22.4,<2.0
cupy-cuda12x
numba~=0.57
Expand Down
2 changes: 1 addition & 1 deletion samples/auxiliary_streams/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ git lfs pull
docker compose -f samples/auxiliary_streams/docker-compose.x86.yml up

# if Jetson
docker compose -f samples/buffer_adapter/docker-compose.l4t.yml up
docker compose -f samples/auxiliary_streams/docker-compose.l4t.yml up

# open 'rtsp://127.0.0.1:554/stream/video-360p', 'rtsp://127.0.0.1:554/stream/video-480p',
# 'rtsp://127.0.0.1:554/stream/video-720p' in your player
Expand Down
3 changes: 3 additions & 0 deletions samples/rtdetr/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ The demo uses models that are compiled into TensorRT engines the first time the
# if x86
docker compose -f samples/rtdetr/docker-compose.x86.yml up

# if Jetson
docker compose -f samples/rtdetr/docker-compose.l4t.yml up

# open 'rtsp://127.0.0.1:554/stream/leeds' in your player
# or visit 'http://127.0.0.1:888/stream/leeds/' (LL-HLS)

Expand Down
2 changes: 1 addition & 1 deletion savant/VERSION
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
SAVANT=0.4.1
SAVANT_RS=0.3.1
SAVANT_RS=0.3.3
DEEPSTREAM=6.4
Loading

0 comments on commit 58d655f

Please sign in to comment.