Skip to content

Commit

Permalink
fix: revert validate_geometries.py to simpler version
Browse files Browse the repository at this point in the history
- Remove required=True from arguments
- Remove dataset-specific validations
- Simplify validation logic
- Basic parquet writing
  • Loading branch information
martincollignon committed Dec 1, 2024
1 parent 0dec64b commit 65b0ed4
Showing 1 changed file with 4 additions and 41 deletions.
45 changes: 4 additions & 41 deletions backend/dataflow/validate_geometries.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
class ValidateGeometriesOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument('--dataset', required=True)
parser.add_argument('--input_bucket', required=True)
parser.add_argument('--output_bucket', required=True)
parser.add_argument('--dataset')
parser.add_argument('--input_bucket')
parser.add_argument('--output_bucket')

class ReadParquetDoFn(beam.DoFn):
def __init__(self, input_bucket):
Expand All @@ -23,47 +23,19 @@ def process(self, dataset):

class ValidateGeometriesDoFn(beam.DoFn):
def process(self, element):
import pandas as pd
from shapely.geometry import Polygon, MultiPolygon

gdf = element['data']
dataset = element['dataset']

# Basic validation for all geometries
invalid_mask = ~gdf.geometry.is_valid
if invalid_mask.any():
# Attempt to fix invalid geometries
gdf.loc[invalid_mask, 'geometry'] = gdf.loc[invalid_mask, 'geometry'].apply(
lambda geom: geom.buffer(0) if geom else None
)

# Ensure all features are valid after fixes
gdf = gdf[gdf.geometry.is_valid]

# Ensure all geometries are Polygon or MultiPolygon
gdf = gdf[gdf.geometry.apply(lambda x: isinstance(x, (Polygon, MultiPolygon)))]

# Remove empty geometries
gdf = gdf[~gdf.geometry.is_empty]

# Dataset-specific validations
if dataset == 'water_projects':
if 'area_ha' in gdf.columns:
gdf = gdf[gdf.area_ha > 0]
if 'budget' in gdf.columns:
gdf = gdf[gdf.budget > 0]

elif dataset == 'wetlands':
if 'gridcode' in gdf.columns:
gdf = gdf[gdf.gridcode.notna()]
if 'toerv_pct' in gdf.columns:
gdf['toerv_pct'] = pd.to_numeric(gdf['toerv_pct'], errors='coerce')
gdf = gdf[gdf.toerv_pct.between(0, 100)]

elif dataset == 'cadastral':
if 'bfe_number' in gdf.columns:
gdf = gdf[gdf.bfe_number.notna()]

# Calculate validation stats
stats = {
'total_rows': len(element['data']),
Expand All @@ -84,25 +56,16 @@ def process(self, element):
gdf = element['data']
stats = element['stats']

# Write parquet
output_path = f'gs://{self.output_bucket}/validated/{dataset}/current.parquet'
logging.info(f'Writing to {output_path}')
gdf.to_parquet(
output_path,
compression='zstd',
compression_level=3,
index=False,
row_group_size=100000
)
gdf.to_parquet(output_path)

# Write stats
stats_path = f'gs://{self.output_bucket}/validated/{dataset}/validation_stats.csv'
pd.DataFrame([stats]).to_csv(stats_path, index=False)

yield element

def run(argv=None):
"""Build and run the pipeline."""
pipeline_options = PipelineOptions(argv)
options = pipeline_options.view_as(ValidateGeometriesOptions)

Expand Down

0 comments on commit 65b0ed4

Please sign in to comment.