From 32e0f307d90a0b488c958bc87f9ba622db2eafca Mon Sep 17 00:00:00 2001 From: davidmezzetti <561939+davidmezzetti@users.noreply.github.com> Date: Mon, 6 Jan 2025 08:31:34 -0500 Subject: [PATCH] Improve file processing performance, closes #59 --- src/python/paperetl/file/__main__.py | 1 + src/python/paperetl/file/execute.py | 53 ++++++++++++++++++++++++---- 2 files changed, 47 insertions(+), 7 deletions(-) diff --git a/src/python/paperetl/file/__main__.py b/src/python/paperetl/file/__main__.py index 59cde5b..3ec2298 100644 --- a/src/python/paperetl/file/__main__.py +++ b/src/python/paperetl/file/__main__.py @@ -13,4 +13,5 @@ sys.argv[2], sys.argv[3] if len(sys.argv) > 3 else None, sys.argv[4] == "True" if len(sys.argv) > 4 else False, + int(sys.argv[5]) if len(sys.argv) > 5 else 32, ) diff --git a/src/python/paperetl/file/execute.py b/src/python/paperetl/file/execute.py index 08d4573..07bb2ed 100644 --- a/src/python/paperetl/file/execute.py +++ b/src/python/paperetl/file/execute.py @@ -4,6 +4,8 @@ import gzip import os +import pickle +import tempfile from multiprocessing import Process, Queue @@ -70,7 +72,7 @@ def parse(path, source, extension, compress, config): yield from CSV.parse(stream, source) @staticmethod - def process(inputs, outputs): + def process(inputs, outputs, batchsize): """ Main worker process loop. Processes file paths stored in inputs and writes articles to outputs. Writes a final message upon completion. @@ -78,19 +80,50 @@ def process(inputs, outputs): Args: inputs: inputs queue outputs: outputs queue + batchsize: batch size """ + batch = [] try: # Process until inputs queue is exhausted while not inputs.empty(): params = inputs.get() + # Parse file and save successfully parsed (not None) results for result in Execute.parse(*params): - outputs.put(result) + if result: + batch.append(result) + if len(batch) == batchsize: + outputs.put(Execute.serialize(batch)) + batch = [] + finally: + # Final batch + if batch: + outputs.put(Execute.serialize(batch)) + # Write message that process is complete outputs.put(Execute.COMPLETE) + @staticmethod + def serialize(batch): + """ + Saves a batch of data to a temporary file for later processing. + + Args: + batch to save + + Returns: + temporary file name + """ + + with tempfile.NamedTemporaryFile(mode="wb", delete=False) as output: + # Serialize batch to temporary file + pickle.dump(batch, output) + + # Temporary file name + return output.name + @staticmethod def scan(indir, config, inputs): """ @@ -150,7 +183,12 @@ def save(processes, outputs, db): # Save article, this method will skip duplicates based on entry date elif result: - db.save(result) + with open(result, "rb") as f: + for x in pickle.load(f): + db.save(x) + + # Delete temporary file + os.remove(result) @staticmethod def close(processes, inputs, outputs): @@ -173,7 +211,7 @@ def close(processes, inputs, outputs): outputs.close() @staticmethod - def run(indir, url, config=None, replace=False): + def run(indir, url, config=None, replace=False, batchsize=32): """ Main execution method. @@ -182,6 +220,7 @@ def run(indir, url, config=None, replace=False): url: database url config: path to config directory, if any replace: if true, a new database will be created, overwriting any existing database + batchsize: batch size """ processes, inputs, outputs = None, None, None @@ -189,8 +228,8 @@ def run(indir, url, config=None, replace=False): # Build database connection db = Factory.create(url, replace) - # Create queues, limit size of output queue - inputs, outputs = Queue(), Queue(30000) + # Create queues + inputs, outputs = Queue(), Queue() # Scan input directory and add files to inputs queue total = Execute.scan(indir, config, inputs) @@ -198,7 +237,7 @@ def run(indir, url, config=None, replace=False): # Start worker processes processes = [] for _ in range(min(total, os.cpu_count())): - process = Process(target=Execute.process, args=(inputs, outputs)) + process = Process(target=Execute.process, args=(inputs, outputs, batchsize)) process.start() processes.append(process)