From 0bd91aeb807b618132912233f12f43517a03c1e3 Mon Sep 17 00:00:00 2001 From: Nok Chan Date: Thu, 18 May 2023 16:56:13 +0100 Subject: [PATCH 1/5] Refactor the _build_conf_paths method Signed-off-by: Nok Chan --- kedro/config/omegaconf_config.py | 37 ++++++++++++++++++-------------- 1 file changed, 21 insertions(+), 16 deletions(-) diff --git a/kedro/config/omegaconf_config.py b/kedro/config/omegaconf_config.py index ac4e2fc56d..aa155b88fe 100644 --- a/kedro/config/omegaconf_config.py +++ b/kedro/config/omegaconf_config.py @@ -236,21 +236,7 @@ def load_and_merge_dir_config( # pylint: disable=too-many-arguments """ # pylint: disable=too-many-locals - if not self._fs.isdir(Path(conf_path).as_posix()): - raise MissingConfigException( - f"Given configuration path either does not exist " - f"or is not a valid directory: {conf_path}" - ) - - paths = [ - Path(each) - for pattern in patterns - for each in self._fs.glob(Path(f"{str(conf_path)}/{pattern}").as_posix()) - ] - deduplicated_paths = set(paths) - config_files_filtered = [ - path for path in deduplicated_paths if self._is_valid_config_path(path) - ] + config_files_filtered = self._build_conf_paths(conf_path, patterns) config_per_file = {} for config_filepath in config_files_filtered: @@ -275,9 +261,9 @@ def load_and_merge_dir_config( # pylint: disable=too-many-arguments seen_file_to_keys = { file: set(config.keys()) for file, config in config_per_file.items() } - aggregate_config = config_per_file.values() self._check_duplicates(seen_file_to_keys) + aggregate_config = config_per_file.values() if not aggregate_config: return {} @@ -288,6 +274,25 @@ def load_and_merge_dir_config( # pylint: disable=too-many-arguments ) return OmegaConf.to_container(OmegaConf.merge(*aggregate_config), resolve=True) + def _build_conf_paths(self, conf_path, patterns): + if not self._fs.isdir(Path(conf_path).as_posix()): + raise MissingConfigException( + f"Given configuration path either does not exist " + f"or is not a valid directory: {conf_path}" + ) + + paths = [ + Path(each) + for pattern in patterns + for each in self._fs.glob(Path(f"{str(conf_path)}/{pattern}").as_posix()) + ] + deduplicated_paths = set(paths) + config_files_filtered = [ + path for path in deduplicated_paths if self._is_valid_config_path(path) + ] + + return config_files_filtered + def _is_valid_config_path(self, path): """Check if given path is a file path and file type is yaml or json.""" posix_path = path.as_posix() From 8ea71bb163922e06b8c7b28f1205fc0fbefad687 Mon Sep 17 00:00:00 2001 From: Nok Chan Date: Thu, 18 May 2023 16:56:59 +0100 Subject: [PATCH 2/5] refactor the load_omega_config method Signed-off-by: Nok Chan --- kedro/config/omegaconf_config.py | 37 +++++++++++++++++--------------- 1 file changed, 20 insertions(+), 17 deletions(-) diff --git a/kedro/config/omegaconf_config.py b/kedro/config/omegaconf_config.py index aa155b88fe..e41f2ace41 100644 --- a/kedro/config/omegaconf_config.py +++ b/kedro/config/omegaconf_config.py @@ -240,23 +240,7 @@ def load_and_merge_dir_config( # pylint: disable=too-many-arguments config_per_file = {} for config_filepath in config_files_filtered: - try: - with self._fs.open(str(config_filepath.as_posix())) as open_config: - # As fsspec doesn't allow the file to be read as StringIO, - # this is a workaround to read it as a binary file and decode it back to utf8. - tmp_fo = io.StringIO(open_config.read().decode("utf8")) - config = OmegaConf.load(tmp_fo) - processed_files.add(config_filepath) - if read_environment_variables: - self._resolve_environment_variables(config) - config_per_file[config_filepath] = config - except (ParserError, ScannerError) as exc: - line = exc.problem_mark.line # type: ignore - cursor = exc.problem_mark.column # type: ignore - raise ParserError( - f"Invalid YAML or JSON file {Path(conf_path, config_filepath.name).as_posix()}," - f" unable to read line {line}, position {cursor}." - ) from exc + self._load_omega_config(conf_path, processed_files, read_environment_variables, config_per_file, config_filepath) seen_file_to_keys = { file: set(config.keys()) for file, config in config_per_file.items() @@ -274,6 +258,25 @@ def load_and_merge_dir_config( # pylint: disable=too-many-arguments ) return OmegaConf.to_container(OmegaConf.merge(*aggregate_config), resolve=True) + def _load_omega_config(self, conf_path, processed_files, read_environment_variables, config_per_file, config_filepath): + try: + with self._fs.open(str(config_filepath.as_posix())) as open_config: + # As fsspec doesn't allow the file to be read as StringIO, + # this is a workaround to read it as a binary file and decode it back to utf8. + tmp_fo = io.StringIO(open_config.read().decode("utf8")) + config = OmegaConf.load(tmp_fo) + processed_files.add(config_filepath) + if read_environment_variables: + self._resolve_environment_variables(config) + config_per_file[config_filepath] = config + except (ParserError, ScannerError) as exc: + line = exc.problem_mark.line # type: ignore + cursor = exc.problem_mark.column # type: ignore + raise ParserError( + f"Invalid YAML or JSON file {Path(conf_path, config_filepath.name).as_posix()}," + f" unable to read line {line}, position {cursor}." + ) from exc + def _build_conf_paths(self, conf_path, patterns): if not self._fs.isdir(Path(conf_path).as_posix()): raise MissingConfigException( From 819a224e4476d97b1c538ba980a408eacde136be Mon Sep 17 00:00:00 2001 From: Nok Chan Date: Thu, 18 May 2023 17:02:23 +0100 Subject: [PATCH 3/5] Refactor the merge&resolve method Signed-off-by: Nok Chan --- kedro/config/omegaconf_config.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/kedro/config/omegaconf_config.py b/kedro/config/omegaconf_config.py index e41f2ace41..daa2998a60 100644 --- a/kedro/config/omegaconf_config.py +++ b/kedro/config/omegaconf_config.py @@ -248,12 +248,15 @@ def load_and_merge_dir_config( # pylint: disable=too-many-arguments self._check_duplicates(seen_file_to_keys) aggregate_config = config_per_file.values() + return self._merge_and_resolve_config(key, aggregate_config) + + def _merge_and_resolve_config(self, key, aggregate_config): if not aggregate_config: return {} if key == "parameters": # Merge with runtime parameters only for "parameters" - return OmegaConf.to_container( + return OmegaConf.to_container( OmegaConf.merge(*aggregate_config, self.runtime_params), resolve=True ) return OmegaConf.to_container(OmegaConf.merge(*aggregate_config), resolve=True) From b5191d6435099ec4307a6a439fd21d773f06d589 Mon Sep 17 00:00:00 2001 From: Nok Chan Date: Thu, 18 May 2023 17:03:01 +0100 Subject: [PATCH 4/5] Lint Signed-off-by: Nok Chan --- kedro/config/omegaconf_config.py | 29 +++++++++++++++++++++-------- 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/kedro/config/omegaconf_config.py b/kedro/config/omegaconf_config.py index daa2998a60..03f69e32f8 100644 --- a/kedro/config/omegaconf_config.py +++ b/kedro/config/omegaconf_config.py @@ -240,7 +240,13 @@ def load_and_merge_dir_config( # pylint: disable=too-many-arguments config_per_file = {} for config_filepath in config_files_filtered: - self._load_omega_config(conf_path, processed_files, read_environment_variables, config_per_file, config_filepath) + self._load_omega_config( + conf_path, + processed_files, + read_environment_variables, + config_per_file, + config_filepath, + ) seen_file_to_keys = { file: set(config.keys()) for file, config in config_per_file.items() @@ -256,16 +262,23 @@ def _merge_and_resolve_config(self, key, aggregate_config): if key == "parameters": # Merge with runtime parameters only for "parameters" - return OmegaConf.to_container( + return OmegaConf.to_container( OmegaConf.merge(*aggregate_config, self.runtime_params), resolve=True ) return OmegaConf.to_container(OmegaConf.merge(*aggregate_config), resolve=True) - def _load_omega_config(self, conf_path, processed_files, read_environment_variables, config_per_file, config_filepath): + def _load_omega_config( + self, + conf_path, + processed_files, + read_environment_variables, + config_per_file, + config_filepath, + ): try: with self._fs.open(str(config_filepath.as_posix())) as open_config: - # As fsspec doesn't allow the file to be read as StringIO, - # this is a workaround to read it as a binary file and decode it back to utf8. + # As fsspec doesn't allow the file to be read as StringIO, + # this is a workaround to read it as a binary file and decode it back to utf8. tmp_fo = io.StringIO(open_config.read().decode("utf8")) config = OmegaConf.load(tmp_fo) processed_files.add(config_filepath) @@ -276,9 +289,9 @@ def _load_omega_config(self, conf_path, processed_files, read_environment_variab line = exc.problem_mark.line # type: ignore cursor = exc.problem_mark.column # type: ignore raise ParserError( - f"Invalid YAML or JSON file {Path(conf_path, config_filepath.name).as_posix()}," - f" unable to read line {line}, position {cursor}." - ) from exc + f"Invalid YAML or JSON file {Path(conf_path, config_filepath.name).as_posix()}," + f" unable to read line {line}, position {cursor}." + ) from exc def _build_conf_paths(self, conf_path, patterns): if not self._fs.isdir(Path(conf_path).as_posix()): From 0f0962f8cb4e1023b5b5be8f1d08f12b9bef0268 Mon Sep 17 00:00:00 2001 From: Nok Chan Date: Thu, 18 May 2023 17:16:13 +0100 Subject: [PATCH 5/5] more refactoring for the load_omega_config method Signed-off-by: Nok Chan --- kedro/config/omegaconf_config.py | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/kedro/config/omegaconf_config.py b/kedro/config/omegaconf_config.py index 03f69e32f8..9ee9f12ec2 100644 --- a/kedro/config/omegaconf_config.py +++ b/kedro/config/omegaconf_config.py @@ -240,12 +240,8 @@ def load_and_merge_dir_config( # pylint: disable=too-many-arguments config_per_file = {} for config_filepath in config_files_filtered: - self._load_omega_config( - conf_path, - processed_files, - read_environment_variables, - config_per_file, - config_filepath, + config_per_file[config_filepath] = self._load_omega_config( + conf_path, processed_files, read_environment_variables, config_filepath ) seen_file_to_keys = { @@ -272,7 +268,6 @@ def _load_omega_config( conf_path, processed_files, read_environment_variables, - config_per_file, config_filepath, ): try: @@ -284,7 +279,6 @@ def _load_omega_config( processed_files.add(config_filepath) if read_environment_variables: self._resolve_environment_variables(config) - config_per_file[config_filepath] = config except (ParserError, ScannerError) as exc: line = exc.problem_mark.line # type: ignore cursor = exc.problem_mark.column # type: ignore @@ -292,6 +286,7 @@ def _load_omega_config( f"Invalid YAML or JSON file {Path(conf_path, config_filepath.name).as_posix()}," f" unable to read line {line}, position {cursor}." ) from exc + return config def _build_conf_paths(self, conf_path, patterns): if not self._fs.isdir(Path(conf_path).as_posix()):