Skip to content

Commit

Permalink
Merge pull request #831 from insight-platform/813-add-source_id-and-s…
Browse files Browse the repository at this point in the history
…ource_id_prefix-parameters-to-sink-in-savantclient

Add source id and source id prefix parameters to sink
  • Loading branch information
placccebo authored Aug 7, 2024
2 parents 2b5e9e4 + 20a48ab commit 2b9f667
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 7 deletions.
6 changes: 4 additions & 2 deletions samples/template/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ This sample is intended to help users to start developing a custom Savant module
* Easy dev environment setup with either Docker Compose or devcontainer configuration files.
* Jaeger tracing platform service that allows to collect and inspect module's pipeline traces.
* Supporting Savant services such as Always On Sink adapter and Uri Input script that allow to send a video to the module and receive stream output.
* All of the above is setup to be ready to run, with no additional configuration needed.
* All of the above is set up to be ready to run, with no additional configuration needed.

See [documentation](https://insight-platform.github.io/Savant/) for more information.

Expand Down Expand Up @@ -51,7 +51,9 @@ Note that starting module for the first time involves downloading the model file

URI-Input script demonstration requires a sample video.

```
```bash
# you are expected to be in Savant/samples/template/ directory

curl -o assets/test_data/elon_musk_perf.mp4 https://eu-central-1.linodeobjects.com/savant-data/demo/elon_musk_perf.mp4
```

Expand Down
1 change: 1 addition & 0 deletions samples/template/src/client/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
sink = (
SinkBuilder()
.with_socket('sub+connect:ipc:///tmp/zmq-sockets/output-video.ipc')
.with_source_id(source_id)
.with_idle_timeout(60)
.with_log_provider(JaegerLogProvider(jaeger_endpoint))
# Note: healthcheck port should be configured in the module.
Expand Down
30 changes: 29 additions & 1 deletion savant/client/builder/sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,17 @@ def __init__(
module_health_check_url: Optional[str] = None,
module_health_check_timeout: float = 60,
module_health_check_interval: float = 5,
source_id: Optional[str] = None,
source_id_prefix: Optional[str] = None,
):
self._socket = socket
self._log_provider = log_provider
self._idle_timeout = idle_timeout
self._module_health_check_url = module_health_check_url
self._module_health_check_timeout = module_health_check_timeout
self._module_health_check_interval = module_health_check_interval
self._source_id = source_id
self._source_id_prefix = source_id_prefix

def with_socket(self, socket: str) -> 'SinkBuilder':
"""Set ZeroMQ socket for Sink."""
Expand Down Expand Up @@ -96,6 +100,22 @@ def with_module_health_check_interval(self, interval: float) -> 'SinkBuilder':
"""
return self._with_field('module_health_check_interval', interval)

def with_source_id(self, source_id: Optional[str]) -> 'SinkBuilder':
"""Set source id for Sink.
Sink will filter messages by source id.
"""
return self._with_field('source_id', source_id)

def with_source_id_prefix(self, source_id_prefix: Optional[str]) -> 'SinkBuilder':
"""Set source id prefix for Sink.
Sink will filter messages by source id prefix.
Note: source_id and source_id_prefix are mutually exclusive. If both are set, source_id will be used.
"""
return self._with_field('source_id_prefix', source_id_prefix)

def build(self) -> SinkRunner:
"""Build Sink."""

Expand All @@ -112,6 +132,8 @@ def build(self) -> SinkRunner:
module_health_check_url=self._module_health_check_url,
module_health_check_timeout=self._module_health_check_timeout,
module_health_check_interval=self._module_health_check_interval,
source_id=self._source_id,
source_id_prefix=self._source_id_prefix,
)

def build_async(self) -> AsyncSinkRunner:
Expand All @@ -130,6 +152,8 @@ def build_async(self) -> AsyncSinkRunner:
module_health_check_url=self._module_health_check_url,
module_health_check_timeout=self._module_health_check_timeout,
module_health_check_interval=self._module_health_check_interval,
source_id=self._source_id,
source_id_prefix=self._source_id_prefix,
)

def __repr__(self):
Expand All @@ -140,7 +164,9 @@ def __repr__(self):
f'idle_timeout={self._idle_timeout}, '
f'module_health_check_url={self._module_health_check_url}, '
f'module_health_check_timeout={self._module_health_check_timeout}, '
f'module_health_check_interval={self._module_health_check_interval})'
f'module_health_check_interval={self._module_health_check_interval}, '
f'source_id={self._source_id}, '
f'source_id_prefix={self._source_id_prefix})'
)

def _with_field(self, field: str, value) -> 'SinkBuilder':
Expand All @@ -152,6 +178,8 @@ def _with_field(self, field: str, value) -> 'SinkBuilder':
'module_health_check_url': self._module_health_check_url,
'module_health_check_timeout': self._module_health_check_timeout,
'module_health_check_interval': self._module_health_check_interval,
'source_id': self._source_id,
'source_id_prefix': self._source_id_prefix,
field: value,
}
)
37 changes: 33 additions & 4 deletions savant/client/runner/sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ def __init__(
module_health_check_interval: float,
receive_timeout: int = Defaults.RECEIVE_TIMEOUT,
receive_hwm: int = Defaults.RECEIVE_HWM,
source_id: Optional[str] = None,
source_id_prefix: Optional[str] = None,
):
self._log_provider = log_provider
self._idle_timeout = idle_timeout if idle_timeout is not None else 10**6
Expand All @@ -53,15 +55,24 @@ def __init__(
if module_health_check_url is not None
else None
)
self._source = self._build_zeromq_source(socket, receive_timeout, receive_hwm)
self._source = self._build_zeromq_source(
socket, receive_timeout, receive_hwm, source_id, source_id_prefix
)
self._source.start()

def __del__(self):
logger.info('Terminating ZeroMQ connection')
self._source.terminate()

@abstractmethod
def _build_zeromq_source(self, socket: str, receive_timeout: int, receive_hwm: int):
def _build_zeromq_source(
self,
socket: str,
receive_timeout: int,
receive_hwm: int,
source_id: Optional[str],
source_id_prefix: Optional[str],
):
pass

@abstractmethod
Expand Down Expand Up @@ -112,12 +123,21 @@ def _handle_message(self, zmq_message: ZeroMQMessage):
class SinkRunner(BaseSinkRunner):
"""Receives messages from ZeroMQ socket."""

def _build_zeromq_source(self, socket: str, receive_timeout: int, receive_hwm: int):
def _build_zeromq_source(
self,
socket: str,
receive_timeout: int,
receive_hwm: int,
source_id: Optional[str],
source_id_prefix: Optional[str],
):
return ZeroMQSource(
socket=socket,
receive_timeout=receive_timeout,
receive_hwm=receive_hwm,
set_ipc_socket_permissions=False,
source_id=source_id,
source_id_prefix=source_id_prefix,
)

def __next__(self) -> SinkResult:
Expand Down Expand Up @@ -155,12 +175,21 @@ class AsyncSinkRunner(BaseSinkRunner):

_source: AsyncZeroMQSource

def _build_zeromq_source(self, socket: str, receive_timeout: int, receive_hwm: int):
def _build_zeromq_source(
self,
socket: str,
receive_timeout: int,
receive_hwm: int,
source_id: Optional[str],
source_id_prefix: Optional[str],
):
return AsyncZeroMQSource(
socket=socket,
receive_timeout=receive_timeout,
receive_hwm=receive_hwm,
set_ipc_socket_permissions=False,
source_id=source_id,
source_id_prefix=source_id_prefix,
)

async def __anext__(self) -> SinkResult:
Expand Down

0 comments on commit 2b9f667

Please sign in to comment.