Skip to content

Commit

Permalink
Add dataset-specific validation pipelines and update Dockerfile.proce…
Browse files Browse the repository at this point in the history
…ssing
  • Loading branch information
martincollignon committed Dec 1, 2024
1 parent bebbd98 commit 68a003d
Show file tree
Hide file tree
Showing 4 changed files with 297 additions and 1 deletion.
14 changes: 13 additions & 1 deletion backend/Dockerfile.processing
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# backend/Dockerfile.validation
# backend/Dockerfile.processing
FROM apache/beam_python3.11_sdk:2.60.0

# Install system dependencies including GDAL
Expand All @@ -17,9 +17,21 @@ ENV C_INCLUDE_PATH=/usr/include/gdal
COPY requirements.txt /app/requirements.txt
RUN pip install -r /app/requirements.txt

# Install additional dependencies needed for validation
RUN pip install \
geopandas \
pandas \
shapely \
fsspec>=2024.10.0 \
gcsfs>=2024.10.0 \
pyarrow

# Copy the dataflow directory (which includes setup.py)
COPY dataflow /app/dataflow

# Install the package in development mode
WORKDIR /app/dataflow
RUN pip install -e .

# Set the working directory
WORKDIR /app
95 changes: 95 additions & 0 deletions backend/dataflow/validate_cadastral.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import logging

class ValidateGeometriesOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument('--dataset')
parser.add_argument('--input_bucket')
parser.add_argument('--output_bucket')

class ReadParquetDoFn(beam.DoFn):
def __init__(self, input_bucket):
self.input_bucket = input_bucket

def process(self, dataset):
import geopandas as gpd
path = f'gs://{self.input_bucket}/raw/{dataset}/current.parquet'
logging.info(f'Reading from {path}')
gdf = gpd.read_parquet(path)
yield {'dataset': dataset, 'data': gdf}

class ValidateCadastralDoFn(beam.DoFn):
def process(self, element):
from shapely.geometry import Polygon, MultiPolygon
gdf = element['data']
dataset = element['dataset']

# Fix invalid geometries
invalid_mask = ~gdf.geometry.is_valid
if invalid_mask.any():
gdf.loc[invalid_mask, 'geometry'] = gdf.loc[invalid_mask, 'geometry'].apply(
lambda geom: geom.buffer(0) if geom else None
)

# Cadastral-specific validation
gdf = gdf[
gdf.geometry.is_valid &
gdf.geometry.apply(lambda x: isinstance(x, (Polygon, MultiPolygon))) &
~gdf.geometry.is_empty &
gdf.geometry.apply(lambda x: 0.1 <= x.area <= 100_000_000) # Area between 0.1m² and 100km²
]

stats = {
'total_rows': len(element['data']),
'valid_geometries': len(gdf)
}

element['data'] = gdf
element['stats'] = stats
yield element

class WriteResultsDoFn(beam.DoFn):
def __init__(self, output_bucket):
self.output_bucket = output_bucket

def process(self, element):
import pandas as pd
dataset = element['dataset']
gdf = element['data']
stats = element['stats']

# Write parquet
output_path = f'gs://{self.output_bucket}/validated/{dataset}/current.parquet'
logging.info(f'Writing to {output_path}')
gdf.to_parquet(
output_path,
compression='zstd',
compression_level=3,
index=False,
row_group_size=100000
)

# Write stats
stats_path = f'gs://{self.output_bucket}/validated/{dataset}/validation_stats.csv'
pd.DataFrame([stats]).to_csv(stats_path, index=False)

yield element

def run(argv=None):
"""Build and run the pipeline."""
pipeline_options = PipelineOptions(argv)
options = pipeline_options.view_as(ValidateGeometriesOptions)

with beam.Pipeline(options=pipeline_options) as p:
(p
| 'Create Dataset' >> beam.Create([options.dataset])
| 'Read Parquet' >> beam.ParDo(ReadParquetDoFn(options.input_bucket))
| 'Validate Geometries' >> beam.ParDo(ValidateCadastralDoFn())
| 'Write Results' >> beam.ParDo(WriteResultsDoFn(options.output_bucket))
)

if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
95 changes: 95 additions & 0 deletions backend/dataflow/validate_water_projects.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import logging

class ValidateGeometriesOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument('--dataset')
parser.add_argument('--input_bucket')
parser.add_argument('--output_bucket')

class ReadParquetDoFn(beam.DoFn):
def __init__(self, input_bucket):
self.input_bucket = input_bucket

def process(self, dataset):
import geopandas as gpd
path = f'gs://{self.input_bucket}/raw/{dataset}/current.parquet'
logging.info(f'Reading from {path}')
gdf = gpd.read_parquet(path)
yield {'dataset': dataset, 'data': gdf}

class ValidateWaterProjectsDoFn(beam.DoFn):
def process(self, element):
from shapely.geometry import Polygon, MultiPolygon
gdf = element['data']
dataset = element['dataset']

# Fix invalid geometries
invalid_mask = ~gdf.geometry.is_valid
if invalid_mask.any():
gdf.loc[invalid_mask, 'geometry'] = gdf.loc[invalid_mask, 'geometry'].apply(
lambda geom: geom.buffer(0) if geom else None
)

# Water projects-specific validation
gdf = gdf[
gdf.geometry.is_valid &
gdf.geometry.apply(lambda x: isinstance(x, (Polygon, MultiPolygon))) &
~gdf.geometry.is_empty &
gdf.geometry.apply(lambda x: 1 <= x.area <= 10_000_000) # Area between 1m² and 10km²
]

stats = {
'total_rows': len(element['data']),
'valid_geometries': len(gdf)
}

element['data'] = gdf
element['stats'] = stats
yield element

class WriteResultsDoFn(beam.DoFn):
def __init__(self, output_bucket):
self.output_bucket = output_bucket

def process(self, element):
import pandas as pd
dataset = element['dataset']
gdf = element['data']
stats = element['stats']

# Write parquet
output_path = f'gs://{self.output_bucket}/validated/{dataset}/current.parquet'
logging.info(f'Writing to {output_path}')
gdf.to_parquet(
output_path,
compression='zstd',
compression_level=3,
index=False,
row_group_size=100000
)

# Write stats
stats_path = f'gs://{self.output_bucket}/validated/{dataset}/validation_stats.csv'
pd.DataFrame([stats]).to_csv(stats_path, index=False)

yield element

def run(argv=None):
"""Build and run the pipeline."""
pipeline_options = PipelineOptions(argv)
options = pipeline_options.view_as(ValidateGeometriesOptions)

with beam.Pipeline(options=pipeline_options) as p:
(p
| 'Create Dataset' >> beam.Create([options.dataset])
| 'Read Parquet' >> beam.ParDo(ReadParquetDoFn(options.input_bucket))
| 'Validate Geometries' >> beam.ParDo(ValidateWaterProjectsDoFn())
| 'Write Results' >> beam.ParDo(WriteResultsDoFn(options.output_bucket))
)

if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
94 changes: 94 additions & 0 deletions backend/dataflow/validate_wetlands.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import logging

class ValidateGeometriesOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument('--dataset')
parser.add_argument('--input_bucket')
parser.add_argument('--output_bucket')

class ReadParquetDoFn(beam.DoFn):
def __init__(self, input_bucket):
self.input_bucket = input_bucket

def process(self, dataset):
import geopandas as gpd
path = f'gs://{self.input_bucket}/raw/{dataset}/current.parquet'
logging.info(f'Reading from {path}')
gdf = gpd.read_parquet(path)
yield {'dataset': dataset, 'data': gdf}

class ValidateWetlandsDoFn(beam.DoFn):
def process(self, element):
from shapely.geometry import Polygon, MultiPolygon
gdf = element['data']
dataset = element['dataset']

# Fix invalid geometries
invalid_mask = ~gdf.geometry.is_valid
if invalid_mask.any():
gdf.loc[invalid_mask, 'geometry'] = gdf.loc[invalid_mask, 'geometry'].apply(
lambda geom: geom.buffer(0) if geom else None
)

# Wetlands-specific validation
gdf = gdf[
gdf.geometry.is_valid &
gdf.geometry.apply(lambda x: isinstance(x, (Polygon, MultiPolygon))) &
~gdf.geometry.is_empty
]

stats = {
'total_rows': len(element['data']),
'valid_geometries': len(gdf)
}

element['data'] = gdf
element['stats'] = stats
yield element

class WriteResultsDoFn(beam.DoFn):
def __init__(self, output_bucket):
self.output_bucket = output_bucket

def process(self, element):
import pandas as pd
dataset = element['dataset']
gdf = element['data']
stats = element['stats']

# Write parquet
output_path = f'gs://{self.output_bucket}/validated/{dataset}/current.parquet'
logging.info(f'Writing to {output_path}')
gdf.to_parquet(
output_path,
compression='zstd',
compression_level=3,
index=False,
row_group_size=100000
)

# Write stats
stats_path = f'gs://{self.output_bucket}/validated/{dataset}/validation_stats.csv'
pd.DataFrame([stats]).to_csv(stats_path, index=False)

yield element

def run(argv=None):
"""Build and run the pipeline."""
pipeline_options = PipelineOptions(argv)
options = pipeline_options.view_as(ValidateGeometriesOptions)

with beam.Pipeline(options=pipeline_options) as p:
(p
| 'Create Dataset' >> beam.Create([options.dataset])
| 'Read Parquet' >> beam.ParDo(ReadParquetDoFn(options.input_bucket))
| 'Validate Geometries' >> beam.ParDo(ValidateWetlandsDoFn())
| 'Write Results' >> beam.ParDo(WriteResultsDoFn(options.output_bucket))
)

if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()

0 comments on commit 68a003d

Please sign in to comment.