-
Notifications
You must be signed in to change notification settings - Fork 9
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Safer video loading from SLP #119
Changes from 5 commits
f314fe6
8c5bac3
d04fea5
b0af70f
8e179ca
e97cfff
c923ddc
0487ebc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,7 +26,75 @@ | |
Instance, | ||
PredictedInstance, | ||
) | ||
from sleap_io.io.utils import convert_predictions_to_dataframe | ||
|
||
|
||
def convert_predictions_to_dataframe(labels: Labels) -> pd.DataFrame: | ||
"""Convert predictions data to a Pandas dataframe. | ||
|
||
Args: | ||
labels: A general label object. | ||
|
||
Returns: | ||
pd.DataFrame: A pandas data frame with the structured data with | ||
hierarchical columns. The column hierarchy is: | ||
"video_path", | ||
"skeleton_name", | ||
"track_name", | ||
"node_name", | ||
And it is indexed by the frames. | ||
|
||
Raises: | ||
ValueError: If no frames in the label objects contain predicted instances. | ||
""" | ||
# Form pairs of labeled_frames and predicted instances | ||
labeled_frames = labels.labeled_frames | ||
all_frame_instance_tuples = ( | ||
(label_frame, instance) # type: ignore | ||
for label_frame in labeled_frames | ||
for instance in label_frame.predicted_instances | ||
) | ||
|
||
# Extract the data | ||
data_list = list() | ||
for labeled_frame, instance in all_frame_instance_tuples: | ||
# Traverse the nodes of the instances's skeleton | ||
skeleton = instance.skeleton | ||
for node in skeleton.nodes: | ||
row_dict = dict( | ||
frame_idx=labeled_frame.frame_idx, | ||
x=instance.points[node].x, | ||
y=instance.points[node].y, | ||
score=instance.points[node].score, # type: ignore[attr-defined] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Avoid suppressing type checker warnings with Using |
||
node_name=node.name, | ||
skeleton_name=skeleton.name, | ||
track_name=instance.track.name if instance.track else "untracked", | ||
video_path=labeled_frame.video.filename, | ||
) | ||
data_list.append(row_dict) | ||
|
||
if not data_list: | ||
raise ValueError("No predicted instances found in labels object") | ||
|
||
labels_df = pd.DataFrame(data_list) | ||
|
||
# Reformat the data with columns for dict-like hierarchical data access. | ||
index = [ | ||
"skeleton_name", | ||
"track_name", | ||
"node_name", | ||
"video_path", | ||
"frame_idx", | ||
] | ||
|
||
labels_tidy_df = ( | ||
labels_df.set_index(index) | ||
.unstack(level=[0, 1, 2, 3]) | ||
.swaplevel(0, -1, axis=1) # video_path on top while x, y score on bottom | ||
.sort_index(axis=1) # Better format for columns | ||
.sort_index(axis=0) # Sorts by frames | ||
) | ||
|
||
return labels_tidy_df | ||
Comment on lines
+31
to
+97
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Include unit tests for To ensure that the new
This will help detect any potential issues early and ensure robustness. |
||
|
||
|
||
def get_timestamps(series: PoseEstimationSeries) -> np.ndarray: | ||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -21,10 +21,7 @@ | |||||
Labels, | ||||||
) | ||||||
from sleap_io.io.video import VideoBackend, ImageVideo, MediaVideo, HDF5Video | ||||||
from sleap_io.io.utils import ( | ||||||
read_hdf5_attrs, | ||||||
read_hdf5_dataset, | ||||||
) | ||||||
from sleap_io.io.utils import read_hdf5_attrs, read_hdf5_dataset, is_file_accessible | ||||||
from enum import IntEnum | ||||||
from pathlib import Path | ||||||
import imageio.v3 as iio | ||||||
|
@@ -43,8 +40,28 @@ | |||||
PREDICTED = 1 | ||||||
|
||||||
|
||||||
def sanitize_filename( | ||||||
filename: str | Path | list[str] | list[Path], | ||||||
) -> str | list[str]: | ||||||
"""Sanitize a filename to a canonical posix-compatible format. | ||||||
|
||||||
Args: | ||||||
filename: A string or `Path` object or list of either to sanitize. | ||||||
|
||||||
Returns: | ||||||
A sanitized filename as a string (or list of strings if a list was provided) | ||||||
with forward slashes and posix-formatted. | ||||||
""" | ||||||
if isinstance(filename, list): | ||||||
return [sanitize_filename(f) for f in filename] | ||||||
return Path(filename).as_posix().replace("\\", "/") | ||||||
|
||||||
|
||||||
def make_video( | ||||||
labels_path: str, video_json: dict, video_ind: int | None = None | ||||||
labels_path: str, | ||||||
video_json: dict, | ||||||
video_ind: int | None = None, | ||||||
open_backend: bool = True, | ||||||
) -> Video: | ||||||
"""Create a `Video` object from a JSON dictionary. | ||||||
|
||||||
|
@@ -53,6 +70,9 @@ | |||||
video_json: A dictionary containing the video metadata. | ||||||
video_ind: The index of the video in the labels file. This is used to try to | ||||||
recover the source video for embedded videos. This is skipped if `None`. | ||||||
open_backend: If `True` (the default), attempt to open the video backend for | ||||||
I/O. If `False`, the backend will not be opened (useful for reading metadata | ||||||
when the video files are not available). | ||||||
""" | ||||||
backend_metadata = video_json["backend"] | ||||||
video_path = backend_metadata["filename"] | ||||||
|
@@ -65,23 +85,7 @@ | |||||
is_embedded = True | ||||||
|
||||||
# Basic path resolution. | ||||||
video_path = Path(Path(video_path).as_posix().replace("\\", "/")) | ||||||
|
||||||
try: | ||||||
if not video_path.exists(): | ||||||
# Check for the same filename in the same directory as the labels file. | ||||||
video_path_ = Path(labels_path).parent / video_path.name | ||||||
if video_path_.exists(): | ||||||
video_path = video_path_ | ||||||
else: | ||||||
# TODO (TP): Expand capabilities of path resolution to support more | ||||||
# complex path finding strategies. | ||||||
pass | ||||||
except OSError: | ||||||
pass | ||||||
|
||||||
# Convert video path to string. | ||||||
video_path = video_path.as_posix() | ||||||
video_path = Path(sanitize_filename(video_path)) | ||||||
|
||||||
if is_embedded: | ||||||
# Try to recover the source video. | ||||||
|
@@ -91,37 +95,63 @@ | |||||
f[f"video{video_ind}/source_video"].attrs["json"] | ||||||
) | ||||||
source_video = make_video( | ||||||
labels_path, source_video_json, video_ind=None | ||||||
labels_path, | ||||||
source_video_json, | ||||||
video_ind=None, | ||||||
open_backend=open_backend, | ||||||
) | ||||||
|
||||||
if "filenames" in backend_metadata: | ||||||
# This is an ImageVideo. | ||||||
# TODO: Path resolution. | ||||||
video_path = backend_metadata["filenames"] | ||||||
|
||||||
try: | ||||||
backend = VideoBackend.from_filename( | ||||||
video_path, | ||||||
dataset=backend_metadata.get("dataset", None), | ||||||
grayscale=backend_metadata.get("grayscale", None), | ||||||
input_format=backend_metadata.get("input_format", None), | ||||||
) | ||||||
except ValueError: | ||||||
backend = None | ||||||
backend = None | ||||||
if open_backend: | ||||||
try: | ||||||
if not is_file_accessible(video_path): | ||||||
# Check for the same filename in the same directory as the labels file. | ||||||
candidate_video_path = Path(labels_path).parent / video_path.name | ||||||
if is_file_accessible(candidate_video_path): | ||||||
video_path = candidate_video_path | ||||||
else: | ||||||
# TODO (TP): Expand capabilities of path resolution to support more | ||||||
# complex path finding strategies. | ||||||
pass | ||||||
except (OSError, PermissionError, FileNotFoundError): | ||||||
pass | ||||||
|
||||||
# Convert video path to string. | ||||||
video_path = video_path.as_posix() | ||||||
|
||||||
if "filenames" in backend_metadata: | ||||||
# This is an ImageVideo. | ||||||
# TODO: Path resolution. | ||||||
video_path = backend_metadata["filenames"] | ||||||
video_path = [Path(sanitize_filename(p)) for p in video_path] | ||||||
|
||||||
try: | ||||||
backend = VideoBackend.from_filename( | ||||||
video_path, | ||||||
dataset=backend_metadata.get("dataset", None), | ||||||
grayscale=backend_metadata.get("grayscale", None), | ||||||
input_format=backend_metadata.get("input_format", None), | ||||||
) | ||||||
except Exception: | ||||||
backend = None | ||||||
|
||||||
return Video( | ||||||
filename=video_path, | ||||||
backend=backend, | ||||||
backend_metadata=backend_metadata, | ||||||
source_video=source_video, | ||||||
open_backend=open_backend, | ||||||
) | ||||||
|
||||||
|
||||||
def read_videos(labels_path: str) -> list[Video]: | ||||||
def read_videos(labels_path: str, open_backend: bool = True) -> list[Video]: | ||||||
"""Read `Video` dataset in a SLEAP labels file. | ||||||
|
||||||
Args: | ||||||
labels_path: A string path to the SLEAP labels file. | ||||||
open_backend: If `True` (the default), attempt to open the video backend for | ||||||
I/O. If `False`, the backend will not be opened (useful for reading metadata | ||||||
when the video files are not available). | ||||||
|
||||||
Returns: | ||||||
A list of `Video` objects. | ||||||
|
@@ -131,7 +161,9 @@ | |||||
read_hdf5_dataset(labels_path, "videos_json") | ||||||
): | ||||||
video_json = json.loads(video_data) | ||||||
video = make_video(labels_path, video_json, video_ind=video_ind) | ||||||
video = make_video( | ||||||
labels_path, video_json, video_ind=video_ind, open_backend=open_backend | ||||||
) | ||||||
videos.append(video) | ||||||
return videos | ||||||
|
||||||
|
@@ -145,16 +177,17 @@ | |||||
Returns: | ||||||
A dictionary containing the video metadata. | ||||||
""" | ||||||
video_filename = sanitize_filename(video.filename) | ||||||
if video.backend is None: | ||||||
return {"filename": video.filename, "backend": video.backend_metadata} | ||||||
return {"filename": video_filename, "backend": video.backend_metadata} | ||||||
|
||||||
if type(video.backend) == MediaVideo: | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Use Instead of using Apply this change: - if type(video.backend) == MediaVideo:
+ if isinstance(video.backend, MediaVideo): Make similar changes for other type checks in this function (e.g., for 📝 Committable suggestion
Suggested change
🧰 Tools🪛 Ruff
|
||||||
return { | ||||||
"filename": video.filename, | ||||||
"filename": video_filename, | ||||||
"backend": { | ||||||
"type": "MediaVideo", | ||||||
"shape": video.shape, | ||||||
"filename": video.filename, | ||||||
"filename": video_filename, | ||||||
"grayscale": video.grayscale, | ||||||
"bgr": True, | ||||||
"dataset": "", | ||||||
|
@@ -164,12 +197,12 @@ | |||||
|
||||||
elif type(video.backend) == HDF5Video: | ||||||
return { | ||||||
"filename": video.filename, | ||||||
"filename": video_filename, | ||||||
"backend": { | ||||||
"type": "HDF5Video", | ||||||
"shape": video.shape, | ||||||
"filename": ( | ||||||
"." if video.backend.has_embedded_images else video.filename | ||||||
"." if video.backend.has_embedded_images else video_filename | ||||||
), | ||||||
"dataset": video.backend.dataset, | ||||||
"input_format": video.backend.input_format, | ||||||
|
@@ -180,12 +213,12 @@ | |||||
|
||||||
elif type(video.backend) == ImageVideo: | ||||||
return { | ||||||
"filename": video.filename, | ||||||
"filename": video_filename, | ||||||
"backend": { | ||||||
"type": "ImageVideo", | ||||||
"shape": video.shape, | ||||||
"filename": video.backend.filename[0], | ||||||
"filenames": video.backend.filename, | ||||||
"filename": sanitize_filename(video.backend.filename[0]), | ||||||
"filenames": sanitize_filename(video.backend.filename), | ||||||
"dataset": video.backend_metadata.get("dataset", None), | ||||||
"grayscale": video.grayscale, | ||||||
"input_format": video.backend_metadata.get("input_format", None), | ||||||
|
@@ -1003,17 +1036,20 @@ | |||||
) | ||||||
|
||||||
|
||||||
def read_labels(labels_path: str) -> Labels: | ||||||
def read_labels(labels_path: str, open_videos: bool = True) -> Labels: | ||||||
"""Read a SLEAP labels file. | ||||||
|
||||||
Args: | ||||||
labels_path: A string path to the SLEAP labels file. | ||||||
open_videos: If `True` (the default), attempt to open the video backend for | ||||||
I/O. If `False`, the backend will not be opened (useful for reading metadata | ||||||
when the video files are not available). | ||||||
|
||||||
Returns: | ||||||
The processed `Labels` object. | ||||||
""" | ||||||
tracks = read_tracks(labels_path) | ||||||
videos = read_videos(labels_path) | ||||||
videos = read_videos(labels_path, open_backend=open_videos) | ||||||
skeletons = read_skeletons(labels_path) | ||||||
points = read_points(labels_path) | ||||||
pred_points = read_pred_points(labels_path) | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Resolve type checking issues instead of using
# type: ignore
The use of
# type: ignore
suggests that the type checker is encountering issues with the generator expression. To improve code quality and maintainability, consider updating the type annotations or refactoring the code to address the underlying type errors instead of suppressing them.