-
Notifications
You must be signed in to change notification settings - Fork 2
/
extract.py
482 lines (414 loc) · 19.9 KB
/
extract.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
"""Contains the main extraction pipeline for stratigraphy."""
import logging
import math
import fitz
from stratigraphy.util import find_depth_columns
from stratigraphy.util.dataclasses import Line
from stratigraphy.util.depthcolumn import DepthColumn
from stratigraphy.util.find_description import (
get_description_blocks,
get_description_blocks_from_layer_identifier,
get_description_lines,
)
from stratigraphy.util.interval import BoundaryInterval, Interval
from stratigraphy.util.layer_identifier_column import (
LayerIdentifierColumn,
find_layer_identifier_column,
find_layer_identifier_column_entries,
)
from stratigraphy.util.line import TextLine
from stratigraphy.util.textblock import TextBlock, block_distance
from stratigraphy.util.util import (
parse_and_remove_empty_predictions,
x_overlap,
x_overlap_significant_smallest,
)
logger = logging.getLogger(__name__)
def process_page(lines: list[TextLine], geometric_lines, language: str, **params: dict) -> list[dict]:
"""Process a single page of a pdf.
Finds all descriptions and depth intervals on the page and matches them.
Args:
lines (list[TextLine]): all the text lines on the page.
geometric_lines (list[Line]): The geometric lines of the page.
language (str): The language of the page.
**params (dict): Additional parameters for the matching pipeline.
Returns:
list[dict]: All list of the text of all description blocks.
"""
# Detect Layer Index Columns
layer_identifier_entries = find_layer_identifier_column_entries(lines)
layer_identifier_columns = (
find_layer_identifier_column(layer_identifier_entries) if layer_identifier_entries else []
)
pairs = []
if layer_identifier_columns:
for layer_identifier_column in layer_identifier_columns:
material_description_rect = find_material_description_column(
lines, layer_identifier_column, language, **params["material_description"]
)
if material_description_rect:
pairs.append((layer_identifier_column, material_description_rect))
# Obtain the best pair. In contrast to depth columns, there only ever is one layer index column per page.
if pairs:
pairs.sort(key=lambda pair: score_column_match(pair[0], pair[1]))
words = [word for line in lines for word in line.words]
# If there is a layer identifier column, then we use this directly.
# Else, we search for depth columns. We could also think of some scoring mechanism to decide which one to use.
if not pairs:
depth_column_entries = find_depth_columns.depth_column_entries(words, include_splits=True)
layer_depth_columns = find_depth_columns.find_layer_depth_columns(depth_column_entries, words)
used_entry_rects = []
for column in layer_depth_columns:
for entry in column.entries:
used_entry_rects.extend([entry.start.rect, entry.end.rect])
depth_column_entries = [
entry
for entry in find_depth_columns.depth_column_entries(words, include_splits=False)
if entry.rect not in used_entry_rects
]
depth_columns: list[DepthColumn] = layer_depth_columns
depth_columns.extend(
find_depth_columns.find_depth_columns(
depth_column_entries, words, depth_column_params=params["depth_column_params"]
)
)
for depth_column in depth_columns:
material_description_rect = find_material_description_column(
lines, depth_column, language, **params["material_description"]
)
if material_description_rect:
pairs.append((depth_column, material_description_rect))
# lowest score first
pairs.sort(key=lambda pair: score_column_match(pair[0], pair[1], words))
to_delete = []
for i, (_depth_column, material_description_rect) in enumerate(pairs):
for _depth_column_2, material_description_rect_2 in pairs[i + 1 :]:
if material_description_rect.intersects(material_description_rect_2):
to_delete.append(i)
continue
filtered_pairs = [item for index, item in enumerate(pairs) if index not in to_delete]
groups = [] # list of matched depth intervals and text blocks
# groups is of the form: [{"depth_interval": BoundaryInterval, "block": TextBlock}]
if len(filtered_pairs): # match depth column items with material description
for depth_column, material_description_rect in filtered_pairs:
description_lines = get_description_lines(lines, material_description_rect)
if len(description_lines) > 1:
new_groups = match_columns(
depth_column, description_lines, geometric_lines, material_description_rect, **params
)
groups.extend(new_groups)
json_filtered_pairs = [
{
"depth_column": depth_column.to_json(),
"material_description_rect": [
material_description_rect.x0,
material_description_rect.y0,
material_description_rect.x1,
material_description_rect.y1,
],
}
for depth_column, material_description_rect in filtered_pairs
]
else:
json_filtered_pairs = []
# Fallback when no depth column was found
material_description_rect = find_material_description_column(
lines, depth_column=None, language=language, **params["material_description"]
)
if material_description_rect:
description_lines = get_description_lines(lines, material_description_rect)
description_blocks = get_description_blocks(
description_lines,
geometric_lines,
material_description_rect,
params["block_line_ratio"],
params["left_line_length_threshold"],
)
groups.extend([{"block": block} for block in description_blocks])
json_filtered_pairs.extend(
[
{
"depth_column": None,
"material_description_rect": [
material_description_rect.x0,
material_description_rect.y0,
material_description_rect.x1,
material_description_rect.y1,
],
}
]
)
predictions = [
{"material_description": group["block"].to_json(), "depth_interval": group["depth_interval"].to_json()}
if "depth_interval" in group
else {"material_description": group["block"].to_json()}
for group in groups
]
predictions = parse_and_remove_empty_predictions(predictions)
return predictions, json_filtered_pairs
def score_column_match(
depth_column: DepthColumn,
material_description_rect: fitz.Rect,
all_words: list[TextLine] | None = None,
**params: dict,
) -> float:
"""Scores the match between a depth column and a material description.
Args:
depth_column (DepthColumn): The depth column.
material_description_rect (fitz.Rect): The material description rectangle.
all_words (list[TextLine] | None, optional): List of the available textlines. Defaults to None.
**params (dict): Additional parameters for the matching pipeline. Kept for compatibility with the pipeline.
Returns:
float: The score of the match.
"""
rect = depth_column.rect()
top = rect.y0
bottom = rect.y1
right = rect.x1
distance = (
abs(top - material_description_rect.y0)
+ abs(bottom - material_description_rect.y1)
+ abs(right - material_description_rect.x0)
)
height = bottom - top
noise_count = depth_column.noise_count(all_words) if all_words else 0
return (height - distance) * math.pow(0.8, noise_count)
def match_columns(
depth_column: DepthColumn | LayerIdentifierColumn,
description_lines: list[TextLine],
geometric_lines: list[Line],
material_description_rect: fitz.Rect,
**params: dict,
) -> list:
"""Match the depth column entries with the description lines.
This function identifies groups of depth intervals and text blocks that are likely to match.
Makes a distinction between DepthColumn and LayerIdentifierColumn and obtains the corresponding text blocks
as well as their depth intervals where present.
Args:
depth_column (DepthColumn | LayerIdentifierColumn): The depth column.
description_lines (list[TextLine]): The description lines.
geometric_lines (list[Line]): The geometric lines.
material_description_rect (fitz.Rect): The material description rectangle.
**params (dict): Additional parameters for the matching pipeline.
Returns:
list: The matched depth intervals and text blocks.
"""
if isinstance(depth_column, DepthColumn):
return [
element
for group in depth_column.identify_groups(
description_lines, geometric_lines, material_description_rect, **params
)
for element in transform_groups(group["depth_intervals"], group["blocks"], **params)
]
elif isinstance(depth_column, LayerIdentifierColumn):
blocks = get_description_blocks_from_layer_identifier(depth_column.entries, description_lines)
groups = []
for block in blocks:
depth_interval = depth_column.get_depth_interval(block)
if depth_interval:
groups.append({"depth_interval": depth_interval, "block": block})
else:
groups.append({"block": block})
return groups
else:
raise ValueError(
f"depth_column must be a DepthColumn or a LayerIdentifierColumn object. Got {type(depth_column)}."
)
def transform_groups(
depth_intervals: list[Interval], blocks: list[TextBlock], **params: dict
) -> list[dict[str, Interval | TextBlock]]:
"""Transforms the text blocks such that their number equals the number of depth intervals.
If there are more depth intervals than text blocks, text blocks are splitted. When there
are more text blocks than depth intervals, text blocks are merged. If the number of text blocks
and depth intervals equals, we proceed with the pairing.
Args:
depth_intervals (List[Interval]): The depth intervals from the pdf.
blocks (List[TextBlock]): Found textblocks from the pdf.
**params (dict): Additional parameters for the matching pipeline.
Returns:
List[Dict[str, Union[Interval, TextBlock]]]: Pairing of text blocks and depth intervals.
"""
if len(depth_intervals) == 0:
return []
elif len(depth_intervals) == 1:
concatenated_block = TextBlock(
[line for block in blocks for line in block.lines]
) # concatenate all text lines within a block; line separation flag does not matter here.
return [{"depth_interval": depth_intervals[0], "block": concatenated_block}]
else:
if len(blocks) < len(depth_intervals):
blocks = split_blocks_by_textline_length(blocks, target_split_count=len(depth_intervals) - len(blocks))
if len(blocks) > len(depth_intervals):
# create additional depth intervals with end & start value None to match the number of blocks
depth_intervals.extend([BoundaryInterval(None, None) for _ in range(len(blocks) - len(depth_intervals))])
return [
{"depth_interval": depth_interval, "block": block}
for depth_interval, block in zip(depth_intervals, blocks, strict=False)
]
def merge_blocks_by_vertical_spacing(blocks: list[TextBlock], target_merge_count: int) -> list[TextBlock]:
"""Merge textblocks without any geometric lines that separates them.
Note: Deprecated. Currently not in use any more. Kept here until we are sure that it is not needed anymore.
The logic looks at the distances between the textblocks and merges them if they are closer
than a certain cutoff.
Args:
blocks (List[TextBlock]): Textblocks that are to be merged.
target_merge_count (int): the number of merges that we'd like to happen (i.e. we'd like the total number of
blocks to be reduced by this number)
Returns:
List[TextBlock]: The merged textblocks.
"""
distances = []
for block_index in range(len(blocks) - 1):
distances.append(block_distance(blocks[block_index], blocks[block_index + 1]))
cutoff = sorted(distances)[target_merge_count - 1] # merge all blocks that have a distance smaller than this
merged_count = 0
merged_blocks = []
current_merged_block = blocks[0]
for block_index in range(len(blocks) - 1):
new_block = blocks[block_index + 1]
if (
merged_count < target_merge_count
and block_distance(blocks[block_index], blocks[block_index + 1]) <= cutoff
):
current_merged_block = current_merged_block.concatenate(new_block)
merged_count += 1
else:
merged_blocks.append(current_merged_block)
current_merged_block = new_block
if len(current_merged_block.lines):
merged_blocks.append(current_merged_block)
return merged_blocks
def split_blocks_by_textline_length(blocks: list[TextBlock], target_split_count: int) -> list[TextBlock]:
"""Split textblocks without any geometric lines that separates them.
The logic looks at the lengths of the text lines and cuts them off
if there are textlines that are shorter than others.
# TODO: Extend documentation about logic.
Args:
blocks (List[TextBlock]): Textblocks that are to be split.
target_split_count (int): the number of splits that we'd like to happen (i.e. we'd like the total number of
blocks to be increased by this number)
Returns:
List[TextBlock]: The split textblocks.
"""
line_lengths = sorted([line.rect.x1 for block in blocks for line in block.lines[:-1]])
if len(line_lengths) <= target_split_count: # In that case each line is a block
return [TextBlock([line]) for block in blocks for line in block.lines]
else:
cutoff_values = line_lengths[:target_split_count] # all lines inside cutoff_values will be split line
split_blocks = []
current_block_lines = []
for block in blocks:
for line_index in range(block.line_count):
line = block.lines[line_index]
current_block_lines.append(line)
if line_index < block.line_count - 1 and line.rect.x1 in cutoff_values:
split_blocks.append(TextBlock(current_block_lines))
cutoff_values.remove(line.rect.x1)
current_block_lines = []
if len(current_block_lines):
split_blocks.append(TextBlock(current_block_lines))
current_block_lines = []
if (
block.is_terminated_by_line
): # If block was terminated by a line, populate the flag to the last element of split_blocks.
split_blocks[-1].is_terminated_by_line = True
return split_blocks
def find_material_description_column(
lines: list[TextLine], depth_column: DepthColumn, language: str, **params: dict
) -> fitz.Rect | None:
"""Find the material description column given a depth column.
Args:
lines (list[TextLine]): The text lines of the page.
depth_column (DepthColumn): The depth column.
language (str): The language of the page.
**params (dict): Additional parameters for the matching pipeline.
Returns:
fitz.Rect | None: The material description column.
"""
if depth_column:
above_depth_column = [
line
for line in lines
if x_overlap(line.rect, depth_column.rect()) and line.rect.y0 < depth_column.rect().y0
]
min_y0 = max(line.rect.y0 for line in above_depth_column) if len(above_depth_column) else -1
def check_y0_condition(y0):
return y0 > min_y0 and y0 < depth_column.rect().y1
else:
def check_y0_condition(y0):
return True
candidate_description = [line for line in lines if check_y0_condition(line.rect.y0)]
is_description = [line for line in candidate_description if line.is_description(params[language])]
if len(candidate_description) == 0:
return
description_clusters = []
while len(is_description) > 0:
coverage_by_generating_line = [
[other for other in is_description if x_overlap_significant_smallest(line.rect, other.rect, 0.5)]
for line in is_description
]
def filter_coverage(coverage):
if len(coverage):
min_x0 = min(line.rect.x0 for line in coverage)
max_x1 = max(line.rect.x1 for line in coverage)
x0_threshold = max_x1 - 0.4 * (
max_x1 - min_x0
) # how did we determine the 0.4? Should it be a parameter? What would it do if we were to change it?
return [line for line in coverage if line.rect.x0 < x0_threshold]
else:
return []
coverage_by_generating_line = [filter_coverage(coverage) for coverage in coverage_by_generating_line]
max_coverage = max(coverage_by_generating_line, key=len)
description_clusters.append(max_coverage)
is_description = [line for line in is_description if line not in max_coverage]
candidate_rects = []
for cluster in description_clusters:
best_y0 = min([line.rect.y0 for line in cluster])
best_y1 = max([line.rect.y1 for line in cluster])
min_description_x0 = min(
[
line.rect.x0 - 0.01 * line.rect.width for line in cluster
] # How did we determine the 0.01? Should it be a parameter? What would it do if we were to change it?
)
max_description_x0 = max(
[
line.rect.x0 + 0.2 * line.rect.width for line in cluster
] # How did we determine the 0.2? Should it be a parameter? What would it do if we were to change it?
)
good_lines = [
line
for line in candidate_description
if line.rect.y0 >= best_y0 and line.rect.y1 <= best_y1
if min_description_x0 < line.rect.x0 < max_description_x0
]
best_x0 = min([line.rect.x0 for line in good_lines])
best_x1 = max([line.rect.x1 for line in good_lines])
# expand to include entire last block
def is_below(best_x0, best_y1, line):
return (
(
line.rect.x0 > best_x0 - 5
) # How did we determine the 5? Should it be a parameter? What would it do if we were to change it?
and (line.rect.x0 < (best_x0 + best_x1) / 2) # noqa B023
and (
line.rect.y0 < best_y1 + 10
) # How did we determine the 10? Should it be a parameter? What would it do if we were to change it?
and (line.rect.y1 > best_y1)
)
continue_search = True
while continue_search:
line = next((line for line in lines if is_below(best_x0, best_y1, line)), None)
if line:
best_x0 = min(best_x0, line.rect.x0)
best_x1 = max(best_x1, line.rect.x1)
best_y1 = line.rect.y1
else:
continue_search = False
candidate_rects.append(fitz.Rect(best_x0, best_y0, best_x1, best_y1))
if len(candidate_rects) == 0:
return None
if depth_column:
return max(candidate_rects, key=lambda rect: score_column_match(depth_column, rect))
else:
return candidate_rects[0]