Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

disable multiprocessing if unavailable #39

Merged
merged 2 commits into from
Dec 3, 2015
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 26 additions & 7 deletions smart_open/smart_open_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
"""

import logging
import multiprocessing.pool
import os
import subprocess
import sys
Expand All @@ -37,6 +36,18 @@

logger = logging.getLogger(__name__)

# Multiprocessing is unavailable in App Engine (and possibly other sandboxes).
# The only method currently relying on it is s3_iter_bucket, which is instructed
# not to use it by the NO_MULTIPROCESSING flag.
try:
import multiprocessing.pool
except ImportError:
logger.warning("multiprocessing could not be imported and won't be used")
NO_MULTIPROCESSING = True
from itertools import imap
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this might break Python 3. imap is only available in Python 2.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tmylk how come the unit tests have been passing for a year then?

else:
NO_MULTIPROCESSING = False

S3_MIN_PART_SIZE = 50 * 1024**2 # minimum part size for S3 multipart uploads
WEBHDFS_MIN_PART_SIZE = 50 * 1024**2 # minimum part size for HDFS multipart uploads

Expand Down Expand Up @@ -578,7 +589,8 @@ def s3_iter_bucket(bucket, prefix='', accept_key=lambda key: True, key_limit=Non
If `key_limit` is given, stop after yielding out that many results.

The keys are processed in parallel, using `workers` processes (default: 16),
to speed up downloads greatly.
to speed up downloads greatly. If multiprocessing is not available, thus
NO_MULTIPROCESSING is True, this parameter will be ignored.

Example::

Expand All @@ -593,13 +605,18 @@ def s3_iter_bucket(bucket, prefix='', accept_key=lambda key: True, key_limit=Non
... print key, len(content)

"""
logger.info("iterating over keys from %s with %i workers" % (bucket, workers))

total_size, key_no = 0, -1
keys = (key for key in bucket.list(prefix=prefix) if accept_key(key.name))

pool = multiprocessing.pool.Pool(processes=workers)
for key_no, (key, content) in enumerate(pool.imap_unordered(s3_iter_bucket_process_key, keys)):
if NO_MULTIPROCESSING:
logger.info("iterating over keys from %s without multiprocessing" % bucket)
iterator = imap(s3_iter_bucket_process_key, keys)
else:
logger.info("iterating over keys from %s with %i workers" % (bucket, workers))
pool = multiprocessing.pool.Pool(processes=workers)
iterator = pool.imap_unordered(s3_iter_bucket_process_key, keys)

for key_no, (key, content) in enumerate(iterator):
if key_no % 1000 == 0:
logger.info("yielding key #%i: %s, size %i (total %.1fMB)" %
(key_no, key, len(content), total_size / 1024.0 ** 2))
Expand All @@ -611,7 +628,9 @@ def s3_iter_bucket(bucket, prefix='', accept_key=lambda key: True, key_limit=Non
if key_limit is not None and key_no + 1 >= key_limit:
# we were asked to output only a limited number of keys => we're done
break
pool.terminate()

if not NO_MULTIPROCESSING:
pool.terminate()

logger.info("processed %i keys, total size %i" % (key_no + 1, total_size))

Expand Down