Skip to content

Commit

Permalink
Safer video loading from SLP
Browse files Browse the repository at this point in the history
  • Loading branch information
talmo committed Sep 28, 2024
1 parent 7e91d04 commit f314fe6
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 38 deletions.
7 changes: 5 additions & 2 deletions sleap_io/io/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,19 @@
from pathlib import Path


def load_slp(filename: str) -> Labels:
def load_slp(filename: str, open_videos: bool = True) -> Labels:
"""Load a SLEAP dataset.
Args:
filename: Path to a SLEAP labels file (`.slp`).
open_videos: If `True` (the default), attempt to open the video backend for
I/O. If `False`, the backend will not be opened (useful for reading metadata
when the video files are not available).
Returns:
The dataset as a `Labels` object.
"""
return slp.read_labels(filename)
return slp.read_labels(filename, open_videos=open_videos)


def save_slp(
Expand Down
91 changes: 55 additions & 36 deletions sleap_io/io/slp.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@ class InstanceType(IntEnum):


def make_video(
labels_path: str, video_json: dict, video_ind: int | None = None
labels_path: str,
video_json: dict,
video_ind: int | None = None,
open_backend: bool = True,
) -> Video:
"""Create a `Video` object from a JSON dictionary.
Expand All @@ -53,6 +56,9 @@ def make_video(
video_json: A dictionary containing the video metadata.
video_ind: The index of the video in the labels file. This is used to try to
recover the source video for embedded videos. This is skipped if `None`.
open_backend: If `True` (the default), attempt to open the video backend for
I/O. If `False`, the backend will not be opened (useful for reading metadata
when the video files are not available).
"""
backend_metadata = video_json["backend"]
video_path = backend_metadata["filename"]
Expand All @@ -67,22 +73,6 @@ def make_video(
# Basic path resolution.
video_path = Path(Path(video_path).as_posix().replace("\\", "/"))

try:
if not video_path.exists():
# Check for the same filename in the same directory as the labels file.
video_path_ = Path(labels_path).parent / video_path.name
if video_path_.exists():
video_path = video_path_
else:
# TODO (TP): Expand capabilities of path resolution to support more
# complex path finding strategies.
pass
except OSError:
pass

# Convert video path to string.
video_path = video_path.as_posix()

if is_embedded:
# Try to recover the source video.
with h5py.File(labels_path, "r") as f:
Expand All @@ -91,23 +81,44 @@ def make_video(
f[f"video{video_ind}/source_video"].attrs["json"]
)
source_video = make_video(
labels_path, source_video_json, video_ind=None
labels_path,
source_video_json,
video_ind=None,
open_backend=open_backend,
)

if "filenames" in backend_metadata:
# This is an ImageVideo.
# TODO: Path resolution.
video_path = backend_metadata["filenames"]

try:
backend = VideoBackend.from_filename(
video_path,
dataset=backend_metadata.get("dataset", None),
grayscale=backend_metadata.get("grayscale", None),
input_format=backend_metadata.get("input_format", None),
)
except ValueError:
backend = None
backend = None
if open_backend:
try:
if not video_path.exists():
# Check for the same filename in the same directory as the labels file.
video_path_ = Path(labels_path).parent / video_path.name
if video_path_.exists() and video_path.stat():
video_path = video_path_
else:
# TODO (TP): Expand capabilities of path resolution to support more
# complex path finding strategies.
pass
except (OSError, PermissionError, FileNotFoundError):
pass

# Convert video path to string.
video_path = video_path.as_posix()

if "filenames" in backend_metadata:
# This is an ImageVideo.
# TODO: Path resolution.
video_path = backend_metadata["filenames"]

try:
backend = VideoBackend.from_filename(
video_path,
dataset=backend_metadata.get("dataset", None),
grayscale=backend_metadata.get("grayscale", None),
input_format=backend_metadata.get("input_format", None),
)
except:
backend = None

return Video(
filename=video_path,
Expand All @@ -117,11 +128,14 @@ def make_video(
)


def read_videos(labels_path: str) -> list[Video]:
def read_videos(labels_path: str, open_backend: bool = False) -> list[Video]:
"""Read `Video` dataset in a SLEAP labels file.
Args:
labels_path: A string path to the SLEAP labels file.
open_backend: If `True` (the default), attempt to open the video backend for
I/O. If `False`, the backend will not be opened (useful for reading metadata
when the video files are not available).
Returns:
A list of `Video` objects.
Expand All @@ -131,7 +145,9 @@ def read_videos(labels_path: str) -> list[Video]:
read_hdf5_dataset(labels_path, "videos_json")
):
video_json = json.loads(video_data)
video = make_video(labels_path, video_json, video_ind=video_ind)
video = make_video(
labels_path, video_json, video_ind=video_ind, open_backend=open_backend
)
videos.append(video)
return videos

Expand Down Expand Up @@ -1003,17 +1019,20 @@ def write_lfs(labels_path: str, labels: Labels):
)


def read_labels(labels_path: str) -> Labels:
def read_labels(labels_path: str, open_videos: bool = True) -> Labels:
"""Read a SLEAP labels file.
Args:
labels_path: A string path to the SLEAP labels file.
open_videos: If `True` (the default), attempt to open the video backend for
I/O. If `False`, the backend will not be opened (useful for reading metadata
when the video files are not available).
Returns:
The processed `Labels` object.
"""
tracks = read_tracks(labels_path)
videos = read_videos(labels_path)
videos = read_videos(labels_path, open_backend=open_videos)
skeletons = read_skeletons(labels_path)
points = read_points(labels_path)
pred_points = read_pred_points(labels_path)
Expand Down

0 comments on commit f314fe6

Please sign in to comment.