Skip to content

Commit

Permalink
Skip watcher callbacks for FOVs that already have data generated (#394)
Browse files Browse the repository at this point in the history
* Skip FOVs that have already had data generated

* Fix pulse height path check

* Initial testing phase of watcher with multiple files extracted

* Make sure test cases create the dummy data correctly for existing_data

* Ensure the test actually creates dummy images for fov-2-scan-1

* Make sure imsave is actually imported

* Fix sizing of the test images generated

* Combine file names into one f-string

* More f-string changes

* Ensure that the case where not all channel tiffs written also accounted for

* Remove extraneous print statements

* Make sure to account for the fact that corrupted FOVs will need to be re-written no matter what
  • Loading branch information
alex-l-kong authored Jul 27, 2023
1 parent 9b3155f commit 8ce956a
Show file tree
Hide file tree
Showing 4 changed files with 160 additions and 23 deletions.
14 changes: 9 additions & 5 deletions src/toffy/fov_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,12 +263,14 @@ def _check_fov_status(self, path: str):

return None, None

def _generate_callback_data(self, point_name: str):
def _generate_callback_data(self, point_name: str, overwrite: bool):
"""Runs the `fov_func` and `inter_func` if applicable for a FOV
Args:
point_name (str):
The name of the FOV to run FOV (and intermediate if applicable) callbacks on
overwrite (bool):
Forces an overwrite of already existing data, needed if a FOV needs re-extraction
"""
print(f"Discovered {point_name}, beginning per-fov callbacks...")
logging.info(f'{datetime.now().strftime("%d/%m/%Y %H:%M:%S")} -- Extracting {point_name}\n')
Expand All @@ -277,7 +279,7 @@ def _generate_callback_data(self, point_name: str):
f"Running {self.fov_func.__name__} on {point_name}\n"
)

self.fov_func(self.run_folder, point_name)
self.fov_func(self.run_folder, point_name, overwrite)
self.run_structure.processed(point_name)

if self.inter_func:
Expand Down Expand Up @@ -409,20 +411,22 @@ def _check_bin_updates(self):

# re-extract the .bin file
# NOTE: since no more FOVs are being written, last_fov_num_processed is irrelevant
self._fov_callback_driver(fov_bin_path)
self._fov_callback_driver(fov_bin_path, overwrite=True)

def _fov_callback_driver(self, file_trigger: str):
def _fov_callback_driver(self, file_trigger: str, overwrite: bool = False):
"""The FOV and intermediate-level callback motherbase for a single .bin file
Args:
file_trigger (str):
The file that gets caught by the watcher to throw into the pipeline
overwrite (bool):
Forces an overwrite of already existing data, needed if a FOV needs re-extraction
"""
# check if what's created is in the run structure
fov_ready, point_name = self._check_fov_status(file_trigger)

if fov_ready:
self._generate_callback_data(point_name)
self._generate_callback_data(point_name, overwrite=overwrite)

# needs to update if .bin file processed OR new moly point detected
is_moly = point_name in self.run_structure.moly_points
Expand Down
47 changes: 43 additions & 4 deletions src/toffy/watcher_callbacks.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import inspect
import os
import warnings
from dataclasses import dataclass, field
from typing import Iterable

Expand All @@ -10,7 +11,7 @@

import pandas as pd
import xarray as xr
from alpineer import misc_utils
from alpineer import io_utils, misc_utils
from mibi_bin_tools.bin_files import _write_out, extract_bin_files
from mibi_bin_tools.type_utils import any_true

Expand Down Expand Up @@ -121,6 +122,7 @@ def image_stitching(self, tiff_out_dir, **kwargs):
class FovCallbacks:
run_folder: str
point_name: str
overwrite: bool
__panel: pd.DataFrame = field(default=None, init=False)
__fov_data: xr.DataArray = field(default=None, init=False)

Expand Down Expand Up @@ -179,6 +181,24 @@ def extract_tiffs(self, tiff_out_dir: str, panel: pd.DataFrame, **kwargs):
if not os.path.exists(tiff_out_dir):
os.makedirs(tiff_out_dir)

extracted_img_dir = os.path.join(tiff_out_dir, self.point_name)
unextracted_chan_tiffs = []

# in the case all images have been extracted, simply return
if os.path.exists(extracted_img_dir) and not self.overwrite:
all_chan_tiffs = [f"{ct}.tiff" for ct in panel["Target"]]
extracted_chan_tiffs = io_utils.list_files(extracted_img_dir, substrs=".tiff")
unextracted_chan_tiffs = set(all_chan_tiffs).difference(extracted_chan_tiffs)

if len(unextracted_chan_tiffs) == 0:
warnings.warn(f"Images already extracted for FOV {self.point_name}")
return

# ensure we don't re-extract channels that have already been extracted
if unextracted_chan_tiffs and not self.overwrite:
unextracted_chans = io_utils.remove_file_extensions(unextracted_chan_tiffs)
panel = panel[panel["Target"].isin(unextracted_chans)]

if self.__fov_data is None:
self._generate_fov_data(panel, **kwargs)

Expand Down Expand Up @@ -216,6 +236,15 @@ def generate_qc(self, qc_out_dir: str, panel: pd.DataFrame = None, **kwargs):
raise ValueError("Must provide panel if fov data is not already generated...")
self._generate_fov_data(panel, **kwargs)

qc_metric_paths = [
os.path.join(qc_out_dir, f"{self.point_name}_nonzero_mean_stats.csv"),
os.path.join(qc_out_dir, f"{self.point_name}_total_intensity_stats.csv"),
os.path.join(qc_out_dir, f"{self.point_name}_percentile_99_9_stats.csv"),
]
if all([os.path.exists(qc_file) for qc_file in qc_metric_paths]) and not self.overwrite:
warnings.warn(f"All QC metrics already extracted for FOV {self.point_name}")
return

metric_data = compute_qc_metrics_direct(
image_data=self.__fov_data,
fov_name=self.point_name,
Expand All @@ -232,7 +261,7 @@ def generate_mph(self, mph_out_dir, **kwargs):
Args:
mph_out_dir (str): where to output mph csvs to
**kwargs (dict):
Additional arguments for `toffy.qc_comp.compute_mph_metrics`. Accepted kwargs are:
Additional arguments for `toffy.mph_comp.compute_mph_metrics`. Accepted kwargs are:
- mass
- mass_start
Expand All @@ -242,6 +271,11 @@ def generate_mph(self, mph_out_dir, **kwargs):
if not os.path.exists(mph_out_dir):
os.makedirs(mph_out_dir)

mph_pulse_file = os.path.join(mph_out_dir, f"{self.point_name}-mph_pulse.csv")
if os.path.exists(mph_pulse_file) and not self.overwrite:
warnings.warn(f"MPH pulse metrics already extracted for FOV {self.point_name}")
return

compute_mph_metrics(
bin_file_dir=self.run_folder,
csv_dir=mph_out_dir,
Expand All @@ -267,6 +301,11 @@ def generate_pulse_heights(self, pulse_out_dir: str, panel: pd.DataFrame = None,
if not os.path.exists(pulse_out_dir):
os.makedirs(pulse_out_dir)

pulse_height_file = os.path.join(pulse_out_dir, f"{self.point_name}-pulse_heights.csv")
if os.path.exists(pulse_height_file) and not self.overwrite:
warnings.warn(f"Pulse heights per mass already extracted for FOV {self.point_name}")
return

write_mph_per_mass(
base_dir=self.run_folder,
output_dir=pulse_out_dir,
Expand Down Expand Up @@ -306,9 +345,9 @@ def build_fov_callback(*args, **kwargs):
misc_utils.verify_in_list(required_arguments=argnames, passed_arguments=list(kwargs.keys()))

# construct actual callback
def fov_callback(run_folder: str, point_name: str):
def fov_callback(run_folder: str, point_name: str, overwrite: bool = False):
# construct FovCallback object for given FoV
callback_obj = FovCallbacks(run_folder, point_name)
callback_obj = FovCallbacks(run_folder, point_name, overwrite)

# for each member, retrieve the member function and run it
for arg in args:
Expand Down
64 changes: 52 additions & 12 deletions tests/fov_watcher_test.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import os
import platform
import shutil
import subprocess
import tempfile
import time
import warnings
Expand All @@ -10,15 +8,20 @@
from pathlib import Path
from unittest.mock import patch

import numpy as np
import pandas as pd
import pytest
from alpineer import io_utils
from pytest_cases import parametrize_with_cases
from skimage.io import imsave

from toffy.fov_watcher import start_watcher
from toffy.json_utils import write_json_file
from toffy.settings import QC_COLUMNS, QC_SUFFIXES
from toffy.watcher_callbacks import build_callbacks

from .utils.test_utils import (
TEST_CHANNELS,
RunStructureCases,
RunStructureTestContext,
WatcherCases,
Expand Down Expand Up @@ -128,8 +131,9 @@ def test_run_structure(run_json, expected_files, recwarn):
@patch("toffy.watcher_callbacks.visualize_mph", side_effect=mock_visualize_mph)
@pytest.mark.parametrize("add_blank", [False, True])
@pytest.mark.parametrize("temp_bin", [False, True])
@pytest.mark.parametrize("watcher_start_lag", [4, 8, 12])
@parametrize_with_cases("run_cbs, int_cbs, fov_cbs, kwargs, validators", cases=WatcherCases)
@parametrize_with_cases(
"run_cbs,int_cbs,fov_cbs,kwargs,validators,watcher_start_lag,existing_data", cases=WatcherCases
)
def test_watcher(
mock_viz_qc,
mock_viz_mph,
Expand All @@ -138,9 +142,10 @@ def test_watcher(
fov_cbs,
kwargs,
validators,
watcher_start_lag,
existing_data,
add_blank,
temp_bin,
watcher_start_lag,
):
try:
with tempfile.TemporaryDirectory() as tmpdir:
Expand Down Expand Up @@ -172,24 +177,59 @@ def test_watcher(
json_object=COMBINED_RUN_JSON_SPOOF,
)

# if existing_data set to True, test case where a FOV has already been extracted
if existing_data[0]:
os.makedirs(os.path.join(tiff_out_dir, "fov-2-scan-1"))
channels_write = TEST_CHANNELS if existing_data[1] == "Full" else [TEST_CHANNELS[1]]
for channel in channels_write:
random_img = np.random.rand(32, 32)
imsave(
os.path.join(tiff_out_dir, "fov-2-scan-1", f"{channel}.tiff"), random_img
)

os.makedirs(qc_out_dir)
for qcs, qcc in zip(QC_SUFFIXES, QC_COLUMNS):
df_qc = pd.DataFrame(
np.random.rand(len(TEST_CHANNELS), 3), columns=["fov", "channel", qcc]
)
df_qc["fov"] = "fov-2-scan-1"
df_qc["channel"] = TEST_CHANNELS
df_qc.to_csv(os.path.join(qc_out_dir, f"fov-2-scan-1_{qcs}.csv"), index=False)

os.makedirs(mph_out_dir)
df_mph = pd.DataFrame(
np.random.rand(1, 4), columns=["fov", "MPH", "total_count", "time"]
)
df_mph["fov"] = "fov-2-scan-1"
df_mph.to_csv(os.path.join(mph_out_dir, "fov-2-scan-1-mph_pulse.csv"), index=False)

os.makedirs(pulse_out_dir)
df_ph = pd.DataFrame(np.random.rand(10, 3), columns=["mass", "fov", "pulse_height"])
df_ph["fov"] = "fov-2-scan-1"
df_ph.to_csv(
os.path.join(pulse_out_dir, "fov-2-scan-1-pulse_heights.csv"), index=False
)

# `_slow_copy_sample_tissue_data` mimics the instrument computer uploading data to the
# client access computer. `start_watcher` is made async here since these processes
# wouldn't block each other in normal use

with Pool(processes=4) as pool:
pool.apply_async(
_slow_copy_sample_tissue_data,
(run_data, SLOW_COPY_INTERVAL_S, add_blank, temp_bin),
)

time.sleep(watcher_start_lag)

# watcher completion is checked every second
# zero-size files are halted for 1 second or until they have non zero-size
watcher_warnings = []
if not add_blank:
with pytest.warns(
UserWarning, match="Re-extracting incompletely extracted FOV fov-1-scan-1"
):
watcher_warnings.append(
r"Re-extracting incompletely extracted FOV fov-1-scan-1"
)
if existing_data[0] and existing_data[1] == "Full":
watcher_warnings.append(r"already extracted for FOV fov-2-scan-1")

if len(watcher_warnings) > 0:
with pytest.warns(UserWarning, match="|".join(watcher_warnings)):
res_scan = pool.apply_async(
start_watcher,
(
Expand Down
58 changes: 56 additions & 2 deletions tests/utils/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,48 @@
from toffy.json_utils import write_json_file
from toffy.settings import QC_COLUMNS, QC_SUFFIXES

TEST_CHANNELS = [
"Calprotectin",
"Chymase",
"SMA",
"Vimentin",
"LAG3",
"CD4",
"CD69",
"FAP",
"FOXP3",
"PD1",
"CD31",
"Biotin",
"Ecadherin",
"CD56",
"CD38",
"TCF1 TCF7",
"TBET",
"CD45RB",
"CD68",
"CD11c",
"CD8",
"CD3e",
"IDO1",
"CD45RO",
"TIM-3",
"CD163",
"CD20",
"FN1",
"Glut1",
"HLADR",
"CD14",
"CD45",
"Cytokeratin17",
"COL1A1",
"H3K27me3",
"CD57",
"H3K9ac",
"Ki67",
"HLA1 class ABC",
]


def make_run_file(tmp_dir, prefixes=[], include_nontiled=False):
"""Create a run subir and run json in the provided dir and return the path to this new dir."""
Expand Down Expand Up @@ -521,13 +563,25 @@ def case_default(self, intensity, replace):
["extract_tiffs", "generate_pulse_heights"],
kwargs,
validators,
1,
(False, None),
)

@parametrize(intensity=(False, True))
@parametrize(replace=(False, True))
def case_inter_callback(self, intensity, replace):
rcs, _, fcs, kwargs, validators = self.case_default(intensity, replace)
rcs, _, fcs, kwargs, validators, wsl, ed = self.case_default(intensity, replace)
ics = rcs[:2]
rcs = rcs[2:]

return (rcs, ics, fcs, kwargs, validators)
return (rcs, ics, fcs, kwargs, validators, wsl, ed)

@parametrize(watcher_start_lag=(4, 8, 12))
def case_watcher_lag(self, watcher_start_lag):
rcs, ics, fcs, kwargs, validators, _, ed = self.case_default(True, True)
return (rcs, ics, fcs, kwargs, validators, watcher_start_lag, ed)

@parametrize(existing_data=((True, "Full"), (True, "Partial"), (False, None)))
def case_existing_data(self, existing_data):
rcs, ics, fcs, kwargs, validators, wsl, _ = self.case_default(False, False)
return (rcs, ics, fcs, kwargs, validators, wsl, existing_data)

0 comments on commit 8ce956a

Please sign in to comment.