-
Notifications
You must be signed in to change notification settings - Fork 131
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce new stream interface #156
Introduce new stream interface #156
Conversation
…h race conditions on state changes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this will be a huge improvement! I would like it to work similarly to the current Stream
implementation if possible so most of my suggestions are around that.
I still need to test on my nano so will keep you posted with those results.
predictions = self._model.postprocess( | ||
predictions, | ||
preprocessing_metadata, | ||
confidence=0.5, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't these be brought in as parameters and not hardcoded?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 good catch
|
||
def events_compatible(events: List[Optional[ModelActivityEvent]]) -> bool: | ||
if not all_not_empty(sequence=events): | ||
return False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this could be simplified to
if any(e is None for e in events):
return False
and all_not_empty
function removed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
@@ -110,6 +110,9 @@ | |||
|
|||
# Flag to enforce FPS, default is False | |||
ENFORCE_FPS = str2bool(os.getenv("ENFORCE_FPS", False)) | |||
MAX_FPS = os.getenv("MAX_FPS") | |||
if MAX_FPS is not None: | |||
MAX_FPS = int(MAX_FPS) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you also add a way to set the camera resolution? (functionality from this PR)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am sorry, this PR is already large - I am happy to help as part of another one.
@classmethod | ||
def init( | ||
cls, | ||
api_key: str, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be defaulted via .env file
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
break | ||
timestamp, frame_id, frame, predictions = inference_results | ||
try: | ||
self._on_prediction(frame, predictions) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-
these args are swapped from the previous interface which was
(predictions, frame)
- can we keep them the same as the previous interface for continuity? -
The previous predictions had
frame_id
set on the predictions which was useful for knowing how processing was being done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here - simply we do introduce breaking change
|
||
def _set_file_mode_buffering_strategies(self) -> None: | ||
if self._buffer_filling_strategy is None: | ||
self._buffer_filling_strategy = BufferFillingStrategy.WAIT |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The previous behavior was to process frames at native FPS of the video stream and run inference on the most recent frame, so a 4 minute video file would stream for 4 minutes. Currently, when I run this with a video file, it processes and infers on each frame of video, which is similar to the enforce_fps
mode of the previous implementation. Can we have this implementation work similarly to the old implementation by default in this mode?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can discuss about that - I believe that default behaviour against video is to process each frame.
StreamOperationNotAllowedError, | ||
) | ||
|
||
DEFAULT_BUFFER_SIZE = int(os.getenv("VIDEO_SOURCE_BUFFER_SIZE", "64")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should these configs live in inference/core/env.py
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
else: | ||
self._set_stream_mode_consumption_strategies() | ||
self._playback_allowed.set() | ||
self._stream_consumption_thread = Thread(target=self._consume_video) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should 'daemon=True' be set on this thread so it stops in the case of the main thread getting stopped?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm - good question
- on one hand - once our interface manages state of threads and join them (which is good) - we do not need
- on the other - users may left the threads hanging which would prevent the scripts termination - but if we are concerned about that - then we should probably apply to all cases, but then - threads that needs to dispose resources (like camera connection) may not be handled well
from inference.core.models.roboflow import OnnxRoboflowInferenceModel | ||
from inference.models.utils import get_roboflow_model | ||
|
||
PREDICTIONS_QUEUE_SIZE = int( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we move this to env.py
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
always if connection to stream is lost. | ||
""" | ||
if api_key is None: | ||
api_key = os.environ.get(API_KEY_ENV_NAMES[0], None) or os.environ.get( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like repeated logic from env.py
. Should we just import API_KEY
from env.py
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we could - I am just wondering if we should. The reasoning:
if we put a high-level configuration option into the env - then the moment we latch the env variable into Py variable matters - if we latch the values in the moment we import the env.py
- then any export to env that happens in between will not be visible and may lead to unexpected behaviour - like if someone has a script and config option is exported in the script function body after import inference
- variable will not be visible for inference
logic.
I am open to change that in a way you pointed, just bringing this into consideration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
resolved as suggested
|
||
def start(self, use_main_thread: bool = True) -> None: | ||
self._stop = False | ||
self._inference_thread = Thread(target=self._execute_inference) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be a daemon thread?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm... good question:
- we manage the state of thread execution and the interface exposed in the class would make the job done in terms of graceful termination - which is good
- daemon threads, however, in conceptual terms represents computation that we do not care in terms of termination (and we let main thread termination to kill them abruptly) - the benefit of having a daemon here would be to avoid client's scripts hanging given the termination method is not invoked
frame_timestamp=video_frame.frame_timestamp, | ||
frame_id=video_frame.frame_id, | ||
) | ||
preprocessed_image, preprocessing_metadata = self._model.preprocess( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would we see gains if we separated preprocessing, inference, postprocessing into separate threads with their own source/sink queues? Not a needed change for this PR, just a point of discussion for future versions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
initially I was considering that, but the think convinced me not doing so was the metrics collected by watchodg:
LatencyMonitorReport(
frame_decoding_latency=0.024393796875000002,
pre_processing_latency=0.0021425156250000003,
inference_latency=0.033938140625000016,
post_processing_latency=0.0007386406250000003,
model_latency=0.03681929687500001,
e2e_latency=0.06121309375000001,
)
that represents avg processing time for 1080p@60fps stream against yolov8n
.
Simply speaking preprocessing takes ~2ms and can be easily considered minor cost of the whole inference process.
Post-processing - highly dependent on context (here I was inferring COCO-trained model against animal footage - low amount of bounding boxes was present) - can be indeed a bottleneck - let me check against another input
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, I run the same test against video with noticeably more detections on average
LatencyMonitorReport(
frame_decoding_latency=0.049186390624999986,
pre_processing_latency=0.001831875,
inference_latency=0.033255812499999995,
post_processing_latency=0.00079621875,
model_latency=0.035883906250000014,
e2e_latency=0.08507029687499995,
)
this time, stream was 30fps, so the frame_decoding_latency
is higher, but the post-processing is still sub 1ms
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left a few comments. Looking really great! Huge improvement for stability and usability!
Description
Please include a summary of the change and which issue is fixed or implemented. Please also include relevant motivation and context (e.g. links, docs, tickets etc.).
List any dependencies that are required for this change.
Type of change
Please delete options that are not relevant.
How has this change been tested, please provide a testcase or example of how you tested the change?
Any specific deployment considerations
For example, documentation changes, usability, usage/costs, secrets, etc.
Docs