Skip to content

Commit

Permalink
refactor: simplify Dataflow code following Beam examples
Browse files Browse the repository at this point in the history
- Restructure pipeline following official Beam patterns
- Remove setup.py dependency
- Simplify DAG configuration
  • Loading branch information
martincollignon committed Dec 1, 2024
1 parent 8301cba commit 8be02c7
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 42 deletions.
78 changes: 38 additions & 40 deletions backend/dataflow/validate_geometries.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# backend/dataflow/validate_geometries.py
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import logging

class ValidateGeometriesOptions(PipelineOptions):
@classmethod
Expand All @@ -10,38 +9,25 @@ def _add_argparse_args(cls, parser):
parser.add_argument('--input_bucket')
parser.add_argument('--output_bucket')

def run(argv=None):
"""Build and run the pipeline."""
options = PipelineOptions(argv)

with beam.Pipeline(options=options) as p:
(p
| 'Create Dataset' >> beam.Create([options.view_as(ValidateGeometriesOptions).dataset])
| 'Read Data' >> beam.Map(lambda dataset: read_dataset(dataset,
options.view_as(ValidateGeometriesOptions).input_bucket))
| 'Validate and Optimize' >> beam.ParDo(ValidateAndOptimize())
| 'Write Results' >> beam.Map(lambda element: write_outputs(element,
options.view_as(ValidateGeometriesOptions).output_bucket))
)

def read_dataset(dataset, input_bucket):
# Import here to ensure it's only imported in workers
import geopandas as gpd

gdf = gpd.read_parquet(
f'gs://{input_bucket}/raw/{dataset}/current.parquet'
)
gdf = gpd.read_parquet(f'gs://{input_bucket}/raw/{dataset}/current.parquet')
return {'dataset': dataset, 'data': gdf}

def write_outputs(element, output_bucket):
# Import here to ensure it's available on workers
import pandas as pd

dataset = element['dataset']
gdf = element['data']
stats = element['stats']

# Write optimized GeoParquet
gdf.to_parquet(
f'gs://{output_bucket}/validated/{dataset}/current.parquet',
compression='zstd',
compression_level=3,
index=False,
row_group_size=100000
)

# Write validation stats
pd.DataFrame([stats]).to_csv(
f'gs://{output_bucket}/validated/{dataset}/validation_stats.csv',
index=False
)

class ValidateAndOptimize(beam.DoFn):
def process(self, element):
# Import here to ensure it's available on workers
Expand Down Expand Up @@ -94,18 +80,30 @@ def process(self, element):

yield {'dataset': dataset, 'data': gdf, 'stats': stats}

def run():
pipeline_options = PipelineOptions()
options = pipeline_options.view_as(ValidateGeometriesOptions)

with beam.Pipeline(options=pipeline_options) as p:
(p
| 'Create Dataset' >> beam.Create([options.dataset])
| 'Read Data' >> beam.Map(lambda dataset: read_dataset(dataset, options.input_bucket))
| 'Validate and Optimize' >> beam.ParDo(ValidateAndOptimize())
| 'Write Results' >> beam.Map(lambda element: write_outputs(element, options.output_bucket))
)
def write_outputs(element, output_bucket):
# Import here to ensure it's available on workers
import pandas as pd

dataset = element['dataset']
gdf = element['data']
stats = element['stats']

# Write optimized GeoParquet
gdf.to_parquet(
f'gs://{output_bucket}/validated/{dataset}/current.parquet',
compression='zstd',
compression_level=3,
index=False,
row_group_size=100000
)

# Write validation stats
pd.DataFrame([stats]).to_csv(
f'gs://{output_bucket}/validated/{dataset}/validation_stats.csv',
index=False
)

if __name__ == '__main__':
import logging
logging.getLogger().setLevel(logging.INFO)
run()
3 changes: 1 addition & 2 deletions dags/data_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@
'disk_size_gb': 100,
'region': 'europe-west1',
'temp_location': 'gs://landbrugsdata-processing/temp',
'sdk_container_image': 'gcr.io/landbrugsdata-1/dataflow-processing:latest',
'setup_file': 'gs://landbrugsdata-processing/dataflow/setup.py'
'sdk_container_image': 'gcr.io/landbrugsdata-1/dataflow-processing:latest'
},
location='europe-west1',
wait_until_finished=True,
Expand Down

0 comments on commit 8be02c7

Please sign in to comment.