Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure incompletely extracted bin files get re-extracted #392

Merged
merged 25 commits into from
Jul 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
8221f4c
Ensure incompletely extracted bin files get re-extracted
alex-l-kong Jul 17, 2023
27d0d76
Fix path spec for re-extraction
alex-l-kong Jul 20, 2023
d8a5d68
Add some logging info to the re-extraction
alex-l-kong Jul 21, 2023
ba05308
Merge branch 'main' into incomplete_bins
alex-l-kong Jul 21, 2023
6f8cb53
Only re-extract FOVs where the .bin creation time is later than the .…
alex-l-kong Jul 21, 2023
d56e256
getmtime -> getctime
alex-l-kong Jul 21, 2023
c0e8828
When checking for FOV updates at the end of a run, skip Moly points
alex-l-kong Jul 21, 2023
32b797b
Clarify comment
alex-l-kong Jul 21, 2023
cf7b55e
Test created time updates of .bin files
alex-l-kong Jul 21, 2023
794c7df
Make sure to test for renaming every third file regardless of temp_bi…
alex-l-kong Jul 21, 2023
edb2a76
Test the Windows-specific creation time updates on CACs
alex-l-kong Jul 21, 2023
1673228
Test re-extraction process
alex-l-kong Jul 21, 2023
e000646
Use st_ctime for Linux compatibility
alex-l-kong Jul 21, 2023
bccd270
Fix the creation time extraction for Linux (again)...
alex-l-kong Jul 21, 2023
2ff2d89
Add print statements to see what's going on with Linux
alex-l-kong Jul 21, 2023
6467cd9
Need to print floats instead
alex-l-kong Jul 21, 2023
5ec6bb2
Change to st_ctime
alex-l-kong Jul 21, 2023
5e48e38
Just Path, not pathlib
alex-l-kong Jul 21, 2023
4d9bfe1
This should (hopefully) be cross-platform compatible
alex-l-kong Jul 21, 2023
58fc665
Merge branch 'main' into incomplete_bins
alex-l-kong Jul 24, 2023
d3b741d
Ensure the warning of re-extraction is explicitly tested against
alex-l-kong Jul 24, 2023
a03c124
Documentation
alex-l-kong Jul 24, 2023
8c23c52
Remove extraneous comments and variables
alex-l-kong Jul 25, 2023
86cd75e
Fix poetry.lock and pyproject.toml
alex-l-kong Jul 25, 2023
d29b855
Remove unnecessary imports
alex-l-kong Jul 25, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 80 additions & 2 deletions src/toffy/fov_watcher.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import logging
import os
import platform
import time
import warnings
from datetime import datetime
from multiprocessing import Lock
from pathlib import Path
from typing import Callable, Tuple, Union
from typing import Callable, Optional, Tuple, Union

import natsort as ns
import numpy as np
Expand Down Expand Up @@ -234,6 +235,15 @@ def __init__(
)

def _check_fov_status(self, path: str):
"""Verifies the status of the file written at `path`

Args:
path (str):
The path to check the status of
Returns:
Tuple[Optional[str], Optional[str]]:
The status of `path`, as well as the corresponding FOV name
"""
try:
fov_ready, point_name = self.run_structure.check_run_condition(path)
return fov_ready, point_name
Expand All @@ -254,6 +264,12 @@ def _check_fov_status(self, path: str):
return None, None

def _generate_callback_data(self, point_name: str):
"""Runs the `fov_func` and `inter_func` if applicable for a FOV

Args:
point_name (str):
The name of the FOV to run FOV (and intermediate if applicable) callbacks on
"""
print(f"Discovered {point_name}, beginning per-fov callbacks...")
logging.info(f'{datetime.now().strftime("%d/%m/%Y %H:%M:%S")} -- Extracting {point_name}\n')
logging.info(
Expand All @@ -280,6 +296,12 @@ def _generate_callback_data(self, point_name: str):
self.check_complete()

def _process_missed_fovs(self, path: str):
"""Given a `path`, check if there are any missing FOVs to process before it

Args:
path (str):
The path to check for missing FOVs prior
"""
# verify the path provided is correct .bin type, if not skip
filename = Path(path).parts[-1]
name_ext = filename.split(".")
Expand Down Expand Up @@ -316,6 +338,14 @@ def _process_missed_fovs(self, path: str):
self._fov_callback_driver(os.path.join(self.run_folder, fov_name + ".json"))

def _check_last_fov(self, path: str):
"""Checks if the last FOV's data has been written.

Needed because there won't be any more file triggers after this happens.

Args:
path (str):
The path that triggers this call. Used only for formatting purposes.
"""
# define the name of the last FOV
last_fov = f"fov-{self.run_structure.highest_fov}-scan-1"
last_fov_bin = f"{last_fov}.bin"
Expand Down Expand Up @@ -350,7 +380,44 @@ def _check_last_fov(self, path: str):
# explicitly call check_complete to start run callbacks, since all FOVs are done
self.check_complete()

def _check_bin_updates(self):
alex-l-kong marked this conversation as resolved.
Show resolved Hide resolved
"""Checks for, and re-runs if necessary, any incompletely extracted FOVs."""
for fov in self.run_structure.fov_progress:
# skip moly points
if fov in self.run_structure.moly_points:
continue

fov_bin_path = os.path.join(self.run_folder, fov + ".bin")
fov_json_path = os.path.join(self.run_folder, fov + ".json")

# if .bin file ctime > .json file ctime, incomplete extraction, need to re-extract
fov_bin_create = Path(fov_bin_path).stat().st_ctime
fov_json_create = Path(fov_json_path).stat().st_ctime

if fov_bin_create > fov_json_create:
warnings.warn(f"Re-extracting incompletely extracted FOV {fov}")
logging.info(
f'{datetime.now().strftime("%d/%m/%Y %H:%M:%S")} -- Re-extracting {fov}\n'
)
logging.info(
f'{datetime.now().strftime("%d/%m/%Y %H:%M:%S")} -- '
f"Running {self.fov_func.__name__} on {fov}\n"
)

# since reprocessing needed, remove from self.processed_fovs
self.run_structure.processed_fovs.remove(fov)

# re-extract the .bin file
# NOTE: since no more FOVs are being written, last_fov_num_processed is irrelevant
self._fov_callback_driver(fov_bin_path)

def _fov_callback_driver(self, file_trigger: str):
"""The FOV and intermediate-level callback motherbase for a single .bin file

Args:
file_trigger (str):
The file that gets caught by the watcher to throw into the pipeline
"""
# check if what's created is in the run structure
fov_ready, point_name = self._check_fov_status(file_trigger)

Expand All @@ -366,6 +433,14 @@ def _fov_callback_driver(self, file_trigger: str):
def _run_callbacks(
self, event: Union[DirCreatedEvent, FileCreatedEvent, FileMovedEvent], check_last_fov: bool
):
"""The pipeline runner, invoked when a new event is seen

Args:
event (Union[DirCreatedEvent, FileCreatedEvent, FileMovedEvent]):
The type of event seen. File/directory creation and file renaming are supported.
check_last_fov (bool):
Whether to invoke `_check_last_fov` on the event
"""
if type(event) in [DirCreatedEvent, FileCreatedEvent]:
file_trigger = event.src_path
else:
Expand Down Expand Up @@ -419,11 +494,14 @@ def on_moved(self, event: FileMovedEvent, check_last_fov: bool = True):
def check_complete(self):
"""Checks run structure fov_progress status

If run is complete, all calbacks in `per_run` will be run over the whole run.
If run is complete, all callbacks in `per_run` will be run over the whole run.

NOTE: bin files that had new data written will first need to be re-extracted.
"""

if all(self.run_structure.check_fov_progress().values()) and not self.all_fovs_complete:
self.all_fovs_complete = True
self._check_bin_updates()
logging.info(f'{datetime.now().strftime("%d/%m/%Y %H:%M:%S")} -- All FOVs finished\n')
logging.info(
f'{datetime.now().strftime("%d/%m/%Y %H:%M:%S")} -- '
Expand Down
1 change: 0 additions & 1 deletion src/toffy/normalize.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,6 @@ def combine_run_metrics(run_dir, substring, warn_overwrite=True):

if substring + "_combined.csv" in files:
if warn_overwrite:
print("Warning of overwriting!")
warnings.warn(
"Removing previously generated combined {} file in {}".format(substring, run_dir)
)
Expand Down
62 changes: 48 additions & 14 deletions tests/fov_watcher_test.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import os
import platform
import shutil
import subprocess
import tempfile
import time
import warnings
from datetime import datetime
from multiprocessing.pool import ThreadPool as Pool
from pathlib import Path
from unittest.mock import patch
Expand Down Expand Up @@ -67,6 +70,18 @@ def _slow_copy_sample_tissue_data(
else:
shutil.copy(tissue_path, dest)

# get all .bin files
bin_files = [bfile for bfile in sorted(os.listdir(COMBINED_DATA_PATH)) if ".bin" in bfile]

# simulate updating the creation time for some .bin files, this tests _check_bin_updates
for i, bfile in enumerate(bin_files):
if i % 2 == 0:
shutil.copy(
os.path.join(COMBINED_DATA_PATH, bfile), os.path.join(dest, bfile + ".temp")
)
os.remove(os.path.join(dest, bfile))
os.rename(os.path.join(dest, bfile + ".temp"), os.path.join(dest, bfile))
alex-l-kong marked this conversation as resolved.
Show resolved Hide resolved


COMBINED_RUN_JSON_SPOOF = {
"fovs": [
Expand Down Expand Up @@ -171,20 +186,39 @@ def test_watcher(

# watcher completion is checked every second
# zero-size files are halted for 1 second or until they have non zero-size
res_scan = pool.apply_async(
start_watcher,
(
run_data,
log_out,
fov_callback,
run_callback,
intermediate_callback,
1,
SLOW_COPY_INTERVAL_S,
),
)

res_scan.get()
if not add_blank:
with pytest.warns(
UserWarning, match="Re-extracting incompletely extracted FOV fov-1-scan-1"
):
res_scan = pool.apply_async(
start_watcher,
(
run_data,
log_out,
fov_callback,
run_callback,
intermediate_callback,
1,
SLOW_COPY_INTERVAL_S,
),
)

res_scan.get()
else:
res_scan = pool.apply_async(
start_watcher,
(
run_data,
log_out,
fov_callback,
run_callback,
intermediate_callback,
1,
SLOW_COPY_INTERVAL_S,
),
)

res_scan.get()

with open(os.path.join(log_out, "test_run_log.txt")) as f:
logtxt = f.read()
Expand Down
Loading