You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
The classic approach of using a generic Provider as in core SLEAP is not always ideal for the "streaming inference" workload, where we're just reading through frames of a video sequentially (as opposed to doing random access in training/labels inference).
In particular, we take a big performance hit by having to re-open and/or seek to new frames when decoding a video sequentially.
A better approach that is more specialized to inference would look like a concurrent producer-consumer pattern.
In the video reader process:
importattrsimportmultiprocessingasmpimportsleap_ioassio@attrs.defineclassVideoReader:
video: sio.Videomax_size: int=8frame_queue: mp.Queue=attrs.field(init=False)
def__attrs_post_init__(self):
self.frame_queue=mp.Queue(maxsize=self.max_size)
defstart(self, frame_inds):
forframe_idxinframe_inds:
img=self.video[frame_ind]
self.frame_queue.put({"frame_idx": frame_idx, "img": img}) # blocks if full
Then VideoReader is called in a mp.Process so it's fully concurrent. Data is shared across processes via the frame_queue.
Will also need a way to stop the reader if the whole thing is cancelled. (See docs on multiprocessing for patterns for this.)
Then in the inference process, it'll be something like:
# Build a batch.batch= []
foriinrange(batch_size):
batch.append(video_reader.frame_queue.get())
# Process images.imgs=np.concatenate([x["img"] forxinbatch], axis=0)
predictions=predictor.predict(imgs)
This is done inside of an outer loop that loops over batches. This can be pre-determined based on the video metadata, or done greedily in a while loop, checking if there are any more frames every time.
Special case to handle: Incomplete batches (this will hang one of the processes in the current formulation -- suggestion: poison pill method).
Plan
VideoReader: This is the base video reader class, which is a sub-class of threading.Thread. The inputs to this module include sleap_io.Video object, a queue with a pre-defined max-size as batch size, and a tuple containing the start and end frame for defining the list of frames to process. If the tuple is None, then all the frames in the video are processed. This module overwrites the run method from the parent Thread class to read the frames based on the given frame indices and add the frames as a list [image, frame_idx, orig_size, video_file_name] to the buffer queue. If the reader doesn't have any more frames to load into the queue, None values are appended to terminate consuming frames from the queue.
The frames are consumed (similar to the code below) and are grouped into batches before passing it to a trained model to get the predicted instances using the Predictor class. The predicted instances can then be saved into a .slp file or can be returned as a list of predictions.
# Build a batch.batch= []
foriinrange(batch_size):
batch.append(video_reader.frame_queue.get())
# Process images.imgs=np.concatenate([x["img"] forxinbatch], axis=0)
predictions=predictor.predict(imgs, return_labels=False)
The text was updated successfully, but these errors were encountered:
The classic approach of using a generic
Provider
as in core SLEAP is not always ideal for the "streaming inference" workload, where we're just reading through frames of a video sequentially (as opposed to doing random access in training/labels inference).In particular, we take a big performance hit by having to re-open and/or seek to new frames when decoding a video sequentially.
A better approach that is more specialized to inference would look like a concurrent producer-consumer pattern.
In the video reader process:
Then
VideoReader
is called in amp.Process
so it's fully concurrent. Data is shared across processes via theframe_queue
.Will also need a way to stop the reader if the whole thing is cancelled. (See docs on
multiprocessing
for patterns for this.)Then in the inference process, it'll be something like:
This is done inside of an outer loop that loops over batches. This can be pre-determined based on the video metadata, or done greedily in a while loop, checking if there are any more frames every time.
Special case to handle: Incomplete batches (this will hang one of the processes in the current formulation -- suggestion: poison pill method).
Plan
VideoReader
: This is the base video reader class, which is a sub-class ofthreading.Thread
. The inputs to this module includesleap_io.Video
object, a queue with a pre-defined max-size as batch size, and a tuple containing the start and end frame for defining the list of frames to process. If the tuple is None, then all the frames in the video are processed. This module overwrites therun
method from the parentThread
class to read the frames based on the given frame indices and add the frames as a list [image
,frame_idx
,orig_size
,video_file_name
] to the buffer queue. If the reader doesn't have any more frames to load into the queue,None
values are appended to terminate consuming frames from the queue.The frames are consumed (similar to the code below) and are grouped into batches before passing it to a trained model to get the predicted instances using the
Predictor
class. The predicted instances can then be saved into a.slp
file or can be returned as a list of predictions.The text was updated successfully, but these errors were encountered: