Skip to content

Commit

Permalink
Reduce memory on multiprocess raster by writing to temp file
Browse files Browse the repository at this point in the history
  • Loading branch information
PSU3D0 committed May 17, 2024
1 parent 569a22a commit d8ef474
Showing 1 changed file with 80 additions and 42 deletions.
122 changes: 80 additions & 42 deletions docprompt/_pdfium.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from io import BytesIO
import os
import random
import tempfile
import pypdfium2 as pdfium
from contextlib import contextmanager
from threading import Lock
Expand Down Expand Up @@ -126,11 +128,6 @@ def _render_job(
return buffer.getvalue()


def _render_parallel_job(page_indice):
global ProcObjs
return _render_job(page_indice, *ProcObjs)


def _render_parallel_multi_doc_job(pdf_indice, page_indice):
global ProcObjsMultiDoc

Expand All @@ -139,6 +136,11 @@ def _render_parallel_multi_doc_job(pdf_indice, page_indice):
return pdf_indice, page_indice, _render_job(page_indice, pdf, *ProcObjsMultiDoc[1:])


def _render_parallel_job(page_indice):
global ProcObjs
return _render_job(page_indice, *ProcObjs)


def rasterize_page_with_pdfium(
fp: Union[PathLike, Path, bytes],
page_number: int,
Expand All @@ -160,6 +162,17 @@ def rasterize_page_with_pdfium(
)


@contextmanager
def potential_temporary_file(fp: Union[PathLike, Path, bytes]):
if isinstance(fp, bytes):
with tempfile.NamedTemporaryFile(suffix=".pdf") as temp_fp:
temp_fp.write(fp)
temp_fp.flush()
yield temp_fp.name
else:
yield fp


def rasterize_pdf_with_pdfium(
fp: Union[PathLike, Path, bytes],
password: Optional[str] = None,
Expand All @@ -178,17 +191,28 @@ def rasterize_pdf_with_pdfium(

ctx = mp.get_context("spawn")

initargs = (None, fp, password, False, kwargs, return_mode, post_process_fn)
with potential_temporary_file(fp) as temp_fp:
initargs = (
None,
temp_fp,
password,
False,
kwargs,
return_mode,
post_process_fn,
)

with ft.ProcessPoolExecutor(
max_workers=max_workers,
initializer=_render_parallel_init,
initargs=initargs,
mp_context=ctx,
) as executor:
results = executor.map(_render_parallel_job, range(total_pages), chunksize=1)
with ft.ProcessPoolExecutor(
max_workers=max_workers,
initializer=_render_parallel_init,
initargs=initargs,
mp_context=ctx,
) as executor:
results = executor.map(
_render_parallel_job, range(total_pages), chunksize=1
)

return list(results)
return list(results)


def rasterize_pdfs_with_pdfium(
Expand Down Expand Up @@ -219,39 +243,53 @@ def rasterize_pdfs_with_pdfium(
page_counts.append(len(pdf))
total_to_process += len(pdf)

initargs = (
None,
fps,
passwords,
False,
kwargs,
return_mode,
post_process_fn,
)
writable_fps = []

with tempfile.TemporaryDirectory(prefix="docprompt_raster_tmp") as tempdir:
for i, fp in enumerate(fps):
if isinstance(fp, bytes):
temp_fp = os.path.join(
tempdir, f"{i}_{random.randint(10000, 50000)}.pdf"
)
with open(temp_fp, "wb") as f:
f.write(fp)
writable_fps.append(temp_fp)
else:
writable_fps.append(str(fp))

initargs = (
None,
writable_fps,
passwords,
False,
kwargs,
return_mode,
post_process_fn,
)

results = {}
results = {}

futures = []
futures = []

max_workers = min(mp.cpu_count(), total_to_process)
max_workers = min(mp.cpu_count(), total_to_process)

with tqdm.tqdm(total=total_to_process) as pbar:
with ft.ProcessPoolExecutor(
max_workers=max_workers,
initializer=_render_parallel_multi_doc_init,
initargs=initargs,
mp_context=ctx,
) as executor:
for i, page_count in enumerate(page_counts):
for j in range(page_count):
futures.append(
executor.submit(_render_parallel_multi_doc_job, i, j)
)
with tqdm.tqdm(total=total_to_process) as pbar:
with ft.ProcessPoolExecutor(
max_workers=max_workers,
initializer=_render_parallel_multi_doc_init,
initargs=initargs,
mp_context=ctx,
) as executor:
for i, page_count in enumerate(page_counts):
for j in range(page_count):
futures.append(
executor.submit(_render_parallel_multi_doc_job, i, j)
)

for future in ft.as_completed(futures):
pdf_indice, page_indice, result = future.result()
for future in ft.as_completed(futures):
pdf_indice, page_indice, result = future.result()

results.setdefault(pdf_indice, {})[page_indice + 1] = result
pbar.update(1)
results.setdefault(pdf_indice, {})[page_indice + 1] = result
pbar.update(1)

return results

0 comments on commit d8ef474

Please sign in to comment.