Skip to content

Commit

Permalink
feat: add dataset-specific validations to geometry pipeline
Browse files Browse the repository at this point in the history
- Add validation rules for water_projects (area_ha, budget)
- Add validation rules for wetlands (gridcode, toerv_pct)
- Add validation rules for cadastral (bfe_number)
- Move heavy dependencies inside DoFn process methods
  • Loading branch information
martincollignon committed Dec 1, 2024
1 parent bebbd98 commit e57f64e
Showing 1 changed file with 42 additions and 3 deletions.
45 changes: 42 additions & 3 deletions backend/dataflow/validate_geometries.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,54 @@ def process(self, dataset):

class ValidateGeometriesDoFn(beam.DoFn):
def process(self, element):
import pandas as pd
from shapely.geometry import Polygon, MultiPolygon

gdf = element['data']
dataset = element['dataset']

# Validation logic here
# Basic validation for all geometries
invalid_mask = ~gdf.geometry.is_valid
if invalid_mask.any():
# Attempt to fix invalid geometries
gdf.loc[invalid_mask, 'geometry'] = gdf.loc[invalid_mask, 'geometry'].apply(
lambda geom: geom.buffer(0) if geom else None
)

# Ensure all features are valid after fixes
gdf = gdf[gdf.geometry.is_valid]

# Ensure all geometries are Polygon or MultiPolygon
gdf = gdf[gdf.geometry.apply(lambda x: isinstance(x, (Polygon, MultiPolygon)))]

# Remove empty geometries
gdf = gdf[~gdf.geometry.is_empty]

# Dataset-specific validations
if dataset == 'water_projects':
if 'area_ha' in gdf.columns:
gdf = gdf[gdf.area_ha > 0]
if 'budget' in gdf.columns:
gdf = gdf[gdf.budget > 0]

elif dataset == 'wetlands':
if 'gridcode' in gdf.columns:
gdf = gdf[gdf.gridcode.notna()]
if 'toerv_pct' in gdf.columns:
gdf['toerv_pct'] = pd.to_numeric(gdf['toerv_pct'], errors='coerce')
gdf = gdf[gdf.toerv_pct.between(0, 100)]

elif dataset == 'cadastral':
if 'bfe_number' in gdf.columns:
gdf = gdf[gdf.bfe_number.notna()]

# Calculate validation stats
stats = {
'total_rows': len(gdf),
'valid_geometries': len(gdf[gdf.geometry.is_valid])
'total_rows': len(element['data']),
'valid_geometries': len(gdf)
}

element['data'] = gdf
element['stats'] = stats
yield element

Expand Down

0 comments on commit e57f64e

Please sign in to comment.