diff --git a/backend/dataflow/validate_geometries.py b/backend/dataflow/validate_geometries.py index 3cd5d15..843df12 100644 --- a/backend/dataflow/validate_geometries.py +++ b/backend/dataflow/validate_geometries.py @@ -6,9 +6,9 @@ class ValidateGeometriesOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): - parser.add_argument('--dataset', required=True) - parser.add_argument('--input_bucket', required=True) - parser.add_argument('--output_bucket', required=True) + parser.add_argument('--dataset') + parser.add_argument('--input_bucket') + parser.add_argument('--output_bucket') class ReadParquetDoFn(beam.DoFn): def __init__(self, input_bucket): @@ -23,16 +23,12 @@ def process(self, dataset): class ValidateGeometriesDoFn(beam.DoFn): def process(self, element): - import pandas as pd from shapely.geometry import Polygon, MultiPolygon - gdf = element['data'] - dataset = element['dataset'] # Basic validation for all geometries invalid_mask = ~gdf.geometry.is_valid if invalid_mask.any(): - # Attempt to fix invalid geometries gdf.loc[invalid_mask, 'geometry'] = gdf.loc[invalid_mask, 'geometry'].apply( lambda geom: geom.buffer(0) if geom else None ) @@ -40,30 +36,6 @@ def process(self, element): # Ensure all features are valid after fixes gdf = gdf[gdf.geometry.is_valid] - # Ensure all geometries are Polygon or MultiPolygon - gdf = gdf[gdf.geometry.apply(lambda x: isinstance(x, (Polygon, MultiPolygon)))] - - # Remove empty geometries - gdf = gdf[~gdf.geometry.is_empty] - - # Dataset-specific validations - if dataset == 'water_projects': - if 'area_ha' in gdf.columns: - gdf = gdf[gdf.area_ha > 0] - if 'budget' in gdf.columns: - gdf = gdf[gdf.budget > 0] - - elif dataset == 'wetlands': - if 'gridcode' in gdf.columns: - gdf = gdf[gdf.gridcode.notna()] - if 'toerv_pct' in gdf.columns: - gdf['toerv_pct'] = pd.to_numeric(gdf['toerv_pct'], errors='coerce') - gdf = gdf[gdf.toerv_pct.between(0, 100)] - - elif dataset == 'cadastral': - if 'bfe_number' in gdf.columns: - gdf = gdf[gdf.bfe_number.notna()] - # Calculate validation stats stats = { 'total_rows': len(element['data']), @@ -84,25 +56,16 @@ def process(self, element): gdf = element['data'] stats = element['stats'] - # Write parquet output_path = f'gs://{self.output_bucket}/validated/{dataset}/current.parquet' logging.info(f'Writing to {output_path}') - gdf.to_parquet( - output_path, - compression='zstd', - compression_level=3, - index=False, - row_group_size=100000 - ) + gdf.to_parquet(output_path) - # Write stats stats_path = f'gs://{self.output_bucket}/validated/{dataset}/validation_stats.csv' pd.DataFrame([stats]).to_csv(stats_path, index=False) yield element def run(argv=None): - """Build and run the pipeline.""" pipeline_options = PipelineOptions(argv) options = pipeline_options.view_as(ValidateGeometriesOptions)