From 23981df69fd660680309dc7dae8b42c3d5a324d6 Mon Sep 17 00:00:00 2001 From: delucchi-cmu Date: Fri, 28 Jul 2023 12:11:02 -0400 Subject: [PATCH] Consolicate logic to wait for dask futures. --- src/hipscat_import/catalog/resume_plan.py | 53 +++++++----------- src/hipscat_import/catalog/run_import.py | 47 +--------------- src/hipscat_import/pipeline_resume_plan.py | 55 ++++++++++++++----- .../catalog/test_resume_plan.py | 18 +++--- .../hipscat_import/catalog/test_run_import.py | 10 ++-- 5 files changed, 78 insertions(+), 105 deletions(-) diff --git a/src/hipscat_import/catalog/resume_plan.py b/src/hipscat_import/catalog/resume_plan.py index 205ee83a..975b875d 100644 --- a/src/hipscat_import/catalog/resume_plan.py +++ b/src/hipscat_import/catalog/resume_plan.py @@ -25,18 +25,15 @@ class ResumePlan(PipelineResumePlan): split_keys: List[Tuple[str, str]] = field(default_factory=list) """set of files (and job keys) that have yet to be split""" - MAPPING_LOG_FILE = "mapping_log.txt" - SPLITTING_LOG_FILE = "splitting_log.txt" - REDUCING_LOG_FILE = "reducing_log.txt" + MAPPING_STAGE = "mapping" + SPLITTING_STAGE = "splitting" + REDUCING_STAGE = "reducing" + ORIGINAL_INPUT_PATHS = "input_paths.txt" HISTOGRAM_BINARY_FILE = "mapping_histogram.binary" HISTOGRAMS_DIR = "histograms" - MAPPING_DONE_FILE = "mapping_done" - SPLITTING_DONE_FILE = "splitting_done" - REDUCING_DONE_FILE = "reducing_done" - def __post_init__(self): """Initialize the plan.""" self.gather_plan() @@ -76,14 +73,14 @@ def gather_plan(self): ## Gather keys for execution. step_progress.update(1) if not mapping_done: - mapped_keys = set(self.read_log_keys(self.MAPPING_LOG_FILE)) + mapped_keys = set(self.read_log_keys(self.MAPPING_STAGE)) self.map_files = [ (f"map_{i}", file_path) for i, file_path in enumerate(self.input_paths) if f"map_{i}" not in mapped_keys ] if not splitting_done: - split_keys = set(self.read_log_keys(self.SPLITTING_LOG_FILE)) + split_keys = set(self.read_log_keys(self.SPLITTING_STAGE)) self.split_keys = [ (f"split_{i}", file_path) for i, file_path in enumerate(self.input_paths) @@ -140,29 +137,21 @@ def write_partial_histogram(cls, tmp_path, mapping_key: str, histogram): with open(file_name, "wb+") as file_handle: file_handle.write(histogram.data) - def mark_mapping_done(self, mapping_key: str): - """Add mapping key to done list.""" - self.write_log_key(self.MAPPING_LOG_FILE, mapping_key) + def wait_for_mapping(self, futures): + """Wait for mapping futures to complete.""" + self.wait_for_futures(futures, self.MAPPING_STAGE) def is_mapping_done(self) -> bool: """Are there files left to map?""" - return self.done_file_exists(self.MAPPING_DONE_FILE) - - def set_mapping_done(self): - """All files are done mapping.""" - self.touch_done_file(self.MAPPING_DONE_FILE) + return self.done_file_exists(self.MAPPING_STAGE) - def mark_splitting_done(self, splitting_key: str): - """Add splitting key to done list.""" - self.write_log_key(self.SPLITTING_LOG_FILE, splitting_key) + def wait_for_splitting(self, futures): + """Wait for splitting futures to complete.""" + self.wait_for_futures(futures, self.SPLITTING_STAGE) def is_splitting_done(self) -> bool: """Are there files left to split?""" - return self.done_file_exists(self.SPLITTING_DONE_FILE) - - def set_splitting_done(self): - """All files are done splitting.""" - self.touch_done_file(self.SPLITTING_DONE_FILE) + return self.done_file_exists(self.SPLITTING_STAGE) def get_reduce_items(self, destination_pixel_map): """Fetch a triple for each partition to reduce. @@ -174,7 +163,7 @@ def get_reduce_items(self, destination_pixel_map): - reduce key (string of destination order+pixel) """ - reduced_keys = set(self.read_log_keys(self.REDUCING_LOG_FILE)) + reduced_keys = set(self.read_log_keys(self.REDUCING_STAGE)) reduce_items = [ (hp_pixel, source_pixels, f"{hp_pixel.order}_{hp_pixel.pixel}") for hp_pixel, source_pixels in destination_pixel_map.items() @@ -182,14 +171,10 @@ def get_reduce_items(self, destination_pixel_map): ] return reduce_items - def mark_reducing_done(self, reducing_key: str): - """Add reducing key to done list.""" - self.write_log_key(self.REDUCING_LOG_FILE, reducing_key) - def is_reducing_done(self) -> bool: """Are there partitions left to reduce?""" - return self.done_file_exists(self.REDUCING_DONE_FILE) + return self.done_file_exists(self.REDUCING_STAGE) - def set_reducing_done(self): - """All partitions are done reducing.""" - self.touch_done_file(self.REDUCING_DONE_FILE) + def wait_for_reducing(self, futures): + """Wait for reducing futures to complete.""" + self.wait_for_futures(futures, self.REDUCING_STAGE) diff --git a/src/hipscat_import/catalog/run_import.py b/src/hipscat_import/catalog/run_import.py index 870d5aa8..e0391e60 100644 --- a/src/hipscat_import/catalog/run_import.py +++ b/src/hipscat_import/catalog/run_import.py @@ -7,7 +7,6 @@ import hipscat.io.write_metadata as io import numpy as np -from dask.distributed import as_completed from hipscat import pixel_math from tqdm import tqdm @@ -37,21 +36,7 @@ def _map_pixels(args, client): dec_column=args.dec_column, ) ) - - some_error = False - for future in tqdm( - as_completed(futures), - desc="Mapping ", - total=len(futures), - disable=(not args.progress_bar), - ): - if future.status == "error": # pragma: no cover - some_error = True - else: - args.resume_plan.mark_mapping_done(future.key) - if some_error: # pragma: no cover - raise RuntimeError("Some mapping stages failed. See logs for details.") - args.resume_plan.set_mapping_done() + args.resume_plan.wait_for_mapping(futures) def _split_pixels(args, alignment_future, client): @@ -78,20 +63,7 @@ def _split_pixels(args, alignment_future, client): ) ) - some_error = False - for future in tqdm( - as_completed(futures), - desc="Splitting", - total=len(futures), - disable=(not args.progress_bar), - ): - if future.status == "error": # pragma: no cover - some_error = True - else: - args.resume_plan.mark_splitting_done(future.key) - if some_error: # pragma: no cover - raise RuntimeError("Some splitting stages failed. See logs for details.") - args.resume_plan.set_splitting_done() + args.resume_plan.wait_for_splitting(futures) def _reduce_pixels(args, destination_pixel_map, client): @@ -123,20 +95,7 @@ def _reduce_pixels(args, destination_pixel_map, client): ) ) - some_error = False - for future in tqdm( - as_completed(futures), - desc="Reducing ", - total=len(futures), - disable=(not args.progress_bar), - ): - if future.status == "error": # pragma: no cover - some_error = True - else: - args.resume_plan.mark_reducing_done(future.key) - if some_error: # pragma: no cover - raise RuntimeError("Some reducing stages failed. See logs for details.") - args.resume_plan.set_reducing_done() + args.resume_plan.wait_for_reducing(futures) def run(args, client): diff --git a/src/hipscat_import/pipeline_resume_plan.py b/src/hipscat_import/pipeline_resume_plan.py index 1d3b84ae..747e9c40 100644 --- a/src/hipscat_import/pipeline_resume_plan.py +++ b/src/hipscat_import/pipeline_resume_plan.py @@ -7,7 +7,9 @@ from pathlib import Path import pandas as pd +from dask.distributed import as_completed from hipscat.io import FilePointer, file_io +from tqdm import tqdm @dataclass @@ -37,37 +39,39 @@ def safe_to_resume(self): ) file_io.make_directory(self.tmp_path, exist_ok=True) - def done_file_exists(self, file_name): + def done_file_exists(self, stage_name): """Is there a file at a given path? For a done file, the existence of the file is the only signal needed to indicate a pipeline segment is complete. Args: - file_name(str): done file name, relative to tmp_path, including extension. + stage_name(str): name of the stage (e.g. mapping, reducing) Returns: boolean, True if the done file exists at tmp_path. False otherwise. """ - return file_io.does_file_or_directory_exist(file_io.append_paths_to_pointer(self.tmp_path, file_name)) + return file_io.does_file_or_directory_exist( + file_io.append_paths_to_pointer(self.tmp_path, f"{stage_name}_done") + ) - def touch_done_file(self, file_name): + def touch_done_file(self, stage_name): """Touch (create) a done file at the given path. For a done file, the existence of the file is the only signal needed to indicate a pipeline segment is complete. Args: - file_name(str): done file name, relative to tmp_path, including extension. + stage_name(str): name of the stage (e.g. mapping, reducing) """ - Path(file_io.append_paths_to_pointer(self.tmp_path, file_name)).touch() + Path(file_io.append_paths_to_pointer(self.tmp_path, f"{stage_name}_done")).touch() - def read_log_keys(self, file_name): + def read_log_keys(self, stage_name): """Read a resume log file, containing timestamp and keys. Args: - file_name(str): log file name, relative to tmp_path, including extension. + stage_name(str): name of the stage (e.g. mapping, reducing) Return: List[str] - all keys found in the log file """ - file_path = file_io.append_paths_to_pointer(self.tmp_path, file_name) + file_path = file_io.append_paths_to_pointer(self.tmp_path, f"{stage_name}_log.txt") if file_io.does_file_or_directory_exist(file_path): mapping_log = pd.read_csv( file_path, @@ -78,17 +82,42 @@ def read_log_keys(self, file_name): return mapping_log["key"].tolist() return [] - def write_log_key(self, file_name, key): + def write_log_key(self, stage_name, key): """Append a tab-delimited line to the file with the current timestamp and provided key - + Args: - file_name(str): log file name, relative to tmp_path, including extension. + stage_name(str): name of the stage (e.g. mapping, reducing) key(str): single key to write """ - file_path = file_io.append_paths_to_pointer(self.tmp_path, file_name) + file_path = file_io.append_paths_to_pointer(self.tmp_path, f"{stage_name}_log.txt") with open(file_path, "a", encoding="utf-8") as mapping_log: mapping_log.write(f'{datetime.now().strftime("%m/%d/%Y, %H:%M:%S")}\t{key}\n') def clean_resume_files(self): """Remove all intermediate files created in execution.""" file_io.remove_directory(self.tmp_path, ignore_errors=True) + + def wait_for_futures(self, futures, stage_name): + """Wait for collected futures to complete. + + As each future completes, read the task key and write to the log file. + If all tasks complete successfully, touch the done file. Otherwise, raise an error. + + Args: + futures(List[future]): collected futures + stage_name(str): name of the stage (e.g. mapping, reducing) + """ + some_error = False + for future in tqdm( + as_completed(futures), + desc=stage_name, + total=len(futures), + disable=(not self.progress_bar), + ): + if future.status == "error": # pragma: no cover + some_error = True + else: + self.write_log_key(stage_name, future.key) + if some_error: # pragma: no cover + raise RuntimeError(f"Some {stage_name} stages failed. See logs for details.") + self.touch_done_file(stage_name) diff --git a/tests/hipscat_import/catalog/test_resume_plan.py b/tests/hipscat_import/catalog/test_resume_plan.py index 35b6c524..905813f6 100644 --- a/tests/hipscat_import/catalog/test_resume_plan.py +++ b/tests/hipscat_import/catalog/test_resume_plan.py @@ -11,7 +11,7 @@ def test_mapping_done(tmp_path): """Verify expected behavior of mapping done file""" plan = ResumePlan(tmp_path=tmp_path, progress_bar=False) assert not plan.is_mapping_done() - plan.set_mapping_done() + plan.touch_done_file(ResumePlan.MAPPING_STAGE) assert plan.is_mapping_done() plan.clean_resume_files() @@ -22,7 +22,7 @@ def test_reducing_done(tmp_path): """Verify expected behavior of reducing done file""" plan = ResumePlan(tmp_path=tmp_path, progress_bar=False) assert not plan.is_reducing_done() - plan.set_reducing_done() + plan.touch_done_file(ResumePlan.REDUCING_STAGE) assert plan.is_reducing_done() plan.clean_resume_files() @@ -34,19 +34,19 @@ def test_done_checks(tmp_path): mapping > splitting > reducing """ plan = ResumePlan(tmp_path=tmp_path, progress_bar=False, resume=True) - plan.set_reducing_done() + plan.touch_done_file(ResumePlan.REDUCING_STAGE) with pytest.raises(ValueError, match="before reducing"): plan.gather_plan() - plan.set_splitting_done() + plan.touch_done_file(ResumePlan.SPLITTING_STAGE) with pytest.raises(ValueError, match="before reducing"): plan.gather_plan() plan.clean_resume_files() plan = ResumePlan(tmp_path=tmp_path, progress_bar=False, resume=True) - plan.set_splitting_done() + plan.touch_done_file(ResumePlan.SPLITTING_STAGE) with pytest.raises(ValueError, match="before splitting"): plan.gather_plan() @@ -116,14 +116,14 @@ def test_read_write_map_files(tmp_path, small_sky_single_file, formats_headers_c map_files = plan.map_files assert len(map_files) == 2 - plan.mark_mapping_done("map_0") + plan.write_log_key(ResumePlan.MAPPING_STAGE, "map_0") plan.gather_plan() map_files = plan.map_files assert len(map_files) == 1 assert map_files[0][1] == formats_headers_csv - plan.mark_mapping_done("map_1") + plan.write_log_key(ResumePlan.MAPPING_STAGE, "map_1") ## Nothing left to map plan.gather_plan() @@ -145,14 +145,14 @@ def test_read_write_splitting_keys(tmp_path, small_sky_single_file, formats_head split_keys = plan.split_keys assert len(split_keys) == 2 - plan.mark_splitting_done("split_0") + plan.write_log_key(ResumePlan.SPLITTING_STAGE, "split_0") plan.gather_plan() split_keys = plan.split_keys assert len(split_keys) == 1 assert split_keys[0][0] == "split_1" - plan.mark_splitting_done("split_1") + plan.write_log_key(ResumePlan.SPLITTING_STAGE, "split_1") plan.gather_plan() split_keys = plan.split_keys assert len(split_keys) == 0 diff --git a/tests/hipscat_import/catalog/test_run_import.py b/tests/hipscat_import/catalog/test_run_import.py index c82a5fa4..030c3193 100644 --- a/tests/hipscat_import/catalog/test_run_import.py +++ b/tests/hipscat_import/catalog/test_run_import.py @@ -46,8 +46,8 @@ def test_resume_dask_runner( histogram[11] = 131 empty = hist.empty_histogram(0) for file_index in range(0, 5): - plan.mark_mapping_done(f"map_{file_index}") - plan.mark_splitting_done(f"split_{file_index}") + plan.write_log_key(ResumePlan.MAPPING_STAGE,f"map_{file_index}") + plan.write_log_key(ResumePlan.SPLITTING_STAGE,f"split_{file_index}") ResumePlan.write_partial_histogram( tmp_path=temp_path, mapping_key=f"map_{file_index}", @@ -112,9 +112,9 @@ def test_resume_dask_runner( temp_path, ) plan = args.resume_plan - plan.set_mapping_done() - plan.set_splitting_done() - plan.set_reducing_done() + plan.touch_done_file(ResumePlan.MAPPING_STAGE) + plan.touch_done_file(ResumePlan.SPLITTING_STAGE) + plan.touch_done_file(ResumePlan.REDUCING_STAGE) args = ImportArguments( output_catalog_name="resume",