From 1cde0f3ab60ac6d5513aa87420b13a2a93154fec Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Wed, 22 May 2024 16:16:30 -0400 Subject: [PATCH 01/14] Have ngen request requirements default to gpkg. Having ngen request default to geopackage format when building the domain for hydrofabric data requirements. --- .../communication/maas_request/ngen/abstract_nextgen_request.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/lib/communication/dmod/communication/maas_request/ngen/abstract_nextgen_request.py b/python/lib/communication/dmod/communication/maas_request/ngen/abstract_nextgen_request.py index 88eca2e8f..d3b4221c7 100644 --- a/python/lib/communication/dmod/communication/maas_request/ngen/abstract_nextgen_request.py +++ b/python/lib/communication/dmod/communication/maas_request/ngen/abstract_nextgen_request.py @@ -247,7 +247,7 @@ def hydrofabric_data_requirement(self) -> DataRequirement: ), ] hydro_domain = DataDomain( - data_format=DataFormat.NGEN_GEOJSON_HYDROFABRIC, + data_format=DataFormat.NGEN_GEOPACKAGE_HYDROFABRIC_V2, discrete_restrictions=hydro_restrictions, ) self._hydrofabric_data_requirement = DataRequirement( From d547681a4435fa076543f929051ee63c1782ebd6 Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Wed, 22 May 2024 16:17:35 -0400 Subject: [PATCH 02/14] Consider create partition cfg as deriving dataset. --- .../services/dataservice/dmod/dataservice/data_derive_util.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/services/dataservice/dmod/dataservice/data_derive_util.py b/python/services/dataservice/dmod/dataservice/data_derive_util.py index cddef72d5..2d2ecf1c6 100644 --- a/python/services/dataservice/dmod/dataservice/data_derive_util.py +++ b/python/services/dataservice/dmod/dataservice/data_derive_util.py @@ -403,6 +403,8 @@ def can_dataset_be_derived(self, requirement: DataRequirement, job: Optional[Job # Account for partial configs included in request that enable building realization config on the fly if job is not None and self.can_derive_realization_from_formulations(requirement=requirement, job=job): return True + elif job is not None and requirement.domain.data_format == DataFormat.NGEN_PARTITION_CONFIG: + return True else: return job is not None and self._can_derive_bmi_configs(requirement=requirement, job=job) From 6726c62cfda4ca90a4e3e5ba1f2d06d71dedd282 Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Wed, 22 May 2024 16:18:22 -0400 Subject: [PATCH 03/14] Add capabilities to query for dataset in msg. --- .../dmod/communication/dataset_management_message.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/python/lib/communication/dmod/communication/dataset_management_message.py b/python/lib/communication/dmod/communication/dataset_management_message.py index bdbcc052b..d50931e55 100644 --- a/python/lib/communication/dmod/communication/dataset_management_message.py +++ b/python/lib/communication/dmod/communication/dataset_management_message.py @@ -1,5 +1,6 @@ from .message import AbstractInitRequest, MessageEventType, Response from dmod.core.serializable import Serializable +from dmod.core.dataset import Dataset from .maas_request import ExternalRequest, ExternalRequestResponse from dmod.core.meta_data import DataCategory, DataDomain, DataFormat, DataRequirement from dmod.core.enum import PydanticEnum @@ -16,6 +17,7 @@ class QueryType(PydanticEnum): GET_VALUES = 6 GET_MIN_VALUE = 7 GET_MAX_VALUE = 8 + GET_STATE = 9 @classmethod def get_for_name(cls, name_str: str) -> 'QueryType': @@ -274,6 +276,7 @@ class DatasetManagementResponseBody(Serializable): item_name: Optional[str] # TODO: in the future, tighten the type restrictions of this field query_results: Optional[Dict[str, Any]] + dataset_state: Optional[Dataset] is_awaiting: bool = False From e8af757f0e44cca819770d745dba401d7dc70207 Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Wed, 22 May 2024 16:19:35 -0400 Subject: [PATCH 04/14] CLI support to query for dataset state. --- python/lib/client/dmod/client/__main__.py | 6 ++++++ python/lib/client/dmod/client/dmod_client.py | 2 ++ 2 files changed, 8 insertions(+) diff --git a/python/lib/client/dmod/client/__main__.py b/python/lib/client/dmod/client/__main__.py index e61232ab7..991263f32 100644 --- a/python/lib/client/dmod/client/__main__.py +++ b/python/lib/client/dmod/client/__main__.py @@ -260,6 +260,10 @@ def _handle_data_service_action_args(parent_subparsers_container): parser_download.add_argument('dataset_name', help='Specify the name of the desired dataset.') parser_download.add_argument('dest_dir', type=Path, help='Specify local destination directory to save to.') + # Nested parser for the 'state' action + parser_list = action_subparsers.add_parser('state', description="Get dataset state.") + parser_list.add_argument('dataset_name', help='Specify the dataset name') + # Nested parser for the 'list_datasets' action parser_list = action_subparsers.add_parser('list', description="List available datasets.") parser_list.add_argument('--category', dest='category', choices=dataset_categories, type=DataCategory.get_for_name, @@ -294,6 +298,8 @@ def _handle_jobs_command_args(parent_subparsers_container): # Nested parser for the 'info' action parser_job_info = job_command_subparsers.add_parser('info') + parser_job_info.add_argument('--status-only', dest="status_only", action='store_true', + help='Only include job exec status, not full state') parser_job_info.add_argument('job_id', help='The id of the job for which to retrieve job state info') # Nested parser for the 'release' action diff --git a/python/lib/client/dmod/client/dmod_client.py b/python/lib/client/dmod/client/dmod_client.py index 5956bbd46..4413a8697 100644 --- a/python/lib/client/dmod/client/dmod_client.py +++ b/python/lib/client/dmod/client/dmod_client.py @@ -212,6 +212,8 @@ async def data_service_action(self, action: str, **kwargs) -> ResultIndicator: return await self.data_service_client.get_dataset_names(**kwargs) elif action == 'items': return await self.data_service_client.get_dataset_item_names(**kwargs) + elif action == 'state': + return await self.data_service_client.get_dataset_state(**kwargs) else: raise ValueError(f"Unsupported data service action to {self.__class__.__name__}: {action}") except NotImplementedError: From 5ae0244be7cda3826a43758b0dce62951d42d3bf Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Wed, 22 May 2024 16:20:21 -0400 Subject: [PATCH 05/14] Fix job status_step setter non-usage in job mgr. --- python/lib/scheduler/dmod/scheduler/job/job_manager.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/lib/scheduler/dmod/scheduler/job/job_manager.py b/python/lib/scheduler/dmod/scheduler/job/job_manager.py index 6fcce3185..c08724e6a 100644 --- a/python/lib/scheduler/dmod/scheduler/job/job_manager.py +++ b/python/lib/scheduler/dmod/scheduler/job/job_manager.py @@ -392,7 +392,7 @@ def _organize_active_jobs(self, active_jobs: List[RequestedJob]) -> List[List[Re # Note that this code should be safe as is as long as the job itself still has the previous allocation saved # in situations when it needs to use the same allocation as before if job.status_step == JobExecStep.STOPPED: - job.status_step = JobExecStep.AWAITING_ALLOCATION + job.set_status_step(JobExecStep.AWAITING_ALLOCATION) # TODO: calculate impact on priority self.save_job(job) @@ -408,7 +408,7 @@ def _organize_active_jobs(self, active_jobs: List[RequestedJob]) -> List[List[Re else: # TODO: confirm the allocation is still valid (saving it without checking will make it so, which # could lead to inconsistencies) - job.status_step = JobExecStep.AWAITING_DATA + job.set_status_step(JobExecStep.AWAITING_DATA) self.save_job(job) if job.should_release_resources: From fafe90335868d0a0e7c93bf33aec8028fdbf6e1e Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Wed, 22 May 2024 16:20:59 -0400 Subject: [PATCH 06/14] Add get_dataset_state to client. --- .../lib/client/dmod/client/request_clients.py | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/python/lib/client/dmod/client/request_clients.py b/python/lib/client/dmod/client/request_clients.py index 778a814ae..6dae2a83e 100644 --- a/python/lib/client/dmod/client/request_clients.py +++ b/python/lib/client/dmod/client/request_clients.py @@ -771,6 +771,27 @@ async def get_dataset_item_names(self, dataset_name: str, **kwargs) -> DatasetMa except DmodRuntimeError as e: raise DmodRuntimeError(f"DMOD error when getting dataset item: {str(e)}") + async def get_dataset_state(self, dataset_name: str, **kwargs) -> DatasetManagementResponse: + """ + Get dataset state. + + Parameters + ---------- + dataset_name : str + The name/id of the dataset of interest. + + Returns + ------- + DatasetManagementResponse + A response containing the dataset state. + """ + request = DatasetManagementMessage(action=ManagementAction.QUERY, dataset_name=dataset_name, + query=DatasetQuery(query_type=QueryType.GET_STATE)) + try: + return await self._process_request(request=request) + except DmodRuntimeError as e: + raise DmodRuntimeError(f"DMOD error when getting dataset state: {str(e)}") + async def list_datasets(self, category: Optional[DataCategory] = None, **kwargs) -> List[str]: """ Convenience method to list datasets, optionally filtering to a specific category. From eed4585208adcecb2d796024fa9d1a7e78db9a85 Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Wed, 22 May 2024 16:21:39 -0400 Subject: [PATCH 07/14] Fix dataservice remote debugging --- .../dmod/dataservice/rest_service.py | 39 ++++++++++++++++++- 1 file changed, 38 insertions(+), 1 deletion(-) diff --git a/python/services/dataservice/dmod/dataservice/rest_service.py b/python/services/dataservice/dmod/dataservice/rest_service.py index e5549da61..d95ea25c9 100644 --- a/python/services/dataservice/dmod/dataservice/rest_service.py +++ b/python/services/dataservice/dmod/dataservice/rest_service.py @@ -1,5 +1,6 @@ import asyncio import logging +import sys from contextlib import asynccontextmanager from pathlib import Path @@ -26,7 +27,7 @@ from .errors import Error as Errors from .exceptions import ErrorResponseException from .models import Error -from .service_settings import service_settings +from .service_settings import debug_settings, service_settings app = FastAPI( title="DMOD Data Service", @@ -54,6 +55,12 @@ async def lifespan(app: FastAPI): # understand what is going on. settings = service_settings() + if settings.pycharm_debug: + logging.info("Setting up remote debugging.") + _setup_remote_debugging() + else: + logging.info("Skipping data service remote debugging setup.") + # Initialize objects that will be injected and shared by service subsystems # Initialize a job util via the default factory, which requires some Redis params job_util = dep.job_util(settings) @@ -219,6 +226,36 @@ def _service_manager( dataset_inquery_util=inquery, ) +def _setup_remote_debugging(): + settings = debug_settings() + logging.info("Preparing remote debugging connection for data service.") + if not settings.pycharm_remote_debug_egg.exists(): + print( + f'Error: no file at given path to remote debugger egg file "{settings.pycharm_remote_debug_egg!s}"', + file=sys.stderr, + ) + sys.exit(1) + sys.path.append(str(settings.pycharm_remote_debug_egg)) + import pydevd_pycharm + + try: + pydevd_pycharm.settrace( + settings.remote_debug_host, + port=settings.remote_debug_port, + stdoutToServer=True, + stderrToServer=True, + ) + except Exception as error: + msg = "Warning: could not set debugging trace to {} on {} due to {} - {}" + print( + msg.format( + settings.remote_debug_host, + settings.remote_debug_port, + error.__class__.__name__, + str(error), + ), + file=sys.stderr, + ) @app.websocket("/") async def websocket_handler( From 905c37e6d479f06027879937ce0c2b7b7bab7de9 Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Wed, 22 May 2024 16:22:16 -0400 Subject: [PATCH 08/14] Fix CLI job list/status commands. --- .../lib/client/dmod/client/request_clients.py | 6 +- .../communication/maas_request/job_message.py | 58 ++++++++++--------- 2 files changed, 34 insertions(+), 30 deletions(-) diff --git a/python/lib/client/dmod/client/request_clients.py b/python/lib/client/dmod/client/request_clients.py index 6dae2a83e..ffc051fab 100644 --- a/python/lib/client/dmod/client/request_clients.py +++ b/python/lib/client/dmod/client/request_clients.py @@ -135,7 +135,7 @@ async def get_jobs_list(self, active_only: bool) -> List[str]: else: return indicator.data - async def request_job_info(self, job_id: str, *args, **kwargs) -> JobInfoResponse: + async def request_job_info(self, job_id: str, status_only: bool = False, *args, **kwargs) -> JobInfoResponse: """ Request the full state of the provided job, formatted as a JSON dictionary. @@ -143,6 +143,8 @@ async def request_job_info(self, job_id: str, *args, **kwargs) -> JobInfoRespons ---------- job_id : str The id of the job in question. + status_only : bool + Whether to only include status info in response, not entire job state. args (Unused) variable positional args. kwargs @@ -154,7 +156,7 @@ async def request_job_info(self, job_id: str, *args, **kwargs) -> JobInfoRespons An indicator of success of the request that, when successful, contains the full state of the provided job, formatted as a JSON dictionary, in the ``data`` attribute. """ - return await self._job_info_request(job_id=job_id, status_only=False) + return await self._job_info_request(job_id=job_id, status_only=status_only) async def request_job_release(self, job_id: str, *args, **kwargs) -> JobControlResponse: """ diff --git a/python/lib/communication/dmod/communication/maas_request/job_message.py b/python/lib/communication/dmod/communication/maas_request/job_message.py index cb6501bbe..78e80368b 100644 --- a/python/lib/communication/dmod/communication/maas_request/job_message.py +++ b/python/lib/communication/dmod/communication/maas_request/job_message.py @@ -76,19 +76,20 @@ class JobInfoResponse(Response): job_id: str = Field(description="The identifier of the job of interest.") status_only: bool = Field(False, description="Whether only 'status' attribute of job is returned (when success).") + job_state: Optional[dict] - @validator("data") - def _validate_data(cls, value: Optional[dict], values: dict) -> dict: + @validator("job_state") + def _validate_job_state(cls, value: Optional[dict], values: dict, field, **kwargs) -> dict: """ - Validate the :attr:`data` attribute, in particular in the context of whether the response indicates success. + Validate :attr:`job_state` attribute, in particular in the context of whether the response indicates success. - Validate the value of the :attr:`data` attribute. For successful responses, this should be a dictionary object - with at least one key. For failure responses, the attribute should be set to ``None``. + Validate the value of the :attr:`job_state` attribute. For successful responses, this should be a dictionary + object with at least one key. For failure responses, the attribute should be set to ``None``. Parameters ---------- value: Optional[dict] - The :attr:`data` value. + The :attr:`job_state` value. values: dict Previous attribute values for the instance. @@ -101,21 +102,21 @@ def _validate_data(cls, value: Optional[dict], values: dict) -> dict: ------ ValueError If either: - 1. the response indicates success and the :attr:`data` `value` is ``None`` - 2. the response indicates success and the :attr:`data` `value` is has length of ``0`` - 3. the response indicates failure and the :attr:`data` `value` is not ``None`` + 1. the response indicates success and the :attr:`job_state` `value` is ``None`` + 2. the response indicates success and the :attr:`job_state` `value` is has length of ``0`` + 3. the response indicates failure and the :attr:`job_state` `value` is not ``None`` TypeError - If the response indicates success and the :attr:`data` `value` is not a :class:`dict` object. + If the response indicates success and the :attr:`job_state` `value` is not a :class:`dict` object. """ if values['success']: if value is None: - raise ValueError(f"{cls.__name__} 'data' field must not be 'None' when successful.") + raise ValueError(f"{cls.__name__} 'job_state' field must not be 'None' when successful.") elif not isinstance(value, dict): - raise TypeError(f"{cls.__name__} 'data' field should be dictionary but was {value.__class__.__name__}") + raise TypeError(f"{cls.__name__} 'job_state' field should be dictionary but was {value.__class__.__name__}") elif len(value) == 0: - raise ValueError(f"{cls.__name__} 'data' field must not be empty when successful.") + raise ValueError(f"{cls.__name__} 'job_state' field must not be empty when successful.") elif value is not None: - raise ValueError(f"{cls.__name__} 'data' field must be 'None' when not successful.") + raise ValueError(f"{cls.__name__} 'job_state' field must be 'None' when not successful.") return value @@ -144,19 +145,20 @@ class JobListResponse(Response): """ The type of :class:`AbstractInitRequest` for which this type is the response. """ only_active: bool = Field(False, description="Whether only the ids of active jobs were returned.") + job_list: Optional[List[str]] - @validator("data") - def _validate_data(cls, value: Optional[List[str]], values: dict) -> List[str]: + @validator("job_list") + def _validate_job_list(cls, value: Optional[List[str]], values: dict) -> List[str]: """ - Validate the :attr:`data` attribute, in particular in the context of whether the response indicates success. + Validate the :attr:`job_list` attribute, in particular in the context of whether the response indicates success. - Validate the value of the :attr:`data` attribute. For successful responses, this should be a list of strings, - though the list may be empty. For failure responses, the attribute should be set to ``None``. + Validate the value of the :attr:`job_list` attribute. For successful responses, this should be a list of + strings, though the list may be empty. For failure responses, the attribute should be set to ``None``. Parameters ---------- value: Optional[List[str]] - The ``data`` value. + The ``job_list`` value. values: dict Previous attribute values for the instance. @@ -169,21 +171,21 @@ def _validate_data(cls, value: Optional[List[str]], values: dict) -> List[str]: ------ ValueError If either: - 1. the response indicates success and the :attr:`data` `value` is ``None`` - 2. the response indicates success and the :attr:`data` `value` has a non-string - 3. the response indicates failure and the :attr:`data` `value` is not ``None`` + 1. the response indicates success and the :attr:`job_list` `value` is ``None`` + 2. the response indicates success and the :attr:`job_list` `value` has a non-string + 3. the response indicates failure and the :attr:`job_list` `value` is not ``None`` TypeError - If the response indicates success and the :attr:`data` `value` is not a :class:`list` object. + If the response indicates success and the :attr:`job_list` `value` is not a :class:`list` object. """ if values['success']: if value is None: - raise ValueError(f"{cls.__name__} 'data' field must not be 'None' when successful.") + raise ValueError(f"{cls.__name__} 'job_list' field must not be 'None' when successful ({value!s}).") elif not isinstance(value, list): - raise TypeError(f"{cls.__name__} 'data' field must be list but was {value.__class__.__name__}") + raise TypeError(f"{cls.__name__} 'job_list' field must be list but was {value.__class__.__name__} ({value!s})") elif len(value) > 0 and not all(isinstance(n, str) for n in value): - raise ValueError(f"{cls.__name__} 'data' field was not list of all strings elements.") + raise ValueError(f"{cls.__name__} 'job_list' field was not list of all strings elements ({value!s}).") elif value is not None: - raise ValueError(f"{cls.__name__} 'data' field must be 'None' when not successful.") + raise ValueError(f"{cls.__name__} 'job_list' field must be 'None' when not successful ({value!s}).") return value From b070dd277531d7120dcaefeb1006c741ca45a994 Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Wed, 22 May 2024 16:22:51 -0400 Subject: [PATCH 09/14] Update dataservice to support dataset state query. --- python/services/dataservice/dmod/dataservice/service.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/python/services/dataservice/dmod/dataservice/service.py b/python/services/dataservice/dmod/dataservice/service.py index 319584b4c..b421ea3bc 100644 --- a/python/services/dataservice/dmod/dataservice/service.py +++ b/python/services/dataservice/dmod/dataservice/service.py @@ -562,6 +562,9 @@ def _process_query(self, message: DatasetManagementMessage) -> DatasetManagement ------- ::method:`_async_process_query` """ + if message.dataset_name and message.dataset_name not in self._managers.known_datasets(): + return DatasetManagementResponse(action=message.management_action, success=False, + reason="Dataset Not Found", dataset_name=message.dataset_name) query_type = message.query.query_type if query_type == QueryType.LIST_FILES: dataset_name = message.dataset_name @@ -569,6 +572,11 @@ def _process_query(self, message: DatasetManagementMessage) -> DatasetManagement return DatasetManagementResponse(action=message.management_action, success=True, dataset_name=dataset_name, reason=f'Obtained {dataset_name} Items List', data={"query_results": {QueryType.LIST_FILES.name: list_of_files}}) + elif query_type == QueryType.GET_STATE: + return DatasetManagementResponse(action=message.management_action, success=True, + dataset_name=message.dataset_name, + reason=f'Obtained {message.dataset_name} State', + data={"dataset_state": self._managers.known_datasets()[message.dataset_name]}) # TODO: (later) add support for messages with other query types also else: reason = 'Unsupported {} Query Type - {}'.format(DatasetQuery.__class__.__name__, query_type.name) From 369a5616ca39e2d596207f72f68b726947ce726e Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Wed, 22 May 2024 16:23:38 -0400 Subject: [PATCH 10/14] Fix some (not all) issues w/ partitionerservice. --- .../dmod/partitionerservice/service.py | 108 ++++++++++++------ 1 file changed, 74 insertions(+), 34 deletions(-) diff --git a/python/services/partitionerservice/dmod/partitionerservice/service.py b/python/services/partitionerservice/dmod/partitionerservice/service.py index a982739c7..32856a2e2 100644 --- a/python/services/partitionerservice/dmod/partitionerservice/service.py +++ b/python/services/partitionerservice/dmod/partitionerservice/service.py @@ -9,9 +9,11 @@ from docker.errors import ContainerError from dmod.communication import AbstractInitRequest, DatasetManagementMessage, DatasetManagementResponse, \ InvalidMessageResponse, PartitionRequest, PartitionResponse, ManagementAction, WebSocketInterface +from dmod.communication.dataset_management_message import DatasetQuery, QueryType from dmod.core.meta_data import DataCategory, DataDomain, DataFormat, DataRequirement, DiscreteRestriction, \ StandardDatasetIndex from dmod.core.exception import DmodRuntimeError +from dmod.core.dataset import Dataset from dmod.externalrequests.maas_request_handlers import DataServiceClient from dmod.modeldata.hydrofabric import HydrofabricFilesManager from dmod.scheduler import SimpleDockerUtil @@ -98,13 +100,14 @@ async def _async_create_new_partitioning_dataset(self) -> Tuple[Optional[str], O Tuple[Optional[str], Optional[str]] The new dataset's name and 'data_id' respectively, with ``None`` for both if creation failed. """ + # TODO: probably won't work ... needs domain dataset_name = 'partition-config-{}'.format(str(uuid4())) request = DatasetManagementMessage(action=ManagementAction.CREATE, dataset_name=dataset_name, category=DataCategory.CONFIG) response: DatasetManagementResponse = await self._data_client.async_make_request(request) return (dataset_name, response.data_id) if response.success else (None, None) - async def _async_find_hydrofabric_dataset_name(self, data_id: str, uid: str) -> Optional[str]: + async def _async_find_hydrofabric_dataset_name(self, data_id: str, uid: str) -> Tuple[Optional[str], Optional[DataFormat], Optional[str]]: """ Query the data service for the name of the required hydrofabric dataset that fulfill the implied restrictions. @@ -119,17 +122,43 @@ async def _async_find_hydrofabric_dataset_name(self, data_id: str, uid: str) -> Returns ------- - Optional[str] - The name of the hydrofabric dataset satisfying these restrictions, or ``None`` if one could not be found. + Tuple[Optional[str], Optional[DataFormat], Optional[str]] + Tuple containing: + - name of hydrofabric dataset satisfying these restrictions, or ``None`` if one could not be found + - the data format of hydrofabric dataset, or ``None`` if a dataset could not be found + - the name of the geopackage file, if a ``NGEN_GEOPACKAGE_HYDROFABRIC_V2`` dataset, or ``None`` if + either a dataset could not be found **OR** the dataset is ``NGEN_GEOJSON_HYDROFABRIC`` """ # TODO: (later) need a way to select (or prioritize) data format from the partitioning request - restrictions = [DiscreteRestriction(variable='data_id', values=[data_id]), - DiscreteRestriction(variable='uid', values=[uid])] + restrictions = [DiscreteRestriction(variable=StandardDatasetIndex.DATA_ID, values=[data_id]), + DiscreteRestriction(variable=StandardDatasetIndex.HYDROFABRIC_ID, values=[uid])] + + domain = DataDomain(data_format=DataFormat.NGEN_GEOPACKAGE_HYDROFABRIC_V2, discrete_restrictions=restrictions) + data_request = DatasetManagementMessage(action=ManagementAction.SEARCH, category=DataCategory.HYDROFABRIC, + domain=domain) + response: DatasetManagementResponse = await self._data_client.async_make_request(data_request) + + if response.success: + list_request = DatasetManagementMessage(action=ManagementAction.QUERY, dataset_name=response.dataset_name, + query=DatasetQuery(query_type=QueryType.LIST_FILES)) + list_response = await self._data_client.async_make_request(list_request) + gpkg_name = None + for item in list_response.query_results.get('files', []) if response.success else []: + if item[-5:].lower() == ".gpkg": + gpkg_name = item + break + return response.dataset_name, DataFormat.NGEN_GEOPACKAGE_HYDROFABRIC_V2, gpkg_name + + # Otherwise, try geojson too, just to be safe domain = DataDomain(data_format=DataFormat.NGEN_GEOJSON_HYDROFABRIC, discrete_restrictions=restrictions) data_request = DatasetManagementMessage(action=ManagementAction.SEARCH, category=DataCategory.HYDROFABRIC, domain=domain) response: DatasetManagementResponse = await self._data_client.async_make_request(data_request) - return response.dataset_name if response.success else None + + if response.success: + return response.dataset_name, DataFormat.NGEN_GEOJSON_HYDROFABRIC, None + else: + return None, None, None async def _async_process_request(self, request: PartitionRequest) -> PartitionResponse: """ @@ -153,15 +182,19 @@ async def _async_process_request(self, request: PartitionRequest) -> PartitionRe """ try: - hydrofabric_dataset_name = await self._async_find_hydrofabric_dataset_name(request.hydrofabric_data_id, - request.hydrofabric_uid) + hydrofabric_dataset_name, hf_data_format, file_name = await self._async_find_hydrofabric_dataset_name( + request.hydrofabric_data_id, request.hydrofabric_uid) if not isinstance(hydrofabric_dataset_name, str): return PartitionResponse(success=False, reason='Could Not Find Hydrofabric Dataset') # TODO: (later) perhaps look at examining these and adapting to what's in the dataset (or request); for now, # just use whatever the defaults are used when "None" is passed in - catchment_file_name = None - nexus_file_name = None + if hf_data_format == DataFormat.NGEN_GEOPACKAGE_HYDROFABRIC_V2 and file_name: + catchment_file_name = file_name + nexus_file_name = file_name + else: + catchment_file_name = None + nexus_file_name = None # Create a new dataset that is empty for the partitioning config partition_dataset_name, partition_dataset_data_id = await self._async_create_new_partitioning_dataset() @@ -361,8 +394,8 @@ async def _generate_partition_config_dataset(self, job: Job) -> bool: hy_data_id, hy_uid = self._find_required_hydrofabric_details(job) if hy_data_id is None or hy_uid is None: raise DmodRuntimeError(err_msg.format("Cannot get hydrofabric dataset details")) - hydrofabric_dataset_name = await self._async_find_hydrofabric_dataset_name(hy_data_id, hy_uid) - if hydrofabric_dataset_name is None: + hydrofabric_ds_name, hf_format, gpkg_file = await self._async_find_hydrofabric_dataset_name(hy_data_id, hy_uid) + if hydrofabric_ds_name is None: raise DmodRuntimeError(err_msg.format("Cannot find hydrofabric dataset name")) # Create a new partitioning dataset, and get back the name and data_id @@ -371,9 +404,12 @@ async def _generate_partition_config_dataset(self, job: Job) -> bool: raise DmodRuntimeError(err_msg.format("Cannot create new partition config dataset")) # Run the partitioning execution container + # TODO: (later) doesn't account for whether we are dealing with gpkg or geojson result, logs = self._execute_partitioner_container(num_partitions=job.cpu_count, - hydrofabric_dataset_name=hydrofabric_dataset_name, - partition_dataset_name=part_dataset_name) + hydrofabric_dataset_name=hydrofabric_ds_name, + partition_dataset_name=part_dataset_name, + catchment_file_name=gpkg_file, + nexus_file_name=gpkg_file) if result: logging.info("Partition config dataset generation for {} was successful".format(job.job_id)) # If good, save the partition dataset data_id as a data requirement for the job. @@ -450,32 +486,36 @@ async def manage_job_partitioning(self): while not self._job_util.lock_active_jobs(lock_id): await asyncio.sleep(2) - for job in [j for j in self._job_util.get_all_active_jobs() if - j.status_step == JobExecStep.AWAITING_PARTITIONING]: + for job in [j for j in self._job_util.get_all_active_jobs() if j.status_step == JobExecStep.AWAITING_PARTITIONING]: + partition_requirements = [r for r in job.data_requirements if + r.domain.data_format == DataFormat.NGEN_PARTITION_CONFIG] + assert len(partition_requirements) <= 1 if job.cpu_count == 1: - logging.warning("No need to partition job {} with only 1 CPU allocated".format(job.job_id)) - job.status_step = JobExecStep.AWAITING_ALLOCATION - continue + logging.info("No need to partition job {} with only 1 CPU allocated".format(job.job_id)) + job.set_status_step(JobExecStep.AWAITING_ALLOCATION) + elif len(partition_requirements) == 1 and partition_requirements[0].fulfilled_by: + logging.info(f"No need to partition job {job.job_id} with partition config already fulfilled by " + f"dataset {partition_requirements[0].fulfilled_by}") + job.set_status_step(JobExecStep.AWAITING_ALLOCATION) + else: + logging.info("Processing partitioning for active job {}".format(job.job_id)) + try: + # TODO: test the actual partitioning process also + if await self._generate_partition_config_dataset(job): + job.set_status_step(JobExecStep.AWAITING_ALLOCATION) + else: + job.set_status_step(JobExecStep.PARTITIONING_FAILED) + except Exception as e: + logging.error(f"Partition generation for {job.job_id} failed ({e.__class__.__name__}) - {e!s}") + job.set_status_step(JobExecStep.PARTITIONING_FAILED) - logging.info("Processing partitioning for active job {}".format(job.job_id)) - try: - # See if there is already an existing dataset to use for this - part_dataset_search_result = await self._find_partition_dataset(job) - # If either one was found, or we can create a new partition config dataset, move to allocations - if part_dataset_search_result.success or (await self._generate_partition_config_dataset(job)): - job.status_step = JobExecStep.AWAITING_ALLOCATION - else: - job.status_step = JobExecStep.PARTITIONING_FAILED - except Exception as e: - logging.error("Partition dataset generation for {} failed due to error - {}".format(job.job_id, e)) - job.status_step = JobExecStep.PARTITIONING_FAILED # Protect service task against problems with an individual save attempt try: self._job_util.save_job(job) - except: - # TODO: (later) logging would be good, and perhaps maybe retries - pass + except Exception as e: + logging.error(f"Partition service actions were successful for job {job.job_id}, but service could " + f"not save updated job state due to {e.__class__.__name__}: {e!s}") self._job_util.unlock_active_jobs(lock_id) await asyncio.sleep(5) From 8744c638d108e877ac01f9403b65908ad1762e61 Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Wed, 22 May 2024 16:24:17 -0400 Subject: [PATCH 11/14] Fix scheduler handling of job info request. --- .../schedulerservice/dmod/schedulerservice/service.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/python/services/schedulerservice/dmod/schedulerservice/service.py b/python/services/schedulerservice/dmod/schedulerservice/service.py index a19bf5018..9286dd363 100644 --- a/python/services/schedulerservice/dmod/schedulerservice/service.py +++ b/python/services/schedulerservice/dmod/schedulerservice/service.py @@ -158,12 +158,15 @@ async def _handle_job_info_request(self, message: JobInfoRequest, websocket: Web try: # Get current persisted copy of Job object job = self._job_manager.retrieve_job(message.job_id) + if message.status_only: + job_state = job.status.to_dict() + else: + job_state = job.to_dict() response = JobInfoResponse(success=True, reason="Job Details Retrieved", job_id=message.job_id, - status_only=message.status_only, - data=job.status.to_dict() if message.status_only else job.to_dict()) + status_only=message.status_only, job_state=job_state) except Exception as e: - response = JobListResponse(success=False, reason=f"Encountered {e.__class__.__name__}", - status_only=message.status_only, + response = JobInfoResponse(success=False, reason=f"Encountered {e.__class__.__name__}", + job_id=message.job_id, status_only=message.status_only, message=f"Error when attempting to get job state details: {e!s}") await websocket.send(str(response)) From 0ebac5e1835f43d507a60c82b000c99a177d57f4 Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Wed, 22 May 2024 16:25:46 -0400 Subject: [PATCH 12/14] Updating versions and internal dependencies. --- python/lib/client/dmod/client/_version.py | 2 +- python/lib/client/setup.py | 2 +- python/lib/communication/dmod/communication/_version.py | 2 +- python/services/requestservice/dmod/requestservice/_version.py | 2 +- python/services/requestservice/setup.py | 2 +- python/services/schedulerservice/setup.py | 2 +- 6 files changed, 6 insertions(+), 6 deletions(-) diff --git a/python/lib/client/dmod/client/_version.py b/python/lib/client/dmod/client/_version.py index ef72cc0f1..4ca39e7ce 100644 --- a/python/lib/client/dmod/client/_version.py +++ b/python/lib/client/dmod/client/_version.py @@ -1 +1 @@ -__version__ = '0.8.1' +__version__ = '0.8.2' diff --git a/python/lib/client/setup.py b/python/lib/client/setup.py index 81ccdd710..90f8ff6f4 100644 --- a/python/lib/client/setup.py +++ b/python/lib/client/setup.py @@ -22,7 +22,7 @@ license='', include_package_data=True, #install_requires=['websockets', 'jsonschema'],vi - install_requires=['dmod-core>=0.16.0', 'websockets>=8.1', 'pydantic>=1.10.8,~=1.10', 'dmod-communication>=0.19.0', + install_requires=['dmod-core>=0.16.0', 'websockets>=8.1', 'pydantic>=1.10.8,~=1.10', 'dmod-communication>=0.19.1', 'dmod-externalrequests>=0.6.0', 'dmod-modeldata>=0.12.0'], packages=find_namespace_packages(include=['dmod.*'], exclude=['dmod.test']) ) diff --git a/python/lib/communication/dmod/communication/_version.py b/python/lib/communication/dmod/communication/_version.py index 482e4a19c..db7a41602 100644 --- a/python/lib/communication/dmod/communication/_version.py +++ b/python/lib/communication/dmod/communication/_version.py @@ -1 +1 @@ -__version__ = '0.19.0' +__version__ = '0.19.1' diff --git a/python/services/requestservice/dmod/requestservice/_version.py b/python/services/requestservice/dmod/requestservice/_version.py index 1658609d0..102b47c9c 100644 --- a/python/services/requestservice/dmod/requestservice/_version.py +++ b/python/services/requestservice/dmod/requestservice/_version.py @@ -1 +1 @@ -__version__ = '0.9.0' \ No newline at end of file +__version__ = '0.9.1' \ No newline at end of file diff --git a/python/services/requestservice/setup.py b/python/services/requestservice/setup.py index ade896764..6eb45160e 100644 --- a/python/services/requestservice/setup.py +++ b/python/services/requestservice/setup.py @@ -17,7 +17,7 @@ author_email='', url='', license='', - install_requires=['websockets', 'dmod-core>=0.16.0', 'dmod-communication>=0.19.0', 'dmod-access>=0.2.0', + install_requires=['websockets', 'dmod-core>=0.16.0', 'dmod-communication>=0.19.1', 'dmod-access>=0.2.0', 'dmod-externalrequests>=0.6.0'], packages=find_namespace_packages(exclude=['dmod.test', 'schemas', 'ssl', 'src']) ) diff --git a/python/services/schedulerservice/setup.py b/python/services/schedulerservice/setup.py index 407a0a5df..9cb9eb60a 100644 --- a/python/services/schedulerservice/setup.py +++ b/python/services/schedulerservice/setup.py @@ -17,6 +17,6 @@ author_email='', url='', license='', - install_requires=['dmod-core>=0.2.0', 'dmod-communication>=0.19.0', 'dmod-scheduler>=0.12.2'], + install_requires=['dmod-core>=0.2.0', 'dmod-communication>=0.19.1', 'dmod-scheduler>=0.12.2'], packages=find_namespace_packages(exclude=['dmod.test', 'deprecated', 'conf', 'schemas', 'ssl', 'src']) ) From ee66c8501a45af30fcf6f295efed10449cf4d43f Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Tue, 28 May 2024 14:16:15 -0400 Subject: [PATCH 13/14] Use generator comprehension, and fix formatting. --- .../partitionerservice/dmod/partitionerservice/service.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/services/partitionerservice/dmod/partitionerservice/service.py b/python/services/partitionerservice/dmod/partitionerservice/service.py index 32856a2e2..f4396d039 100644 --- a/python/services/partitionerservice/dmod/partitionerservice/service.py +++ b/python/services/partitionerservice/dmod/partitionerservice/service.py @@ -486,7 +486,8 @@ async def manage_job_partitioning(self): while not self._job_util.lock_active_jobs(lock_id): await asyncio.sleep(2) - for job in [j for j in self._job_util.get_all_active_jobs() if j.status_step == JobExecStep.AWAITING_PARTITIONING]: + for job in (j for j in self._job_util.get_all_active_jobs() if + j.status_step == JobExecStep.AWAITING_PARTITIONING): partition_requirements = [r for r in job.data_requirements if r.domain.data_format == DataFormat.NGEN_PARTITION_CONFIG] assert len(partition_requirements) <= 1 From 4ec3242da21a5063154584550466cd919b433cec Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Wed, 29 May 2024 09:15:14 -0400 Subject: [PATCH 14/14] Use "raise from" form in get_dataset_state. --- python/lib/client/dmod/client/request_clients.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/lib/client/dmod/client/request_clients.py b/python/lib/client/dmod/client/request_clients.py index ffc051fab..213f86a93 100644 --- a/python/lib/client/dmod/client/request_clients.py +++ b/python/lib/client/dmod/client/request_clients.py @@ -792,7 +792,7 @@ async def get_dataset_state(self, dataset_name: str, **kwargs) -> DatasetManagem try: return await self._process_request(request=request) except DmodRuntimeError as e: - raise DmodRuntimeError(f"DMOD error when getting dataset state: {str(e)}") + raise DmodRuntimeError(f"DMOD error when getting dataset state: {str(e)}") from e async def list_datasets(self, category: Optional[DataCategory] = None, **kwargs) -> List[str]: """