From e57f64eb3ddf64151d2a3ed8d85c53bf1b033d7e Mon Sep 17 00:00:00 2001 From: Martin Collignon <2604526+martincollignon@users.noreply.github.com> Date: Sun, 1 Dec 2024 17:22:42 +0100 Subject: [PATCH] feat: add dataset-specific validations to geometry pipeline - Add validation rules for water_projects (area_ha, budget) - Add validation rules for wetlands (gridcode, toerv_pct) - Add validation rules for cadastral (bfe_number) - Move heavy dependencies inside DoFn process methods --- backend/dataflow/validate_geometries.py | 45 +++++++++++++++++++++++-- 1 file changed, 42 insertions(+), 3 deletions(-) diff --git a/backend/dataflow/validate_geometries.py b/backend/dataflow/validate_geometries.py index c9e554a2..e507df5b 100644 --- a/backend/dataflow/validate_geometries.py +++ b/backend/dataflow/validate_geometries.py @@ -23,15 +23,54 @@ def process(self, dataset): class ValidateGeometriesDoFn(beam.DoFn): def process(self, element): + import pandas as pd + from shapely.geometry import Polygon, MultiPolygon + gdf = element['data'] dataset = element['dataset'] - # Validation logic here + # Basic validation for all geometries + invalid_mask = ~gdf.geometry.is_valid + if invalid_mask.any(): + # Attempt to fix invalid geometries + gdf.loc[invalid_mask, 'geometry'] = gdf.loc[invalid_mask, 'geometry'].apply( + lambda geom: geom.buffer(0) if geom else None + ) + + # Ensure all features are valid after fixes + gdf = gdf[gdf.geometry.is_valid] + + # Ensure all geometries are Polygon or MultiPolygon + gdf = gdf[gdf.geometry.apply(lambda x: isinstance(x, (Polygon, MultiPolygon)))] + + # Remove empty geometries + gdf = gdf[~gdf.geometry.is_empty] + + # Dataset-specific validations + if dataset == 'water_projects': + if 'area_ha' in gdf.columns: + gdf = gdf[gdf.area_ha > 0] + if 'budget' in gdf.columns: + gdf = gdf[gdf.budget > 0] + + elif dataset == 'wetlands': + if 'gridcode' in gdf.columns: + gdf = gdf[gdf.gridcode.notna()] + if 'toerv_pct' in gdf.columns: + gdf['toerv_pct'] = pd.to_numeric(gdf['toerv_pct'], errors='coerce') + gdf = gdf[gdf.toerv_pct.between(0, 100)] + + elif dataset == 'cadastral': + if 'bfe_number' in gdf.columns: + gdf = gdf[gdf.bfe_number.notna()] + + # Calculate validation stats stats = { - 'total_rows': len(gdf), - 'valid_geometries': len(gdf[gdf.geometry.is_valid]) + 'total_rows': len(element['data']), + 'valid_geometries': len(gdf) } + element['data'] = gdf element['stats'] = stats yield element