From 12a33c6015fe0220a3d1c5940eb0b3cc5b0e4d13 Mon Sep 17 00:00:00 2001 From: mjanez <96422458+mjanez@users.noreply.github.com> Date: Tue, 12 Sep 2023 14:33:10 +0200 Subject: [PATCH] Improve base harvester - Include ows functions common to all harvesters using owslib (csw, xml). - Add auxiliary functions such as get_default_dcat_info_attribute to avoid repetition. --- ogc2ckan/harvesters/base.py | 284 ++++++++++++++++++++++++++++++++---- 1 file changed, 255 insertions(+), 29 deletions(-) diff --git a/ogc2ckan/harvesters/base.py b/ogc2ckan/harvesters/base.py index 04ec790..1bc11a2 100644 --- a/ogc2ckan/harvesters/base.py +++ b/ogc2ckan/harvesters/base.py @@ -7,24 +7,27 @@ import logging from datetime import datetime import html +import os # third-party libraries from geojson import Polygon, dumps from pyproj import Transformer import pandas as pd -import re from bs4 import BeautifulSoup +from owslib.iso import MD_Keywords +from owslib.namespaces import Namespaces +from owslib import util # custom classes from controller import ckan_management from model.custom_organization import CustomOrganization from controller.mapping import get_mapping_value from config.ogc2ckan_config import load_yaml, get_log_module -from mappings.default_ogc2ckan_config import OGC2CKAN_PATHS_CONFIG, OGC2CKAN_HARVESTER_MD_CONFIG, OGC2CKAN_CKANINFO_CONFIG, OGC2CKAN_MD_FORMATS +from mappings.default_ogc2ckan_config import OGC2CKAN_PATHS_CONFIG, OGC2CKAN_HARVESTER_MD_CONFIG, OGC2CKAN_CKANINFO_CONFIG, OGC2CKAN_MD_FORMATS, OGC2CKAN_ISO_MD_ELEMENTS from harvesters.harvesters import get_harvester_class -log_module = get_log_module() +log_module = get_log_module(os.path.abspath(__file__)) class DCATInfo: '''Represents the information of a dataset in DCAT format. @@ -107,8 +110,9 @@ def __init__(self, app_dir, url, name, groups, active, organization, type, custo default_localized_strings_file = f"{self.app_dir}/{OGC2CKAN_PATHS_CONFIG['default_mappings_folder']}/{OGC2CKAN_PATHS_CONFIG['default_localized_strings_file']}" # Localized default info yaml_dict = load_yaml(default_localized_strings_file) - language = self.default_dcat_info.language + language = self.get_default_dcat_info_attribute("language") self.localized_strings_dict = self._get_localized_dict(yaml_dict, language) + self.ows_namespaces = self._ows_get_namespaces() @classmethod def from_harvest_server(cls, harvest_server, app_dir): @@ -151,16 +155,6 @@ def create_datasets(self, ckan_info): # Get all datasets self.get_datasets(ckan_info) - #TODO: Check if the dataset exists in CKAN use: ckan_management.get_ckan_datasets_list - # if hasattr(self, 'constraints') and self.constraints: - # emails = set(email.lower().replace(' ','') for email in (self.constraints.get('mails') or []) if email) - # else: - # emails = [] - - # if len(emails) > 0: - # datasets_title = [x.title for x in self.datasets if x.contact_email.lower().replace(' ','') in emails] - # logging.info(f"{self.name} ({self.type.upper()}) server records found: {', '.join(datasets_title)}") - if hasattr(self, 'workspaces') and self.workspaces: logging.info(f"{log_module}:{self.name} ({self.type.upper()}) server OGC workspaces selected: {', '.join([w.upper() for w in self.workspaces])}") @@ -225,17 +219,24 @@ def get_custom_default_metadata(self, dataset_id: str, dict_property: str = 'dat Returns: Any: The value of the specified property in the mapping dictionary, or None if the property is not found. """ - mapping = self.custom_organization_info.find_mapping_value(dataset_id, dict_property) + try: + mapping = self.custom_organization_info.find_mapping_value(dataset_id, dict_property) + except KeyError: + mapping = None if mapping is None: mapping = self.custom_organization_info.find_similar_mapping_value(dataset_id, dict_property) if mapping is None: # If 'dataset_id' property is not found, try 'dataset_group_id' - mapping = self.custom_organization_info.find_mapping_value( - dataset_id, - 'dataset_group_id' - ) + try: + mapping = self.custom_organization_info.find_mapping_value( + dataset_id, + 'dataset_group_id' + ) + except KeyError: + logging.error(f"{log_module}:Dataset: '{dataset_id}' does not have info in 'custom_organization_mapping_file'. Add it or deactivate `custom_organization_active`") + mapping = None return mapping @@ -267,6 +268,225 @@ def get_custom_metadata_value(self, custom_metadata, key, default=None): else: return default + def get_default_dcat_info_attribute(self, field_name): + """ + Tries to retrieve the value of a field_name from self.default_dcat_info. If the field does not exist, returns the default value from OGC2CKAN_HARVESTER_MD_CONFIG. + + Args: + field_name (str): The name of the field to retrieve. + + Returns: + The value of the specified field from self.default_dcat_info, or the default value from OGC2CKAN_HARVESTER_MD_CONFIG if the field does not exist. + """ + try: + return getattr(self.default_dcat_info, field_name, OGC2CKAN_HARVESTER_MD_CONFIG[field_name]) + + except AttributeError: + logging.error(f"{log_module}:Field '{field_name}' does not exist in 'default_dcat_info' section at 'config.yaml'") + return None + + except KeyError as e: + logging.error(f"{log_module}:Field '{field_name}' does not exist in 'OGC2CKAN_HARVESTER_MD_CONFIG' at: 'ckan-ogc/ogc2ckan/mappings/default_ogc2ckan_config.py': {e}") + return None + + # OWS functions + def ows_update_metadata_sections(self, layer_info): + def get_first_element_from_list(lst): + return lst[0] if isinstance(lst, list) and lst else None + + layer_info.identification = get_first_element_from_list(layer_info.identification) if layer_info.identification else None + layer_info.distributor = get_first_element_from_list(layer_info.distribution.distributor) if layer_info.distribution and layer_info.distribution.distributor else None + layer_info.distribution = layer_info.distribution.online if hasattr(layer_info.distribution, 'online') else None + layer_info.contact = get_first_element_from_list(layer_info.contact) if layer_info.contact else None + layer_info.identification.publisher = get_first_element_from_list(layer_info.identification.publisher) if layer_info.identification and layer_info.identification.publisher else None + layer_info.topiccategory = get_first_element_from_list(layer_info.identification.topiccategory) if layer_info.identification and layer_info.identification.topiccategory else None + layer_info.uricode = get_first_element_from_list(layer_info.identification.uricode) if layer_info.identification and layer_info.identification.uricode else None + + try: + if not layer_info.identification: + raise AttributeError("identification") + if not layer_info.distribution: + raise AttributeError("distribution") + if not layer_info.contact: + raise AttributeError("contact") + if not layer_info.topiccategory: + raise AttributeError("topiccategory") + if not layer_info.uricode: + raise AttributeError("uricode") + + except AttributeError as e: + logging.error(f"{log_module}:An error occurred in ows_update_metadata_sections: {e}") + setattr(layer_info, e.args[0], None) + + def ows_get_metadata_not_owslib(self, layer_info): + """Gets metadata values that are not retrieved by OWSLib from an MD_Metadata object. + + Args: + layer_info (object): Object containing metadata information. + + Returns: + dict: Dictionary containing metadata values. + """ + return { + "lineage_source": self._ows_findall_metadata_elements(layer_info, self.ows_namespaces, OGC2CKAN_ISO_MD_ELEMENTS['lineage_source']) + } + + def ows_get_keywords(self, dataset, keywords): + """ + Gets the keywords from a OWS MD_Metadata record. + + Args: + dataset: The CKAN Dataset to ingest. + keywords: The OWS MD_Metadata Record object list of keywords to get. + + Returns: + list: The keywords. + """ + keywords_list = dataset.keywords + keywords_thesaurus_list = dataset.keywords_thesaurus + themes_set = set(dataset.theme) + keywords_uri_set = set(dataset.keywords_uri) + topic = dataset.topic + + for keyword in keywords: + for k in keyword.keywords: + url = getattr(k, "url", None) + if url: + last_part = url.split("/")[-1] + keywords_list.append({'name': last_part.lower()}) + keywords_uri_set.add(url) + if '/theme/' in url: + # INSPIRE Theme + themes_set.add(url) + + if keyword.thesaurus: + url = keyword.thesaurus.get('url') + title = keyword.thesaurus.get('title') + date = keyword.thesaurus.get('date') + datetype = keyword.thesaurus.get('datetype') + if url: + keywords_thesaurus_list.append({'title': title, 'date': date, 'datetype': datetype, 'url': url}) + + if topic: + themes_set.add(get_mapping_value(topic, 'theme', 'id', 'topic_category')) + + # Unique values + keywords_list = [dict(t) for t in {tuple(d.items()) for d in keywords_list}] + + # Set themes/keywords/keywords_uri + self._set_themes(dataset, list(themes_set)) + dataset.set_keywords(keywords_list) + self._set_keywords_uri(dataset, list(keywords_uri_set)) + + def ows_set_metadata_dates(self, dataset, record_id): + """ + Sets the metadata dates for a CKAN dataset from an ISO metadata record. + + Args: + dataset: The CKAN dataset to set the metadata dates for. + record_id: The ISO metadata record to get the metadata dates from. + """ + # Default values + issued_date = datetime.now().strftime('%Y-%m-%d') + created_date = '1900-01-01' + modified_date = issued_date + + for date in record_id.date: + if date.type == "creation": + created_date = self._normalize_date(date.date) + elif date.type == "publication": + issued_date = self._normalize_date(date.date) + elif date.type == "revision": + modified_date = self._normalize_date(date.date) + + dataset.set_issued(issued_date) + dataset.set_created(created_date) + dataset.set_modified(modified_date) + + return issued_date, modified_date + + @staticmethod + def _ows_get_namespaces(): + n = Namespaces() + ns = n.get_namespaces(["gco", "gfc", "gmd", "gmi", "gml", "gml32", "gmx", "gts", "srv", "xlink"]) + ns[None] = n.get_namespace("gmd") + + return ns + + @staticmethod + def _ows_findall_metadata_elements(layer_info, namespaces, tag): + """ + Finds all elements in an ISO metadata record (md). + + Args: + layer_info: The MD_Metadata object that contains ISO metadata record to search in. + namespaces: The namespaces to use for the search. + tag: The tag of the element to search for. + + Returns: + The elements if founds, otherwise None. + """ + results = [] + val = layer_info.md.findall(util.nspath_eval(tag, namespaces)) + + for i in val: + if hasattr(i, 'text'): + i = util.testXMLValue(i) + results.append(i) + + return results + + @staticmethod + def _ows_find_metadata_element(layer_info, namespaces, tag): + """ + Finds element in an ISO metadata record (md). + + Args: + layer_info: The MD_Metadata object that contains ISO metadata record to search in. + namespaces: The namespaces to use for the search. + tag: The tag of the element to search for. + + Returns: + The element if found, otherwise None. + """ + val = layer_info.md.find(util.nspath_eval(tag, namespaces)) + if hasattr(val, 'text'): + val = util.testXMLValue(val.text) + return val + + @staticmethod + def _ows_convert_keyword(keywords, iso2dict=False, theme="theme"): + """ + Convert keywords to a standardized format. + + Args: + keywords (list): The list of keywords to convert. + iso2dict (bool, optional): Whether to convert ISO keywords to a dictionary format. Default is False. + theme (str, optional): The theme/category for the keywords. Default is "theme". + + Returns: + list or dict: The converted keywords in a standardized format. If `iso2dict` is True, returns a list of dictionaries. + Otherwise, returns a list of lists. + """ + def convert_iso_keywords(keywords): + _keywords = [] + for kw in keywords: + if isinstance(kw, MD_Keywords): + _keywords.append([_kw.name for _kw in kw.keywords]) + else: + _keywords.append(kw) + return _keywords + + if not iso2dict and keywords: + return [ + { + "keywords": convert_iso_keywords(keywords), + "thesaurus": {"date": None, "datetype": None, "title": None}, + "type": theme, + } + ] + return convert_iso_keywords(keywords) + @staticmethod def _create_harvester_from_server(harvest_server, harvester_class): harvester = harvester_class( @@ -621,11 +841,11 @@ def set_default_responsible_parties(self, dataset, default_dcat_info, ckan_info, for metadata_field, dcat_attribute in metadata_fields.items(): value = source_dataset.get(metadata_field) or getattr(default_dcat_info, dcat_attribute, None) if metadata_field == "contact_uri" and not value: - value = self._get_dir3_uri(ckan_info.dir3_soup, getattr(default_dcat_info, "contact_uri", None), source_dataset.get("contact_name")) + value = self._get_dir3_uri(ckan_info.dir3_soup, self.get_default_dcat_info_attribute("contact_uri"), source_dataset.get("contact_name")) elif metadata_field == "publisher_identifier" and not value: - value = self._get_dir3_uri(ckan_info.dir3_soup, getattr(default_dcat_info, "contact_uri", None), source_dataset.get("publisher_name")) + value = self._get_dir3_uri(ckan_info.dir3_soup, self.get_default_dcat_info_attribute("contact_uri"), source_dataset.get("publisher_name")) elif metadata_field == "maintainer_uri" and not value: - value = self._get_dir3_uri(ckan_info.dir3_soup, getattr(default_dcat_info, "contact_uri", None), source_dataset.get("maintainer_name")) + value = self._get_dir3_uri(ckan_info.dir3_soup, self.get_default_dcat_info_attribute("contact_uri"), source_dataset.get("maintainer_name")) elif metadata_field == "publisher_type" and value: value = value.replace('https:', 'http:') getattr(dataset, f"set_{metadata_field}")(value) @@ -690,12 +910,13 @@ def set_bounding_box_from_bounding_box(self, dataset, bounding_box): self._set_min_max_coordinates(dataset, minx, maxx, miny, maxy) - def set_keywords_themes_topic(self, dataset, custom_metadata): + def set_default_keywords_themes_topic(self, dataset, custom_metadata, ckan_schema = 'geodcatap'): """Sets the keywords for a dataset. INSPIRE keywords/themes, default/custom keywords, ISO 19115 Topic category and Spanish NTI-RISP Theme. Args: dataset: The dataset to set the keywords for. custom_metadata: A dictionary containing custom metadata for the dataset. + ckan_schema: The CKAN schema (ckanext-scheming) dataset type. Returns: None. @@ -729,7 +950,7 @@ def set_keywords_themes_topic(self, dataset, custom_metadata): keywords_uri.add(k['uri']) # Set keywords (INSPIRE quality) and INSPIRE Themes - inspireid_theme = self.default_inspire_info['inspireid_theme'].lower() + inspireid_theme = self.get_default_dcat_info_attribute("inspireid_theme").lower() theme_inspire = "http://inspire.ec.europa.eu/theme/" + inspireid_theme # Insert inspireid_theme (default) as theme/keyword @@ -739,14 +960,19 @@ def set_keywords_themes_topic(self, dataset, custom_metadata): # Set ISO 19115 Topic Category ## Insert topic (default) as topic - default_topic = self.default_dcat_info.topic + default_topic = self.get_default_dcat_info_attribute("topic") dataset.set_topic(default_topic) - # Insert theme_es (default) as theme - themes_es.append(self.default_dcat_info.theme_es) - themes_eu.append(self.default_dcat_info.theme_eu) + # Insert theme_eu (default) + theme_eu = self.get_default_dcat_info_attribute("theme_eu") + themes_eu.append(theme_eu) + + # Insert theme_es if ckan_schema == 'geodcatap_es' + if ckan_schema == 'geodcatap_es': + theme_es = self.get_default_dcat_info_attribute("theme_es") + themes_es.append(theme_es) + self._set_themes_es(dataset, list(set(themes_es))) - self._set_themes_es(dataset, list(set(themes_es))) self._set_themes_eu(dataset, list(set(themes_eu))) self._set_themes(dataset, list(set(themes))) dataset.set_keywords(keywords)