Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Safer video loading from SLP #119

Merged
merged 8 commits into from
Sep 29, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions sleap_io/io/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,19 @@
from pathlib import Path


def load_slp(filename: str) -> Labels:
def load_slp(filename: str, open_videos: bool = True) -> Labels:
"""Load a SLEAP dataset.

Args:
filename: Path to a SLEAP labels file (`.slp`).
open_videos: If `True` (the default), attempt to open the video backend for
I/O. If `False`, the backend will not be opened (useful for reading metadata
when the video files are not available).

Returns:
The dataset as a `Labels` object.
"""
return slp.read_labels(filename)
return slp.read_labels(filename, open_videos=open_videos)


def save_slp(
Expand Down
129 changes: 84 additions & 45 deletions sleap_io/io/slp.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,28 @@ class InstanceType(IntEnum):
PREDICTED = 1


def sanitize_filename(
filename: str | Path | list[str] | list[Path],
) -> str | list[str]:
"""Sanitize a filename to a canonical posix-compatible format.

Args:
filename: A string or `Path` object or list of either to sanitize.

Returns:
A sanitized filename as a string (or list of strings if a list was provided)
with forward slashes and posix-formatted.
"""
if isinstance(filename, list):
return [sanitize_filename(f) for f in filename]
return Path(filename).as_posix().replace("\\", "/")


def make_video(
labels_path: str, video_json: dict, video_ind: int | None = None
labels_path: str,
video_json: dict,
video_ind: int | None = None,
open_backend: bool = True,
) -> Video:
"""Create a `Video` object from a JSON dictionary.

Expand All @@ -53,6 +73,9 @@ def make_video(
video_json: A dictionary containing the video metadata.
video_ind: The index of the video in the labels file. This is used to try to
recover the source video for embedded videos. This is skipped if `None`.
open_backend: If `True` (the default), attempt to open the video backend for
I/O. If `False`, the backend will not be opened (useful for reading metadata
when the video files are not available).
"""
backend_metadata = video_json["backend"]
video_path = backend_metadata["filename"]
Expand All @@ -65,23 +88,7 @@ def make_video(
is_embedded = True

# Basic path resolution.
video_path = Path(Path(video_path).as_posix().replace("\\", "/"))

try:
if not video_path.exists():
# Check for the same filename in the same directory as the labels file.
video_path_ = Path(labels_path).parent / video_path.name
if video_path_.exists():
video_path = video_path_
else:
# TODO (TP): Expand capabilities of path resolution to support more
# complex path finding strategies.
pass
except OSError:
pass

# Convert video path to string.
video_path = video_path.as_posix()
video_path = Path(sanitize_filename(video_path))

if is_embedded:
# Try to recover the source video.
Expand All @@ -91,37 +98,63 @@ def make_video(
f[f"video{video_ind}/source_video"].attrs["json"]
)
source_video = make_video(
labels_path, source_video_json, video_ind=None
labels_path,
source_video_json,
video_ind=None,
open_backend=open_backend,
)

if "filenames" in backend_metadata:
# This is an ImageVideo.
# TODO: Path resolution.
video_path = backend_metadata["filenames"]

try:
backend = VideoBackend.from_filename(
video_path,
dataset=backend_metadata.get("dataset", None),
grayscale=backend_metadata.get("grayscale", None),
input_format=backend_metadata.get("input_format", None),
)
except ValueError:
backend = None
backend = None
if open_backend:
try:
if not video_path.exists():
# Check for the same filename in the same directory as the labels file.
video_path_ = Path(labels_path).parent / video_path.name
if video_path_.exists() and video_path.stat():
video_path = video_path_
else:
# TODO (TP): Expand capabilities of path resolution to support more
# complex path finding strategies.
pass
except (OSError, PermissionError, FileNotFoundError):
pass

# Convert video path to string.
video_path = video_path.as_posix()

if "filenames" in backend_metadata:
# This is an ImageVideo.
# TODO: Path resolution.
video_path = backend_metadata["filenames"]
video_path = [Path(sanitize_filename(p)) for p in video_path]

try:
backend = VideoBackend.from_filename(
video_path,
dataset=backend_metadata.get("dataset", None),
grayscale=backend_metadata.get("grayscale", None),
input_format=backend_metadata.get("input_format", None),
)
except Exception:
backend = None

return Video(
filename=video_path,
backend=backend,
backend_metadata=backend_metadata,
source_video=source_video,
open_backend=open_backend,
)


def read_videos(labels_path: str) -> list[Video]:
def read_videos(labels_path: str, open_backend: bool = True) -> list[Video]:
"""Read `Video` dataset in a SLEAP labels file.

Args:
labels_path: A string path to the SLEAP labels file.
open_backend: If `True` (the default), attempt to open the video backend for
I/O. If `False`, the backend will not be opened (useful for reading metadata
when the video files are not available).

Returns:
A list of `Video` objects.
Expand All @@ -131,7 +164,9 @@ def read_videos(labels_path: str) -> list[Video]:
read_hdf5_dataset(labels_path, "videos_json")
):
video_json = json.loads(video_data)
video = make_video(labels_path, video_json, video_ind=video_ind)
video = make_video(
labels_path, video_json, video_ind=video_ind, open_backend=open_backend
)
videos.append(video)
return videos

Expand All @@ -145,16 +180,17 @@ def video_to_dict(video: Video) -> dict:
Returns:
A dictionary containing the video metadata.
"""
video_filename = sanitize_filename(video.filename)
if video.backend is None:
return {"filename": video.filename, "backend": video.backend_metadata}
return {"filename": video_filename, "backend": video.backend_metadata}

if type(video.backend) == MediaVideo:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Use isinstance() for type checking

Instead of using type(video.backend) == MediaVideo, it's more pythonic and flexible to use isinstance() for type checking. This allows for subclass comparisons and is the recommended approach in Python.

Apply this change:

-    if type(video.backend) == MediaVideo:
+    if isinstance(video.backend, MediaVideo):

Make similar changes for other type checks in this function (e.g., for HDF5Video and ImageVideo).

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if type(video.backend) == MediaVideo:
if isinstance(video.backend, MediaVideo):
🧰 Tools
🪛 Ruff

184-184: Use is and is not for type comparisons, or isinstance() for isinstance checks

(E721)

return {
"filename": video.filename,
"filename": video_filename,
"backend": {
"type": "MediaVideo",
"shape": video.shape,
"filename": video.filename,
"filename": video_filename,
"grayscale": video.grayscale,
"bgr": True,
"dataset": "",
Expand All @@ -164,12 +200,12 @@ def video_to_dict(video: Video) -> dict:

elif type(video.backend) == HDF5Video:
return {
"filename": video.filename,
"filename": video_filename,
"backend": {
"type": "HDF5Video",
"shape": video.shape,
"filename": (
"." if video.backend.has_embedded_images else video.filename
"." if video.backend.has_embedded_images else video_filename
),
"dataset": video.backend.dataset,
"input_format": video.backend.input_format,
Expand All @@ -180,12 +216,12 @@ def video_to_dict(video: Video) -> dict:

elif type(video.backend) == ImageVideo:
return {
"filename": video.filename,
"filename": video_filename,
"backend": {
"type": "ImageVideo",
"shape": video.shape,
"filename": video.backend.filename[0],
"filenames": video.backend.filename,
"filename": sanitize_filename(video.backend.filename[0]),
"filenames": sanitize_filename(video.backend.filename),
"dataset": video.backend_metadata.get("dataset", None),
"grayscale": video.grayscale,
"input_format": video.backend_metadata.get("input_format", None),
Expand Down Expand Up @@ -1003,17 +1039,20 @@ def write_lfs(labels_path: str, labels: Labels):
)


def read_labels(labels_path: str) -> Labels:
def read_labels(labels_path: str, open_videos: bool = True) -> Labels:
"""Read a SLEAP labels file.

Args:
labels_path: A string path to the SLEAP labels file.
open_videos: If `True` (the default), attempt to open the video backend for
I/O. If `False`, the backend will not be opened (useful for reading metadata
when the video files are not available).

Returns:
The processed `Labels` object.
"""
tracks = read_tracks(labels_path)
videos = read_videos(labels_path)
videos = read_videos(labels_path, open_backend=open_videos)
skeletons = read_skeletons(labels_path)
points = read_points(labels_path)
pred_points = read_pred_points(labels_path)
Expand Down
22 changes: 20 additions & 2 deletions sleap_io/model/video.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ class Video:
information) without having access to the video file itself.
source_video: The source video object if this is a proxy video. This is present
when the video contains an embedded subset of frames from another video.
open_backend: Whether to open the backend when the video is available. If `True`
(the default), the backend will be automatically opened if the video exists.
Set this to `False` when you want to manually open the backend, or when the
you know the video file does not exist and you want to avoid trying to open
the file.

Notes:
Instances of this class are hashed by identity, not by value. This means that
Expand All @@ -47,12 +52,13 @@ class Video:
backend: Optional[VideoBackend] = None
backend_metadata: dict[str, any] = attrs.field(factory=dict)
source_video: Optional[Video] = None
open_backend: bool = True

EXTS = MediaVideo.EXTS + HDF5Video.EXTS + ImageVideo.EXTS

def __attrs_post_init__(self):
"""Post init syntactic sugar."""
if self.backend is None and self.exists():
if self.open_backend and self.backend is None and self.exists():
self.open()

@classmethod
Expand Down Expand Up @@ -181,7 +187,13 @@ def __getitem__(self, inds: int | list[int] | slice) -> np.ndarray:
See also: VideoBackend.get_frame, VideoBackend.get_frames
"""
if not self.is_open:
self.open()
if self.open_backend:
self.open()
else:
raise ValueError(
"Video backend is not open. Call video.open() or set "
"video.open_backend to True to do automatically on frame read."
)
return self.backend[inds]

def exists(self, check_all: bool = False) -> bool:
Expand All @@ -208,13 +220,16 @@ def is_open(self) -> bool:

def open(
self,
filename: Optional[str] = None,
dataset: Optional[str] = None,
grayscale: Optional[str] = None,
keep_open: bool = True,
):
"""Open the video backend for reading.

Args:
filename: Filename to open. If not specified, will use the filename set on
the video object.
dataset: Name of dataset in HDF5 file.
grayscale: Whether to force grayscale. If None, autodetect on first frame
load.
Expand All @@ -231,6 +246,9 @@ def open(
Values for the HDF5 dataset and grayscale will be remembered if not
specified.
"""
if filename is not None:
self.replace_filename(filename, open=False)

if not self.exists():
raise FileNotFoundError(f"Video file not found: {self.filename}")

Expand Down
9 changes: 9 additions & 0 deletions tests/io/test_slp.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,3 +354,12 @@ def test_embed_two_rounds(tmpdir, slp_real_data):
== "tests/data/videos/centered_pair_low_quality.mp4"
)
assert type(labels3.video.backend) == MediaVideo


def test_lazy_video_read(slp_real_data):
labels = read_labels(slp_real_data)
assert type(labels.video.backend) == MediaVideo
assert labels.video.exists()

labels = read_labels(slp_real_data, open_videos=False)
assert labels.video.backend is None
Comment on lines +360 to +366
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Enhance test coverage and improve type checking

While the test function effectively verifies the behavior of the open_videos parameter, consider the following improvements:

  1. Use isinstance() for type checking instead of direct type comparison. This adheres to Python best practices and is more flexible.
  2. Add assertions to verify that labels are still correctly loaded when open_videos=False. This ensures that the lazy loading doesn't affect the integrity of the label data.
  3. Consider using assertIsNone() for clarity in the second assertion.

Here's a suggested improvement:

def test_lazy_video_read(slp_real_data):
    # Test default behavior
    labels = read_labels(slp_real_data)
    assert isinstance(labels.video.backend, MediaVideo)
    assert labels.video.exists()
    
    # Test lazy loading behavior
    lazy_labels = read_labels(slp_real_data, open_videos=False)
    assert lazy_labels.video.backend is None
    
    # Verify that labels are still correctly loaded
    assert len(lazy_labels) == len(labels)
    assert lazy_labels.skeleton == labels.skeleton
    # Add more assertions to verify the integrity of lazy-loaded labels

This refactored version improves type checking, clarifies the assertions, and adds a placeholder for additional verification of lazy-loaded labels.

🧰 Tools
🪛 Ruff

361-361: Use is and is not for type comparisons, or isinstance() for isinstance checks

(E721)

Loading