Skip to content

Commit

Permalink
fix(cadastral): Improve feature parsing and error handling
Browse files Browse the repository at this point in the history
- Fix coordinate parsing in geometry handling
- Add better validation and error logging
- Improve sync process with cloud-friendly progress tracking
- Add backoff retry for storage operations
- Remove tqdm dependency for Cloud Run compatibility
  • Loading branch information
martincollignon committed Dec 8, 2024
1 parent a70964e commit cb25fac
Showing 1 changed file with 76 additions and 7 deletions.
83 changes: 76 additions & 7 deletions backend/src/sources/parsers/cadastral.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ def _parse_geometry(self, geom_elem):

coords = [float(x) for x in pos_list.text.strip().split()]
pairs = [(coords[i], coords[i+1])
for i in range(0, len(coords), 3)]
for i in range(0, len(coords)-2, 2)]

if len(pairs) < 4:
continue
Expand All @@ -128,6 +128,8 @@ def _parse_geometry(self, geom_elem):
polygon = Polygon(pairs)
if polygon.is_valid:
polygons.append(polygon)
else:
logger.warning("Invalid polygon created")
except Exception as e:
logger.warning(f"Error creating polygon: {str(e)}")
continue
Expand All @@ -147,6 +149,11 @@ def _parse_feature(self, feature_elem):
try:
feature = {}

# Add validation of the feature element
if feature_elem is None:
logger.warning("Received None feature element")
return None

# Parse all mapped fields
for xml_field, (db_field, converter) in self.field_mapping.items():
elem = feature_elem.find(f'.//mat:{xml_field}', self.namespaces)
Expand All @@ -165,6 +172,14 @@ def _parse_feature(self, feature_elem):
geometry_wkt = self._parse_geometry(geom_elem)
if geometry_wkt:
feature['geometry'] = geometry_wkt
else:
logger.warning("Failed to parse geometry for feature")

# Add validation of required fields
if not feature.get('bfe_number'):
logger.warning("Missing required field: bfe_number")
if not feature.get('geometry'):
logger.warning("Missing required field: geometry")

return feature if feature.get('bfe_number') and feature.get('geometry') else None

Expand All @@ -186,8 +201,33 @@ async def _get_total_count(self, session):
response.raise_for_status()
text = await response.text()
root = ET.fromstring(text)
total_available = int(root.get('numberMatched', '0'))
logger.info(f"Total available features: {total_available:,}")

# Handle case where numberMatched might be '*'
number_matched = root.get('numberMatched', '0')
number_returned = root.get('numberReturned', '0')

logger.info(f"WFS response metadata - numberMatched: {number_matched}, numberReturned: {number_returned}")

if number_matched == '*':
# If server doesn't provide exact count, fetch a larger page to estimate
logger.warning("Server returned '*' for numberMatched, fetching sample to estimate...")
params['count'] = '1000'
async with session.get(self.config['url'], params=params) as sample_response:
sample_text = await sample_response.text()
sample_root = ET.fromstring(sample_text)
feature_count = len(sample_root.findall('.//mat:SamletFastEjendom_Gaeldende', self.namespaces))
# Estimate conservatively
return feature_count * 2000 # Adjust multiplier based on expected data size

if not number_matched.isdigit():
raise ValueError(f"Invalid numberMatched value: {number_matched}")

total_available = int(number_matched)

# Add sanity check for unreasonable numbers
if total_available > 5000000: # Adjust threshold as needed
logger.warning(f"Unusually high feature count: {total_available:,}. This may indicate an issue.")

return total_available

except Exception as e:
Expand Down Expand Up @@ -233,19 +273,40 @@ async def _fetch_chunk(self, session, start_index, timeout=None):
content = await response.text()
root = ET.fromstring(content)

# Add validation of returned features count
number_returned = root.get('numberReturned', '0')
logger.info(f"WFS reports {number_returned} features returned in this chunk")

features = []
for feature_elem in root.findall('.//mat:SamletFastEjendom_Gaeldende', self.namespaces):
feature_elements = root.findall('.//mat:SamletFastEjendom_Gaeldende', self.namespaces)
logger.info(f"Found {len(feature_elements)} feature elements in XML")

for feature_elem in feature_elements:
feature = self._parse_feature(feature_elem)
if feature:
features.append(feature)

logger.info(f"Chunk {start_index}: parsed {len(features)} valid features")
valid_count = len(features)
logger.info(f"Chunk {start_index}: parsed {valid_count} valid features out of {len(feature_elements)} elements")

# Validate that we're getting reasonable numbers
if valid_count == 0 and len(feature_elements) > 0:
logger.warning(f"No valid features parsed from {len(feature_elements)} elements - possible parsing issue")
elif valid_count < len(feature_elements) * 0.5: # If we're losing more than 50% of features
logger.warning(f"Low feature parsing success rate: {valid_count}/{len(feature_elements)}")

return features

except Exception as e:
logger.error(f"Error fetching chunk at index {start_index}: {str(e)}")
raise

@backoff.on_exception(
backoff.expo,
Exception, # Consider narrowing this to specific storage exceptions
max_tries=3,
max_time=300
)
async def write_to_storage(self, features, dataset):
"""Write features to GeoParquet in Cloud Storage"""
if not features:
Expand Down Expand Up @@ -303,6 +364,7 @@ async def sync(self):

features_batch = []
total_processed = 0
failed_chunks = []

for start_index in range(0, total_features, self.page_size):
try:
Expand All @@ -311,20 +373,27 @@ async def sync(self):
features_batch.extend(chunk)
total_processed += len(chunk)

# Log progress every 10,000 features
if total_processed % 10000 == 0:
logger.info(f"Progress: {total_processed:,}/{total_features:,} features ({(total_processed/total_features)*100:.1f}%)")

# Write batch if it's large enough or it's the last batch
is_last_batch = (start_index + self.page_size) >= total_features
if len(features_batch) >= self.batch_size or is_last_batch:
logger.info(f"Writing batch of {len(features_batch):,} features (is_last_batch: {is_last_batch})")
self.is_sync_complete = is_last_batch
await self.write_to_storage(features_batch, 'cadastral')
features_batch = []
logger.info(f"Progress: {total_processed:,}/{total_features:,}")

except Exception as e:
logger.error(f"Error processing batch at {start_index}: {str(e)}")
failed_chunks.append(start_index)
continue

logger.info(f"Sync completed. Total processed: {total_processed:,}")
if failed_chunks:
logger.error(f"Failed to process chunks starting at indices: {failed_chunks}")

logger.info(f"Sync completed. Total processed: {total_processed:,} features")
return total_processed

except Exception as e:
Expand Down

0 comments on commit cb25fac

Please sign in to comment.