Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions coordinates/coordinates.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import numpy as np
from geoalchemy2 import WKBElement
from netCDF4 import Dataset
from pypsdm.db.weather.models import Coordinate
from shapely.geometry import Point
from sqlalchemy import func
from sqlalchemy.exc import IntegrityError
from sqlmodel import Session


Expand All @@ -24,3 +28,53 @@ def create_coordinates_df(weather: Dataset, session: Session):
session.commit()

return idx_to_id


def insert_coordinate(session, lat, lon):
"""
Insert a coordinate into the database if it doesn't already exist.

Args:
session: Database session
lat: Latitude of the coordinate
lon: Longitude of the coordinate

Returns:
int: The ID of the inserted or existing coordinate.
"""
point = Point(lon, lat)
geography_point = WKBElement(point.wkb, srid=4326)

try:
existing_coordinate = (
session.query(Coordinate)
.filter(Coordinate.coordinate == geography_point)
.first()
)

if existing_coordinate:
return existing_coordinate.id

# Find the highest current ID in the Coordinate table
max_id = session.query(func.max(Coordinate.id)).scalar()
new_id = (max_id or 0) + 1
new_coordinate = Coordinate.from_xy(id=new_id, x=lat, y=lon)

session.add(new_coordinate)
session.commit()

return new_coordinate.id

except IntegrityError as e:
session.rollback()

existing_coordinate = (
session.query(Coordinate)
.filter(Coordinate.coordinate == geography_point)
.first()
)

if existing_coordinate:
return existing_coordinate.id

raise e
125 changes: 95 additions & 30 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
#!/usr/bin/env python
"""
Main entry point for weather data processing application.
Utility script for processing weather data from GRIB and NetCDF files.
"""
import argparse
import sys
from pathlib import Path

from weather.config import load_config
from weather.convert import inspect_grib_file
from weather.processor import process_weather_data

# Add the parent directory to the path to import our modules
sys.path.append(str(Path(__file__).parent.parent))


def parse_arguments():
"""Parse command line arguments."""
parser = argparse.ArgumentParser(
description="Process weather data from NetCDF files."
description="Process weather data from GRIB and NetCDF files."
)
parser.add_argument(
"--config",
Expand All @@ -20,51 +26,110 @@ def parse_arguments():
default="config.yaml",
help="Path to YAML configuration file",
)
parser.add_argument(

subparsers = parser.add_subparsers(dest="command", required=True)

# Subparser for inspecting GRIB files (no longer takes an argument)
grib_parser = subparsers.add_parser("inspect-grib", help="Inspect a GRIB file")

grib_parser.add_argument(
"--verbose", "-v", action="store_true", help="Enable verbose logging"
)

# Subparser for processing NetCDF files
netcdf_parser = subparsers.add_parser("process-netcdf", help="Process NetCDF files")

netcdf_parser.add_argument(
"--batch-size",
dest="batch_size",
type=int,
help="Override batch size from config file",
)
parser.set_defaults(perform_migration=True)
netcdf_parser.add_argument(
"--log-file",
dest="log_file",
type=str,
help="Override log file path from config file",
)
netcdf_parser.add_argument(
"--no-migration",
dest="perform_migration",
action="store_false",
help="Skip database migration after processing",
)

netcdf_parser.set_defaults(perform_migration=True)

return parser.parse_args()


def main():
# Parse command line arguments
args = parse_arguments()

try:
status_code = 0 # Initialize status code

if args.command == "inspect-grib":
# Load configuration from YAML
config = load_config(args.config_path)

# Command line arguments override configuration file
batch_size = (
args.batch_size
if args.batch_size is not None
else config.get("batch_size", 1000)
)
perform_migration = True

# Get input directory from config
input_dir = config.get("input_dir")
file_name_base = config.get("file_name_base")

print(f"Loaded configuration from {args.config_path}")
print(f"Using ROOT_DIR: {config.get('ROOT_DIR')}")
print(f"Using input directory: {input_dir}")
print(f"Using file name base: {file_name_base}")
print(f"Using batch size: {batch_size}")
print(f"Database migration: {'Enabled' if perform_migration else 'Disabled'}")

print("Starting weather data processing")
process_weather_data(input_dir, file_name_base, batch_size, perform_migration)
print("Processing completed successfully")
except Exception as e:
print(f"Error during processing: {e}")
return 1

return 0
# Construct full path to the GRIB file using input_dir and file_name_base
grib_file_path = (
Path(input_dir) / f"{file_name_base}.grib"
) # Adjust extension as needed

# Check if file exists
if not grib_file_path.exists():
print(f"Error: File '{grib_file_path}' not found")
return 1

try:
inspect_grib_file(grib_file_path)
print("GRIB file inspection completed successfully.")
return 0
except Exception as e:
print(f"Error inspecting GRIB file: {e}")
return 1

elif args.command == "process-netcdf":
try:
# Load configuration from YAML
config = load_config(args.config_path)

# Command line arguments override configuration file
batch_size = (
args.batch_size
if args.batch_size is not None
else config.get("batch_size", 1000)
)

perform_migration = True

# Get input directory from config
input_dir = config.get("input_dir")
file_name_base = config.get("file_name_base")

print(f"Loaded configuration from {args.config_path}")
print(f"Using ROOT_DIR: {config.get('ROOT_DIR')}")
print(f"Using input directory: {input_dir}")
print(f"Using file name base: {file_name_base}")
print(f"Using batch size: {batch_size}")
print(
f"Database migration: {'Enabled' if perform_migration else 'Disabled'}"
)

print("Starting weather data processing")
process_weather_data(
input_dir, file_name_base, batch_size, perform_migration
)
print("Processing completed successfully")
except Exception as e:
print(f"Error during processing: {e}")
status_code = 1

return status_code


if __name__ == "__main__":
Expand Down
4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ pypsdm = "^0.0.6"
psycopg2 = "^2.9.11"
netcdf4 = "^1.7.3"
sqlmodel = "^0.0.24"
cfgrib = "^0.9.15.0"
xarray = "^2025.6.1"
eccodes = "^2.42.0"
pyarrow = "^20.0.0"

[tool.poetry.group.dev.dependencies]
pytest = "^8.4.2"
Expand Down
Loading
Loading