-
-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Restructure TextCorpus code to share multiprocessing and preprocessing logic. #1478
Conversation
@piskvorky @menshikh-iv I believe the build failures on this only have to do with importing in the main guard in the |
I tested out the WikiCorpus before and after on the full wikipedia corpus. I did this by building the Dictionary. My goal was to ensure the speed is comparable now to what was implemented before.
The speed is comparable. This implementation also performs deaccenting and stopword removal, which I suspect is why it takes a few minutes longer. The difference in number of documents comes from the removal of stopwords, which results in many more empty documents which are pruned. I think the difference in number of terms is due to the Dictionary pruning encountering different documents. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A quick shallow scan of coding style; I didn't have time to verify the actual logic.
self.init_state(state_kwargs) | ||
|
||
def init_state(self, state_kwargs): | ||
for name, value in state_kwargs.items(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not simply self.__dict__.update
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good suggestion; done
@@ -0,0 +1,160 @@ | |||
import multiprocessing as mp |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing file header.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added file header
gensim/corpora/textcorpus.py
Outdated
# So just split the token sequence arbitrarily into sentences of length | ||
# `max_sentence_length`. | ||
sentence, rest = [], b'' | ||
with utils.smart_open(self.source) as fin: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Best to open files in binary mode (rb
), and convert to text explicitly where needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The default is 'rb', but I updated to set mode explicitly to future-proof.
gensim/corpora/textcorpus.py
Outdated
break | ||
|
||
last_token = text.rfind(b' ') # last token may have been split in two... keep for next iteration | ||
words, rest = (text[:last_token].split(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No vertical indent -- please use hanging indent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
gensim/corpora/wikicorpus.py
Outdated
# no need to lowercase and unicode, because the tokenizer already does that. | ||
character_filters = [textcorpus.deaccent, textcorpus.strip_multiple_whitespaces] | ||
super(WikiCorpus, self).__init__(source, dictionary, metadata, character_filters, tokenizer, | ||
token_filters, processes) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No vertical indent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
gensim/test/test_corpora.py
Outdated
@@ -20,7 +19,7 @@ | |||
import numpy as np | |||
|
|||
from gensim.corpora import (bleicorpus, mmcorpus, lowcorpus, svmlightcorpus, | |||
ucicorpus, malletcorpus, textcorpus, indexedcorpus) | |||
ucicorpus, malletcorpus, indexedcorpus) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No vertical indent please.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
gensim/test/test_textcorpus.py
Outdated
|
||
def test_texts_file(): | ||
fpath = os.path.join(tempfile.gettempdir(), 'gensim_corpus.tst') | ||
with open(fpath, 'w') as f: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
smart_open
+ binary mode please.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this function was actually not being used, so I just removed it
gensim/test/test_textcorpus.py
Outdated
|
||
def corpus_from_lines(self, lines): | ||
fpath = tempfile.mktemp() | ||
with codecs.open(fpath, 'w', encoding='utf8') as f: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dtto.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed mode to 'wb'
gensim/utils.py
Outdated
@@ -1263,3 +1269,45 @@ def _iter_windows(document, window_size, copy=False, ignore_below_size=True): | |||
else: | |||
for doc_window in doc_windows: | |||
yield doc_window.copy() if copy else doc_window | |||
|
|||
|
|||
def walk_with_depth(top, topdown=True, onerror=None, followlinks=False, depth=0): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this really needed? The depth can be deduced easily from normal walk()
, by comparing the root directories.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a very good point; I replaced this with a wrapper on os.walk
that just deduces the depth in the manner you suggested.
This looks like a massive PR; are the changes 100% backward compatible? If not, what is the upgrade plan for users = how do they modify their existing code so it continues to work? |
util.debug('worker exiting after %d tasks' % completed) | ||
|
||
|
||
class _PatchedPool(mp.pool.Pool): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this for, what is being patched (and why)?
Needs a clear comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added documentation throughout this module to clarify. Some added context: when I initially implemented this refactor, I was serializing the token_filters
, tokenizer
, and character_filters
used in TextCorpus
for text preprocessing. This pickling overhead was causing a significant slowdown. So I wanted to include them in each worker process at startup to speed it up. Doing so ruled out the use of the builtin multiprocessing.Pool
class.
Rather than write a complicated custom pool, I decided that reuse via patching of the existing pool would be more robust and probably useful elsewhere in the code (for instance, in the text_analysis
module used by the probability_estimation
module). That is why this module came about.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aha, thanks, that's interesting. @gojomo @menshikh-iv can you please review this extended multiprocessing logic?
I'm curious whether others do it this way too, since this seems a very common use-case.
""" | ||
for i in range(self._processes - len(self._pool)): | ||
w = self.Process(args=(self._inqueue, self._outqueue, | ||
self._initializer, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No vertical indent in gensim.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
util.debug('added worker') | ||
|
||
|
||
class TextProcessingPool(object): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this wrapper for? Why not use the default Pool
?
I'd prefer to stick to built-ins, unless absolutely necessary. And if absolutely necessary, will need a better documentation describing the rationale.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added documentation with rationale. See comment on _PatchedPool
above for additional context.
@piskvorky I've addressed your review comments; thank you for the quick feedback! If I can add anything else to make your review of the logic easier or otherwise clarify things, I will gladly do so. |
gensim/utils.py
Outdated
path = os.path.abspath(top) | ||
for dirpath, dirnames, filenames in os.walk(path, topdown, onerror, followlinks): | ||
sub_path = dirpath.replace(path, '') | ||
depth = sub_path.count(os.sep) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this safe even with unnormalized paths (/tmp
vs /tmp/
etc)? How does walk
handle symlinks?
os.path.relpath
/commonprefix
may be safer, I'm not sure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
docs say this on symlinks:
By default, os.walk does not follow symbolic links to subdirectories on
systems that support them. In order to get this functionality, set the
optional argument 'followlinks' to true.
I did look at both of the functions you referenced (which I was not familiar with), but I believe the current code handles unnormalized paths correctly. I've added tests to verify this.
…logic from `WikiCorpus`.
…ove tests for text corpora classes to `test_textcorpus` module.
…that serializes all preprocessing functions once on initialization and then only passes the documents to the workers and the tokens back to the master.
…`textcorpus.walk` to `walk_with_depth` and move to `utils` module. Update tests and other referencing modules to adjust to the moves, resolving some circular references that arose in the process.
…r to provide multiprocessing and additional preprocessing options.
…agical ways. Also, adjust `LineSentence` default kwargs to use single process and allow other preprocessing options.
…om `TextDirectoryCorpus`.
…ove tests for text corpora classes to `test_textcorpus` module.
…that serializes all preprocessing functions once on initialization and then only passes the documents to the workers and the tokens back to the master.
…r to provide multiprocessing and additional preprocessing options.
6c12b54
to
2db0aaa
Compare
@piskvorky I believe this is fully backwards-compatible in terms of interfaces. The only thing I expect will be different is the default preprocessing used for the Also, I have updated the PR to address your most recent comments; thank you for your review. I believe you'd asked for thoughts from @gojomo and @menshikh-iv regarding the modified multiprocessing pool; I'm also curious to know if the approach I took here has been used elsewhere and if any alternative approaches might be more suitable for this problem. Thanks! |
path = os.path.abspath(top) | ||
for dirpath, dirnames, filenames in os.walk(path, topdown, onerror, followlinks): | ||
sub_path = dirpath.replace(path, '') | ||
depth = sub_path.count(os.sep) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This construct still makes me a little uneasy. Can we at least os.path.normpath
, to get rid of any double/trailing/leading slashes? Or does os.walk
normalize the dirpath
somehow? Although in that case, we'd have to normalize path
and dirpath
in exactly the same way, so that the .replace()
above works.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I see your concern. I think os.path.abspath
(called as the first line of that function) handles the situation you're worried about:
In [4]: os.path.abspath('/test/path/')
Out[4]: '/test/path'
In [5]: os.path.abspath('/test/path')
Out[5]: '/test/path'
In [6]: os.path.abspath('/test/path//')
Out[6]: '/test/path'
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if walk
hits a symlinked dir -- does it return dirpath
as a canonical path (de-sym-linked), or is path
still its prefix?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tree tmp
tmp
├── subdir
│ └── test
└── symlink -> subdir
2 directories, 1 file
In [56]: list(os.walk('tmp'))
Out[56]: [('tmp', ['subdir', 'symlink'], []), ('tmp/subdir', [], ['test'])]
In [58]: list(os.walk('tmp', followlinks=True))
Out[58]:
[('tmp', ['subdir', 'symlink'], []),
('tmp/subdir', [], ['test']),
('tmp/symlink', [], ['test'])]
@macks22 thanks!
Does the new code support custom tokenization / text normalization? That sounds really useful. Same defaults (backward compatibility), but allow injecting your own function to normalize and tokenize a text. We had a recent ticket where a Thai user complained our wiki processing returns rubbish. Which is 100% true -- not only do/did we not support custom text processing, we didn't even notice where our hardwired processing didn't make sense, and happily produced garbage output without any error/warning. |
…dd the `tokenizer` argument to allow users to override the default lemmatizer/tokenizer functions.
@piskvorky I had mainly made those changes so the preprocessing defaults would be as close to the default for the
|
Nice! Should |
@macks22 is there a way to reach you privately (email)? Please ping me at radim@rare-technologies.com. |
…g within `TextCorpus`. Update docstring for `TextCorpus` for new parameters. Convert `PathLineSentences` to a `TextDirectoryCorpus` subclass and adjust the tests to account for this.
@piskvorky Updated to improve the docstrings around the |
gensim/corpora/textcorpus.py
Outdated
logger.debug("sorting filepaths") | ||
paths = list(paths) | ||
paths.sort(key=lambda path: os.path.basename(path)) | ||
logger.debug("found {} files: {}".format(len(paths), paths)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The rest of the code uses C-style formatting -- best keep it consistent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The intention here was to get the auto-formatting for the list of paths, as opposed to having to do my own '[' + ', '.join(paths) + ']', which seemed much messier. Should I still change it do this instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All these formatting alternatives should work identically (call str/repr on their arguments), so I'm not sure what you mean. Are you seeing a difference?
One advantage of a C-style format is that the argument types will be immediately apparent to the reader (%d
and %s
or %r
in this case).
Unrelated: the arguments should be passed to logger.debug
as arguments, to avoid formatting the string in case the message is not emitted by logging (doesn't pass the log level threshold etc). We want to leave the string formatting (which can sometimes be expensive) for the last moment possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I see; I simply wasn't aware of '%r' as an option to get the repr
. Updated to use the C-style formatting.
gensim/corpora/textcorpus.py
Outdated
|
||
logging.info('files read into PathLineSentences:' + '\n'.join(self.input_files)) | ||
logger.debug("finished reading %d files", num_files) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not info
?
token_filters (iterable of callable): each will be applied to the iterable of tokens | ||
in order, and should return another iterable of tokens. These filters can add, | ||
remove, or replace tokens, or do nothing at all. The default token filters | ||
remove tokens less than 3 characters long and remove stopwords using the list | ||
in `gensim.parsing.preprocessing.STOPWORDS`. | ||
processes (int): number of processes to use for text preprocessing. The default is | ||
-1, which will use (number of virtual CPUs - 1) worker processes, in addition |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens when number of virtual CPUs == 1
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No worker pool is used; all preprocessing occurs in the master process. I've updated the docstring to inform on this.
gensim/corpora/textcorpus.py
Outdated
For Python 2, the original text will not be unicode, so it may be useful to | ||
convert to unicode as the first character filter. The default character filters | ||
lowercase, convert to unicode (strict utf8), perform ASCII-folding, then collapse | ||
For Python 2, the original text will not be unicode (unless you modify your |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I propose dropping the (unless ...
bracket. This is already very complicated as it is.
Also, why single out Python 2? Is the behaviour different between Python 2 vs Python 3?
If so, I'd consider that a bug.
Let's keep the API as simple as possible: getstream
returns unicode (no matter the Python version); all filters expect unicode.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done; I've put the unicode conversion in the master process. For the sake of speed, it may make sense to have getstream
return bytes in all versions, move the encoding parameters to the workers, and have them do the unicode conversion. Based on the ongoing Phrases refactor, that seems to be more of a bottleneck than I would've expected. Despite these considerations, I think it is sensible to do it in the master for the sake of simplicity for now.
gensim/corpora/textcorpus.py
Outdated
For Python 2, the original text will not be unicode (unless you modify your | ||
`getstream` method to convert it to unicode), so it may be useful to convert to | ||
unicode as the first character filter. The default character filters lowercase, | ||
convert to unicode (strict utf8), perform ASCII-folding, then collapse |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why lowercase before converting to unicode? Could lead to bugs for non-ASCII capitals.
gensim/corpora/textcorpus.py
Outdated
processes (int): number of processes to use for text preprocessing. The default is | ||
-1, which will use (number of virtual CPUs - 1) worker processes, in addition | ||
to the master process. If set to a number greater than the number of virtual | ||
CPUs available, the value will be reduced to (number of virtual CPUs - 1). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-1 on this: why override a user's explicit request?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you're right; this mistrust of users is not suitable for a Python code base! Modified to remove upper bounding
…ry filtering arguments to discard no tokens by default.
gensim/corpora/textcorpus.py
Outdated
@@ -552,7 +554,7 @@ def getstream(self): | |||
""" | |||
for path in self.iter_filepaths(): | |||
logging.debug("reading file: %s", path) | |||
with utils.smart_open(path) as f: | |||
with utils.smart_open(path, 'rt') as f: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks fragile. Best to always open files in binary mode rb
, and convert to text (unicode) explicitly, with an explicit encoding, where needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed to 'rb' followed by explicit unicode conversion
…nd add unicode decoding arguments to `TextCorpus`. Also open source files in 'rb' mode. Lowercase after deaccenting to prevent deaccent confusion. Do not upper bound the number of processes the user passes to `TextCorpus` constructor.
@piskvorky thank you for your many reviews! I believe I have addressed all your comments and requests for changes. From what I can tell, the build check failures are only due to the imports in the |
Thanks for all the fixes and good work :) I'll defer to @menshikh-iv for a final thorough review and decision (and fixing the unrelated build errors). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Beautiful work!
Please resolve merge conflict & fix small issues, this code LGTM.
gensim/corpora/stateful_pool.py
Outdated
self._pool.terminate() | ||
|
||
|
||
if __name__ == "__main__": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code isn't needed here (remove OR refactor it and add as test, it's better solution)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moved to test class
gensim/corpora/textcorpus.py
Outdated
else: | ||
yield f.read().strip() | ||
num_texts += 1 | ||
# endclass TextDirectoryCorpus |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No needed # endclass ...
, please remove it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@macks22 please pay attention to Appveyour problems, a lot of tests breaks, but all of this looks like one problem. |
Ping @macks22, what's a status here? |
Ping @macks22 |
@menshikh-iv I'm hoping to update this in the coming weeks. Having trouble finding time to put towards it on the weekends. I'm thinking to refactor it according to some discussion I had with @michaelwsherman in regards to #1506. He had proposed a decomposition of responsibilities into something like a |
@macks22 thanks for clarification, good luck :) |
@menshikh-iv hope all is well; I'm still working to find time to update this to fix the tests on Windows in the manner I described above. Hopefully next weekend. |
ping @macks22, have you time to finish this now? |
Ping @macks22, we are waiting you :) |
@menshikh-iv sorry for the latency in reply. I haven't had sufficient time to finish this. It's still on my Todo list, but TBH, I may not have time again until end of December holidays. |
Ping @macks22, December has come, I remind you of us :) |
ping @macks22, I remind you about PR :) |
I'm sorry, but I'm closing this PR. |
Sorry for the delay in responding; I have been busier than expected. I will try to re-open and finish when I can. |
Implements #1477.