Skip to content

Commit

Permalink
AWS KVS Source Adapter (#729)
Browse files Browse the repository at this point in the history
* #697 add KVS source adapter

* #697 add commands to play/stop the stream

* #697 add docs for kvs source adapter

* #697 add kvs source adapter to run_source.py script

* #697 add FPS meter to kvs source adapter
  • Loading branch information
tomskikh authored Apr 11, 2024
1 parent eae4e2e commit 3a25715
Show file tree
Hide file tree
Showing 13 changed files with 1,403 additions and 0 deletions.
1 change: 1 addition & 0 deletions adapters/gst/sources/kvs/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
LOGGER_PREFIX = 'adapters.kvs_source'
39 changes: 39 additions & 0 deletions adapters/gst/sources/kvs/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import signal
import time

from savant.gstreamer import Gst
from savant.utils.logging import get_logger, init_logging

from . import LOGGER_PREFIX
from .api import Api
from .config import Config
from .stream_manager import StreamManager


def main():
init_logging()
logger = get_logger(LOGGER_PREFIX)
logger.info('Starting the adapter')

# To gracefully shutdown the adapter on SIGTERM (raise KeyboardInterrupt)
signal.signal(signal.SIGTERM, signal.getsignal(signal.SIGINT))

config = Config()

Gst.init(None)

stream_manager = StreamManager(config)
api = Api(config, stream_manager)

stream_manager.start()
api.start()
try:
while api.is_running() and stream_manager.is_running():
time.sleep(1)
finally:
logger.info('Stopping the adapter')
stream_manager.stop()


if __name__ == '__main__':
main()
111 changes: 111 additions & 0 deletions adapters/gst/sources/kvs/api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
from http import HTTPStatus
from threading import Thread
from typing import Optional

import uvicorn
from fastapi import FastAPI, HTTPException

from savant.utils.logging import get_logger

from . import LOGGER_PREFIX
from .config import Config
from .stream_manager import KvsStreamNotFoundError, StreamManager
from .stream_model import StreamModel

logger = get_logger(f'{LOGGER_PREFIX}.api')


class Api:
"""API server for the stream control API."""

def __init__(self, config: Config, stream_manager: StreamManager):
self.config = config
self.stream_manager = stream_manager
self.thread: Optional[Thread] = None
self.app = FastAPI()
self.app.get('/stream')(self.get_stream)
self.app.put('/stream')(self.update_stream)
self.app.put('/stream/play')(self.play_stream)
self.app.put('/stream/stop')(self.stop_stream)

def get_stream(self) -> StreamModel:
"""Get the current stream configuration."""

stream = self.stream_manager.get_stream()
if stream is None:
raise HTTPException(
status_code=HTTPStatus.NOT_FOUND,
detail='Stream not configured.',
)

return stream.without_credentials()

def update_stream(self, stream: StreamModel) -> StreamModel:
"""Update stream configuration."""

logger.info(
'Updating stream configuration to: %r',
stream.without_credentials(),
)

try:
self.stream_manager.update_stream(stream)
except KvsStreamNotFoundError as e:
logger.warning('Stream not found: %s', e)
raise HTTPException(
status_code=HTTPStatus.NOT_FOUND,
detail=f'Stream not found: {e}',
)
except Exception as e:
logger.error('Error updating stream configuration: %s', e, exc_info=True)
raise HTTPException(
status_code=HTTPStatus.BAD_REQUEST,
detail=f'Failed to update stream configuration: {e}',
)

return self.get_stream()

def play_stream(self) -> StreamModel:
"""Start the stream."""

try:
self.stream_manager.play_stream()
except Exception as e:
logger.error('Error starting the stream: %s', e, exc_info=True)
raise HTTPException(
status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
detail=f'Failed to start the stream: {e}',
)

return self.get_stream()

def stop_stream(self) -> StreamModel:
"""Stop the stream."""

try:
self.stream_manager.stop_stream(stop_poller=False)
except Exception as e:
logger.error('Error stopping the stream: %s', e, exc_info=True)
raise HTTPException(
status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
detail=f'Failed to stop the stream: {e}',
)

return self.get_stream()

def run_api(self):
"""Run the API server."""

logger.info('Starting API server on port %d', self.config.api_port)
uvicorn.run(self.app, host='0.0.0.0', port=self.config.api_port)

def start(self):
"""Start the thread with the API server."""

self.thread = Thread(target=self.run_api, daemon=True)
self.thread.start()

def is_running(self):
"""Check if the API server is running."""

return self.thread.is_alive()
68 changes: 68 additions & 0 deletions adapters/gst/sources/kvs/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import os
from datetime import datetime, timedelta
from distutils.util import strtobool
from pathlib import Path

from savant.utils.config import opt_config

TIME_DELTAS = {
's': lambda x: timedelta(seconds=x),
'm': lambda x: timedelta(minutes=x),
}


class AwsConfig:
def __init__(self):
self.region = os.environ['AWS_REGION']
self.access_key = os.environ['AWS_ACCESS_KEY']
self.secret_key = os.environ['AWS_SECRET_KEY']


class FpsMeterConfig:
def __init__(self):
self.period_seconds = opt_config('FPS_PERIOD_SECONDS', None, float)
self.period_frames = opt_config('FPS_PERIOD_FRAMES', 1000, int)
self.output = opt_config('FPS_OUTPUT', 'stdout')
assert self.output in [
'stdout',
'logger',
], 'FPS_OUTPUT must be "stdout" or "logger"'


class Config:
def __init__(self):
self.source_id = os.environ['SOURCE_ID']
self.stream_name = os.environ['STREAM_NAME']
timestamp = os.environ.get('TIMESTAMP')
self.timestamp = parse_timestamp(timestamp) if timestamp else datetime.utcnow()

self.zmq_endpoint = os.environ['ZMQ_ENDPOINT']
self.sync_output = opt_config('SYNC_OUTPUT', False, strtobool)
self.playing = opt_config('PLAYING', True, strtobool)
self.api_port = opt_config('API_PORT', 18367, int)

self.save_state = opt_config('SAVE_STATE', False, strtobool)
if self.save_state:
self.state_path = opt_config('STATE_PATH', Path('state.json'), Path)
else:
self.state_path = None

self.aws: AwsConfig = AwsConfig()
self.fps_meter: FpsMeterConfig = FpsMeterConfig()


def parse_timestamp(ts: str) -> datetime:
"""Parse a timestamp string into a datetime object.
The timestamp can be in the format "YYYY-MM-DDTHH:MM:SS" or a relative time
in the format "-N[s|m]" where N is an integer and "s" or "m" is the unit of
time (seconds or minutes)."""

try:
return datetime.strptime(ts, '%Y-%m-%dT%H:%M:%S')
except ValueError:
unit = ts[-1]
delta = int(ts[:-1])
if delta > 0:
raise ValueError(f'Invalid timestamp: {ts}')
return datetime.utcnow() + TIME_DELTAS[unit](delta)
Loading

0 comments on commit 3a25715

Please sign in to comment.