Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve logs and docs #51

Merged
merged 2 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion cdsobs/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ def run_ingestion_pipeline(
Month to start reading the data. It only applies to the first year of the interval.
Default is 1.
"""
logger.info("----------------------------------------------------------------")
logger.info("Running ingestion pipeline")
logger.info("----------------------------------------------------------------")
service_definition = get_service_definition(config, dataset_name)

def _run_for_batch(time_space_batch):
Expand Down Expand Up @@ -138,6 +141,9 @@ def run_make_cdm(
make_production. If False, the data only will be loaded and checked for CDM
compliance in memory.
"""
logger.info("----------------------------------------------------------------")
logger.info("Running make cdm")
logger.info("----------------------------------------------------------------")
service_definition = get_service_definition(config, dataset_name)

def _run_for_batch(time_batch):
Expand All @@ -152,7 +158,7 @@ def _run_for_batch(time_batch):
time_batch,
)
except EmptyBatchException:
logger.warning(f"Not found for {time_batch=}")
logger.warning(f"No data found for {time_batch=}")

main_iterator = _get_main_iterator(
config, dataset_name, source, start_year, end_year
Expand Down
3 changes: 2 additions & 1 deletion cdsobs/cdm/api.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from dataclasses import dataclass
from itertools import chain
from pathlib import Path
from pprint import pformat
from typing import List, Optional

import fsspec
Expand Down Expand Up @@ -76,7 +77,7 @@ def to_cdm_dataset(partition: DatasetPartition) -> CdmDataset:
if len(removed_variables) > 0:
logger.warning(
"The following variables where read but are not in the CDM and "
f"are going to be dropped: {removed_variables}"
f"are going to be dropped: {pformat(removed_variables)}"
)
return CdmDataset(data, partition.partition_params, partition.dataset_metadata)

Expand Down
19 changes: 7 additions & 12 deletions cdsobs/cdm/check.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from dataclasses import dataclass
from pprint import pformat
from typing import Iterator, List

import pandas
Expand Down Expand Up @@ -87,15 +88,17 @@ def check_table_cdm_compliance(
"""
table_name = table_def.name
table_field_mapping = get_cdm_table_mapping(cdm_tables, homogenised_data, table_def)
fields_found = (
table_field_mapping.fields_found + table_field_mapping.fields_with_suffix
)
logger.info(
f"Found the following fields for {table_name=}: "
f"{table_field_mapping.fields_found + table_field_mapping.fields_with_suffix}"
f"Found the following fields for {table_name}: " f"{pformat(fields_found)}"
)
foreign_fields = table_field_mapping.foreign_fields
foreign_fields = [f.name for f in table_field_mapping.foreign_fields]
if len(foreign_fields) > 0:
logger.info(
"Also, the following fields can be mapped from their names in "
f"children tables: {foreign_fields=}"
f"children tables: {pformat(foreign_fields)}"
)
# Check the primary keys for this table are available and NaN free
_check_primary_keys(table_field_mapping, homogenised_data)
Expand Down Expand Up @@ -143,14 +146,6 @@ def _check_data_types(
logger.warning(
f"For {field=} {input_data_dtype=} does not match with {cdm_numpy_dtype=}"
)
else:
# Warn if timestamp without timeszones is used
if cdm_dtype == "timestamp with timezone" and not hasattr(
input_data_dtype, "tz"
):
logger.warning(
f"{field=} does not have timezone information, UTC is assumed."
)


def get_cdm_table_mapping(
Expand Down
4 changes: 3 additions & 1 deletion cdsobs/cli/_validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@


def validate_service_definition(
service_definition: str = typer.Argument(..., help="Path to JSON file"),
service_definition: str = typer.Argument(
..., help="Path to Service definition YAML file"
),
cdsobs_config_yml: Path = config_yml_typer,
):
"""Validate a service definition YAML file."""
Expand Down
10 changes: 6 additions & 4 deletions cdsobs/ingestion/api.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from hashlib import sha1
from importlib import import_module
from pathlib import Path
from pprint import pformat
from typing import List

import numpy
Expand Down Expand Up @@ -138,8 +139,9 @@ def check_mandatory_columns(
all_columns = set(data_renamed.columns)
missing_mandatory_columns = mandatory_columns.difference(all_columns)
if len(missing_mandatory_columns) > 0:
logger.warning(f"Mandatory columns {missing_mandatory_columns} are missing")
# raise MissingMandatoryColumns
logger.warning(
f"Mandatory columns {pformat(missing_mandatory_columns)} are missing"
)


def cast_to_descriptions(
Expand Down Expand Up @@ -232,7 +234,6 @@ def read_batch_data(
# Explicitly remove this reference to reduce memory usage
del data_table
source_definition = service_definition.sources[source]
logger.info("Applying the melt columns configuration.")
if source_definition.cdm_mapping.melt_columns is not None:
logger.info("Melting variable columns as requested")
homogenised_data = _melt_variables(
Expand Down Expand Up @@ -322,7 +323,8 @@ def _melt_variables(
# Check for variables not in the code table
cdm_vars = set(code_dict)
not_found = set(homogenised_data_melted["observed_variable"].unique()) - cdm_vars
logger.warning(f"Some variables were not found in the CDM: {not_found}")
if len(not_found) > 0:
logger.warning(f"Some variables were not found in the CDM: {not_found}")
return homogenised_data_melted


Expand Down
2 changes: 2 additions & 0 deletions cdsobs/ingestion/partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ def get_partitions(
station_ids,
sources,
)
# Remove group columns from the data
group_data = group_data.drop(group_columns, axis=1)
yield DatasetPartition(
dataset_params, partition_params, group_data, constraints
)
Expand Down
13 changes: 11 additions & 2 deletions cdsobs/ingestion/readers/sql.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import datetime
from typing import Any, Protocol, Tuple
import inspect
from typing import Any, Callable, Protocol, Tuple

import connectorx as cx
import pandas
Expand Down Expand Up @@ -258,7 +259,8 @@ def read_ingestion_tables(
A tuple with the header and data tables
"""
# Read tables
logger.info(f"Reading ingestion tables with {sql_reader_function=}")
function_file = get_function_reference(sql_reader_function)
logger.info(f"Reading ingestion tables with {function_file}")
if not source_definition.is_multitable():
header_table_name = None
else:
Expand All @@ -272,6 +274,13 @@ def read_ingestion_tables(
return header, data


def get_function_reference(sql_reader_function: Callable) -> str:
"""Return function reference as module.name."""
module = inspect.getmodule(sql_reader_function).__name__ # type: ignore
name = sql_reader_function.__name__
return module + "." + name


def get_sql_data_types(
config: DBConfig, data_table_name: str, header_table_name: str | None
) -> pandas.Series:
Expand Down
5 changes: 3 additions & 2 deletions cdsobs/service_definition/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,13 @@ def get_service_definition(
config: CDSObsConfig, dataset_name: str
) -> ServiceDefinition:
cadsobs_insitu_location = config.cads_obs_insitu_location
path_to_json = Path(
path_to_yaml = Path(
cadsobs_insitu_location,
"cads-forms-insitu",
f"{dataset_name}/service_definition.yml",
)
with open(path_to_json) as f:
logger.info(f"Reading service definition file from {path_to_yaml}")
with open(path_to_yaml) as f:
data = yaml.safe_load(f)

return ServiceDefinition(**data)
2 changes: 1 addition & 1 deletion cdsobs/utils/logutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def configure_logger() -> None:
format="%(message)s",
stream=sys.stdout,
)
logging_format = os.environ.get("CADSOBS_LOGGING_FORMAT", "JSON")
logging_format = os.environ.get("CADSOBS_LOGGING_FORMAT", "CONSOLE")
if logging_format == "CONSOLE":
renderer = structlog.dev.ConsoleRenderer(colors=False)
elif logging_format == "JSON":
Expand Down
Loading
Loading