Skip to content

Commit

Permalink
Optimize wetlands grid cell merging
Browse files Browse the repository at this point in the history
- Keep original CRS (EPSG:25832) until final step
- Use boundary-based merging for grid cells
- Remove unnecessary geometry validation for grids
- Add detailed timing logs for each operation
- Transform to BigQuery-compatible CRS only at end
- Improve memory usage and performance
  • Loading branch information
martincollignon committed Dec 10, 2024
1 parent e20b970 commit 389d603
Showing 1 changed file with 33 additions and 40 deletions.
73 changes: 33 additions & 40 deletions backend/src/sources/parsers/wetlands.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
from shapely.geometry import MultiPolygon
import time
import psutil
from shapely.geometry import LineString
from shapely.geometry.polygon import orient
from shapely.ops import linemerge

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -164,13 +167,10 @@ async def write_to_storage(self, features, dataset):
df = pd.DataFrame([f['properties'] for f in features])
geometries = [Polygon(f['geometry']['coordinates'][0]) for f in features]

# Create GeoDataFrame with original CRS
# Create GeoDataFrame with original CRS - keep in 25832 for grid operations
gdf = gpd.GeoDataFrame(df, geometry=geometries, crs="EPSG:25832")

# Validate and transform geometries
gdf = validate_and_transform_geometries(gdf, 'wetlands')

# Handle working/final files
# Handle working/final files (still in original CRS)
temp_working = f"/tmp/{dataset}_working.parquet"
working_blob = self.bucket.blob(f'raw/{dataset}/working.parquet')

Expand All @@ -181,7 +181,7 @@ async def write_to_storage(self, features, dataset):
combined_gdf = pd.concat([existing_gdf, gdf], ignore_index=True)
else:
combined_gdf = gdf

# Write working file
combined_gdf.to_parquet(temp_working)
working_blob.upload_from_filename(temp_working)
Expand All @@ -191,50 +191,43 @@ async def write_to_storage(self, features, dataset):
if hasattr(self, 'is_sync_complete') and self.is_sync_complete:
logger.info(f"Sync complete - writing final files")

# Write regular final file
# Write regular final file (transform this one)
final_gdf = validate_and_transform_geometries(combined_gdf.copy(), 'wetlands')
final_blob = self.bucket.blob(f'raw/{dataset}/current.parquet')
final_gdf.to_parquet(temp_working)
final_blob.upload_from_filename(temp_working)

logger.info(f"Starting comprehensive geometry cleanup and union of {len(combined_gdf):,} geometries...")
logger.info(f"Memory usage before operations: {psutil.Process().memory_info().rss / 1024 / 1024:.2f} MB")
# Do grid operations in original CRS
logger.info(f"Starting efficient merge of {len(combined_gdf):,} grid cells in EPSG:25832...")
logger.info(f"Memory usage: {psutil.Process().memory_info().rss / 1024 / 1024:.2f} MB")
start_time = time.time()

# Comprehensive geometry fixes
logger.info("Starting comprehensive geometry fixes...")
fixed_geometries = []
for idx, geom in enumerate(combined_gdf.geometry.values):
# Check validity
if not geom.is_valid:
logger.info(f"Invalid geometry found: {explain_validity(geom)}")

# Make valid
fixed = make_valid(geom)

# Handle potential multi-polygons
if isinstance(fixed, MultiPolygon):
# Take largest polygon if we get a multi
largest = max(fixed.geoms, key=lambda x: x.area)
fixed_geometries.append(largest)
else:
fixed_geometries.append(fixed)
else:
fixed_geometries.append(geom)

# Log progress
if idx % 10000 == 0:
logger.info(f"Fixed {idx:,} geometries...")
# Extract boundaries
logger.info("Extracting boundaries...")
boundary_start = time.time()
boundaries = combined_gdf.geometry.boundary
logger.info(f"Boundaries extracted in {time.time() - boundary_start:.2f} seconds")

logger.info("Geometry fixes complete")
logger.info(f"Fixed {len(fixed_geometries):,} geometries")
# Merge lines
logger.info("Merging boundaries...")
merge_start = time.time()
merged = linemerge(boundaries)
logger.info(f"Boundaries merged in {time.time() - merge_start:.2f} seconds")

# Then do the union
logger.info("Performing union...")
dissolved = unary_union(fixed_geometries)
# Create polygons
logger.info("Creating polygons from merged boundaries...")
poly_start = time.time()
final_poly = unary_union(list(polygonize(merged)))
logger.info(f"Polygons created in {time.time() - poly_start:.2f} seconds")

end_time = time.time()
logger.info(f"All operations completed in {(end_time - start_time) / 60:.2f} minutes")
logger.info(f"Grid operations completed in {(end_time - start_time) / 60:.2f} minutes")

dissolved_gdf = gpd.GeoDataFrame(geometry=[dissolved], crs=combined_gdf.crs)
# Create final GeoDataFrame and transform to BigQuery-compatible CRS
dissolved_gdf = gpd.GeoDataFrame(geometry=[final_poly], crs="EPSG:25832")
logger.info("Transforming final geometry to BigQuery-compatible CRS...")
dissolved_gdf = validate_and_transform_geometries(dissolved_gdf, 'wetlands')
logger.info(f"Final CRS: {dissolved_gdf.crs}")

# Write dissolved version
temp_dissolved = f"/tmp/{dataset}_dissolved.parquet"
Expand Down

0 comments on commit 389d603

Please sign in to comment.