Skip to content

Commit

Permalink
Improve base harvester
Browse files Browse the repository at this point in the history
- Include ows functions common to all harvesters using owslib (csw, xml).
- Add auxiliary functions such as get_default_dcat_info_attribute to avoid repetition.
  • Loading branch information
mjanez committed Sep 12, 2023
1 parent 2ed438e commit 12a33c6
Showing 1 changed file with 255 additions and 29 deletions.
284 changes: 255 additions & 29 deletions ogc2ckan/harvesters/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,27 @@
import logging
from datetime import datetime
import html
import os

# third-party libraries
from geojson import Polygon, dumps
from pyproj import Transformer
import pandas as pd
import re
from bs4 import BeautifulSoup
from owslib.iso import MD_Keywords
from owslib.namespaces import Namespaces
from owslib import util

# custom classes
from controller import ckan_management
from model.custom_organization import CustomOrganization
from controller.mapping import get_mapping_value
from config.ogc2ckan_config import load_yaml, get_log_module
from mappings.default_ogc2ckan_config import OGC2CKAN_PATHS_CONFIG, OGC2CKAN_HARVESTER_MD_CONFIG, OGC2CKAN_CKANINFO_CONFIG, OGC2CKAN_MD_FORMATS
from mappings.default_ogc2ckan_config import OGC2CKAN_PATHS_CONFIG, OGC2CKAN_HARVESTER_MD_CONFIG, OGC2CKAN_CKANINFO_CONFIG, OGC2CKAN_MD_FORMATS, OGC2CKAN_ISO_MD_ELEMENTS
from harvesters.harvesters import get_harvester_class


log_module = get_log_module()
log_module = get_log_module(os.path.abspath(__file__))

class DCATInfo:
'''Represents the information of a dataset in DCAT format.
Expand Down Expand Up @@ -107,8 +110,9 @@ def __init__(self, app_dir, url, name, groups, active, organization, type, custo
default_localized_strings_file = f"{self.app_dir}/{OGC2CKAN_PATHS_CONFIG['default_mappings_folder']}/{OGC2CKAN_PATHS_CONFIG['default_localized_strings_file']}"
# Localized default info
yaml_dict = load_yaml(default_localized_strings_file)
language = self.default_dcat_info.language
language = self.get_default_dcat_info_attribute("language")
self.localized_strings_dict = self._get_localized_dict(yaml_dict, language)
self.ows_namespaces = self._ows_get_namespaces()

@classmethod
def from_harvest_server(cls, harvest_server, app_dir):
Expand Down Expand Up @@ -151,16 +155,6 @@ def create_datasets(self, ckan_info):
# Get all datasets
self.get_datasets(ckan_info)

#TODO: Check if the dataset exists in CKAN use: ckan_management.get_ckan_datasets_list
# if hasattr(self, 'constraints') and self.constraints:
# emails = set(email.lower().replace(' ','') for email in (self.constraints.get('mails') or []) if email)
# else:
# emails = []

# if len(emails) > 0:
# datasets_title = [x.title for x in self.datasets if x.contact_email.lower().replace(' ','') in emails]
# logging.info(f"{self.name} ({self.type.upper()}) server records found: {', '.join(datasets_title)}")

if hasattr(self, 'workspaces') and self.workspaces:
logging.info(f"{log_module}:{self.name} ({self.type.upper()}) server OGC workspaces selected: {', '.join([w.upper() for w in self.workspaces])}")

Expand Down Expand Up @@ -225,17 +219,24 @@ def get_custom_default_metadata(self, dataset_id: str, dict_property: str = 'dat
Returns:
Any: The value of the specified property in the mapping dictionary, or None if the property is not found.
"""
mapping = self.custom_organization_info.find_mapping_value(dataset_id, dict_property)
try:
mapping = self.custom_organization_info.find_mapping_value(dataset_id, dict_property)
except KeyError:
mapping = None

if mapping is None:
mapping = self.custom_organization_info.find_similar_mapping_value(dataset_id, dict_property)

if mapping is None:
# If 'dataset_id' property is not found, try 'dataset_group_id'
mapping = self.custom_organization_info.find_mapping_value(
dataset_id,
'dataset_group_id'
)
try:
mapping = self.custom_organization_info.find_mapping_value(
dataset_id,
'dataset_group_id'
)
except KeyError:
logging.error(f"{log_module}:Dataset: '{dataset_id}' does not have info in 'custom_organization_mapping_file'. Add it or deactivate `custom_organization_active`")
mapping = None

return mapping

Expand Down Expand Up @@ -267,6 +268,225 @@ def get_custom_metadata_value(self, custom_metadata, key, default=None):
else:
return default

def get_default_dcat_info_attribute(self, field_name):
"""
Tries to retrieve the value of a field_name from self.default_dcat_info. If the field does not exist, returns the default value from OGC2CKAN_HARVESTER_MD_CONFIG.
Args:
field_name (str): The name of the field to retrieve.
Returns:
The value of the specified field from self.default_dcat_info, or the default value from OGC2CKAN_HARVESTER_MD_CONFIG if the field does not exist.
"""
try:
return getattr(self.default_dcat_info, field_name, OGC2CKAN_HARVESTER_MD_CONFIG[field_name])

except AttributeError:
logging.error(f"{log_module}:Field '{field_name}' does not exist in 'default_dcat_info' section at 'config.yaml'")
return None

except KeyError as e:
logging.error(f"{log_module}:Field '{field_name}' does not exist in 'OGC2CKAN_HARVESTER_MD_CONFIG' at: 'ckan-ogc/ogc2ckan/mappings/default_ogc2ckan_config.py': {e}")
return None

# OWS functions
def ows_update_metadata_sections(self, layer_info):
def get_first_element_from_list(lst):
return lst[0] if isinstance(lst, list) and lst else None

layer_info.identification = get_first_element_from_list(layer_info.identification) if layer_info.identification else None
layer_info.distributor = get_first_element_from_list(layer_info.distribution.distributor) if layer_info.distribution and layer_info.distribution.distributor else None
layer_info.distribution = layer_info.distribution.online if hasattr(layer_info.distribution, 'online') else None
layer_info.contact = get_first_element_from_list(layer_info.contact) if layer_info.contact else None
layer_info.identification.publisher = get_first_element_from_list(layer_info.identification.publisher) if layer_info.identification and layer_info.identification.publisher else None
layer_info.topiccategory = get_first_element_from_list(layer_info.identification.topiccategory) if layer_info.identification and layer_info.identification.topiccategory else None
layer_info.uricode = get_first_element_from_list(layer_info.identification.uricode) if layer_info.identification and layer_info.identification.uricode else None

try:
if not layer_info.identification:
raise AttributeError("identification")
if not layer_info.distribution:
raise AttributeError("distribution")
if not layer_info.contact:
raise AttributeError("contact")
if not layer_info.topiccategory:
raise AttributeError("topiccategory")
if not layer_info.uricode:
raise AttributeError("uricode")

except AttributeError as e:
logging.error(f"{log_module}:An error occurred in ows_update_metadata_sections: {e}")
setattr(layer_info, e.args[0], None)

def ows_get_metadata_not_owslib(self, layer_info):
"""Gets metadata values that are not retrieved by OWSLib from an MD_Metadata object.
Args:
layer_info (object): Object containing metadata information.
Returns:
dict: Dictionary containing metadata values.
"""
return {
"lineage_source": self._ows_findall_metadata_elements(layer_info, self.ows_namespaces, OGC2CKAN_ISO_MD_ELEMENTS['lineage_source'])
}

def ows_get_keywords(self, dataset, keywords):
"""
Gets the keywords from a OWS MD_Metadata record.
Args:
dataset: The CKAN Dataset to ingest.
keywords: The OWS MD_Metadata Record object list of keywords to get.
Returns:
list: The keywords.
"""
keywords_list = dataset.keywords
keywords_thesaurus_list = dataset.keywords_thesaurus
themes_set = set(dataset.theme)
keywords_uri_set = set(dataset.keywords_uri)
topic = dataset.topic

for keyword in keywords:
for k in keyword.keywords:
url = getattr(k, "url", None)
if url:
last_part = url.split("/")[-1]
keywords_list.append({'name': last_part.lower()})
keywords_uri_set.add(url)
if '/theme/' in url:
# INSPIRE Theme
themes_set.add(url)

if keyword.thesaurus:
url = keyword.thesaurus.get('url')
title = keyword.thesaurus.get('title')
date = keyword.thesaurus.get('date')
datetype = keyword.thesaurus.get('datetype')
if url:
keywords_thesaurus_list.append({'title': title, 'date': date, 'datetype': datetype, 'url': url})

if topic:
themes_set.add(get_mapping_value(topic, 'theme', 'id', 'topic_category'))

# Unique values
keywords_list = [dict(t) for t in {tuple(d.items()) for d in keywords_list}]

# Set themes/keywords/keywords_uri
self._set_themes(dataset, list(themes_set))
dataset.set_keywords(keywords_list)
self._set_keywords_uri(dataset, list(keywords_uri_set))

def ows_set_metadata_dates(self, dataset, record_id):
"""
Sets the metadata dates for a CKAN dataset from an ISO metadata record.
Args:
dataset: The CKAN dataset to set the metadata dates for.
record_id: The ISO metadata record to get the metadata dates from.
"""
# Default values
issued_date = datetime.now().strftime('%Y-%m-%d')
created_date = '1900-01-01'
modified_date = issued_date

for date in record_id.date:
if date.type == "creation":
created_date = self._normalize_date(date.date)
elif date.type == "publication":
issued_date = self._normalize_date(date.date)
elif date.type == "revision":
modified_date = self._normalize_date(date.date)

dataset.set_issued(issued_date)
dataset.set_created(created_date)
dataset.set_modified(modified_date)

return issued_date, modified_date

@staticmethod
def _ows_get_namespaces():
n = Namespaces()
ns = n.get_namespaces(["gco", "gfc", "gmd", "gmi", "gml", "gml32", "gmx", "gts", "srv", "xlink"])
ns[None] = n.get_namespace("gmd")

return ns

@staticmethod
def _ows_findall_metadata_elements(layer_info, namespaces, tag):
"""
Finds all elements in an ISO metadata record (md).
Args:
layer_info: The MD_Metadata object that contains ISO metadata record to search in.
namespaces: The namespaces to use for the search.
tag: The tag of the element to search for.
Returns:
The elements if founds, otherwise None.
"""
results = []
val = layer_info.md.findall(util.nspath_eval(tag, namespaces))

for i in val:
if hasattr(i, 'text'):
i = util.testXMLValue(i)
results.append(i)

return results

@staticmethod
def _ows_find_metadata_element(layer_info, namespaces, tag):
"""
Finds element in an ISO metadata record (md).
Args:
layer_info: The MD_Metadata object that contains ISO metadata record to search in.
namespaces: The namespaces to use for the search.
tag: The tag of the element to search for.
Returns:
The element if found, otherwise None.
"""
val = layer_info.md.find(util.nspath_eval(tag, namespaces))
if hasattr(val, 'text'):
val = util.testXMLValue(val.text)
return val

@staticmethod
def _ows_convert_keyword(keywords, iso2dict=False, theme="theme"):
"""
Convert keywords to a standardized format.
Args:
keywords (list): The list of keywords to convert.
iso2dict (bool, optional): Whether to convert ISO keywords to a dictionary format. Default is False.
theme (str, optional): The theme/category for the keywords. Default is "theme".
Returns:
list or dict: The converted keywords in a standardized format. If `iso2dict` is True, returns a list of dictionaries.
Otherwise, returns a list of lists.
"""
def convert_iso_keywords(keywords):
_keywords = []
for kw in keywords:
if isinstance(kw, MD_Keywords):
_keywords.append([_kw.name for _kw in kw.keywords])
else:
_keywords.append(kw)
return _keywords

if not iso2dict and keywords:
return [
{
"keywords": convert_iso_keywords(keywords),
"thesaurus": {"date": None, "datetype": None, "title": None},
"type": theme,
}
]
return convert_iso_keywords(keywords)

@staticmethod
def _create_harvester_from_server(harvest_server, harvester_class):
harvester = harvester_class(
Expand Down Expand Up @@ -621,11 +841,11 @@ def set_default_responsible_parties(self, dataset, default_dcat_info, ckan_info,
for metadata_field, dcat_attribute in metadata_fields.items():
value = source_dataset.get(metadata_field) or getattr(default_dcat_info, dcat_attribute, None)
if metadata_field == "contact_uri" and not value:
value = self._get_dir3_uri(ckan_info.dir3_soup, getattr(default_dcat_info, "contact_uri", None), source_dataset.get("contact_name"))
value = self._get_dir3_uri(ckan_info.dir3_soup, self.get_default_dcat_info_attribute("contact_uri"), source_dataset.get("contact_name"))
elif metadata_field == "publisher_identifier" and not value:
value = self._get_dir3_uri(ckan_info.dir3_soup, getattr(default_dcat_info, "contact_uri", None), source_dataset.get("publisher_name"))
value = self._get_dir3_uri(ckan_info.dir3_soup, self.get_default_dcat_info_attribute("contact_uri"), source_dataset.get("publisher_name"))
elif metadata_field == "maintainer_uri" and not value:
value = self._get_dir3_uri(ckan_info.dir3_soup, getattr(default_dcat_info, "contact_uri", None), source_dataset.get("maintainer_name"))
value = self._get_dir3_uri(ckan_info.dir3_soup, self.get_default_dcat_info_attribute("contact_uri"), source_dataset.get("maintainer_name"))
elif metadata_field == "publisher_type" and value:
value = value.replace('https:', 'http:')
getattr(dataset, f"set_{metadata_field}")(value)
Expand Down Expand Up @@ -690,12 +910,13 @@ def set_bounding_box_from_bounding_box(self, dataset, bounding_box):

self._set_min_max_coordinates(dataset, minx, maxx, miny, maxy)

def set_keywords_themes_topic(self, dataset, custom_metadata):
def set_default_keywords_themes_topic(self, dataset, custom_metadata, ckan_schema = 'geodcatap'):
"""Sets the keywords for a dataset. INSPIRE keywords/themes, default/custom keywords, ISO 19115 Topic category and Spanish NTI-RISP Theme.
Args:
dataset: The dataset to set the keywords for.
custom_metadata: A dictionary containing custom metadata for the dataset.
ckan_schema: The CKAN schema (ckanext-scheming) dataset type.
Returns:
None.
Expand Down Expand Up @@ -729,7 +950,7 @@ def set_keywords_themes_topic(self, dataset, custom_metadata):
keywords_uri.add(k['uri'])

# Set keywords (INSPIRE quality) and INSPIRE Themes
inspireid_theme = self.default_inspire_info['inspireid_theme'].lower()
inspireid_theme = self.get_default_dcat_info_attribute("inspireid_theme").lower()
theme_inspire = "http://inspire.ec.europa.eu/theme/" + inspireid_theme

# Insert inspireid_theme (default) as theme/keyword
Expand All @@ -739,14 +960,19 @@ def set_keywords_themes_topic(self, dataset, custom_metadata):

# Set ISO 19115 Topic Category
## Insert topic (default) as topic
default_topic = self.default_dcat_info.topic
default_topic = self.get_default_dcat_info_attribute("topic")
dataset.set_topic(default_topic)

# Insert theme_es (default) as theme
themes_es.append(self.default_dcat_info.theme_es)
themes_eu.append(self.default_dcat_info.theme_eu)
# Insert theme_eu (default)
theme_eu = self.get_default_dcat_info_attribute("theme_eu")
themes_eu.append(theme_eu)

# Insert theme_es if ckan_schema == 'geodcatap_es'
if ckan_schema == 'geodcatap_es':
theme_es = self.get_default_dcat_info_attribute("theme_es")
themes_es.append(theme_es)
self._set_themes_es(dataset, list(set(themes_es)))

self._set_themes_es(dataset, list(set(themes_es)))
self._set_themes_eu(dataset, list(set(themes_eu)))
self._set_themes(dataset, list(set(themes)))
dataset.set_keywords(keywords)
Expand Down

0 comments on commit 12a33c6

Please sign in to comment.