Skip to content

Commit

Permalink
Improve file processing performance, closes #59
Browse files Browse the repository at this point in the history
  • Loading branch information
davidmezzetti committed Jan 6, 2025
1 parent 1a727f0 commit 32e0f30
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 7 deletions.
1 change: 1 addition & 0 deletions src/python/paperetl/file/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@
sys.argv[2],
sys.argv[3] if len(sys.argv) > 3 else None,
sys.argv[4] == "True" if len(sys.argv) > 4 else False,
int(sys.argv[5]) if len(sys.argv) > 5 else 32,
)
53 changes: 46 additions & 7 deletions src/python/paperetl/file/execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

import gzip
import os
import pickle
import tempfile

from multiprocessing import Process, Queue

Expand Down Expand Up @@ -70,27 +72,58 @@ def parse(path, source, extension, compress, config):
yield from CSV.parse(stream, source)

@staticmethod
def process(inputs, outputs):
def process(inputs, outputs, batchsize):
"""
Main worker process loop. Processes file paths stored in inputs and writes articles
to outputs. Writes a final message upon completion.
Args:
inputs: inputs queue
outputs: outputs queue
batchsize: batch size
"""

batch = []
try:
# Process until inputs queue is exhausted
while not inputs.empty():
params = inputs.get()

# Parse file and save successfully parsed (not None) results
for result in Execute.parse(*params):
outputs.put(result)
if result:
batch.append(result)
if len(batch) == batchsize:
outputs.put(Execute.serialize(batch))
batch = []

finally:
# Final batch
if batch:
outputs.put(Execute.serialize(batch))

# Write message that process is complete
outputs.put(Execute.COMPLETE)

@staticmethod
def serialize(batch):
"""
Saves a batch of data to a temporary file for later processing.
Args:
batch to save
Returns:
temporary file name
"""

with tempfile.NamedTemporaryFile(mode="wb", delete=False) as output:
# Serialize batch to temporary file
pickle.dump(batch, output)

# Temporary file name
return output.name

@staticmethod
def scan(indir, config, inputs):
"""
Expand Down Expand Up @@ -150,7 +183,12 @@ def save(processes, outputs, db):

# Save article, this method will skip duplicates based on entry date
elif result:
db.save(result)
with open(result, "rb") as f:
for x in pickle.load(f):
db.save(x)

# Delete temporary file
os.remove(result)

@staticmethod
def close(processes, inputs, outputs):
Expand All @@ -173,7 +211,7 @@ def close(processes, inputs, outputs):
outputs.close()

@staticmethod
def run(indir, url, config=None, replace=False):
def run(indir, url, config=None, replace=False, batchsize=32):
"""
Main execution method.
Expand All @@ -182,23 +220,24 @@ def run(indir, url, config=None, replace=False):
url: database url
config: path to config directory, if any
replace: if true, a new database will be created, overwriting any existing database
batchsize: batch size
"""

processes, inputs, outputs = None, None, None
try:
# Build database connection
db = Factory.create(url, replace)

# Create queues, limit size of output queue
inputs, outputs = Queue(), Queue(30000)
# Create queues
inputs, outputs = Queue(), Queue()

# Scan input directory and add files to inputs queue
total = Execute.scan(indir, config, inputs)

# Start worker processes
processes = []
for _ in range(min(total, os.cpu_count())):
process = Process(target=Execute.process, args=(inputs, outputs))
process = Process(target=Execute.process, args=(inputs, outputs, batchsize))
process.start()
processes.append(process)

Expand Down

0 comments on commit 32e0f30

Please sign in to comment.