diff --git a/backend/dataflow/validate_geometries.py b/backend/dataflow/validate_geometries.py index b706b19c..2aec74ef 100644 --- a/backend/dataflow/validate_geometries.py +++ b/backend/dataflow/validate_geometries.py @@ -1,7 +1,6 @@ # backend/dataflow/validate_geometries.py import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions -import logging class ValidateGeometriesOptions(PipelineOptions): @classmethod @@ -10,38 +9,25 @@ def _add_argparse_args(cls, parser): parser.add_argument('--input_bucket') parser.add_argument('--output_bucket') +def run(argv=None): + """Build and run the pipeline.""" + options = PipelineOptions(argv) + + with beam.Pipeline(options=options) as p: + (p + | 'Create Dataset' >> beam.Create([options.view_as(ValidateGeometriesOptions).dataset]) + | 'Read Data' >> beam.Map(lambda dataset: read_dataset(dataset, + options.view_as(ValidateGeometriesOptions).input_bucket)) + | 'Validate and Optimize' >> beam.ParDo(ValidateAndOptimize()) + | 'Write Results' >> beam.Map(lambda element: write_outputs(element, + options.view_as(ValidateGeometriesOptions).output_bucket)) + ) + def read_dataset(dataset, input_bucket): - # Import here to ensure it's only imported in workers import geopandas as gpd - - gdf = gpd.read_parquet( - f'gs://{input_bucket}/raw/{dataset}/current.parquet' - ) + gdf = gpd.read_parquet(f'gs://{input_bucket}/raw/{dataset}/current.parquet') return {'dataset': dataset, 'data': gdf} -def write_outputs(element, output_bucket): - # Import here to ensure it's available on workers - import pandas as pd - - dataset = element['dataset'] - gdf = element['data'] - stats = element['stats'] - - # Write optimized GeoParquet - gdf.to_parquet( - f'gs://{output_bucket}/validated/{dataset}/current.parquet', - compression='zstd', - compression_level=3, - index=False, - row_group_size=100000 - ) - - # Write validation stats - pd.DataFrame([stats]).to_csv( - f'gs://{output_bucket}/validated/{dataset}/validation_stats.csv', - index=False - ) - class ValidateAndOptimize(beam.DoFn): def process(self, element): # Import here to ensure it's available on workers @@ -94,18 +80,30 @@ def process(self, element): yield {'dataset': dataset, 'data': gdf, 'stats': stats} -def run(): - pipeline_options = PipelineOptions() - options = pipeline_options.view_as(ValidateGeometriesOptions) - - with beam.Pipeline(options=pipeline_options) as p: - (p - | 'Create Dataset' >> beam.Create([options.dataset]) - | 'Read Data' >> beam.Map(lambda dataset: read_dataset(dataset, options.input_bucket)) - | 'Validate and Optimize' >> beam.ParDo(ValidateAndOptimize()) - | 'Write Results' >> beam.Map(lambda element: write_outputs(element, options.output_bucket)) - ) +def write_outputs(element, output_bucket): + # Import here to ensure it's available on workers + import pandas as pd + + dataset = element['dataset'] + gdf = element['data'] + stats = element['stats'] + + # Write optimized GeoParquet + gdf.to_parquet( + f'gs://{output_bucket}/validated/{dataset}/current.parquet', + compression='zstd', + compression_level=3, + index=False, + row_group_size=100000 + ) + + # Write validation stats + pd.DataFrame([stats]).to_csv( + f'gs://{output_bucket}/validated/{dataset}/validation_stats.csv', + index=False + ) if __name__ == '__main__': + import logging logging.getLogger().setLevel(logging.INFO) run() \ No newline at end of file diff --git a/dags/data_processing.py b/dags/data_processing.py index 76869852..1fd7adc1 100644 --- a/dags/data_processing.py +++ b/dags/data_processing.py @@ -38,8 +38,7 @@ 'disk_size_gb': 100, 'region': 'europe-west1', 'temp_location': 'gs://landbrugsdata-processing/temp', - 'sdk_container_image': 'gcr.io/landbrugsdata-1/dataflow-processing:latest', - 'setup_file': 'gs://landbrugsdata-processing/dataflow/setup.py' + 'sdk_container_image': 'gcr.io/landbrugsdata-1/dataflow-processing:latest' }, location='europe-west1', wait_until_finished=True,