Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gb/pdf edits #5

Merged
merged 2 commits into from
Nov 2, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 48 additions & 21 deletions elm/pdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import subprocess
import numpy as np
import requests
import tempfile
import copy
from PyPDF2 import PdfReader
import logging
Expand Down Expand Up @@ -224,16 +225,14 @@ def is_double_col(self, separator=' '):
n_cols[i] = len(columns)
return np.median(n_cols) >= 2

def clean_poppler(self, fp_out, layout=True):
def clean_poppler(self, layout=True):
"""Clean the pdf using the poppler pdftotxt utility

Requires the `pdftotext` command line utility from this software:
https://poppler.freedesktop.org/

Parameters
----------
fp_out : str
Filepath to output .txt file
layout : bool
Layout flag for poppler pdftotxt utility: "maintain original
physical layout". Layout=True works well for single column text,
Expand All @@ -246,21 +245,24 @@ def clean_poppler(self, fp_out, layout=True):
Joined cleaned pages
"""

args = ['pdftotext', f"{self.fp}", f"{fp_out}"]
if layout:
args.insert(1, '-layout')
with tempfile.TemporaryDirectory() as td:
fp_out = os.path.join(td, 'poppler_out.txt')
args = ['pdftotext', f"{self.fp}", f"{fp_out}"]
if layout:
args.insert(1, '-layout')

if not os.path.exists(os.path.dirname(fp_out)):
os.makedirs(os.path.dirname(fp_out), exist_ok=True)
if not os.path.exists(os.path.dirname(fp_out)):
os.makedirs(os.path.dirname(fp_out), exist_ok=True)

stdout = subprocess.run(args, check=True, stdout=subprocess.PIPE)
if stdout.returncode == 0:
logger.info(f'Saved to disk: {fp_out}')
else:
raise RuntimeError(stdout)
stdout = subprocess.run(args, check=True, stdout=subprocess.PIPE)
if stdout.returncode != 0:
msg = ('Poppler raised return code {}: {}'
.format(stdout.returncode, stdout))
logger.exception(msg)
raise RuntimeError(msg)

with open(fp_out, 'r') as f:
clean_txt = f.read()
with open(fp_out, 'r') as f:
clean_txt = f.read()

# break on poppler page break
self.pages = clean_txt.split('\x0c')
Expand Down Expand Up @@ -333,6 +335,35 @@ def combine_pages(pages):
full = full.replace('•', '-')
return full

def _get_nominal_headers(self, split_on, iheaders):
"""Get nominal headers from a standard page. Aim for a "typical" page
that is likely to have a normal header, not the first or last.

Parameters
----------
split_on : str
Chars to split lines of a page on
iheaders : list | tuple
Integer indices to look for headers after splitting a page into
lines based on split_on. This needs to go from the start of the
page to the end.

Returns
-------
headers : list
List of headers where each entry is a string header
"""

headers = [None] * len(iheaders)
page_lens = np.array([len(p) for p in self.pages])
median_len = np.median(page_lens)
ipage = np.argmin(np.abs(page_lens - median_len))
page = self.pages[ipage]
for i, ih in enumerate(iheaders):
headers[i] = page.split(split_on)[ih]

return headers

def clean_headers(self, char_thresh=0.6, page_thresh=0.8, split_on='\n',
iheaders=(0, 1, -2, -1)):
"""Clean headers/footers that are duplicated across pages
Expand All @@ -358,13 +389,9 @@ def clean_headers(self, char_thresh=0.6, page_thresh=0.8, split_on='\n',
Clean text with all pages joined
"""
logger.info('Cleaning headers')
headers = [None] * len(iheaders)
headers = self._get_nominal_headers(split_on, iheaders)
tests = np.zeros((len(self.pages), len(headers)))

page = self.pages[-1]
for i, ih in enumerate(iheaders):
headers[i] = page.split(split_on)[ih]

for ip, page in enumerate(self.pages):
for ih, header in zip(iheaders, headers):
pheader = ''
Expand Down Expand Up @@ -399,7 +426,7 @@ def clean_headers(self, char_thresh=0.6, page_thresh=0.8, split_on='\n',
for ip, page in enumerate(self.pages):
page = page.split(split_on)
for i, iheader in enumerate(iheaders):
if tests[i]:
if tests[i] and len(page) > np.abs(iheader):
_ = page.pop(iheader)

page = split_on.join(page)
Expand Down
Loading