diff --git a/src/toffy/fov_watcher.py b/src/toffy/fov_watcher.py index 8c8652b2..7e33dd16 100644 --- a/src/toffy/fov_watcher.py +++ b/src/toffy/fov_watcher.py @@ -45,7 +45,9 @@ def __init__(self, run_folder: str, timeout: int = 10 * 60): # find run .json and get parameters run_name = Path(run_folder).parts[-1] - run_metadata = read_json_file(os.path.join(run_folder, f"{run_name}.json")) + run_metadata = read_json_file( + os.path.join(run_folder, f"{run_name}.json"), encoding="utf-8" + ) # parse run_metadata and populate expected structure for fov in run_metadata.get("fovs", ()): @@ -521,6 +523,7 @@ def start_watcher( fov_callback: Callable[[str, str], None], run_callback: Callable[[None], None], intermediate_callback: Callable[[str, str], None] = None, + run_folder_timeout: int = 5400, completion_check_time: int = 30, zero_size_timeout: int = 7800, ): @@ -540,12 +543,38 @@ def start_watcher( intermediate_callback (Callable[[None], None]): function defined as run callback overriden as fov callback. assemble this using `watcher_callbacks.build_callbacks` + run_folder_timeout (int): + how long to wait for the run folder to appear before timing out, in seconds. + note that the watcher cannot begin until this run folder appears. completion_check_time (int): how long to wait before checking watcher completion, in seconds. note, this doesn't effect the watcher itself, just when this wrapper function exits. zero_size_timeout (int): number of seconds to wait for non-zero file size """ + # if the run folder specified isn't already there, ask the user to explicitly confirm the name + if not os.path.exists(run_folder): + warnings.warn( + f"Waiting for {run_folder}. Please first double check that your run data " + "doesn't already exist under a slightly different name in D:\\Data. " + "Sometimes, the CACs change capitalization or add extra characters to the run folder. " + "If this happens, stop the watcher and update the run_name variable in the notebook " + "before trying again." + ) + + # allow the watcher to poll the run folder until it appears or times out + run_folder_wait_time = 0 + while not os.path.exists(run_folder) and run_folder_wait_time < run_folder_timeout: + time.sleep(run_folder_timeout / 10) + run_folder_wait_time += run_folder_timeout / 10 + + if run_folder_wait_time == run_folder_timeout: + raise FileNotFoundError( + f"Timed out waiting for {run_folder}. Make sure the run_name variable in the notebook " + "matches up with the run folder name in D:\\Data, or try again a few minutes later " + "if the run folder still hasn't shown up." + ) + observer = Observer() event_handler = FOV_EventHandler( run_folder, log_folder, fov_callback, run_callback, intermediate_callback, zero_size_timeout diff --git a/tests/fov_watcher_test.py b/tests/fov_watcher_test.py index 0ae12af3..fd551e3e 100644 --- a/tests/fov_watcher_test.py +++ b/tests/fov_watcher_test.py @@ -4,6 +4,7 @@ import time import warnings from datetime import datetime +from multiprocessing import TimeoutError from multiprocessing.pool import ThreadPool as Pool from pathlib import Path from unittest.mock import patch @@ -127,6 +128,100 @@ def test_run_structure(run_json, expected_files, recwarn): assert not exist and name == "" +def _slow_create_run_folder(run_folder_path: str, lag_time: int): + time.sleep(lag_time) + os.makedirs(run_folder_path) + write_json_file( + json_path=os.path.join(run_folder_path, "test_run.json"), + json_object=COMBINED_RUN_JSON_SPOOF, + encoding="utf-8", + ) + + +@patch("toffy.watcher_callbacks.visualize_qc_metrics", side_effect=mock_visualize_qc_metrics) +@patch("toffy.watcher_callbacks.visualize_mph", side_effect=mock_visualize_mph) +@pytest.mark.parametrize("run_folder_lag", [0, 5, 15]) +@parametrize_with_cases( + "run_cbs,int_cbs,fov_cbs,kwargs,validators,watcher_start_lag,existing_data", + cases=WatcherCases.case_default, +) +def test_watcher_run_timeout( + mock_viz_qc, + mock_viz_mph, + run_cbs, + int_cbs, + fov_cbs, + kwargs, + validators, + watcher_start_lag, + existing_data, + run_folder_lag, +): + with tempfile.TemporaryDirectory() as tmpdir: + tiff_out_dir = os.path.join(tmpdir, "cb_0", RUN_DIR_NAME) + qc_out_dir = os.path.join(tmpdir, "cb_1", RUN_DIR_NAME) + mph_out_dir = os.path.join(tmpdir, "cb_2", RUN_DIR_NAME) + plot_dir = os.path.join(tmpdir, "cb_2_plots", RUN_DIR_NAME) + pulse_out_dir = os.path.join(tmpdir, "cb_3", RUN_DIR_NAME) + stitched_dir = os.path.join(tmpdir, "cb_0", RUN_DIR_NAME, f"{RUN_DIR_NAME}_stitched") + + # add directories to kwargs + kwargs["tiff_out_dir"] = tiff_out_dir + kwargs["qc_out_dir"] = qc_out_dir + kwargs["mph_out_dir"] = mph_out_dir + kwargs["pulse_out_dir"] = pulse_out_dir + kwargs["plot_dir"] = plot_dir + + # ensure warn_overwrite set to False if intermediate callbacks set, otherwise True + kwargs["warn_overwrite"] = True if int_cbs else False + + run_folder = os.path.join(tmpdir, "test_run") + log_out = os.path.join(tmpdir, "log_output") + fov_callback, run_callback, intermediate_callback = build_callbacks( + run_cbs, int_cbs, fov_cbs, **kwargs + ) + + with Pool(processes=4) as pool: + pool.apply_async(_slow_create_run_folder, (run_folder, run_folder_lag)) + + if run_folder_lag > 10: + with pytest.raises(FileNotFoundError, match=f"Timed out waiting for {run_folder}"): + res_scan = pool.apply_async( + start_watcher, + ( + run_folder, + log_out, + fov_callback, + run_callback, + intermediate_callback, + 10, + 1, + SLOW_COPY_INTERVAL_S, + ), + ) + + res_scan.get() + else: + res_scan = pool.apply_async( + start_watcher, + ( + run_folder, + log_out, + fov_callback, + run_callback, + intermediate_callback, + 10, + 1, + SLOW_COPY_INTERVAL_S, + ), + ) + + try: + res_scan.get(timeout=7) + except TimeoutError: + return + + @patch("toffy.watcher_callbacks.visualize_qc_metrics", side_effect=mock_visualize_qc_metrics) @patch("toffy.watcher_callbacks.visualize_mph", side_effect=mock_visualize_mph) @pytest.mark.parametrize("add_blank", [False, True]) @@ -175,6 +270,7 @@ def test_watcher( write_json_file( json_path=os.path.join(run_data, "test_run.json"), json_object=COMBINED_RUN_JSON_SPOOF, + encoding="utf-8", ) # if existing_data set to True, test case where a FOV has already been extracted @@ -238,6 +334,7 @@ def test_watcher( fov_callback, run_callback, intermediate_callback, + 2700, 1, SLOW_COPY_INTERVAL_S, ), @@ -253,6 +350,7 @@ def test_watcher( fov_callback, run_callback, intermediate_callback, + 2700, 1, SLOW_COPY_INTERVAL_S, ),