diff --git a/docs/index.md b/docs/index.md index ea3af789..41922c1d 100644 --- a/docs/index.md +++ b/docs/index.md @@ -134,14 +134,16 @@ labels.save("labels.slp") ```py import sleap_io as sio -labels = sio.load_file("labels.v001.slp") +# Load labels without trying to open the video files. +labels = sio.load_file("labels.v001.slp", open_videos=False) -# Fix paths using prefixes. +# Fix paths using prefix replacement. labels.replace_filenames(prefix_map={ "D:/data/sleap_projects": "/home/user/sleap_projects", "C:/Users/sleaper/Desktop/test": "/home/user/sleap_projects", }) +# Save labels with updated paths. labels.save("labels.v002.slp") ``` diff --git a/sleap_io/io/main.py b/sleap_io/io/main.py index 7fd702f7..c72017ea 100644 --- a/sleap_io/io/main.py +++ b/sleap_io/io/main.py @@ -7,16 +7,19 @@ from pathlib import Path -def load_slp(filename: str) -> Labels: +def load_slp(filename: str, open_videos: bool = True) -> Labels: """Load a SLEAP dataset. Args: filename: Path to a SLEAP labels file (`.slp`). + open_videos: If `True` (the default), attempt to open the video backend for + I/O. If `False`, the backend will not be opened (useful for reading metadata + when the video files are not available). Returns: The dataset as a `Labels` object. """ - return slp.read_labels(filename) + return slp.read_labels(filename, open_videos=open_videos) def save_slp( diff --git a/sleap_io/io/nwb.py b/sleap_io/io/nwb.py index 9314294a..b46dc23a 100644 --- a/sleap_io/io/nwb.py +++ b/sleap_io/io/nwb.py @@ -26,7 +26,75 @@ Instance, PredictedInstance, ) -from sleap_io.io.utils import convert_predictions_to_dataframe + + +def convert_predictions_to_dataframe(labels: Labels) -> pd.DataFrame: + """Convert predictions data to a Pandas dataframe. + + Args: + labels: A general label object. + + Returns: + pd.DataFrame: A pandas data frame with the structured data with + hierarchical columns. The column hierarchy is: + "video_path", + "skeleton_name", + "track_name", + "node_name", + And it is indexed by the frames. + + Raises: + ValueError: If no frames in the label objects contain predicted instances. + """ + # Form pairs of labeled_frames and predicted instances + labeled_frames = labels.labeled_frames + all_frame_instance_tuples = ( + (label_frame, instance) # type: ignore + for label_frame in labeled_frames + for instance in label_frame.predicted_instances + ) + + # Extract the data + data_list = list() + for labeled_frame, instance in all_frame_instance_tuples: + # Traverse the nodes of the instances's skeleton + skeleton = instance.skeleton + for node in skeleton.nodes: + row_dict = dict( + frame_idx=labeled_frame.frame_idx, + x=instance.points[node].x, + y=instance.points[node].y, + score=instance.points[node].score, # type: ignore[attr-defined] + node_name=node.name, + skeleton_name=skeleton.name, + track_name=instance.track.name if instance.track else "untracked", + video_path=labeled_frame.video.filename, + ) + data_list.append(row_dict) + + if not data_list: + raise ValueError("No predicted instances found in labels object") + + labels_df = pd.DataFrame(data_list) + + # Reformat the data with columns for dict-like hierarchical data access. + index = [ + "skeleton_name", + "track_name", + "node_name", + "video_path", + "frame_idx", + ] + + labels_tidy_df = ( + labels_df.set_index(index) + .unstack(level=[0, 1, 2, 3]) + .swaplevel(0, -1, axis=1) # video_path on top while x, y score on bottom + .sort_index(axis=1) # Better format for columns + .sort_index(axis=0) # Sorts by frames + ) + + return labels_tidy_df def get_timestamps(series: PoseEstimationSeries) -> np.ndarray: diff --git a/sleap_io/io/slp.py b/sleap_io/io/slp.py index 605545aa..777a3f98 100644 --- a/sleap_io/io/slp.py +++ b/sleap_io/io/slp.py @@ -21,10 +21,7 @@ Labels, ) from sleap_io.io.video import VideoBackend, ImageVideo, MediaVideo, HDF5Video -from sleap_io.io.utils import ( - read_hdf5_attrs, - read_hdf5_dataset, -) +from sleap_io.io.utils import read_hdf5_attrs, read_hdf5_dataset, is_file_accessible from enum import IntEnum from pathlib import Path import imageio.v3 as iio @@ -43,8 +40,28 @@ class InstanceType(IntEnum): PREDICTED = 1 +def sanitize_filename( + filename: str | Path | list[str] | list[Path], +) -> str | list[str]: + """Sanitize a filename to a canonical posix-compatible format. + + Args: + filename: A string or `Path` object or list of either to sanitize. + + Returns: + A sanitized filename as a string (or list of strings if a list was provided) + with forward slashes and posix-formatted. + """ + if isinstance(filename, list): + return [sanitize_filename(f) for f in filename] + return Path(filename).as_posix().replace("\\", "/") + + def make_video( - labels_path: str, video_json: dict, video_ind: int | None = None + labels_path: str, + video_json: dict, + video_ind: int | None = None, + open_backend: bool = True, ) -> Video: """Create a `Video` object from a JSON dictionary. @@ -53,6 +70,9 @@ def make_video( video_json: A dictionary containing the video metadata. video_ind: The index of the video in the labels file. This is used to try to recover the source video for embedded videos. This is skipped if `None`. + open_backend: If `True` (the default), attempt to open the video backend for + I/O. If `False`, the backend will not be opened (useful for reading metadata + when the video files are not available). """ backend_metadata = video_json["backend"] video_path = backend_metadata["filename"] @@ -65,23 +85,7 @@ def make_video( is_embedded = True # Basic path resolution. - video_path = Path(Path(video_path).as_posix().replace("\\", "/")) - - try: - if not video_path.exists(): - # Check for the same filename in the same directory as the labels file. - video_path_ = Path(labels_path).parent / video_path.name - if video_path_.exists(): - video_path = video_path_ - else: - # TODO (TP): Expand capabilities of path resolution to support more - # complex path finding strategies. - pass - except OSError: - pass - - # Convert video path to string. - video_path = video_path.as_posix() + video_path = Path(sanitize_filename(video_path)) if is_embedded: # Try to recover the source video. @@ -91,37 +95,63 @@ def make_video( f[f"video{video_ind}/source_video"].attrs["json"] ) source_video = make_video( - labels_path, source_video_json, video_ind=None + labels_path, + source_video_json, + video_ind=None, + open_backend=open_backend, ) - if "filenames" in backend_metadata: - # This is an ImageVideo. - # TODO: Path resolution. - video_path = backend_metadata["filenames"] - - try: - backend = VideoBackend.from_filename( - video_path, - dataset=backend_metadata.get("dataset", None), - grayscale=backend_metadata.get("grayscale", None), - input_format=backend_metadata.get("input_format", None), - ) - except ValueError: - backend = None + backend = None + if open_backend: + try: + if not is_file_accessible(video_path): + # Check for the same filename in the same directory as the labels file. + candidate_video_path = Path(labels_path).parent / video_path.name + if is_file_accessible(candidate_video_path): + video_path = candidate_video_path + else: + # TODO (TP): Expand capabilities of path resolution to support more + # complex path finding strategies. + pass + except (OSError, PermissionError, FileNotFoundError): + pass + + # Convert video path to string. + video_path = video_path.as_posix() + + if "filenames" in backend_metadata: + # This is an ImageVideo. + # TODO: Path resolution. + video_path = backend_metadata["filenames"] + video_path = [Path(sanitize_filename(p)) for p in video_path] + + try: + backend = VideoBackend.from_filename( + video_path, + dataset=backend_metadata.get("dataset", None), + grayscale=backend_metadata.get("grayscale", None), + input_format=backend_metadata.get("input_format", None), + ) + except Exception: + backend = None return Video( filename=video_path, backend=backend, backend_metadata=backend_metadata, source_video=source_video, + open_backend=open_backend, ) -def read_videos(labels_path: str) -> list[Video]: +def read_videos(labels_path: str, open_backend: bool = True) -> list[Video]: """Read `Video` dataset in a SLEAP labels file. Args: labels_path: A string path to the SLEAP labels file. + open_backend: If `True` (the default), attempt to open the video backend for + I/O. If `False`, the backend will not be opened (useful for reading metadata + when the video files are not available). Returns: A list of `Video` objects. @@ -131,7 +161,9 @@ def read_videos(labels_path: str) -> list[Video]: read_hdf5_dataset(labels_path, "videos_json") ): video_json = json.loads(video_data) - video = make_video(labels_path, video_json, video_ind=video_ind) + video = make_video( + labels_path, video_json, video_ind=video_ind, open_backend=open_backend + ) videos.append(video) return videos @@ -145,16 +177,17 @@ def video_to_dict(video: Video) -> dict: Returns: A dictionary containing the video metadata. """ + video_filename = sanitize_filename(video.filename) if video.backend is None: - return {"filename": video.filename, "backend": video.backend_metadata} + return {"filename": video_filename, "backend": video.backend_metadata} if type(video.backend) == MediaVideo: return { - "filename": video.filename, + "filename": video_filename, "backend": { "type": "MediaVideo", "shape": video.shape, - "filename": video.filename, + "filename": video_filename, "grayscale": video.grayscale, "bgr": True, "dataset": "", @@ -164,12 +197,12 @@ def video_to_dict(video: Video) -> dict: elif type(video.backend) == HDF5Video: return { - "filename": video.filename, + "filename": video_filename, "backend": { "type": "HDF5Video", "shape": video.shape, "filename": ( - "." if video.backend.has_embedded_images else video.filename + "." if video.backend.has_embedded_images else video_filename ), "dataset": video.backend.dataset, "input_format": video.backend.input_format, @@ -180,12 +213,12 @@ def video_to_dict(video: Video) -> dict: elif type(video.backend) == ImageVideo: return { - "filename": video.filename, + "filename": video_filename, "backend": { "type": "ImageVideo", "shape": video.shape, - "filename": video.backend.filename[0], - "filenames": video.backend.filename, + "filename": sanitize_filename(video.backend.filename[0]), + "filenames": sanitize_filename(video.backend.filename), "dataset": video.backend_metadata.get("dataset", None), "grayscale": video.grayscale, "input_format": video.backend_metadata.get("input_format", None), @@ -1003,17 +1036,20 @@ def write_lfs(labels_path: str, labels: Labels): ) -def read_labels(labels_path: str) -> Labels: +def read_labels(labels_path: str, open_videos: bool = True) -> Labels: """Read a SLEAP labels file. Args: labels_path: A string path to the SLEAP labels file. + open_videos: If `True` (the default), attempt to open the video backend for + I/O. If `False`, the backend will not be opened (useful for reading metadata + when the video files are not available). Returns: The processed `Labels` object. """ tracks = read_tracks(labels_path) - videos = read_videos(labels_path) + videos = read_videos(labels_path, open_backend=open_videos) skeletons = read_skeletons(labels_path) points = read_points(labels_path) pred_points = read_pred_points(labels_path) diff --git a/sleap_io/io/utils.py b/sleap_io/io/utils.py index 7b7b9764..a3394193 100644 --- a/sleap_io/io/utils.py +++ b/sleap_io/io/utils.py @@ -3,9 +3,8 @@ from __future__ import annotations import h5py # type: ignore[import] import numpy as np -import pandas as pd # type: ignore[import] -from typing import Any, Union, Optional, Generator -from sleap_io import Labels, LabeledFrame, PredictedInstance +from typing import Any, Union, Optional +from pathlib import Path def read_hdf5_dataset(filename: str, dataset: str) -> np.ndarray: @@ -175,72 +174,23 @@ def _overwrite_hdf5_attr( _overwrite_hdf5_attr(ds, attr_name, attr_value) -def convert_predictions_to_dataframe(labels: Labels) -> pd.DataFrame: - """Convert predictions data to a Pandas dataframe. +def is_file_accessible(filename: str | Path) -> bool: + """Check if a file is accessible. Args: - labels: A general label object. + filename: Path to a file. Returns: - pd.DataFrame: A pandas data frame with the structured data with - hierarchical columns. The column hierarchy is: - "video_path", - "skeleton_name", - "track_name", - "node_name", - And it is indexed by the frames. - - Raises: - ValueError: If no frames in the label objects contain predicted instances. + `True` if the file is accessible, `False` otherwise. + + Notes: + This checks if the file readable by the current user by reading one byte from + the file. """ - # Form pairs of labeled_frames and predicted instances - labeled_frames = labels.labeled_frames - all_frame_instance_tuples: Generator[ - tuple[LabeledFrame, PredictedInstance], None, None - ] = ( - (label_frame, instance) # type: ignore - for label_frame in labeled_frames - for instance in label_frame.predicted_instances - ) - - # Extract the data - data_list = list() - for labeled_frame, instance in all_frame_instance_tuples: - # Traverse the nodes of the instances's skeleton - skeleton = instance.skeleton - for node in skeleton.nodes: - row_dict = dict( - frame_idx=labeled_frame.frame_idx, - x=instance.points[node].x, - y=instance.points[node].y, - score=instance.points[node].score, # type: ignore[attr-defined] - node_name=node.name, - skeleton_name=skeleton.name, - track_name=instance.track.name if instance.track else "untracked", - video_path=labeled_frame.video.filename, - ) - data_list.append(row_dict) - - if not data_list: - raise ValueError("No predicted instances found in labels object") - - labels_df = pd.DataFrame(data_list) - - # Reformat the data with columns for dict-like hierarchical data access. - index = [ - "skeleton_name", - "track_name", - "node_name", - "video_path", - "frame_idx", - ] - - labels_tidy_df = ( - labels_df.set_index(index) - .unstack(level=[0, 1, 2, 3]) - .swaplevel(0, -1, axis=1) # video_path on top while x, y score on bottom - .sort_index(axis=1) # Better format for columns - .sort_index(axis=0) # Sorts by frames - ) - - return labels_tidy_df + filename = Path(filename) + try: + with open(filename, "rb") as f: + f.read(1) + return True + except (FileNotFoundError, PermissionError, OSError, ValueError): + return False diff --git a/sleap_io/model/video.py b/sleap_io/model/video.py index edd49489..d331a479 100644 --- a/sleap_io/model/video.py +++ b/sleap_io/model/video.py @@ -9,6 +9,7 @@ from typing import Tuple, Optional, Optional import numpy as np from sleap_io.io.video import VideoBackend, MediaVideo, HDF5Video, ImageVideo +from sleap_io.io.utils import is_file_accessible from pathlib import Path @@ -34,6 +35,11 @@ class Video: information) without having access to the video file itself. source_video: The source video object if this is a proxy video. This is present when the video contains an embedded subset of frames from another video. + open_backend: Whether to open the backend when the video is available. If `True` + (the default), the backend will be automatically opened if the video exists. + Set this to `False` when you want to manually open the backend, or when the + you know the video file does not exist and you want to avoid trying to open + the file. Notes: Instances of this class are hashed by identity, not by value. This means that @@ -47,12 +53,13 @@ class Video: backend: Optional[VideoBackend] = None backend_metadata: dict[str, any] = attrs.field(factory=dict) source_video: Optional[Video] = None + open_backend: bool = True EXTS = MediaVideo.EXTS + HDF5Video.EXTS + ImageVideo.EXTS def __attrs_post_init__(self): """Post init syntactic sugar.""" - if self.backend is None and self.exists(): + if self.open_backend and self.backend is None and self.exists(): self.open() @classmethod @@ -181,25 +188,34 @@ def __getitem__(self, inds: int | list[int] | slice) -> np.ndarray: See also: VideoBackend.get_frame, VideoBackend.get_frames """ if not self.is_open: - self.open() + if self.open_backend: + self.open() + else: + raise ValueError( + "Video backend is not open. Call video.open() or set " + "video.open_backend to True to do automatically on frame read." + ) return self.backend[inds] def exists(self, check_all: bool = False) -> bool: - """Check if the video file exists. + """Check if the video file exists and is accessible. Args: check_all: If `True`, check that all filenames in a list exist. If `False` (the default), check that the first filename exists. + + Returns: + `True` if the file exists and is accessible, `False` otherwise. """ if isinstance(self.filename, list): if check_all: for f in self.filename: - if not Path(f).exists(): + if not is_file_accessible(f): return False return True else: - return Path(self.filename[0]).exists() - return Path(self.filename).exists() + return is_file_accessible(self.filename[0]) + return is_file_accessible(self.filename) @property def is_open(self) -> bool: @@ -208,6 +224,7 @@ def is_open(self) -> bool: def open( self, + filename: Optional[str] = None, dataset: Optional[str] = None, grayscale: Optional[str] = None, keep_open: bool = True, @@ -215,6 +232,8 @@ def open( """Open the video backend for reading. Args: + filename: Filename to open. If not specified, will use the filename set on + the video object. dataset: Name of dataset in HDF5 file. grayscale: Whether to force grayscale. If None, autodetect on first frame load. @@ -231,6 +250,9 @@ def open( Values for the HDF5 dataset and grayscale will be remembered if not specified. """ + if filename is not None: + self.replace_filename(filename, open=False) + if not self.exists(): raise FileNotFoundError(f"Video file not found: {self.filename}") diff --git a/tests/io/test_slp.py b/tests/io/test_slp.py index cd37b9a1..47d879b2 100644 --- a/tests/io/test_slp.py +++ b/tests/io/test_slp.py @@ -38,8 +38,9 @@ import simplejson as json import pytest from pathlib import Path - +import shutil from sleap_io.io.video import ImageVideo, HDF5Video, MediaVideo +import sys def test_read_labels(slp_typical, slp_simple_skel, slp_minimal): @@ -354,3 +355,50 @@ def test_embed_two_rounds(tmpdir, slp_real_data): == "tests/data/videos/centered_pair_low_quality.mp4" ) assert type(labels3.video.backend) == MediaVideo + + +def test_lazy_video_read(slp_real_data): + labels = read_labels(slp_real_data) + assert type(labels.video.backend) == MediaVideo + assert labels.video.exists() + + labels = read_labels(slp_real_data, open_videos=False) + assert labels.video.backend is None + + +def test_video_path_resolution(slp_real_data, tmp_path): + labels = read_labels(slp_real_data) + assert ( + Path(labels.video.filename).as_posix() + == "tests/data/videos/centered_pair_low_quality.mp4" + ) + shutil.copyfile(labels.video.filename, tmp_path / "centered_pair_low_quality.mp4") + labels.video.replace_filename( + "fake/path/to/centered_pair_low_quality.mp4", open=False + ) + labels.save(tmp_path / "labels.slp") + + # Resolve when the same video filename is found in the labels directory. + labels = read_labels(tmp_path / "labels.slp") + assert ( + Path(labels.video.filename).as_posix() + == (tmp_path / "centered_pair_low_quality.mp4").as_posix() + ) + assert labels.video.exists() + + if sys.platform != "win32": # Windows does not support chmod. + # Make the video file inaccessible. + labels.video.replace_filename("new_fake/path/to/inaccessible.mp4", open=False) + labels.save(tmp_path / "labels2.slp") + shutil.copyfile( + tmp_path / "centered_pair_low_quality.mp4", tmp_path / "inaccessible.mp4" + ) + Path(tmp_path / "inaccessible.mp4").chmod(0o000) + + # Fail to resolve when the video file is inaccessible. + labels = read_labels(tmp_path / "labels2.slp") + assert not labels.video.exists() + assert ( + Path(labels.video.filename).as_posix() + == "new_fake/path/to/inaccessible.mp4" + ) diff --git a/tests/model/test_video.py b/tests/model/test_video.py index cc87f5a3..9c5e50e6 100644 --- a/tests/model/test_video.py +++ b/tests/model/test_video.py @@ -56,7 +56,7 @@ def test_video_exists(centered_pair_low_quality_video, centered_pair_frame_paths assert video.exists(check_all=True) is False -def test_video_open_close(centered_pair_low_quality_path): +def test_video_open_close(centered_pair_low_quality_path, centered_pair_frame_paths): video = Video(centered_pair_low_quality_path) assert video.is_open assert type(video.backend) == MediaVideo @@ -91,6 +91,10 @@ def test_video_open_close(centered_pair_low_quality_path): video.open(grayscale=True) assert video.shape == (1100, 384, 384, 1) + video.open(centered_pair_frame_paths) + assert video.shape == (3, 384, 384, 1) + assert type(video.backend) == ImageVideo + def test_video_replace_filename( centered_pair_low_quality_path, centered_pair_frame_paths @@ -142,3 +146,20 @@ def test_grayscale(centered_pair_low_quality_path): video.open() assert video.grayscale == True assert video.shape[-1] == 1 + + +def test_open_backend_preference(centered_pair_low_quality_path): + video = Video(centered_pair_low_quality_path) + assert video.is_open + assert type(video.backend) == MediaVideo + + video = Video(centered_pair_low_quality_path, open_backend=False) + assert video.is_open is False + assert video.backend is None + with pytest.raises(ValueError): + video[0] + + video.open_backend = True + img = video[0] + assert video.is_open + assert type(video.backend) == MediaVideo