Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor data loading/storing #1018

Merged
merged 6 commits into from
Jan 19, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions doc/progress.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ Changelog

0.11.1
~~~~~~
* MAINT #1018 : Refactor data loading and storage. Data is now compressed on the first call to `get_data`.
* MAINT #891: Changed the way that numerical features are stored. Numerical features that range from 0 to 255 are now stored as uint8, which reduces the storage space required as well as storing and loading times.
* MAINT #671: Improved the performance of ``check_datasets_active`` by only querying the given list of datasets in contrast to querying all datasets. Modified the corresponding unit test.
* FIX #964 : AValidate `ignore_attribute`, `default_target_attribute`, `row_id_attribute` are set to attributes that exist on the dataset when calling ``create_dataset``.
Expand Down
187 changes: 65 additions & 122 deletions openml/datasets/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,16 +217,14 @@ def find_invalid_characters(string, pattern):
self.qualities = None

if data_file is not None:
rval = self._create_pickle_in_cache(data_file)
self.data_pickle_file = rval[0] # type: Optional[str]
self.data_feather_file = rval[1] # type: Optional[str]
self.feather_attribute_file = rval[2] # type: Optional[str]
rval = self._compressed_cache_file_paths(data_file)
self.data_pickle_file = rval[0] if os.path.exists(rval[0]) else None
mfeurer marked this conversation as resolved.
Show resolved Hide resolved
self.data_feather_file = rval[1] if os.path.exists(rval[1]) else None
self.feather_attribute_file = rval[2] if os.path.exists(rval[2]) else None
else:
self.data_pickle_file, self.data_feather_file, self.feather_attribute_file = (
None,
None,
None,
)
self.data_pickle_file = None
self.data_feather_file = None
self.feather_attribute_file = None

@property
def id(self) -> Optional[int]:
Expand Down Expand Up @@ -455,152 +453,97 @@ def _parse_data_from_arff(

return X, categorical, attribute_names

def _create_pickle_in_cache(self, data_file: str) -> Tuple[str, str, str]:
""" Parse the arff and pickle the result. Update any old pickle objects. """
def _compressed_cache_file_paths(self, data_file: str) -> Tuple[str, str, str]:
data_pickle_file = data_file.replace(".arff", ".pkl.py3")
data_feather_file = data_file.replace(".arff", ".feather")
feather_attribute_file = data_file.replace(".arff", ".feather.attributes.pkl.py3")
if os.path.exists(data_pickle_file) and self.cache_format == "pickle":
# Load the data to check if the pickle file is outdated (i.e. contains numpy array)
with open(data_pickle_file, "rb") as fh:
try:
data, categorical, attribute_names = pickle.load(fh)
except EOFError:
# The file is likely corrupt, see #780.
# We deal with this when loading the data in `_load_data`.
return data_pickle_file, data_feather_file, feather_attribute_file
except ModuleNotFoundError:
# There was some issue loading the file, see #918
# We deal with this when loading the data in `_load_data`.
return data_pickle_file, data_feather_file, feather_attribute_file
except ValueError as e:
if "unsupported pickle protocol" in e.args[0]:
# There was some issue loading the file, see #898
# We deal with this when loading the data in `_load_data`.
return data_pickle_file, data_feather_file, feather_attribute_file
else:
raise

# Between v0.8 and v0.9 the format of pickled data changed from
# np.ndarray to pd.DataFrame. This breaks some backwards compatibility,
# e.g. for `run_model_on_task`. If a local file still exists with
# np.ndarray data, we reprocess the data file to store a pickled
# pd.DataFrame blob. See also #646.
if isinstance(data, pd.DataFrame) or scipy.sparse.issparse(data):
logger.debug("Data pickle file already exists and is up to date.")
return data_pickle_file, data_feather_file, feather_attribute_file
elif os.path.exists(data_feather_file) and self.cache_format == "feather":
# Load the data to check if the pickle file is outdated (i.e. contains numpy array)
try:
data = pd.read_feather(data_feather_file)
except EOFError:
# The file is likely corrupt, see #780.
# We deal with this when loading the data in `_load_data`.
return data_pickle_file, data_feather_file, feather_attribute_file
except ModuleNotFoundError:
# There was some issue loading the file, see #918
# We deal with this when loading the data in `_load_data`.
return data_pickle_file, data_feather_file, feather_attribute_file
except ValueError as e:
if "unsupported pickle protocol" in e.args[0]:
# There was some issue loading the file, see #898
# We deal with this when loading the data in `_load_data`.
return data_pickle_file, data_feather_file, feather_attribute_file
else:
raise
return data_pickle_file, data_feather_file, feather_attribute_file

logger.debug("Data feather file already exists and is up to date.")
return data_pickle_file, data_feather_file, feather_attribute_file
def _cache_compressed_file_from_arff(
self, arff_file: str
) -> Tuple[Union[pd.DataFrame, scipy.sparse.csr_matrix], List[bool], List[str]]:
""" Store data from the arff file in compressed format. Sets cache_format to 'pickle' if data is sparse. """ # noqa: 501
(
data_pickle_file,
data_feather_file,
feather_attribute_file,
) = self._compressed_cache_file_paths(arff_file)

# At this point either the pickle file does not exist, or it had outdated formatting.
# We parse the data from arff again and populate the cache with a recent pickle file.
X, categorical, attribute_names = self._parse_data_from_arff(data_file)
data, categorical, attribute_names = self._parse_data_from_arff(arff_file)

# Feather format does not work for sparse datasets, so we use pickle for sparse datasets
if scipy.sparse.issparse(data):
self.cache_format = "pickle"

if self.cache_format == "feather" and not scipy.sparse.issparse(X):
logger.info("feather write {}".format(self.name))
X.to_feather(data_feather_file)
logger.info(f"{self.cache_format} write {self.name}")
if self.cache_format == "feather":
data.to_feather(data_feather_file)
with open(feather_attribute_file, "wb") as fh:
pickle.dump((categorical, attribute_names), fh, pickle.HIGHEST_PROTOCOL)
else:
logger.info("pickle write {}".format(self.name))
self.cache_format = "pickle"
with open(data_pickle_file, "wb") as fh:
pickle.dump((X, categorical, attribute_names), fh, pickle.HIGHEST_PROTOCOL)
logger.debug(
"Saved dataset {did}: {name} to file {path}".format(
did=int(self.dataset_id or -1), name=self.name, path=data_pickle_file
)
)
return data_pickle_file, data_feather_file, feather_attribute_file
pickle.dump((data, categorical, attribute_names), fh, pickle.HIGHEST_PROTOCOL)

data_file = data_pickle_file if self.cache_format == "pickle" else data_feather_file
logger.debug(f"Saved dataset {int(self.dataset_id or -1)}: {self.name} to file {data_file}")
return data, categorical, attribute_names

def _load_data(self):
""" Load data from pickle or arff. Download data first if not present on disk. """
if (self.cache_format == "pickle" and self.data_pickle_file is None) or (
self.cache_format == "feather" and self.data_feather_file is None
):
""" Load data from compressed format or arff. Download data if not present on disk. """
need_to_create_pickle = self.cache_format == "pickle" and self.data_pickle_file is None
need_to_create_feather = self.cache_format == "feather" and self.data_feather_file is None

if need_to_create_pickle or need_to_create_feather:
if self.data_file is None:
self._download_data()
(
self.data_pickle_file,
self.data_feather_file,
self.feather_attribute_file,
) = self._create_pickle_in_cache(self.data_file)

res = self._compressed_cache_file_paths(self.data_file)
self.data_pickle_file, self.data_feather_file, self.feather_attribute_file = res
# Since our recently stored data is exists in memory, there is no need to load from disk
return self._cache_compressed_file_from_arff(self.data_file)

# helper variable to help identify where errors occur
fpath = self.data_feather_file if self.cache_format == "feather" else self.data_pickle_file
logger.info(f"{self.cache_format} load data {self.name}")
try:
if self.cache_format == "feather":
logger.info("feather load data {}".format(self.name))
data = pd.read_feather(self.data_feather_file)

fpath = self.feather_attribute_file
with open(self.feather_attribute_file, "rb") as fh:
categorical, attribute_names = pickle.load(fh)
else:
logger.info("pickle load data {}".format(self.name))
with open(self.data_pickle_file, "rb") as fh:
data, categorical, attribute_names = pickle.load(fh)
except EOFError:
logger.warning(
"Detected a corrupt cache file loading dataset %d: '%s'. "
"We will continue loading data from the arff-file, "
"but this will be much slower for big datasets. "
"Please manually delete the cache file if you want OpenML-Python "
"to attempt to reconstruct it."
"" % (self.dataset_id, self.data_pickle_file)
)
data, categorical, attribute_names = self._parse_data_from_arff(self.data_file)
except FileNotFoundError:
raise ValueError(
"Cannot find a pickle file for dataset {} at "
"location {} ".format(self.name, self.data_pickle_file)
)
except ModuleNotFoundError as e:
raise ValueError(f"Cannot find file for dataset {self.name} at location '{fpath}'.")
except (EOFError, ModuleNotFoundError, ValueError) as e:
error_message = e.message if hasattr(e, "message") else e.args[0]
hint = ""

if isinstance(e, EOFError):
readable_error = "Detected a corrupt cache file"
elif isinstance(e, ModuleNotFoundError):
readable_error = "Detected likely dependency issues"
hint = "This is most likely due to https://github.com/openml/openml-python/issues/918. " # noqa: 501
elif isinstance(e, ValueError) and "unsupported pickle protocol" in e.args[0]:
readable_error = "Encountered unsupported pickle protocol"
else:
raise # an unknown ValueError is raised, should crash and file bug report

logger.warning(
"Encountered error message when loading cached dataset %d: '%s'. "
"Error message was: %s. "
"This is most likely due to https://github.com/openml/openml-python/issues/918. "
f"{readable_error} when loading dataset {self.id} from '{fpath}'. "
f"{hint}"
f"Error message was: {error_message}. "
"We will continue loading data from the arff-file, "
"but this will be much slower for big datasets. "
"Please manually delete the cache file if you want OpenML-Python "
"to attempt to reconstruct it."
"" % (self.dataset_id, self.data_pickle_file, e.args[0]),
)
data, categorical, attribute_names = self._parse_data_from_arff(self.data_file)
except ValueError as e:
if "unsupported pickle protocol" in e.args[0]:
logger.warning(
"Encountered unsupported pickle protocol when loading cached dataset %d: '%s'. "
"Error message was: %s. "
"We will continue loading data from the arff-file, "
"but this will be much slower for big datasets. "
"Please manually delete the cache file if you want OpenML-Python "
"to attempt to reconstruct it."
"" % (self.dataset_id, self.data_pickle_file, e.args[0]),
)
data, categorical, attribute_names = self._parse_data_from_arff(self.data_file)
else:
raise

data_up_to_date = isinstance(data, pd.DataFrame) or scipy.sparse.issparse(data)
if self.cache_format == "pickle" and not data_up_to_date:
logger.info("Updating outdated pickle file.")
return self._cache_compressed_file_from_arff(self.data_file)
return data, categorical, attribute_names

@staticmethod
Expand Down
3 changes: 3 additions & 0 deletions tests/test_datasets/test_dataset_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -1258,6 +1258,8 @@ def test_list_qualities(self):

def test_get_dataset_cache_format_pickle(self):
dataset = openml.datasets.get_dataset(1)
dataset.get_data()

self.assertEqual(type(dataset), OpenMLDataset)
self.assertEqual(dataset.name, "anneal")
self.assertGreater(len(dataset.features), 1)
Expand All @@ -1272,6 +1274,7 @@ def test_get_dataset_cache_format_pickle(self):
def test_get_dataset_cache_format_feather(self):

dataset = openml.datasets.get_dataset(128, cache_format="feather")
dataset.get_data()

# Check if dataset is written to cache directory using feather
cache_dir = openml.config.get_cache_directory()
Expand Down