Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Close LGVISIUM-63: Extraction of the groundwater logo using computer vision #83

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
Binary file added Screenshot 2024-09-16 at 19.04.42_template.npy
Binary file not shown.
10 changes: 9 additions & 1 deletion config/matching_params.yml
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,13 @@ coordinate_keys:
- coordonnées
- coordonn

coordinate_fp_keys:


groundwater_fp_keys:
- Wasserstau
- Grundwasser-
- Grundwasserfassung

groundwater_keys:
# German
Expand All @@ -132,7 +139,6 @@ groundwater_keys:
- W SP
- Gr.W.spiegel
- GrW Sp
- Wsp.
- Wsp
- GW-Spiegel
- Grundwasser
Expand Down Expand Up @@ -170,3 +176,5 @@ elevation_keys:
- Ansatzhöhe
- Terrainkote

elevation_fp_keys:

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ dependencies = [
"opencv-python-headless",
"quads>=1.1.0",
"numpy<2",
"scikit-image==0.24.0"
]

[project.optional-dependencies]
Expand Down
46 changes: 38 additions & 8 deletions src/stratigraphy/data_extractor/data_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class DataExtractor(ABC):

doc: fitz.Document = None
feature_keys: list[str] = None
feature_fp_keys: list[str] = None
feature_name: str = None

# How much to the left of a key do we look for the feature information, as a multiple of the key line width
Expand All @@ -48,6 +49,8 @@ class DataExtractor(ABC):
search_right_factor: float = 0
# How much below a key do we look for the feature information, as a multiple of the key line height
search_below_factor: float = 0
# How much above a key do we look for the feature information, as a multiple of the key line height
search_above_factor: float = 0

preprocess_replacements: dict[str, str] = {}

Expand All @@ -63,6 +66,11 @@ def __init__(self, document: fitz.Document):

self.doc = document
self.feature_keys = read_params("matching_params.yml")[f"{self.feature_name}_keys"]
self.feature_fp_keys = (
read_params("matching_params.yml")[f"{self.feature_name}_fp_keys"]
if read_params("matching_params.yml")[f"{self.feature_name}_fp_keys"]
else []
)

def preprocess(self, value: str) -> str:
for old, new in self.preprocess_replacements.items():
Expand Down Expand Up @@ -105,7 +113,15 @@ def find_feature_key(self, lines: list[TextLine], allowed_error_rate: float = 0.
for line in lines:
match = pattern.search(line.text)
if match:
matches.add(line)
# Make sure the key is not in the false positive list
is_fp_key = False
for fp_key in self.feature_fp_keys:
if fp_key in line.text:
is_fp_key = True
break

if not is_fp_key:
matches.add(line)

return list(matches)

Expand All @@ -122,14 +138,28 @@ def get_lines_near_key(self, lines, key_line: TextLine) -> list[TextLine]:
list[TextLine]: The lines close to the key.
"""
key_rect = key_line.rect
elevation_search_rect = fitz.Rect(
key_rect.x0 - self.search_left_factor * key_rect.width,
key_rect.y0,
key_rect.x1 + self.search_right_factor * key_rect.width,
key_rect.y1 + self.search_below_factor * key_rect.height,
)
feature_lines = [line for line in lines if line.rect.intersects(elevation_search_rect)]
feature_lines = self.get_lines_near_rect(lines, key_rect)

# makes sure the line with the key is included first in the extracted information and the duplicate removed
feature_lines.insert(0, key_line)
return list(dict.fromkeys(feature_lines))

def get_lines_near_rect(self, lines, rect: fitz.Rect) -> list[TextLine]:
"""Find the lines of the text that are close to a given rectangle.

Args:
lines (list[TextLine]): Arbitrary text lines to search in.
rect (fitz.Rect): The rectangle to search around.

Returns:
list[TextLine]: The lines close to the rectangle.
"""
search_rect = fitz.Rect(
rect.x0 - self.search_left_factor * rect.width,
rect.y0 - self.search_above_factor * rect.height,
rect.x1 + self.search_right_factor * rect.width,
rect.y1 + self.search_below_factor * rect.height,
)
feature_lines = [line for line in lines if line.rect.intersects(search_rect)]

return feature_lines
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
209 changes: 203 additions & 6 deletions src/stratigraphy/groundwater/groundwater_extraction.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@

import abc
import logging
import math
import os
from dataclasses import dataclass
from datetime import date as dt
from datetime import datetime
from pathlib import Path

import fitz
import numpy as np
import skimage as ski
from stratigraphy.data_extractor.data_extractor import DataExtractor, ExtractedFeature
from stratigraphy.groundwater.utility import extract_date, extract_depth, extract_elevation
from stratigraphy.lines.line import TextLine
Expand Down Expand Up @@ -137,9 +141,11 @@ class GroundwaterLevelExtractor(DataExtractor):
feature_name = "groundwater"

# look for elevation values to the left, right and/or immediately below the key
search_left_factor: float = 2
search_left_factor: float = 3 # NOTE: check files 267125334-bp.pdf, 267125338-bp.pdf, and 267125339-bp.pdf if this
# value is too high, as it might lead to false positives
search_right_factor: float = 10
search_below_factor: float = 4
search_above_factor: float = 4

preprocess_replacements = {",": ".", "'": ".", "o": "0", "\n": " ", "ü": "u"}

Expand Down Expand Up @@ -210,7 +216,6 @@ def get_groundwater_info_from_lines(self, lines: list[TextLine], page: int) -> G

elevation = extract_elevation(text)

# Pattern for matching depth (e.g., "1,48 m u.T.")
matched_lines_rect.append(line.rect)
else:
# Pattern for matching date
Expand All @@ -219,6 +224,12 @@ def get_groundwater_info_from_lines(self, lines: list[TextLine], page: int) -> G
if extracted_date_str:
text = text.replace(extracted_date_str, "").strip()
date = extracted_date
matched_lines_rect.append(line.rect)
else:
# in case several dates are present, we skip the other dates
extracted_date, extracted_date_str = extract_date(text)
if extracted_date_str:
continue

# Pattern for matching depth (e.g., "1,48 m u.T.")
if not depth:
Expand Down Expand Up @@ -269,6 +280,184 @@ def get_groundwater_info_from_lines(self, lines: list[TextLine], page: int) -> G
else:
raise ValueError("Could not extract all required information from the lines provided.")

def load_templates(self) -> list[np.ndarray]:
"""Load the templates for the groundwater information.

Returns:
list[np.ndarray]: the loaded templates
"""
templates = []
template_dir = os.path.join(os.path.dirname(__file__), "assets")
for template in os.listdir(template_dir):
if template.endswith(".npy"): # and template.startswith("700246002-bp_page1_template"):
templates.append(np.load(os.path.join(template_dir, template)))
return templates

def get_groundwater_from_illustration(
self, lines: list[TextLine], page_number: int, terrain_elevation: Elevation | None
) -> list[GroundwaterInformationOnPage]:
"""Extracts the groundwater information from an illustration.

Args:
lines (list[TextLine]): the lines of text to extract the groundwater information from
page_number (int): the page number (1-based) of the PDF document
terrain_elevation (Elevation | None): The elevation of the terrain.

Returns:
list[GroundwaterInformationOnPage]: the extracted groundwater information
"""
extracted_groundwater_list = []
confidence_list = []

# convert the doc to an image
page = self.doc.load_page(page_number - 1)
filename = Path(self.doc.name).stem
png_filename = f"{filename}-{page_number + 1}.png"
png_path = f"/tmp/{png_filename}" # Local path to save the PNG
fitz.utils.get_pixmap(page, matrix=fitz.Matrix(2, 2), clip=page.rect).save(png_path)

# load the image
img = ski.io.imread(png_path)
N_BEST_MATCHES = 5
TEMPLATE_MATCH_THRESHOLD = 0.66

# extract the groundwater information from the image
for template in self.load_templates():
# Compute the match of the template and the image (correlation coef)
result = ski.feature.match_template(img, template)

for _ in range(N_BEST_MATCHES):
ij = np.unravel_index(np.argmax(result), result.shape)
confidence = np.max(result) # TODO - use confidence to filter out bad matches
if confidence < TEMPLATE_MATCH_THRESHOLD:
# skip this template if the confidence is too low to avoid false positives
continue
top_left = (ij[1], ij[0])
illustration_rect = fitz.Rect(
top_left[0], top_left[1], top_left[0] + template.shape[1], top_left[1] + template.shape[0]
)

# remove the matched area from the result to avoid finding the same area again
x_area_to_remove = int(0.75 * template.shape[1])
y_area_to_remove = int(0.75 * template.shape[0])
result[
int(illustration_rect.y0) - y_area_to_remove : int(illustration_rect.y1) + y_area_to_remove,
int(illustration_rect.x0) - x_area_to_remove : int(illustration_rect.x1) + x_area_to_remove,
] = float("-inf")

# convert the illustration_rect to the coordinate system of the PDF
horizontal_scaling = page.rect.width / img.shape[1]
vertical_scaling = page.rect.height / img.shape[0]
pdf_illustration_rect = fitz.Rect(
illustration_rect.x0 * horizontal_scaling,
illustration_rect.y0 * vertical_scaling,
illustration_rect.x1 * horizontal_scaling,
illustration_rect.y1 * vertical_scaling,
)

# extract the groundwater information from the image using the text
groundwater_info_lines = self.get_lines_near_rect(lines, pdf_illustration_rect)

# sort the lines by their proximity to the key line center, compute the distance to the key line center
def distance_to_key_center(line_rect: fitz.Rect, illustration_rect: fitz.Rect) -> float:
key_center_x = (illustration_rect.x0 + illustration_rect.x1) / 2
key_center_y = (illustration_rect.y0 + illustration_rect.y1) / 2
line_center_x = (line_rect.x0 + line_rect.x1) / 2
line_center_y = (line_rect.y0 + line_rect.y1) / 2
return math.sqrt((line_center_x - key_center_x) ** 2 + (line_center_y - key_center_y) ** 2)

groundwater_info_lines.sort(key=lambda line: distance_to_key_center(line.rect, pdf_illustration_rect))
try:
extracted_gw = self.get_groundwater_info_from_lines(groundwater_info_lines, page_number)
if extracted_gw.groundwater.depth or extracted_gw.groundwater.elevation:
# Fill in the depth and elevation if they are not already filled in based on the terrain
if terrain_elevation:
if not extracted_gw.groundwater.depth and extracted_gw.groundwater.elevation:
extracted_gw.groundwater.depth = round(
terrain_elevation.elevation - extracted_gw.groundwater.elevation, 2
)
if not extracted_gw.groundwater.elevation and extracted_gw.groundwater.depth:
extracted_gw.groundwater.elevation = round(
terrain_elevation.elevation - extracted_gw.groundwater.depth, 2
)

# Make a sanity check to see if elevation and depth make sense (i.e., they add up:
# elevation + depth = terrain elevation)
if extracted_gw.groundwater.elevation and extracted_gw.groundwater.depth:
extract_terrain_elevation = round(
extracted_gw.groundwater.elevation + extracted_gw.groundwater.depth, 2
)
if extract_terrain_elevation != terrain_elevation.elevation:
# If the extracted elevation and depth do not match the terrain elevation, we try
# to remove one of the items from the match and see if we can find a better match.
logger.warning(
"The extracted elevation and depth do not match the terrain elevation."
)
logger.warning(
"Elevation: %s, Depth: %s, Terrain Elevation: %s",
extracted_gw.groundwater.elevation,
extracted_gw.groundwater.depth,
terrain_elevation.elevation,
)

# re-run the extraction and see if we can find a better match by removing one
# item from the current match
groundwater_info_lines_without_depth = [
line
for line in groundwater_info_lines
if str(extracted_gw.groundwater.depth) not in line.text
]
groundwater_info_lines_without_elevation = [
line
for line in groundwater_info_lines
if str(extracted_gw.groundwater.elevation) not in line.text
]
extracted_gw = self.get_groundwater_info_from_lines(
groundwater_info_lines_without_depth, page_number
)

if not extracted_gw.groundwater.depth:
extracted_gw = self.get_groundwater_info_from_lines(
groundwater_info_lines_without_elevation, page_number
)

if extracted_gw.groundwater.elevation and extracted_gw.groundwater.depth:
extract_terrain_elevation = round(
extracted_gw.groundwater.elevation + extracted_gw.groundwater.depth, 2
)

if extract_terrain_elevation != terrain_elevation.elevation:
logger.warning(
"The extracted elevation and depth do not match the terrain elevation."
)
logger.warning(
"Elevation: %s, Depth: %s, Terrain Elevation: %s",
extracted_gw.groundwater.elevation,
extracted_gw.groundwater.depth,
terrain_elevation.elevation,
)
continue

# Only if the groundwater information is not already in the list
if extracted_gw not in extracted_groundwater_list and extracted_gw.groundwater.date:
extracted_groundwater_list.append(extracted_gw)
confidence_list.append(confidence)

# Remove the extracted groundwater information from the lines to avoid double extraction
for line in groundwater_info_lines:
# if the rectangle of the line is in contact with the rectangle of the extracted
# groundwater information, remove the line
if line.rect.intersects(extracted_gw.rect):
lines.remove(line)

except ValueError as error:
logger.warning("ValueError: %s", error)
continue

# TODO: Maybe we could stop the search if we found a good match with one of the templates

return extracted_groundwater_list, confidence_list

def extract_groundwater(self, terrain_elevation: Elevation | None) -> list[GroundwaterInformationOnPage]:
"""Extracts the groundwater information from a borehole profile.

Expand All @@ -287,10 +476,18 @@ def extract_groundwater(self, terrain_elevation: Elevation | None) -> list[Groun
lines = extract_text_lines(page)
page_number = page.number + 1 # page.number is 0-based

found_groundwater = (
self.get_groundwater_near_key(lines, page_number)
# or XXXX # Add other techniques here
)
found_groundwater = self.get_groundwater_near_key(lines, page_number)
if not found_groundwater:
logger.info("No groundwater found near the key on page %s.", page_number)
found_groundwater, confidence_list = self.get_groundwater_from_illustration(
lines, page_number, terrain_elevation
)
logger.info("Confidence list: %s", confidence_list)
print("Confidence list: %s", confidence_list)
logger.info("Found groundwater from illustration on page %s: %s", page_number, found_groundwater)
print("Found groundwater from illustration on page %s: %s", page_number, found_groundwater)
if not found_groundwater:
logger.info("No groundwater found in the illustration on page %s.", page_number)

if terrain_elevation:
# If the elevation is provided, calculate the depth of the groundwater
Expand Down
15 changes: 9 additions & 6 deletions src/stratigraphy/groundwater/utility.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,15 @@ def extract_depth(text: str, max_depth: int) -> float | None:
for pattern in depth_patterns:
depth_match = regex.search(pattern, corrected_text)
if depth_match:
depth = float(depth_match.group(1).replace(",", "."))
if depth > max_depth:
# If the extracted depth is greater than the max depth, set it to None and continue searching.
depth = None
else:
break
try:
depth = float(depth_match.group(1).replace(",", "."))
if depth > max_depth:
# If the extracted depth is greater than the max depth, set it to None and continue searching.
depth = None
else:
break
except ValueError:
continue
return depth


Expand Down
Loading
Loading