diff --git a/kedro/config/omegaconf_config.py b/kedro/config/omegaconf_config.py index ac4e2fc56d..9ee9f12ec2 100644 --- a/kedro/config/omegaconf_config.py +++ b/kedro/config/omegaconf_config.py @@ -236,48 +236,23 @@ def load_and_merge_dir_config( # pylint: disable=too-many-arguments """ # pylint: disable=too-many-locals - if not self._fs.isdir(Path(conf_path).as_posix()): - raise MissingConfigException( - f"Given configuration path either does not exist " - f"or is not a valid directory: {conf_path}" - ) - - paths = [ - Path(each) - for pattern in patterns - for each in self._fs.glob(Path(f"{str(conf_path)}/{pattern}").as_posix()) - ] - deduplicated_paths = set(paths) - config_files_filtered = [ - path for path in deduplicated_paths if self._is_valid_config_path(path) - ] + config_files_filtered = self._build_conf_paths(conf_path, patterns) config_per_file = {} for config_filepath in config_files_filtered: - try: - with self._fs.open(str(config_filepath.as_posix())) as open_config: - # As fsspec doesn't allow the file to be read as StringIO, - # this is a workaround to read it as a binary file and decode it back to utf8. - tmp_fo = io.StringIO(open_config.read().decode("utf8")) - config = OmegaConf.load(tmp_fo) - processed_files.add(config_filepath) - if read_environment_variables: - self._resolve_environment_variables(config) - config_per_file[config_filepath] = config - except (ParserError, ScannerError) as exc: - line = exc.problem_mark.line # type: ignore - cursor = exc.problem_mark.column # type: ignore - raise ParserError( - f"Invalid YAML or JSON file {Path(conf_path, config_filepath.name).as_posix()}," - f" unable to read line {line}, position {cursor}." - ) from exc + config_per_file[config_filepath] = self._load_omega_config( + conf_path, processed_files, read_environment_variables, config_filepath + ) seen_file_to_keys = { file: set(config.keys()) for file, config in config_per_file.items() } - aggregate_config = config_per_file.values() self._check_duplicates(seen_file_to_keys) + aggregate_config = config_per_file.values() + return self._merge_and_resolve_config(key, aggregate_config) + + def _merge_and_resolve_config(self, key, aggregate_config): if not aggregate_config: return {} @@ -288,6 +263,50 @@ def load_and_merge_dir_config( # pylint: disable=too-many-arguments ) return OmegaConf.to_container(OmegaConf.merge(*aggregate_config), resolve=True) + def _load_omega_config( + self, + conf_path, + processed_files, + read_environment_variables, + config_filepath, + ): + try: + with self._fs.open(str(config_filepath.as_posix())) as open_config: + # As fsspec doesn't allow the file to be read as StringIO, + # this is a workaround to read it as a binary file and decode it back to utf8. + tmp_fo = io.StringIO(open_config.read().decode("utf8")) + config = OmegaConf.load(tmp_fo) + processed_files.add(config_filepath) + if read_environment_variables: + self._resolve_environment_variables(config) + except (ParserError, ScannerError) as exc: + line = exc.problem_mark.line # type: ignore + cursor = exc.problem_mark.column # type: ignore + raise ParserError( + f"Invalid YAML or JSON file {Path(conf_path, config_filepath.name).as_posix()}," + f" unable to read line {line}, position {cursor}." + ) from exc + return config + + def _build_conf_paths(self, conf_path, patterns): + if not self._fs.isdir(Path(conf_path).as_posix()): + raise MissingConfigException( + f"Given configuration path either does not exist " + f"or is not a valid directory: {conf_path}" + ) + + paths = [ + Path(each) + for pattern in patterns + for each in self._fs.glob(Path(f"{str(conf_path)}/{pattern}").as_posix()) + ] + deduplicated_paths = set(paths) + config_files_filtered = [ + path for path in deduplicated_paths if self._is_valid_config_path(path) + ] + + return config_files_filtered + def _is_valid_config_path(self, path): """Check if given path is a file path and file type is yaml or json.""" posix_path = path.as_posix()