Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restructure TextCorpus code to share multiprocessing and preprocessing logic. #1478

Closed
wants to merge 23 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
e6069d3
Make `TextCorpus.get_texts` use multiprocessing -- copied underlying …
Jul 3, 2017
3de59c2
Refactor `WikiCorpus` to use distribution logic in `TextCorpus` and m…
Jul 3, 2017
c15b42b
Speed up text corpus preprocessing using custom multiprocessing pool …
Jul 4, 2017
977098d
Python 3 compatibility for imap and use mixin for shared text preproc…
Jul 4, 2017
5b7d016
Move `LineSentence` and `Text8Corpus` to `textcorpus` module. Rename …
Jul 9, 2017
8f4794b
Refactor `LineSentence` and `Text8Corpus` to use `TextCorpus` in orde…
Jul 9, 2017
c3a6a25
Beef up docstrings for `preprocess_text`, which is used in slightly m…
Jul 9, 2017
dd871c3
Move `BrownCorpus` to `textcorpus` module and refactor to subclass fr…
Jul 9, 2017
ab27719
Refactor `WikiCorpus` to use distribution logic in `TextCorpus` and m…
Jul 3, 2017
a88f4d5
Speed up text corpus preprocessing using custom multiprocessing pool …
Jul 4, 2017
b75486e
Refactor `LineSentence` and `Text8Corpus` to use `TextCorpus` in orde…
Jul 9, 2017
143faa6
Fix flake8 issues and ensure `utils.chunkize` terminates worker and c…
Jul 15, 2017
9e711eb
Revert wikicorpus tests to avoid odd issues when running with `setup.…
Jul 16, 2017
271d6dd
Address PR review from @piskvorky regarding formatting, documentation…
Jul 17, 2017
2db0aaa
Add tests for `utils.walk_with_depth`.
Jul 26, 2017
76c2e74
Change `WikiCorpus` default preprocessing to be the same as before. A…
Jul 27, 2017
4b08e6e
Add arguments to control filtering of extremes for dictionary buildin…
Jul 28, 2017
169f502
Remove trailing whitespace in `textcorpus` module.
Jul 28, 2017
93f19c5
Remove trailing whitespace in `textcorpus` module and change dictiona…
Jul 28, 2017
c467f71
Respond to PR review; change `getstream` contract to return unicode a…
Jul 30, 2017
fa6d940
Merge branch 'develop' into text_corpus_restructure
Sep 16, 2017
2cdb20c
Remove dependence of `textcorpus` on `word2vec` module to avoid circu…
Sep 16, 2017
4d01d01
Move test case from end of `stateful_pool` module to new `test_statef…
Sep 16, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 178 additions & 0 deletions gensim/corpora/stateful_pool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
"""
Python's builtin `multiprocessing.Pool` is designed to use stateless processes
for applying a function defined at the module scope. This means that `Process`
subclasses with run methods that differ from the default cannot be used with
the `Pool`. So when such a `Processes` subclass would be useful (often the
case for complicated run logic), a custom Pool implementation must be used.

This module provides a `StatefulProcessingPool` with an accompanying
`StatefulProcessor` class that can be overridden to provide stateful `run`
implementations.
"""
import multiprocessing as mp
from multiprocessing.pool import MaybeEncodingError, mapstar


class StatefulProcessor(mp.Process):
"""Process designed for use in `StatefulProcessingPool`.

The `target` function is ignored in favor of a `run` method that reads
from an input queue and writes to an output queue. The actual processing
of the input is delegated to the `process` method, which should be overridden
by subclasses.
"""

def __init__(self, group=None, target=None, name=None, args=(), kwargs={},
state_kwargs=None):
super(StatefulProcessor, self).__init__(group, target, name, args, kwargs)
state_kwargs = {} if state_kwargs is None else dict(state_kwargs)
self.__dict__.update(state_kwargs)

def process(self, text):
"""Process a document text; subclasses should override."""
raise NotImplementedError

def run(self):
inqueue, outqueue, initializer, initargs, maxtasks = self._args

assert maxtasks is None or (type(maxtasks) == int and maxtasks > 0)
put = outqueue.put
get = inqueue.get
if hasattr(inqueue, '_writer'):
inqueue._writer.close()
outqueue._reader.close()

if initializer is not None:
initializer(*initargs)

completed = 0
while maxtasks is None or (maxtasks and completed < maxtasks):
try:
task = get()
except (EOFError, OSError):
mp.util.debug('worker got EOFError or OSError -- exiting')
break

if task is None:
mp.util.debug('worker got sentinel -- exiting')
break

job, i, func, args, kwds = task
try:
if func is mapstar:
args = [(self.process, arg_list) for _, arg_list in args]
else:
func = self.process
result = (True, func(*args, **kwds))
except Exception as e:
result = (False, e)

try:
put((job, i, result))
except Exception as e:
wrapped = MaybeEncodingError(e, result[1])
mp.util.debug("Possible encoding error while sending result: %s" % (
wrapped))
put((job, i, (False, wrapped)))

task = job = result = func = args = kwds = None
completed += 1
mp.util.debug('worker exiting after %d tasks' % completed)


class _PatchedPool(mp.pool.Pool):
"""Patch the builtin `Pool` to take a `processor_class` and `state_kwargs`;

This allows users to use custom subclasses of `StatefulProcessor` to implement
custom stateful `run` logic. Instances of `processor_class` are used to populate
the worker pool; each is initialized using the standard `Process` init procedure,
followed by setting custom attributes from `state_kwargs`.
"""

def Process(self, *args, **kwds):
kwds['state_kwargs'] = self._state_kwargs
return self._processor_class(*args, **kwds)

def __init__(self, processes=None, initializer=None, initargs=(), maxtasksperchild=None,
state_kwargs=None, processor_class=StatefulProcessor):
self._state_kwargs = state_kwargs
self._processor_class = processor_class
super(_PatchedPool, self).__init__(processes, initializer, initargs, maxtasksperchild)

def _repopulate_pool(self):
"""Bring the number of pool processes up to the specified number,
for use after reaping workers which have exited. This is also used
to initially populate the pool on init.
"""
for i in range(self._processes - len(self._pool)):
w = self.Process(args=(
self._inqueue, self._outqueue,
self._initializer, self._initargs, self._maxtasksperchild))

self._pool.append(w)
w.name = w.name.replace('Process', 'PoolWorker')
w.daemon = True
w.start()
mp.util.debug('added worker')


class StatefulProcessingPool(object):
"""Pool that uses stateful worker processes; designed to worked with subclasses of
`StatefulProcessor`. Implements the same interface as `multiprocessing.Pool`.
"""

def __init__(self, processes=None, initializer=None, initargs=(),
maxtasksperchild=None, state_kwargs=None, processor_class=StatefulProcessor):
"""
See `multiprocessing.Pool` for arguments not discussed here.

Args:
state_kwargs (dict): dictionary of attributes to initialize worker processes
with. These will be set as attributes of the process object.
processor_class (class): subclass of `StatefulProcessor` to use for worker
processes.
"""
self._pool = _PatchedPool(
processes, initializer, initargs, maxtasksperchild, state_kwargs,
processor_class)

def apply(self, args=(), kwds={}):
"""Apply TextProcessor.process(*args, **kwds)."""
return self._pool.apply_async(None, args, kwds).get()

def map(self, iterable, chunksize=None):
"""Apply `TextProcessor.process` to each element in `iterable`, collecting the results
in a list that is returned.
"""
return self._pool.map(None, iterable, chunksize)

def imap(self, iterable, chunksize=1):
"""Equivalent of `map()` -- can be MUCH slower than `Pool.map()`."""
return self._pool.imap(None, iterable, chunksize)

def imap_unordered(self, iterable, chunksize=1):
"""Like `imap()` method but ordering of results is arbitrary."""
return self._pool.imap_unordered(None, iterable, chunksize)

def apply_async(self, args=(), kwds={}, callback=None):
"""Asynchronous version of `apply()` method."""
return self._pool.apply_async(None, args, kwds, callback)

def map_async(self, iterable, chunksize=None, callback=None):
"""Asynchronous version of `map()` method."""
return self._pool.map_async(None, iterable, chunksize, callback)

def close(self):
self._pool.close()

def terminate(self):
self._pool.terminate()

def join(self):
self._pool.join()

def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self._pool.terminate()
Loading