Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consolidate logic to wait for dask futures. #109

Merged
merged 1 commit into from
Jul 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 19 additions & 34 deletions src/hipscat_import/catalog/resume_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,15 @@ class ResumePlan(PipelineResumePlan):
split_keys: List[Tuple[str, str]] = field(default_factory=list)
"""set of files (and job keys) that have yet to be split"""

MAPPING_LOG_FILE = "mapping_log.txt"
SPLITTING_LOG_FILE = "splitting_log.txt"
REDUCING_LOG_FILE = "reducing_log.txt"
MAPPING_STAGE = "mapping"
SPLITTING_STAGE = "splitting"
REDUCING_STAGE = "reducing"

ORIGINAL_INPUT_PATHS = "input_paths.txt"

HISTOGRAM_BINARY_FILE = "mapping_histogram.binary"
HISTOGRAMS_DIR = "histograms"

MAPPING_DONE_FILE = "mapping_done"
SPLITTING_DONE_FILE = "splitting_done"
REDUCING_DONE_FILE = "reducing_done"

def __post_init__(self):
"""Initialize the plan."""
self.gather_plan()
Expand Down Expand Up @@ -76,14 +73,14 @@ def gather_plan(self):
## Gather keys for execution.
step_progress.update(1)
if not mapping_done:
mapped_keys = set(self.read_log_keys(self.MAPPING_LOG_FILE))
mapped_keys = set(self.read_log_keys(self.MAPPING_STAGE))
self.map_files = [
(f"map_{i}", file_path)
for i, file_path in enumerate(self.input_paths)
if f"map_{i}" not in mapped_keys
]
if not splitting_done:
split_keys = set(self.read_log_keys(self.SPLITTING_LOG_FILE))
split_keys = set(self.read_log_keys(self.SPLITTING_STAGE))
self.split_keys = [
(f"split_{i}", file_path)
for i, file_path in enumerate(self.input_paths)
Expand Down Expand Up @@ -140,29 +137,21 @@ def write_partial_histogram(cls, tmp_path, mapping_key: str, histogram):
with open(file_name, "wb+") as file_handle:
file_handle.write(histogram.data)

def mark_mapping_done(self, mapping_key: str):
"""Add mapping key to done list."""
self.write_log_key(self.MAPPING_LOG_FILE, mapping_key)
def wait_for_mapping(self, futures):
"""Wait for mapping futures to complete."""
self.wait_for_futures(futures, self.MAPPING_STAGE)

def is_mapping_done(self) -> bool:
"""Are there files left to map?"""
return self.done_file_exists(self.MAPPING_DONE_FILE)

def set_mapping_done(self):
"""All files are done mapping."""
self.touch_done_file(self.MAPPING_DONE_FILE)
return self.done_file_exists(self.MAPPING_STAGE)

def mark_splitting_done(self, splitting_key: str):
"""Add splitting key to done list."""
self.write_log_key(self.SPLITTING_LOG_FILE, splitting_key)
def wait_for_splitting(self, futures):
"""Wait for splitting futures to complete."""
self.wait_for_futures(futures, self.SPLITTING_STAGE)

def is_splitting_done(self) -> bool:
"""Are there files left to split?"""
return self.done_file_exists(self.SPLITTING_DONE_FILE)

def set_splitting_done(self):
"""All files are done splitting."""
self.touch_done_file(self.SPLITTING_DONE_FILE)
return self.done_file_exists(self.SPLITTING_STAGE)

def get_reduce_items(self, destination_pixel_map):
"""Fetch a triple for each partition to reduce.
Expand All @@ -174,22 +163,18 @@ def get_reduce_items(self, destination_pixel_map):
- reduce key (string of destination order+pixel)

"""
reduced_keys = set(self.read_log_keys(self.REDUCING_LOG_FILE))
reduced_keys = set(self.read_log_keys(self.REDUCING_STAGE))
reduce_items = [
(hp_pixel, source_pixels, f"{hp_pixel.order}_{hp_pixel.pixel}")
for hp_pixel, source_pixels in destination_pixel_map.items()
if f"{hp_pixel.order}_{hp_pixel.pixel}" not in reduced_keys
]
return reduce_items

def mark_reducing_done(self, reducing_key: str):
"""Add reducing key to done list."""
self.write_log_key(self.REDUCING_LOG_FILE, reducing_key)

def is_reducing_done(self) -> bool:
"""Are there partitions left to reduce?"""
return self.done_file_exists(self.REDUCING_DONE_FILE)
return self.done_file_exists(self.REDUCING_STAGE)

def set_reducing_done(self):
"""All partitions are done reducing."""
self.touch_done_file(self.REDUCING_DONE_FILE)
def wait_for_reducing(self, futures):
"""Wait for reducing futures to complete."""
self.wait_for_futures(futures, self.REDUCING_STAGE)
47 changes: 3 additions & 44 deletions src/hipscat_import/catalog/run_import.py
delucchi-cmu marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import hipscat.io.write_metadata as io
import numpy as np
from dask.distributed import as_completed
from hipscat import pixel_math
from tqdm import tqdm

Expand Down Expand Up @@ -37,21 +36,7 @@ def _map_pixels(args, client):
dec_column=args.dec_column,
)
)

some_error = False
for future in tqdm(
as_completed(futures),
desc="Mapping ",
total=len(futures),
disable=(not args.progress_bar),
):
if future.status == "error": # pragma: no cover
some_error = True
else:
args.resume_plan.mark_mapping_done(future.key)
if some_error: # pragma: no cover
raise RuntimeError("Some mapping stages failed. See logs for details.")
args.resume_plan.set_mapping_done()
args.resume_plan.wait_for_mapping(futures)


def _split_pixels(args, alignment_future, client):
Expand All @@ -78,20 +63,7 @@ def _split_pixels(args, alignment_future, client):
)
)

some_error = False
for future in tqdm(
as_completed(futures),
desc="Splitting",
total=len(futures),
disable=(not args.progress_bar),
):
if future.status == "error": # pragma: no cover
some_error = True
else:
args.resume_plan.mark_splitting_done(future.key)
if some_error: # pragma: no cover
raise RuntimeError("Some splitting stages failed. See logs for details.")
args.resume_plan.set_splitting_done()
args.resume_plan.wait_for_splitting(futures)


def _reduce_pixels(args, destination_pixel_map, client):
Expand Down Expand Up @@ -123,20 +95,7 @@ def _reduce_pixels(args, destination_pixel_map, client):
)
)

some_error = False
for future in tqdm(
as_completed(futures),
desc="Reducing ",
total=len(futures),
disable=(not args.progress_bar),
):
if future.status == "error": # pragma: no cover
some_error = True
else:
args.resume_plan.mark_reducing_done(future.key)
if some_error: # pragma: no cover
raise RuntimeError("Some reducing stages failed. See logs for details.")
args.resume_plan.set_reducing_done()
args.resume_plan.wait_for_reducing(futures)


def run(args, client):
Expand Down
55 changes: 42 additions & 13 deletions src/hipscat_import/pipeline_resume_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
from pathlib import Path

import pandas as pd
from dask.distributed import as_completed
from hipscat.io import FilePointer, file_io
from tqdm import tqdm


@dataclass
Expand Down Expand Up @@ -37,37 +39,39 @@ def safe_to_resume(self):
)
file_io.make_directory(self.tmp_path, exist_ok=True)

def done_file_exists(self, file_name):
def done_file_exists(self, stage_name):
"""Is there a file at a given path?
For a done file, the existence of the file is the only signal needed to indicate
a pipeline segment is complete.

Args:
file_name(str): done file name, relative to tmp_path, including extension.
stage_name(str): name of the stage (e.g. mapping, reducing)
Returns:
boolean, True if the done file exists at tmp_path. False otherwise.
"""
return file_io.does_file_or_directory_exist(file_io.append_paths_to_pointer(self.tmp_path, file_name))
return file_io.does_file_or_directory_exist(
file_io.append_paths_to_pointer(self.tmp_path, f"{stage_name}_done")
)

def touch_done_file(self, file_name):
def touch_done_file(self, stage_name):
"""Touch (create) a done file at the given path.
For a done file, the existence of the file is the only signal needed to indicate
a pipeline segment is complete.

Args:
file_name(str): done file name, relative to tmp_path, including extension.
stage_name(str): name of the stage (e.g. mapping, reducing)
"""
Path(file_io.append_paths_to_pointer(self.tmp_path, file_name)).touch()
Path(file_io.append_paths_to_pointer(self.tmp_path, f"{stage_name}_done")).touch()

def read_log_keys(self, file_name):
def read_log_keys(self, stage_name):
"""Read a resume log file, containing timestamp and keys.

Args:
file_name(str): log file name, relative to tmp_path, including extension.
stage_name(str): name of the stage (e.g. mapping, reducing)
Return:
List[str] - all keys found in the log file
"""
file_path = file_io.append_paths_to_pointer(self.tmp_path, file_name)
file_path = file_io.append_paths_to_pointer(self.tmp_path, f"{stage_name}_log.txt")
if file_io.does_file_or_directory_exist(file_path):
mapping_log = pd.read_csv(
file_path,
Expand All @@ -78,17 +82,42 @@ def read_log_keys(self, file_name):
return mapping_log["key"].tolist()
return []

def write_log_key(self, file_name, key):
def write_log_key(self, stage_name, key):
"""Append a tab-delimited line to the file with the current timestamp and provided key

Args:
file_name(str): log file name, relative to tmp_path, including extension.
stage_name(str): name of the stage (e.g. mapping, reducing)
key(str): single key to write
"""
file_path = file_io.append_paths_to_pointer(self.tmp_path, file_name)
file_path = file_io.append_paths_to_pointer(self.tmp_path, f"{stage_name}_log.txt")
with open(file_path, "a", encoding="utf-8") as mapping_log:
mapping_log.write(f'{datetime.now().strftime("%m/%d/%Y, %H:%M:%S")}\t{key}\n')

def clean_resume_files(self):
"""Remove all intermediate files created in execution."""
file_io.remove_directory(self.tmp_path, ignore_errors=True)

def wait_for_futures(self, futures, stage_name):
"""Wait for collected futures to complete.

As each future completes, read the task key and write to the log file.
If all tasks complete successfully, touch the done file. Otherwise, raise an error.

Args:
futures(List[future]): collected futures
stage_name(str): name of the stage (e.g. mapping, reducing)
"""
some_error = False
for future in tqdm(
as_completed(futures),
desc=stage_name,
total=len(futures),
disable=(not self.progress_bar),
):
if future.status == "error": # pragma: no cover
some_error = True
else:
self.write_log_key(stage_name, future.key)
if some_error: # pragma: no cover
raise RuntimeError(f"Some {stage_name} stages failed. See logs for details.")
self.touch_done_file(stage_name)
18 changes: 9 additions & 9 deletions tests/hipscat_import/catalog/test_resume_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def test_mapping_done(tmp_path):
"""Verify expected behavior of mapping done file"""
plan = ResumePlan(tmp_path=tmp_path, progress_bar=False)
assert not plan.is_mapping_done()
plan.set_mapping_done()
plan.touch_done_file(ResumePlan.MAPPING_STAGE)
assert plan.is_mapping_done()

plan.clean_resume_files()
Expand All @@ -22,7 +22,7 @@ def test_reducing_done(tmp_path):
"""Verify expected behavior of reducing done file"""
plan = ResumePlan(tmp_path=tmp_path, progress_bar=False)
assert not plan.is_reducing_done()
plan.set_reducing_done()
plan.touch_done_file(ResumePlan.REDUCING_STAGE)
assert plan.is_reducing_done()

plan.clean_resume_files()
Expand All @@ -34,19 +34,19 @@ def test_done_checks(tmp_path):
mapping > splitting > reducing
"""
plan = ResumePlan(tmp_path=tmp_path, progress_bar=False, resume=True)
plan.set_reducing_done()
plan.touch_done_file(ResumePlan.REDUCING_STAGE)

with pytest.raises(ValueError, match="before reducing"):
plan.gather_plan()

plan.set_splitting_done()
plan.touch_done_file(ResumePlan.SPLITTING_STAGE)
with pytest.raises(ValueError, match="before reducing"):
plan.gather_plan()

plan.clean_resume_files()

plan = ResumePlan(tmp_path=tmp_path, progress_bar=False, resume=True)
plan.set_splitting_done()
plan.touch_done_file(ResumePlan.SPLITTING_STAGE)
with pytest.raises(ValueError, match="before splitting"):
plan.gather_plan()

Expand Down Expand Up @@ -116,14 +116,14 @@ def test_read_write_map_files(tmp_path, small_sky_single_file, formats_headers_c
map_files = plan.map_files
assert len(map_files) == 2

plan.mark_mapping_done("map_0")
plan.write_log_key(ResumePlan.MAPPING_STAGE, "map_0")

plan.gather_plan()
map_files = plan.map_files
assert len(map_files) == 1
assert map_files[0][1] == formats_headers_csv

plan.mark_mapping_done("map_1")
plan.write_log_key(ResumePlan.MAPPING_STAGE, "map_1")

## Nothing left to map
plan.gather_plan()
Expand All @@ -145,14 +145,14 @@ def test_read_write_splitting_keys(tmp_path, small_sky_single_file, formats_head
split_keys = plan.split_keys
assert len(split_keys) == 2

plan.mark_splitting_done("split_0")
plan.write_log_key(ResumePlan.SPLITTING_STAGE, "split_0")

plan.gather_plan()
split_keys = plan.split_keys
assert len(split_keys) == 1
assert split_keys[0][0] == "split_1"

plan.mark_splitting_done("split_1")
plan.write_log_key(ResumePlan.SPLITTING_STAGE, "split_1")
plan.gather_plan()
split_keys = plan.split_keys
assert len(split_keys) == 0
Expand Down
10 changes: 5 additions & 5 deletions tests/hipscat_import/catalog/test_run_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ def test_resume_dask_runner(
histogram[11] = 131
empty = hist.empty_histogram(0)
for file_index in range(0, 5):
plan.mark_mapping_done(f"map_{file_index}")
plan.mark_splitting_done(f"split_{file_index}")
plan.write_log_key(ResumePlan.MAPPING_STAGE,f"map_{file_index}")
plan.write_log_key(ResumePlan.SPLITTING_STAGE,f"split_{file_index}")
ResumePlan.write_partial_histogram(
tmp_path=temp_path,
mapping_key=f"map_{file_index}",
Expand Down Expand Up @@ -112,9 +112,9 @@ def test_resume_dask_runner(
temp_path,
)
plan = args.resume_plan
plan.set_mapping_done()
plan.set_splitting_done()
plan.set_reducing_done()
plan.touch_done_file(ResumePlan.MAPPING_STAGE)
plan.touch_done_file(ResumePlan.SPLITTING_STAGE)
plan.touch_done_file(ResumePlan.REDUCING_STAGE)

args = ImportArguments(
output_catalog_name="resume",
Expand Down