-
-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[GSoC 2018] Multistream API for vocabulary building in *2vec #2078
[GSoC 2018] Multistream API for vocabulary building in *2vec #2078
Conversation
10 input streams
|
@gojomo Hi! I'd like to ask for your advice. There is one bug in the code in this PR related to |
Clearly incrementing-serial-numbers-per-line can't be used if you have multiple files that all start with line 0. You could require tags to be specified inside the files – as an extra field/fields each line. If the files are considered in a stable order, and the count of texts (lines) in each can be pre-determined (perhaps with another extra pass over all data), each could be given a unique starting-id. Text tags could also be a deterministic function of the hash of the contained text – for additional per-line overhead, and the extra cost of keeping those (now much longer) tags as string keys. |
gensim/models/base_any2vec.py
Outdated
if isinstance(sentences, GeneratorType): | ||
if multistream and not isinstance(sentences, (tuple, list)): | ||
raise TypeError("If multistream=True, you must pass tuple or list as the sentences argument.") | ||
if not multistream and isinstance(sentences, GeneratorType): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This check (about generators) should be applied for each multistream element too (i.e. to each stream)
gensim/models/base_any2vec.py
Outdated
If True, use `sentences` as list of input streams and speed up vocab building by parallelization | ||
with `min(len(sentences), self.workers)` processes. This option can lead up to 2.5x reduction | ||
in vocabulary building time. | ||
workers : int |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe better use different naming like multistream_workers
(for avoiding potential collision by parameter names) or this has no sense @persiyanov?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there is no point in this. In most cases, by setting workers
parameter user means the same degree of parallelization for both scan vocab (multiprocessing) and training the model (multithreading), IMO
gensim/models/base_any2vec.py
Outdated
@@ -213,6 +214,9 @@ def _train_epoch(self, data_iterable, cur_epoch=0, total_examples=None, | |||
for _ in xrange(self.workers) | |||
] | |||
|
|||
# Chain all input streams into one, because multistream training is not supported yet. | |||
if multistream: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better to raise warnings.warn
about lack of support (I mean explicitly to the user, not only comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, I don't think so:
We add new functionality, allowing users to use multistream
mode. We added this functionality to the gensim core, so we are sure that it works correctly. Why should we warn the user that something is wrong? Furthermore, user can't do anything to get rid of this warning message while continuing using multistream mode.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
About can't do anything - not true, see https://docs.python.org/2/library/warnings.html#temporarily-suppressing-warnings (that's FYI), but I agree with your point in this case.
gensim/models/base_any2vec.py
Outdated
update : bool | ||
If true, the new words in `sentences` will be added to model's vocab. | ||
progress_per : int | ||
Indicates how many words to process before showing/updating the progress. | ||
|
||
""" | ||
if workers is None: | ||
workers = self.workers |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
workers = workers or self.workers
, here and everywhere.
gensim/utils.py
Outdated
"""Merge `dict1` of (word, freq1) and `dict2` of (word, freq2) into `dict1` of (word, freq1+freq2). | ||
Parameters | ||
---------- | ||
dict1 : dict |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dict of (str, int)
, here and everywhere.
gensim/models/word2vec.py
Outdated
@@ -1203,7 +1280,7 @@ def sort_vocab(self, wv): | |||
wv.vocab[word].index = i | |||
|
|||
def prepare_vocab(self, hs, negative, wv, update=False, keep_raw_vocab=False, trim_rule=None, | |||
min_count=None, sample=None, dry_run=False): | |||
min_count=None, sample=None, dry_run=False, **kwargs): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why **kwargs
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
still a question
gensim/models/word2vec.py
Outdated
pool = multiprocessing.Pool(processes=min(workers, len(input_streams))) | ||
|
||
results = [ | ||
pool.apply_async(_scan_vocab_worker, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why apply_async
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What else?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
apply/map are blocking.
gensim/models/doc2vec.py
Outdated
|
||
corpus_count = document_no + 1 | ||
results = [res.get() for res in results] # pairs (vocab, doclen2tags) | ||
self.raw_vocab = reduce(utils.merge_dicts, [r[0] for r in results]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
from functools import reduce
, Guido doesn't like reduce
function, for this reason reduce
was hidden in python3.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@persiyanov great work! Please add tests & merge fresh @gojomo problem mostly defined by user-behavior, i.e. if I (as gensim-user) heard about new multistream feature and want to try it with d2v - I'll use something like |
hooray! Tests are passing now. I will update the description of the PR in a proper way (what's the feature, how it helps, how to use it etc.) |
If it works, great, but I'm a little uncomfortable with the manner-of-activation: the Ultimately, even if not right away, I'd prefer a mechanism where people opting-into an alternative-vocabulary-building-optimization use some other explicit code path, not complicating existing interfaces, then drop a completed vocabulary-object into the Word2Vec/etc model. Then the simple/legacy path stays simple, the new alternative is clearly opted-into, and the point-of-entanglement is limited to a single injection of what the main model needs to proceed – which also allows the main model to be indifferent as to the specifics of how its individual steps were completed. That could also allow further experimentation with other vocab-building approaches, without adding more switches-and-branches to a single entry point every time. (That there's only a 2.5X-3X speedup in moving from 1 to 10 threads makes me think further improvements may be possible.) |
@gojomo Could you give some examples on what you mean by
? Something like Why do you think the interface I proposed is complicated? It's just toggling one flag ( Is it okay to merge this PR in its current state (after addressing @piskvorky comments) and possibly change the interfaces and API in the future (according to new research & results in multistream training direction) if needed? |
@persiyanov Yes, your example utility function The switches-and-branches approach works, and if there's just one alt mode, not too complex. But, it's expanding some (already-gigantic) method-parameter lists, and some already-twisty methods - making them harder to read/maintain/expand in the future. If just a switch, the docs for this mode will be interleaved with long lists of other options – which may tend to tempt those who don't quite need this, but shortchange those who do want full descriptions. Having the docs on a specialized function or class may be more clear – and better able to discuss subtleties like the potentially different memory requirements of this approach, or its sensitivity to the various parallel streams being of roughly-equal size. Also, adding new switches for new approaches in the future, which might or might not be compatible with each other, will be harder than having a separate, swappable step that hides its own details and just leaves things in a proper state. For example, there's another effort in #1962 to offer the approximate-counting The preexisting and not-so-flexible tangle of the *2Vec initialization steps long predates your work, so it may be outside your project to fully fix it, but I'm wary of making the tangle any thicker. |
gensim/utils.py
Outdated
@@ -1712,7 +1712,7 @@ def prune_vocab(vocab, min_reduce, trim_rule=None): | |||
|
|||
def trim_vocab_by_freq(vocab, topk, trim_rule=None): | |||
"""Retain `topk` most frequent words in `vocab`. | |||
If there are more words with the same frequency as `topk`-th one, they will be dropped. | |||
If there are more words with the same frequency as `topk`-th one, they will be keeped. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
keeped
=> kept
gensim/models/base_any2vec.py
Outdated
"""Build vocabulary from a sequence of sentences (can be a once-only generator stream). | ||
Each sentence is a iterable of iterables (can simply be a list of unicode strings too). | ||
|
||
Parameters | ||
---------- | ||
sentences : iterable of iterables | ||
sentences : {iterable of iterables, list or tuple of iterable of iterables} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd prefer not to override the same parameter with a different type (with the type dependant on an external flag).
It's already complicated enough; iterable of iterables, or a list or tuple of iterable of iterables
is no longer humanly parseable.
My original suggestion was to have the input be a sequence of iterables always. For a single stream, simply an iterable of length one. If backward compatibility is hard to achieve (is it?), keep the legacy parameter sentences
(maybe deprecate it in time) and promote a new parameter multistream
. If backward compatibility can be achieved transparently, then just keep using sentences
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct me if I got you wrong, but you propose smth like this:
Word2Vec(input_streams=None, sentences=None ...)
(instead of old Word2Vec(sentences=None, ...)
)
with the following logic:
- both
sentences
andinput_streams
can't be passed - if
sentences
is passed, then doinput_streams = [sentences]
and go to (3) - if
input_streams
is passed, then perform multistream mode
Am I correct about your logic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. I originally thought users would only pass sentences
, and we decide automatically during input validation whether it's a single stream (legacy, promote to sentences = [sentences]
transparently) or already multiple streams. And keep only that single parameter.
But introducing extra input_streams
parameter may be a cleaner option. More explicit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, I'll implement the logic in this way, sounds good
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@piskvorky Currently, in a majority of methods (almost all except constructor) the sentences
parameter is required, not optional. If we introduce new input_streams
param, both of them need to be optional.
This requires us to do some extra checks e.g. assert sentences is not None or input_streams is not None
in all of the methods. It sounds a bit dirty
What do you think about that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dictionary building is a part of the training process. The same N < M or N > M questions apply there.
I'm still unclear what happens when N != M
, even during vocabulary building. Or do we force N==M
always? If so and both N and M are specified by user (number of streams and number of workers), which one takes precedence?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, we don't force anything. I just use multiprocessing.Pool
, put len(input_streams)
tasks in it and that's all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://github.com/RaRe-Technologies/gensim/pull/2078/files#diff-673fdc31e9aae23291039b143f451b9eR1236 here is the exact logic I use when creating the pool
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Small clarification: this is only for dictionary building (see PR title), training process stay as is, the goal of PR is parallelized vocab building only.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, so the distribution of N tasks among M workers is managed by multiprocessing.Pool
(at least during this training stage), got it.
@piskvorky pls, take a look. I've addressed your comments |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Questions around handling memory.
|
||
progress_queue.put((total_words, sentence_no + 1)) | ||
progress_queue.put(None) | ||
return vocab |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will try to pickle the entire result (huge!) and send it back to master, where it is queued internally in the results queue, right?
Can you think of a way to use shared memory instead?
That should improve both memory (in a big way) as well as maybe performance (some extra locking, but less data sending).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's an interesting idea, but some tricky questions arise:
If we use shared manager.dict()
, how will we perform utils.prune_vocab(vocab, max_vocab_size)
? Some threads may start to shrink the vocabulary in parallel at the same time, and that's no good as I see.
One way to solve this is to perform pruning from master process, but it will add some synchronization barriers (all threads must wait until master will prune the vocab).
Another way I can propose is to disallow to use multistream mode for people with low RAM (who sets max_vocab_size
parameter) -- this can save us from inventing tricky hacks in order to properly prune the vocab (it will help us in two places -- both in current multistream implementation with max_vocab_size / workers
and with using shared manager.dict()
)
What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@piskvorky ping
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I missed this notification.
My thoughts: you mean processes, not threads, right? We're using multiprocessing here.
Shrinking could be trickier, I agree. Maybe have a world-freezing lock around that operation: all workers pause while shrinking under way? Hopefully not that frequent?
Disabling pruning with multistream is also an elegant solution :-) Although people with big corpora (~huge dicts) are exactly the people who need fast training, so that solution kinda throws the baby out with the bath water.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pausing all workers while doing shrinking -- sounds very slow. Why are we sure that shrinking is performed not that frequent?
Do you want me to implement this kind of logic and benchmark it? Then decide which logic we will merge into develop branch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, strange. Must be due to the locking, right?
@persiyanov Any way to turn locking off (since we don't care about accurate counts, especially in the high-frequency tokens where collisions are more likely)?
-1 on experiments with disk-spilling, at least inside this project. We don't have the capacity for that here, this is just an aside in the "faster dict building" sub-project, which is already looking good enough.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@piskvorky Yes, I bet this happens because of locks.
Any way to turn locking off?
I can only see dirty ways to do that, e.g. make nogil Cython analogue for defaultdict(int)
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. Have we exhausted ideas for memory sharing? Anything else we could try easily?
If not, are we finished with parallelized dict-building?
Please add concrete final timings and insights into the PR description (instead of substantially reduce vocabulary building time
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@piskvorky done, see the updated description
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made some minor language edits, looks good :) Thanks!
total_words += num_words | ||
total_sentences += num_sentences | ||
|
||
self.raw_vocab = reduce(utils.merge_counts, [res.get() for res in results]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does .get()
create another copy of each (potentially large) dictionary, or only return a reference to an already existing object?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only returns the reference, as far as I know
gensim/models/base_any2vec.py
Outdated
@@ -330,7 +330,37 @@ def _log_epoch_progress(self, progress_queue, job_queue, cur_epoch=0, total_exam | |||
|
|||
def _train_epoch(self, data_iterable=None, data_iterables=None, cur_epoch=0, total_examples=None, | |||
total_words=None, queue_factor=2, report_delay=1.0): | |||
"""Train one epoch.""" | |||
"""Train the model for a single epoch. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a comment about the doc-improvements here, which all look good, but a side observation about this method, which I only noticed during related reviews last week, is that its strategy of re-launching a fresh 'producer' thread and fresh 'worker' threads for each epoch, as I believe was introduced in #1777, likely drives down overall throughput and CPU utilization compared to the prior strategy. The one potential advantage I'd see for adopting such a full-sync teardown&restart between epochs would be allowing the user to specify some callback for mid-training reporting at each epoch's end – but that hasn't yet ben added.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gojomo why would that drive down throughput & CPU utilization?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When some threads have finished an epoch, but others haven't, cores will be idle not because of GIL/etc but because there's no thread even trying to move forward onto the next epoch's data. Plus any overhead of re-launching threads (magnitude unclear). Old strategy launched exactly workers + 1
threads. This one launches epochs * (workers + 1)
threads.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand correctly, you're worried that at the end of each epoch, some threads may be idle (while other threads are finishing their last job) until the next epoch starts.
Isn't that idleness infinitesimal, since any single job takes almost no time at all? I may be missing something but this type of delay shouldn't be even measurable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure of the magnitude, only the direction: this means more idle cores, every time an epoch rolls over. Of course only measuring could tell for sure, and the proportionate impact becomes smaller with larger corpuses.
As it's not yet clear to me the relative interplay of existing GIL/queue sync bottlenecks that have been preventing higher throughput (or else I would have tried more to fix them), adding yet more thread launches/waits/syncs-against-a-not-yet-filled-queue is something I'd have been wary of doing without measurement at the time. Even the coarse once-a-second progress logging tended to show slower throughput at the beginning of training; that slow-start might now be repeated at each epoch - for example, via GIL-contention between the 1st few new worker threads getting a job, and the new producer thread, trying to get ahead of the workers again.
@piskvorky Are you OK with this PR? If so, let's merge it. I need this changes for my next pull request related to multistream training. |
Yeah, I think so. I don't see any outstanding unresolved comments/issues. @menshikh-iv ? |
This PR introduces multi-stream API for building vocabularies in *2Vec models.
Main idea
Training *2vec models involves two main steps:
This pull request optimizes the first step (dictionary building).
The main idea is to parallelize the
scan_vocab
method, by allowing users to pass several streams of data (i.e. severalLineSentence
iterators for different files). Under the hood, the implementation usesmultiprocessing
python module creatingmultiprocessing.Pool(processes=min(workers, len(input_streams)))
to read and process all input streams in parallel.Why use it?
This new feature substantially reduces the vocabulary building time, especially if you have complex data iterators (more CPU-bound, i.e. reading a line from a file, then doing complex preprocessing, removing stop words, lemmatizing, stemming etc).
In our benchmarks below, multi-stream mode produces up to 3x boost in the vocabulary building step. This is significant because this step can take hours for larger corpora (e.g. on https://dumps.wikimedia.org/enwiki/20180320/ :)
The more CPU-complex data iterators you use, the more speedup you will get. For example, I've experimented with building vocab for
Word2Vec
model in two setups, without preprocessing and with preprocessing:gensim.models.word2vec.LineSentence
, which just reads a line from disk and splits it on whitespace. I ran this benchmark on already preprocessed dataset. Using multi-stream resulted in 2.7x speedup.gensim.parsing.preprocessing.preprocess_string
. In this case, multistream mode has provided 6.9x speedup.How to use it?
It's really simple to start using the multi-stream mode. All you need is to have multiple document streams (training corpora, e.g. files on disk) you want to train on, and pass them as a list into the new
input_streams
constructor parameter.input_streams
supersedes the oldsentences
parameter which is now equivalent toinput_streams = [sentences]
:Old single stream approach
New multi-stream approach
The examples above used
Word2Vec
, butDoc2Vec
andFastText
work analogously.