-
Notifications
You must be signed in to change notification settings - Fork 2
/
duplicate_detection.py
146 lines (124 loc) · 7.61 KB
/
duplicate_detection.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
"""This module contains functionality for detecting duplicate layers across pdf pages."""
import logging
import cv2
import fitz
import Levenshtein
import numpy as np
from stratigraphy.util.plot_utils import convert_page_to_opencv_img
logger = logging.getLogger(__name__)
def remove_duplicate_layers(
previous_page: fitz.Page,
current_page: fitz.Page,
previous_layers: list[dict],
current_layers: list[dict],
img_template_probability_threshold: float,
) -> list[dict]:
"""Remove duplicate layers from the current page based on the layers of the previous page.
We check if a layer on the current page is present on the previous page. If we have 3 consecutive layers that are
not duplicates, we assume that there is no further overlap between the pages and stop the search. If we find a
duplicate, all layers up to including the duplicate layer are removed.
If the page contains a depth column, we compare the depth intervals and the material description to determine
duplicate layers. If there is no depth column, we use template matching to compare the layers.
Args:
previous_page (fitz.Page): The previous page.
current_page (fitz.Page): The current page containing the layers to check for duplicates.
previous_layers (list[dict]): The layers of the previous page.
current_layers (list[dict]): The layers of the current page.
img_template_probability_threshold (float): The threshold for the template matching probability
Returns:
list[dict]: The layers of the current page without duplicates.
"""
sorted_layers = sorted(current_layers, key=lambda x: x["material_description"]["rect"][1])
first_non_duplicated_layer_index = 0
count_consecutive_non_duplicate_layers = 0
for layer_index, layer in enumerate(sorted_layers):
if (
count_consecutive_non_duplicate_layers >= 3
): # if we have three consecutive non-duplicate layers, we can assume that there is no further page overlap.
break
# check if current layer has an overlapping layer on the previous page.
# for that purpose compare depth interval as well as material description text.
duplicate_condition = False
if "depth_interval" not in layer: # in this case we use template matching
duplicate_condition = check_duplicate_layer_by_template_matching(
previous_page, current_page, layer, img_template_probability_threshold
)
else: # in this case we compare the depth interval and material description
current_material_description = layer["material_description"]
current_depth_interval = layer["depth_interval"]
for previous_layer in previous_layers:
if "depth_interval" not in previous_layer:
# It may happen, that a layer on the previous page does not have depth interval assigned.
# In this case we skip the comparison. This should only happen in some edge cases, as we
# assume that when the current page has a depth column, that the previous page also contains a
# depth column. We assume overlapping pages and a depth column should extend over both pages.
continue
previous_material_description = previous_layer["material_description"]
previous_depth_interval = previous_layer["depth_interval"]
# start values for the depth intervals may be None. End values are always explicitly set.
current_depth_interval_start = (
current_depth_interval["start"]["value"] if current_depth_interval["start"] is not None else None
)
previous_depth_interval_start = (
previous_depth_interval["start"]["value"] if previous_depth_interval["start"] is not None else None
)
# check if material description is the same
if (
Levenshtein.ratio(current_material_description["text"], previous_material_description["text"])
> 0.9
and current_depth_interval_start == previous_depth_interval_start
and current_depth_interval["end"].get("value") == previous_depth_interval["end"].get("value")
):
duplicate_condition = True
logger.info("Removing duplicate layer.")
break
if duplicate_condition:
first_non_duplicated_layer_index = layer_index + 1 # all layers before this layer are duplicates
count_consecutive_non_duplicate_layers = 0
else:
count_consecutive_non_duplicate_layers += 1
return sorted_layers[first_non_duplicated_layer_index:]
def check_duplicate_layer_by_template_matching(
previous_page: fitz.Page, current_page: fitz.Page, current_layer: dict, img_template_probability_threshold: float
) -> bool:
"""Check if the current layer is a duplicate of a layer on the previous page by using template matching.
This is done by extracting an image of the layer and check if that image is present in the previous page
by applying template matching onto the previous page. This checks if the image of the current layer is present
in the previous page.
Args:
previous_page (fitz.Page): The previous page.
current_page (fitz.Page): The current page.
current_layer (dict): The current layer that is checked for a duplicate.
img_template_probability_threshold (float): The threshold for the template matching probability
to consider a layer a duplicate.
Returns:
bool: True if the layer is a duplicate, False otherwise.
"""
scale_factor = 3
current_page_image = convert_page_to_opencv_img(
current_page, scale_factor=scale_factor, color_mode=cv2.COLOR_BGR2GRAY
)
previous_page_image = convert_page_to_opencv_img(
previous_page, scale_factor=scale_factor, color_mode=cv2.COLOR_BGR2GRAY
)
[x0, y_start, x1, y_end] = current_layer["material_description"]["rect"]
x_start = int(scale_factor * min(x0, current_page.rect.width * 0.2)) # 0.2 is a magic number that works well
x_end = int(scale_factor * min(max(x1, current_page.rect.width * 0.8), previous_page.rect.width - 1))
y_start = int(scale_factor * max(y_start, 0)) # do not go higher up as otherwise we remove too many layers.
y_end = int(scale_factor * min(y_end + 5, previous_page.rect.height - 1, current_page.rect.height - 1))
# y_start and y_end define the upper and lower bound of the image used to compare to the previous page
# and determine if there is an overlap. We add 5 pixel to y_end to add a bit more context to the image
# as the material_description bounding box is very tight around the text. Furthermore, we need to ensure
# that the template is smaller than the previous and the current page.
# y_start should not be lowered further as otherwise the we include potential overlap to the previous page
# that belongs to the previous layer.
layer_image = current_page_image[y_start:y_end, x_start:x_end]
try:
img_template_probablility_match = np.max(
cv2.matchTemplate(previous_page_image, layer_image, cv2.TM_CCOEFF_NORMED)
)
except cv2.error: # there can be strange correlation errors here.
# Just ignore them as it is only a few over the complete dataset
logger.warning("Error in template matching. Skipping layer.")
return False
return img_template_probablility_match > img_template_probability_threshold