Skip to content

Commit

Permalink
Merge pull request #161 from Deltares/feature/107-as-a-developer-i-wa…
Browse files Browse the repository at this point in the history
…nt-the-network-to-be-a-data-class

feat: Added dataclass to represent the Network configuration input da…
  • Loading branch information
Carsopre authored Jul 27, 2023
2 parents c5335ad + 1efc524 commit 6a1d2b1
Show file tree
Hide file tree
Showing 139 changed files with 2,127 additions and 1,864 deletions.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,24 @@


from __future__ import annotations
from ra2ce.common.configuration.config_data_protocol import ConfigDataProtocol

from ra2ce.configuration.ini_config_protocol import IniConfigDataProtocol
from ra2ce.configuration.network.network_ini_config_validator import (
NetworkIniConfigurationValidator,
)

class AnalysisConfigData(ConfigDataProtocol):
@classmethod
def from_dict(cls, dict_values: dict) -> AnalysisConfigData:
_new_analysis_ini_config_data = cls()
_new_analysis_ini_config_data.update(**dict_values)
return _new_analysis_ini_config_data


class AnalysisConfigDataWithNetwork(AnalysisConfigData):
@classmethod
def from_dict(cls, dict_values: dict) -> AnalysisConfigDataWithNetwork:
return super().from_dict(dict_values)


class NetworkIniConfigData(IniConfigDataProtocol):
class AnalysisConfigDataWithoutNetwork(AnalysisConfigData):
@classmethod
def from_dict(cls, dict_values) -> NetworkIniConfigData:
_new_network_ini_config_data = cls()
_new_network_ini_config_data.update(**dict_values)
return _new_network_ini_config_data

def is_valid(self) -> bool:
_report = NetworkIniConfigurationValidator(self).validate()
return _report.is_valid()
def from_dict(cls, dict_values: dict) -> AnalysisConfigDataWithoutNetwork:
return super().from_dict(dict_values)
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from ra2ce.analyses.analysis_config_data.analysis_config_data import (
AnalysisConfigDataWithNetwork,
)
from ra2ce.analyses.analysis_config_data.analysis_config_data_validator_without_network import (
AnalysisConfigDataValidatorWithoutNetwork,
)
from ra2ce.common.validation.ra2ce_validator_protocol import Ra2ceIoValidator
from ra2ce.common.validation.validation_report import ValidationReport


class AnalysisConfigDataValidatorWithNetwork(Ra2ceIoValidator):
def __init__(self, config_data: AnalysisConfigDataWithNetwork) -> None:
self._config = config_data

def validate(self) -> ValidationReport:
_base_report = AnalysisConfigDataValidatorWithoutNetwork(
self._config
).validate()
_output_network_dir = self._config.get("output", None)
if (
not _output_network_dir
or not (_output_network_dir / "network.ini").is_file()
):
_base_report.error(
f"The configuration file 'network.ini' is not found at {_output_network_dir}."
f"Please make sure to name your network settings file 'network.ini'."
)
return _base_report
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,18 @@


from pathlib import Path
from typing import Any, Dict, List, Optional
from ra2ce.analyses.analysis_config_data.analysis_config_data import (
AnalysisConfigDataWithoutNetwork,
)

from ra2ce.configuration.ini_config_protocol import IniConfigDataProtocol
from ra2ce.validation.ra2ce_validator_protocol import Ra2ceIoValidator
from ra2ce.validation.validation_report import ValidationReport
from ra2ce.common.validation.ra2ce_validator_protocol import Ra2ceIoValidator
from ra2ce.common.validation.validation_report import ValidationReport
from ra2ce.graph.network_config_data.network_config_data_validator import (
NetworkDictValues,
)

IndirectAnalysisNameList: List[str] = [

IndirectAnalysisNameList: list[str] = [
"single_link_redundancy",
"multi_link_redundancy",
"optimal_route_origin_destination",
Expand All @@ -39,50 +44,31 @@
"multi_link_losses",
"multi_link_isolated_locations",
]
DirectAnalysisNameList: List[str] = ["direct", "effectiveness_measures"]
_expected_values: Dict[str, List[Any]] = {
"source": ["OSM PBF", "OSM download", "shapefile", "pickle"],
"polygon": ["file", None],
"directed": [True, False, None],
"network_type": ["walk", "bike", "drive", "drive_service", "all", None],
"road_types": [
"motorway",
"motorway_link",
"trunk",
"trunk_link",
"primary",
"primary_link",
"secondary",
"secondary_link",
"tertiary",
"tertiary_link",
"unclassified",
"residential",
"road",
None,
],
"origins": ["file", None],
"destinations": ["file", None],
"save_shp": [True, False, None],
"save_csv": [True, False, None],
"analysis": IndirectAnalysisNameList + DirectAnalysisNameList,
"hazard_map": ["file", None],
"aggregate_wl": ["max", "min", "mean", None],
"weighing": ["distance", "time", None],
"save_traffic": [True, False, None],
"locations": ["file", None],
DirectAnalysisNameList: list[str] = ["direct", "effectiveness_measures"]
AnalysisNetworkDictValues = NetworkDictValues | {
"analysis": IndirectAnalysisNameList + DirectAnalysisNameList
}


class IniConfigValidatorBase(Ra2ceIoValidator):
def __init__(self, config_data: IniConfigDataProtocol) -> None:
class AnalysisConfigDataValidatorWithoutNetwork(Ra2ceIoValidator):
def __init__(self, config_data: AnalysisConfigDataWithoutNetwork) -> None:
self._config = config_data

def validate(self) -> ValidationReport:
raise NotImplementedError()
def _validate_road_types(self, road_type_value: str) -> ValidationReport:
_road_types_report = ValidationReport()
if not road_type_value:
return _road_types_report
_expected_road_types = AnalysisNetworkDictValues["road_types"]
_road_type_value_list = road_type_value.replace(" ", "").split(",")
for road_type in _road_type_value_list:
if road_type not in _expected_road_types:
_road_types_report.error(
f"Wrong road type is configured ({road_type}), has to be one or multiple of: {_expected_road_types}"
)
return _road_types_report

def _validate_files(
self, header: str, path_value_list: Optional[List[Path]]
self, header: str, path_value_list: list[Path]
) -> ValidationReport:
# Value should be none or a list of paths, because it already
# checked that it's not none, we can assume it's a list of Paths.
Expand All @@ -99,22 +85,9 @@ def _validate_files(
)
return _files_report

def _validate_road_types(self, road_type_value: str) -> ValidationReport:
_road_types_report = ValidationReport()
if not road_type_value:
return _road_types_report
_expected_road_types = _expected_values["road_types"]
_road_type_value_list = road_type_value.replace(" ", "").split(",")
for road_type in _road_type_value_list:
if road_type not in _expected_road_types:
_road_types_report.error(
f"Wrong road type is configured ({road_type}), has to be one or multiple of: {_expected_road_types}"
)
return _road_types_report

def _validate_headers(self, required_headers: List[str]) -> ValidationReport:
def _validate_headers(self, required_headers: list[str]) -> ValidationReport:
_report = ValidationReport()
_available_keys: List[str] = self._config.keys()
_available_keys = self._config.keys()

def _check_header(header: str) -> None:
if header not in _available_keys:
Expand All @@ -133,9 +106,9 @@ def _check_header(header: str) -> None:
for header in required_headers:
# Now check the parameters per configured item.
for key, value in self._config[header].items():
if key not in _expected_values.keys():
if key not in AnalysisNetworkDictValues.keys():
continue
_expected_values_list: List[Any] = _expected_values[key]
_expected_values_list = AnalysisNetworkDictValues[key]
if "file" in _expected_values_list:
# Value should be none or a list of paths, because it already
# checked that it's not none, we can assume it's a list of Paths.
Expand Down Expand Up @@ -163,3 +136,12 @@ def _check_header(header: str) -> None:
)

return _report

def validate(self) -> ValidationReport:
_report = ValidationReport()
_required_headers = ["project"]
# Analysis are marked as [analysis1], [analysis2], ...
_required_headers.extend([a for a in self._config.keys() if "analysis" in a])

_report.merge(self._validate_headers(_required_headers))
return _report
1 change: 1 addition & 0 deletions ra2ce/analyses/analysis_config_data/readers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

Original file line number Diff line number Diff line change
Expand Up @@ -23,34 +23,48 @@
import logging
from pathlib import Path
from shutil import copyfile
from typing import List, Optional, Protocol, runtime_checkable

from ra2ce.configuration.config_protocol import ConfigProtocol
from ra2ce.io.readers.file_reader_protocol import FileReaderProtocol


@runtime_checkable
class IniConfigurationReaderProtocol(FileReaderProtocol, Protocol): # pragma: no cover
def read(self, ini_file: Path) -> Optional[ConfigProtocol]:
"""
Reads the given `*.ini` file and if possible converts it into a `ConfigProtocol` object.
Args:
ini_file (Path): Ini file to be mapped into a `ConfigProtocol`.
Returns:
Optional[ConfigProtocol]: Resulting mapped object from the configuration data in the given file.
"""
pass


class IniConfigurationReaderBase(IniConfigurationReaderProtocol):
"""
Generic BASE Ini Configuration Reader.
It is meant to behave as an abstract class, the concrete classes should
implement the read method from the FileReaderProtocol.
Therefore it only contains common functionality among the IniConfigurationReaderBase inheriting implementations.
"""
from ra2ce.analyses.analysis_config_data.analysis_config_data_validator_without_network import (
DirectAnalysisNameList,
IndirectAnalysisNameList,
)
from ra2ce.common.configuration.ini_configuration_reader_protocol import (
ConfigDataReaderProtocol,
)
from ra2ce.common.io.readers.ini_file_reader import IniFileReader


class AnalysisConfigReaderBase(ConfigDataReaderProtocol):
def _convert_analysis_types(self, config: dict) -> dict:
def set_analysis_values(config_type: str):
if config_type in config:
(config[config_type]).append(config[a])
else:
config[config_type] = [config[a]]

analyses_names = [a for a in config.keys() if "analysis" in a]
for a in analyses_names:
if any(t in config[a]["analysis"] for t in DirectAnalysisNameList):
set_analysis_values("direct")
elif any(t in config[a]["analysis"] for t in IndirectAnalysisNameList):
set_analysis_values("indirect")
del config[a]

return config

def _import_configuration(self, root_path: Path, config_path: Path) -> dict:
# Read the configurations in network.ini and add the root path to the configuration dictionary.
if not config_path.is_file():
config_path = root_path / config_path
_config = IniFileReader().read(config_path)
_config["project"]["name"] = config_path.parent.name
_config["root_path"] = root_path

# Set the paths in the configuration Dict for ease of saving to those folders.
# The output path is set at a different step `IniConfigurationReaderBase::_copy_output_files`
_config["input"] = config_path.parent / "input"
_config["static"] = config_path.parent / "static"
return _config

def _copy_output_files(self, from_path: Path, config_data: dict) -> None:
self._create_config_dir("output", config_data)
Expand All @@ -67,7 +81,7 @@ def _create_config_dir(self, dir_name: str, config_data: dict):

def _parse_path_list(
self, property_name: str, path_list: str, config_data: dict
) -> List[Path]:
) -> list[Path]:
_list_paths = []
for path_value in path_list.split(","):
path_value = Path(path_value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,29 +23,32 @@
from pathlib import Path
from typing import Optional

from ra2ce.configuration import AnalysisConfigBase, NetworkConfig
from ra2ce.configuration.analysis.readers.analysis_config_reader_base import (
from ra2ce.analyses.analysis_config_wrapper.analysis_config_wrapper_base import (
AnalysisConfigWrapperBase,
)
from ra2ce.analyses.analysis_config_data.readers.analysis_config_reader_base import (
AnalysisConfigReaderBase,
)
from ra2ce.configuration.analysis.readers.analysis_with_network_config_reader import (
AnalysisWithNetworkConfigReader,
from ra2ce.analyses.analysis_config_data.readers.analysis_config_reader_with_network import (
AnalysisConfigReaderWithNetwork,
)
from ra2ce.configuration.analysis.readers.analysis_without_network_config_reader import (
AnalysisWithoutNetworkConfigReader,
from ra2ce.analyses.analysis_config_data.readers.analysis_config_reader_without_network import (
AnalysisConfigReaderWithoutNetwork,
)
from ra2ce.graph.network_config_data.network_config_data import NetworkConfigData


class AnalysisConfigReaderFactory:
@staticmethod
def get_reader(
network_config: Optional[NetworkConfig],
network_config: Optional[NetworkConfigData],
) -> AnalysisConfigReaderBase:
if network_config:
return AnalysisWithNetworkConfigReader(network_config)
return AnalysisWithoutNetworkConfigReader()
return AnalysisConfigReaderWithNetwork(network_config)
return AnalysisConfigReaderWithoutNetwork()

def read(
self, ini_file: Path, network_config: Optional[NetworkConfig]
) -> AnalysisConfigBase:
self, ini_file: Path, network_config: Optional[NetworkConfigData]
) -> AnalysisConfigWrapperBase:
_reader = self.get_reader(network_config)
return _reader.read(ini_file)
Original file line number Diff line number Diff line change
Expand Up @@ -21,33 +21,32 @@


from pathlib import Path
from typing import Optional

from ra2ce.configuration.analysis.analysis_config_base import AnalysisConfigBase
from ra2ce.configuration.analysis.analysis_ini_config_data import (
AnalysisWithNetworkIniConfigData,
from ra2ce.analyses.analysis_config_wrapper.analysis_config_wrapper_base import (
AnalysisConfigWrapperBase,
)
from ra2ce.configuration.analysis.readers.analysis_config_reader_base import (
from ra2ce.analyses.analysis_config_data.analysis_config_data import (
AnalysisConfigDataWithNetwork,
)
from ra2ce.analyses.analysis_config_data.readers.analysis_config_reader_base import (
AnalysisConfigReaderBase,
)
from ra2ce.configuration.network import NetworkConfig
from ra2ce.graph.network_config_wrapper import NetworkConfigWrapper


class AnalysisWithNetworkConfigReader(AnalysisConfigReaderBase):
def __init__(self, network_data: NetworkConfig) -> None:
class AnalysisConfigReaderWithNetwork(AnalysisConfigReaderBase):
def __init__(self, network_data: NetworkConfigWrapper) -> None:
self._network_data = network_data
if not network_data:
raise ValueError(
"Network data mandatory for an AnalysisIniConfigurationReader reader."
)

def read(
self, ini_file: Optional[Path]
) -> Optional[AnalysisWithNetworkIniConfigData]:
if not ini_file or not ini_file.exists():
return None
_root_path = AnalysisConfigBase.get_network_root_dir(ini_file)
def read(self, ini_file: Path) -> AnalysisConfigDataWithNetwork:
if not isinstance(ini_file, Path) or not ini_file.is_file():
raise ValueError("No analysis ini file 'Path' provided.")
_root_path = AnalysisConfigWrapperBase.get_network_root_dir(ini_file)
_config_data = self._import_configuration(_root_path, ini_file)
_config_data = self._convert_analysis_types(_config_data)
self._copy_output_files(ini_file, _config_data)
return AnalysisWithNetworkIniConfigData.from_dict(_config_data)
return AnalysisConfigDataWithNetwork.from_dict(_config_data)
Loading

0 comments on commit 6a1d2b1

Please sign in to comment.