Skip to content

Commit

Permalink
Merge branch 'develop' into sklearn24-support
Browse files Browse the repository at this point in the history
  • Loading branch information
Neeratyoy committed Jan 28, 2021
2 parents 902cd3f + ab793a6 commit 0e44a0b
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 141 deletions.
1 change: 1 addition & 0 deletions doc/progress.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ Changelog

0.11.1
~~~~~~
* MAINT #1018 : Refactor data loading and storage. Data is now compressed on the first call to `get_data`.
* MAINT #891: Changed the way that numerical features are stored. Numerical features that range from 0 to 255 are now stored as uint8, which reduces the storage space required as well as storing and loading times.
* MAINT #671: Improved the performance of ``check_datasets_active`` by only querying the given list of datasets in contrast to querying all datasets. Modified the corresponding unit test.
* FIX #964 : AValidate `ignore_attribute`, `default_target_attribute`, `row_id_attribute` are set to attributes that exist on the dataset when calling ``create_dataset``.
Expand Down
File renamed without changes.
37 changes: 20 additions & 17 deletions examples/30_extended/task_manual_iteration_tutorial.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,11 @@
####################################################################################################
# And then split the data based on this:

X, y, _, _ = task.get_dataset().get_data(task.target_name)
X_train = X.loc[train_indices]
y_train = y[train_indices]
X_test = X.loc[test_indices]
y_test = y[test_indices]
X, y = task.get_X_and_y(dataset_format="dataframe")
X_train = X.iloc[train_indices]
y_train = y.iloc[train_indices]
X_test = X.iloc[test_indices]
y_test = y.iloc[test_indices]

print(
"X_train.shape: {}, y_train.shape: {}, X_test.shape: {}, y_test.shape: {}".format(
Expand All @@ -78,6 +78,7 @@

task_id = 3
task = openml.tasks.get_task(task_id)
X, y = task.get_X_and_y(dataset_format="dataframe")
n_repeats, n_folds, n_samples = task.get_split_dimensions()
print(
"Task {}: number of repeats: {}, number of folds: {}, number of samples {}.".format(
Expand All @@ -93,10 +94,10 @@
train_indices, test_indices = task.get_train_test_split_indices(
repeat=repeat_idx, fold=fold_idx, sample=sample_idx,
)
X_train = X.loc[train_indices]
y_train = y[train_indices]
X_test = X.loc[test_indices]
y_test = y[test_indices]
X_train = X.iloc[train_indices]
y_train = y.iloc[train_indices]
X_test = X.iloc[test_indices]
y_test = y.iloc[test_indices]

print(
"Repeat #{}, fold #{}, samples {}: X_train.shape: {}, "
Expand All @@ -116,6 +117,7 @@

task_id = 1767
task = openml.tasks.get_task(task_id)
X, y = task.get_X_and_y(dataset_format="dataframe")
n_repeats, n_folds, n_samples = task.get_split_dimensions()
print(
"Task {}: number of repeats: {}, number of folds: {}, number of samples {}.".format(
Expand All @@ -131,10 +133,10 @@
train_indices, test_indices = task.get_train_test_split_indices(
repeat=repeat_idx, fold=fold_idx, sample=sample_idx,
)
X_train = X.loc[train_indices]
y_train = y[train_indices]
X_test = X.loc[test_indices]
y_test = y[test_indices]
X_train = X.iloc[train_indices]
y_train = y.iloc[train_indices]
X_test = X.iloc[test_indices]
y_test = y.iloc[test_indices]

print(
"Repeat #{}, fold #{}, samples {}: X_train.shape: {}, "
Expand All @@ -154,6 +156,7 @@

task_id = 1702
task = openml.tasks.get_task(task_id)
X, y = task.get_X_and_y(dataset_format="dataframe")
n_repeats, n_folds, n_samples = task.get_split_dimensions()
print(
"Task {}: number of repeats: {}, number of folds: {}, number of samples {}.".format(
Expand All @@ -169,10 +172,10 @@
train_indices, test_indices = task.get_train_test_split_indices(
repeat=repeat_idx, fold=fold_idx, sample=sample_idx,
)
X_train = X.loc[train_indices]
y_train = y[train_indices]
X_test = X.loc[test_indices]
y_test = y[test_indices]
X_train = X.iloc[train_indices]
y_train = y.iloc[train_indices]
X_test = X.iloc[test_indices]
y_test = y.iloc[test_indices]

print(
"Repeat #{}, fold #{}, samples {}: X_train.shape: {}, "
Expand Down
187 changes: 65 additions & 122 deletions openml/datasets/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,16 +217,14 @@ def find_invalid_characters(string, pattern):
self.qualities = None

if data_file is not None:
rval = self._create_pickle_in_cache(data_file)
self.data_pickle_file = rval[0] # type: Optional[str]
self.data_feather_file = rval[1] # type: Optional[str]
self.feather_attribute_file = rval[2] # type: Optional[str]
rval = self._compressed_cache_file_paths(data_file)
self.data_pickle_file = rval[0] if os.path.exists(rval[0]) else None
self.data_feather_file = rval[1] if os.path.exists(rval[1]) else None
self.feather_attribute_file = rval[2] if os.path.exists(rval[2]) else None
else:
self.data_pickle_file, self.data_feather_file, self.feather_attribute_file = (
None,
None,
None,
)
self.data_pickle_file = None
self.data_feather_file = None
self.feather_attribute_file = None

@property
def id(self) -> Optional[int]:
Expand Down Expand Up @@ -455,152 +453,97 @@ def _parse_data_from_arff(

return X, categorical, attribute_names

def _create_pickle_in_cache(self, data_file: str) -> Tuple[str, str, str]:
""" Parse the arff and pickle the result. Update any old pickle objects. """
def _compressed_cache_file_paths(self, data_file: str) -> Tuple[str, str, str]:
data_pickle_file = data_file.replace(".arff", ".pkl.py3")
data_feather_file = data_file.replace(".arff", ".feather")
feather_attribute_file = data_file.replace(".arff", ".feather.attributes.pkl.py3")
if os.path.exists(data_pickle_file) and self.cache_format == "pickle":
# Load the data to check if the pickle file is outdated (i.e. contains numpy array)
with open(data_pickle_file, "rb") as fh:
try:
data, categorical, attribute_names = pickle.load(fh)
except EOFError:
# The file is likely corrupt, see #780.
# We deal with this when loading the data in `_load_data`.
return data_pickle_file, data_feather_file, feather_attribute_file
except ModuleNotFoundError:
# There was some issue loading the file, see #918
# We deal with this when loading the data in `_load_data`.
return data_pickle_file, data_feather_file, feather_attribute_file
except ValueError as e:
if "unsupported pickle protocol" in e.args[0]:
# There was some issue loading the file, see #898
# We deal with this when loading the data in `_load_data`.
return data_pickle_file, data_feather_file, feather_attribute_file
else:
raise

# Between v0.8 and v0.9 the format of pickled data changed from
# np.ndarray to pd.DataFrame. This breaks some backwards compatibility,
# e.g. for `run_model_on_task`. If a local file still exists with
# np.ndarray data, we reprocess the data file to store a pickled
# pd.DataFrame blob. See also #646.
if isinstance(data, pd.DataFrame) or scipy.sparse.issparse(data):
logger.debug("Data pickle file already exists and is up to date.")
return data_pickle_file, data_feather_file, feather_attribute_file
elif os.path.exists(data_feather_file) and self.cache_format == "feather":
# Load the data to check if the pickle file is outdated (i.e. contains numpy array)
try:
data = pd.read_feather(data_feather_file)
except EOFError:
# The file is likely corrupt, see #780.
# We deal with this when loading the data in `_load_data`.
return data_pickle_file, data_feather_file, feather_attribute_file
except ModuleNotFoundError:
# There was some issue loading the file, see #918
# We deal with this when loading the data in `_load_data`.
return data_pickle_file, data_feather_file, feather_attribute_file
except ValueError as e:
if "unsupported pickle protocol" in e.args[0]:
# There was some issue loading the file, see #898
# We deal with this when loading the data in `_load_data`.
return data_pickle_file, data_feather_file, feather_attribute_file
else:
raise
return data_pickle_file, data_feather_file, feather_attribute_file

logger.debug("Data feather file already exists and is up to date.")
return data_pickle_file, data_feather_file, feather_attribute_file
def _cache_compressed_file_from_arff(
self, arff_file: str
) -> Tuple[Union[pd.DataFrame, scipy.sparse.csr_matrix], List[bool], List[str]]:
""" Store data from the arff file in compressed format. Sets cache_format to 'pickle' if data is sparse. """ # noqa: 501
(
data_pickle_file,
data_feather_file,
feather_attribute_file,
) = self._compressed_cache_file_paths(arff_file)

# At this point either the pickle file does not exist, or it had outdated formatting.
# We parse the data from arff again and populate the cache with a recent pickle file.
X, categorical, attribute_names = self._parse_data_from_arff(data_file)
data, categorical, attribute_names = self._parse_data_from_arff(arff_file)

# Feather format does not work for sparse datasets, so we use pickle for sparse datasets
if scipy.sparse.issparse(data):
self.cache_format = "pickle"

if self.cache_format == "feather" and not scipy.sparse.issparse(X):
logger.info("feather write {}".format(self.name))
X.to_feather(data_feather_file)
logger.info(f"{self.cache_format} write {self.name}")
if self.cache_format == "feather":
data.to_feather(data_feather_file)
with open(feather_attribute_file, "wb") as fh:
pickle.dump((categorical, attribute_names), fh, pickle.HIGHEST_PROTOCOL)
else:
logger.info("pickle write {}".format(self.name))
self.cache_format = "pickle"
with open(data_pickle_file, "wb") as fh:
pickle.dump((X, categorical, attribute_names), fh, pickle.HIGHEST_PROTOCOL)
logger.debug(
"Saved dataset {did}: {name} to file {path}".format(
did=int(self.dataset_id or -1), name=self.name, path=data_pickle_file
)
)
return data_pickle_file, data_feather_file, feather_attribute_file
pickle.dump((data, categorical, attribute_names), fh, pickle.HIGHEST_PROTOCOL)

data_file = data_pickle_file if self.cache_format == "pickle" else data_feather_file
logger.debug(f"Saved dataset {int(self.dataset_id or -1)}: {self.name} to file {data_file}")
return data, categorical, attribute_names

def _load_data(self):
""" Load data from pickle or arff. Download data first if not present on disk. """
if (self.cache_format == "pickle" and self.data_pickle_file is None) or (
self.cache_format == "feather" and self.data_feather_file is None
):
""" Load data from compressed format or arff. Download data if not present on disk. """
need_to_create_pickle = self.cache_format == "pickle" and self.data_pickle_file is None
need_to_create_feather = self.cache_format == "feather" and self.data_feather_file is None

if need_to_create_pickle or need_to_create_feather:
if self.data_file is None:
self._download_data()
(
self.data_pickle_file,
self.data_feather_file,
self.feather_attribute_file,
) = self._create_pickle_in_cache(self.data_file)

res = self._compressed_cache_file_paths(self.data_file)
self.data_pickle_file, self.data_feather_file, self.feather_attribute_file = res
# Since our recently stored data is exists in memory, there is no need to load from disk
return self._cache_compressed_file_from_arff(self.data_file)

# helper variable to help identify where errors occur
fpath = self.data_feather_file if self.cache_format == "feather" else self.data_pickle_file
logger.info(f"{self.cache_format} load data {self.name}")
try:
if self.cache_format == "feather":
logger.info("feather load data {}".format(self.name))
data = pd.read_feather(self.data_feather_file)

fpath = self.feather_attribute_file
with open(self.feather_attribute_file, "rb") as fh:
categorical, attribute_names = pickle.load(fh)
else:
logger.info("pickle load data {}".format(self.name))
with open(self.data_pickle_file, "rb") as fh:
data, categorical, attribute_names = pickle.load(fh)
except EOFError:
logger.warning(
"Detected a corrupt cache file loading dataset %d: '%s'. "
"We will continue loading data from the arff-file, "
"but this will be much slower for big datasets. "
"Please manually delete the cache file if you want OpenML-Python "
"to attempt to reconstruct it."
"" % (self.dataset_id, self.data_pickle_file)
)
data, categorical, attribute_names = self._parse_data_from_arff(self.data_file)
except FileNotFoundError:
raise ValueError(
"Cannot find a pickle file for dataset {} at "
"location {} ".format(self.name, self.data_pickle_file)
)
except ModuleNotFoundError as e:
raise ValueError(f"Cannot find file for dataset {self.name} at location '{fpath}'.")
except (EOFError, ModuleNotFoundError, ValueError) as e:
error_message = e.message if hasattr(e, "message") else e.args[0]
hint = ""

if isinstance(e, EOFError):
readable_error = "Detected a corrupt cache file"
elif isinstance(e, ModuleNotFoundError):
readable_error = "Detected likely dependency issues"
hint = "This is most likely due to https://github.com/openml/openml-python/issues/918. " # noqa: 501
elif isinstance(e, ValueError) and "unsupported pickle protocol" in e.args[0]:
readable_error = "Encountered unsupported pickle protocol"
else:
raise # an unknown ValueError is raised, should crash and file bug report

logger.warning(
"Encountered error message when loading cached dataset %d: '%s'. "
"Error message was: %s. "
"This is most likely due to https://github.com/openml/openml-python/issues/918. "
f"{readable_error} when loading dataset {self.id} from '{fpath}'. "
f"{hint}"
f"Error message was: {error_message}. "
"We will continue loading data from the arff-file, "
"but this will be much slower for big datasets. "
"Please manually delete the cache file if you want OpenML-Python "
"to attempt to reconstruct it."
"" % (self.dataset_id, self.data_pickle_file, e.args[0]),
)
data, categorical, attribute_names = self._parse_data_from_arff(self.data_file)
except ValueError as e:
if "unsupported pickle protocol" in e.args[0]:
logger.warning(
"Encountered unsupported pickle protocol when loading cached dataset %d: '%s'. "
"Error message was: %s. "
"We will continue loading data from the arff-file, "
"but this will be much slower for big datasets. "
"Please manually delete the cache file if you want OpenML-Python "
"to attempt to reconstruct it."
"" % (self.dataset_id, self.data_pickle_file, e.args[0]),
)
data, categorical, attribute_names = self._parse_data_from_arff(self.data_file)
else:
raise

data_up_to_date = isinstance(data, pd.DataFrame) or scipy.sparse.issparse(data)
if self.cache_format == "pickle" and not data_up_to_date:
logger.info("Updating outdated pickle file.")
return self._cache_compressed_file_from_arff(self.data_file)
return data, categorical, attribute_names

@staticmethod
Expand Down
4 changes: 2 additions & 2 deletions openml/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,9 +305,9 @@ def _create_cache_directory_for_id(key, id_):
Path of the created dataset cache directory.
"""
cache_dir = os.path.join(_create_cache_directory(key), str(id_))
if os.path.exists(cache_dir) and os.path.isdir(cache_dir):
if os.path.isdir(cache_dir):
pass
elif os.path.exists(cache_dir) and not os.path.isdir(cache_dir):
elif os.path.exists(cache_dir):
raise ValueError("%s cache dir exists but is not a directory!" % key)
else:
os.makedirs(cache_dir)
Expand Down
3 changes: 3 additions & 0 deletions tests/test_datasets/test_dataset_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -1258,6 +1258,8 @@ def test_list_qualities(self):

def test_get_dataset_cache_format_pickle(self):
dataset = openml.datasets.get_dataset(1)
dataset.get_data()

self.assertEqual(type(dataset), OpenMLDataset)
self.assertEqual(dataset.name, "anneal")
self.assertGreater(len(dataset.features), 1)
Expand All @@ -1272,6 +1274,7 @@ def test_get_dataset_cache_format_pickle(self):
def test_get_dataset_cache_format_feather(self):

dataset = openml.datasets.get_dataset(128, cache_format="feather")
dataset.get_data()

# Check if dataset is written to cache directory using feather
cache_dir = openml.config.get_cache_directory()
Expand Down

0 comments on commit 0e44a0b

Please sign in to comment.