Skip to content

Commit

Permalink
Improve observability when using cached datasets during preprocessing (
Browse files Browse the repository at this point in the history
  • Loading branch information
arnavgarg1 authored Feb 8, 2023
1 parent 0d646ca commit 6dd4c18
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 15 deletions.
23 changes: 16 additions & 7 deletions ludwig/data/cache/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,26 @@ def get(self):
return None

try:
cache_training_set_metadata = data_utils.load_json(training_set_metadata_fp)
except Exception as e:
logger.error(f"failed to load cached training set metadata at {training_set_metadata_fp}", exc_info=e)
cached_training_set_metadata = data_utils.load_json(training_set_metadata_fp)
except Exception:
logger.exception(f"Failed to load cached training set metadata at {training_set_metadata_fp}")
return None

cached_training_set = self.cache_map[TRAINING] if path_exists(self.cache_map[TRAINING]) else None

cached_test_set = self.cache_map[TEST] if path_exists(self.cache_map[TEST]) else None
if not cached_training_set:
logger.warning(f"Failed to load cached training set at {self.cache_map[TRAINING]}")

cached_validation_set = self.cache_map[VALIDATION] if path_exists(self.cache_map[VALIDATION]) else None
if not cached_validation_set:
logger.warning(f"Failed to load cached validation set at {self.cache_map[VALIDATION]}")

valid = self.checksum == cache_training_set_metadata.get(CHECKSUM) and cached_training_set is not None
cached_test_set = self.cache_map[TEST] if path_exists(self.cache_map[TEST]) else None
if not cached_test_set:
logger.warning(f"Failed to load cached test set at {self.cache_map[TEST]}")

return valid, cache_training_set_metadata, cached_training_set, cached_test_set, cached_validation_set
valid = self.checksum == cached_training_set_metadata.get(CHECKSUM) and cached_training_set is not None

return valid, cached_training_set_metadata, cached_training_set, cached_test_set, cached_validation_set

def put(self, training_set, test_set, validation_set, training_set_metadata):
logger.info("Writing preprocessed training set cache")
Expand Down Expand Up @@ -81,6 +87,9 @@ def delete(self):
# Parquet entries in the cache_ma can be pointers to directories.
delete(fname, recursive=True)

def get_cached_obj_path(self, cached_obj_name: str) -> str:
return self.cache_map.get(cached_obj_name)


class CacheManager:
def __init__(
Expand Down
26 changes: 18 additions & 8 deletions ludwig/data/preprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
FILL_WITH_MEAN,
FILL_WITH_MODE,
FULL,
META,
MIN_DATASET_SPLIT_ROWS,
MODEL_ECD,
NAME,
Expand All @@ -52,6 +53,7 @@
TYPE,
VALIDATION,
)
from ludwig.data.cache.manager import DatasetCache
from ludwig.data.cache.types import wrap
from ludwig.data.concatenate_datasets import concatenate_df, concatenate_files, concatenate_splits
from ludwig.data.dataset.base import Dataset
Expand Down Expand Up @@ -1651,14 +1653,14 @@ def preprocess_for_training(

if data_format in CACHEABLE_FORMATS:
with backend.storage.cache.use_credentials():
# cache.get() returns valid indicating if the checksum for the current config
# is equal to that from the cached training set metadata, as well as the paths to the
# cached training set metadata, training set, validation_set, test set
cache_results = cache.get()
if cache_results is not None:
valid, *cache_values = cache_results
if valid:
logger.info(
"Found cached dataset and meta.json with the same filename "
"of the dataset, using them instead"
)
logger.info(_get_cache_hit_message(cache))
training_set_metadata, training_set, test_set, validation_set = cache_values
config["data_hdf5_fp"] = training_set
data_format = backend.cache.data_format
Expand Down Expand Up @@ -1994,10 +1996,7 @@ def preprocess_for_prediction(
if cache_results is not None:
valid, *cache_values = cache_results
if valid:
logger.info(
"Found cached dataset and meta.json with the same filename "
"of the input file, using them instead"
)
logger.info(_get_cache_hit_message(cache))
training_set_metadata, training_set, test_set, validation_set = cache_values
config["data_hdf5_fp"] = training_set
data_format = backend.cache.data_format
Expand Down Expand Up @@ -2051,3 +2050,14 @@ def preprocess_for_prediction(
)

return dataset, training_set_metadata


def _get_cache_hit_message(cache: DatasetCache) -> str:
return (
"Found cached dataset and meta.json with the same filename of the dataset.\n"
"Using cached values instead of preprocessing the dataset again.\n"
f"Cached training set metadata path: {cache.get_cached_obj_path(META)}\n"
f"Cached training set path: {cache.get_cached_obj_path(TRAINING)}"
f"Cached validation set path: {cache.get_cached_obj_path(VALIDATION)}"
f"Cached test set path: {cache.get_cached_obj_path(TEST)}"
)

0 comments on commit 6dd4c18

Please sign in to comment.