Skip to content

Commit

Permalink
Merge pull request #168 from martincollignon/feature/optimize-grid-merge
Browse files Browse the repository at this point in the history
Feature/optimize grid merge
  • Loading branch information
martincollignon authored Dec 10, 2024
2 parents 3ee6918 + 389d603 commit a3f2ba9
Showing 1 changed file with 38 additions and 14 deletions.
52 changes: 38 additions & 14 deletions backend/src/sources/parsers/wetlands.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,14 @@
import geopandas as gpd
import os
from ..utils.geometry_validator import validate_and_transform_geometries
from shapely.ops import unary_union
from shapely.ops import unary_union, polygonize
from shapely.validation import make_valid, explain_validity
from shapely.geometry import MultiPolygon
import time
import psutil
from shapely.geometry import LineString
from shapely.geometry.polygon import orient
from shapely.ops import linemerge

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -162,13 +167,10 @@ async def write_to_storage(self, features, dataset):
df = pd.DataFrame([f['properties'] for f in features])
geometries = [Polygon(f['geometry']['coordinates'][0]) for f in features]

# Create GeoDataFrame with original CRS
# Create GeoDataFrame with original CRS - keep in 25832 for grid operations
gdf = gpd.GeoDataFrame(df, geometry=geometries, crs="EPSG:25832")

# Validate and transform geometries
gdf = validate_and_transform_geometries(gdf, 'wetlands')

# Handle working/final files
# Handle working/final files (still in original CRS)
temp_working = f"/tmp/{dataset}_working.parquet"
working_blob = self.bucket.blob(f'raw/{dataset}/working.parquet')

Expand All @@ -179,7 +181,7 @@ async def write_to_storage(self, features, dataset):
combined_gdf = pd.concat([existing_gdf, gdf], ignore_index=True)
else:
combined_gdf = gdf

# Write working file
combined_gdf.to_parquet(temp_working)
working_blob.upload_from_filename(temp_working)
Expand All @@ -189,21 +191,43 @@ async def write_to_storage(self, features, dataset):
if hasattr(self, 'is_sync_complete') and self.is_sync_complete:
logger.info(f"Sync complete - writing final files")

# Write regular final file
# Write regular final file (transform this one)
final_gdf = validate_and_transform_geometries(combined_gdf.copy(), 'wetlands')
final_blob = self.bucket.blob(f'raw/{dataset}/current.parquet')
final_gdf.to_parquet(temp_working)
final_blob.upload_from_filename(temp_working)

logger.info(f"Starting union of {len(combined_gdf):,} adjacent geometries...")
logger.info(f"Memory usage before union: {psutil.Process().memory_info().rss / 1024 / 1024:.2f} MB")
# Do grid operations in original CRS
logger.info(f"Starting efficient merge of {len(combined_gdf):,} grid cells in EPSG:25832...")
logger.info(f"Memory usage: {psutil.Process().memory_info().rss / 1024 / 1024:.2f} MB")
start_time = time.time()

# Just use unary_union - it's already optimized for this
dissolved = unary_union(combined_gdf.geometry.values)
# Extract boundaries
logger.info("Extracting boundaries...")
boundary_start = time.time()
boundaries = combined_gdf.geometry.boundary
logger.info(f"Boundaries extracted in {time.time() - boundary_start:.2f} seconds")

# Merge lines
logger.info("Merging boundaries...")
merge_start = time.time()
merged = linemerge(boundaries)
logger.info(f"Boundaries merged in {time.time() - merge_start:.2f} seconds")

# Create polygons
logger.info("Creating polygons from merged boundaries...")
poly_start = time.time()
final_poly = unary_union(list(polygonize(merged)))
logger.info(f"Polygons created in {time.time() - poly_start:.2f} seconds")

end_time = time.time()
logger.info(f"Union completed in {(end_time - start_time) / 60:.2f} minutes")
logger.info(f"Grid operations completed in {(end_time - start_time) / 60:.2f} minutes")

dissolved_gdf = gpd.GeoDataFrame(geometry=[dissolved], crs=combined_gdf.crs)
# Create final GeoDataFrame and transform to BigQuery-compatible CRS
dissolved_gdf = gpd.GeoDataFrame(geometry=[final_poly], crs="EPSG:25832")
logger.info("Transforming final geometry to BigQuery-compatible CRS...")
dissolved_gdf = validate_and_transform_geometries(dissolved_gdf, 'wetlands')
logger.info(f"Final CRS: {dissolved_gdf.crs}")

# Write dissolved version
temp_dissolved = f"/tmp/{dataset}_dissolved.parquet"
Expand Down

0 comments on commit a3f2ba9

Please sign in to comment.