Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GSoC 2018] Multistream API for vocabulary building in *2vec #2078

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
92e6e22
multistream scan vocab for doc2vec, word2vec & fastText
Jun 4, 2018
2618a2e
fixes
Jun 4, 2018
7960af8
fix tags for doc2vec
Jun 4, 2018
b8da97a
fix tests
Jun 4, 2018
16be716
removed benchmark vocab
Jun 4, 2018
c2d674a
addressing comments
Jun 7, 2018
85e689c
make interfaces and documentation more pretty
Jun 7, 2018
0d5ae38
add word2vec multistream tests
Jun 7, 2018
df3ae5f
fix pep8
Jun 8, 2018
49357cb
iteritems -> items
Jun 8, 2018
0365eea
more precise test
Jun 8, 2018
812ab8c
add doc2vec tests
Jun 8, 2018
f11f44d
add fasttext tests
Jun 8, 2018
941dfd8
remove prints
Jun 8, 2018
36e7238
fix seed=42
Jun 8, 2018
fa57f7a
fixed tests
Jun 8, 2018
9ea007d
add build_vocab test for fasttext
Jun 8, 2018
aec68ea
fix
Jun 8, 2018
07f3fd4
change size from 10 to 5 in fasttext test because of appveyor memory …
Jun 8, 2018
8b49fb8
another test with memory error
Jun 8, 2018
d0c11d9
fix py3 tests
Jun 8, 2018
5974448
fix iteritems for py3
Jun 8, 2018
1419847
fix functools reduce
Jun 8, 2018
280e826
addressing comments
Jun 12, 2018
7d489f4
addressing @jayantj comments
Jun 13, 2018
49a1ee6
fix language
Jun 13, 2018
1cbad7f
add final vocab pruning in multistream modes
Jun 13, 2018
d024625
keys -> iterkeys
Jun 14, 2018
5e4de19
use heapq.nlargest
Jun 15, 2018
74e7b02
fix
Jun 15, 2018
0d12d8b
multistream flag to input_streams param
Jun 19, 2018
25d00cd
fix tests
Jun 19, 2018
2281265
fix flake 8
Jun 19, 2018
543a9e0
fix doc2vec docstrings
Jun 19, 2018
d520d68
fix merging streams
Jun 19, 2018
d11a0b8
fix doc2vec
Jun 19, 2018
ecd8f39
max_vocab_size -> max_vocab_size / workers
Jun 19, 2018
a96d5a4
fixed
Jun 19, 2018
0a327b0
/ -> // (py3 division)
Jun 19, 2018
62873fb
fix
Jun 19, 2018
5f61219
Merge branch 'develop' into feature/gsoc-multistream-vocab
Jun 20, 2018
c67f964
fix docstring
Jun 20, 2018
a16cec0
Merge branch 'develop' into feature/gsoc-multistream-vocab
Jun 24, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 32 additions & 2 deletions gensim/models/base_any2vec.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,37 @@ def _log_epoch_progress(self, progress_queue, job_queue, cur_epoch=0, total_exam

def _train_epoch(self, data_iterable=None, data_iterables=None, cur_epoch=0, total_examples=None,
total_words=None, queue_factor=2, report_delay=1.0):
"""Train one epoch."""
"""Train the model for a single epoch.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a comment about the doc-improvements here, which all look good, but a side observation about this method, which I only noticed during related reviews last week, is that its strategy of re-launching a fresh 'producer' thread and fresh 'worker' threads for each epoch, as I believe was introduced in #1777, likely drives down overall throughput and CPU utilization compared to the prior strategy. The one potential advantage I'd see for adopting such a full-sync teardown&restart between epochs would be allowing the user to specify some callback for mid-training reporting at each epoch's end – but that hasn't yet ben added.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gojomo why would that drive down throughput & CPU utilization?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When some threads have finished an epoch, but others haven't, cores will be idle not because of GIL/etc but because there's no thread even trying to move forward onto the next epoch's data. Plus any overhead of re-launching threads (magnitude unclear). Old strategy launched exactly workers + 1 threads. This one launches epochs * (workers + 1) threads.

Copy link
Owner

@piskvorky piskvorky Jun 21, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, you're worried that at the end of each epoch, some threads may be idle (while other threads are finishing their last job) until the next epoch starts.

Isn't that idleness infinitesimal, since any single job takes almost no time at all? I may be missing something but this type of delay shouldn't be even measurable.

Copy link
Collaborator

@gojomo gojomo Jun 21, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure of the magnitude, only the direction: this means more idle cores, every time an epoch rolls over. Of course only measuring could tell for sure, and the proportionate impact becomes smaller with larger corpuses.

As it's not yet clear to me the relative interplay of existing GIL/queue sync bottlenecks that have been preventing higher throughput (or else I would have tried more to fix them), adding yet more thread launches/waits/syncs-against-a-not-yet-filled-queue is something I'd have been wary of doing without measurement at the time. Even the coarse once-a-second progress logging tended to show slower throughput at the beginning of training; that slow-start might now be repeated at each epoch - for example, via GIL-contention between the 1st few new worker threads getting a job, and the new producer thread, trying to get ahead of the workers again.


Parameters
----------
data_iterable : iterable of list of object
The input corpus. This will be split in chunks and these chunks will be pushed to the queue.
data_iterables : iterable of iterables of list of object
The iterable of input streams like `data_iterable`. Use this parameter in multistream mode.
cur_epoch : int, optional
The current training epoch, needed to compute the training parameters for each job.
For example in many implementations the learning rate would be dropping with the number of epochs.
total_examples : int, optional
Count of objects in the `data_iterator`. In the usual case this would correspond to the number of sentences
in a corpus, used to log progress.
total_words : int, optional
Count of total objects in `data_iterator`. In the usual case this would correspond to the number of raw
words in a corpus, used to log progress.
queue_factor : int, optional
Multiplier for size of queue -> size = number of workers * queue_factor.
report_delay : float, optional
Number of seconds between two consecutive progress report messages in the logger.

Returns
-------
(int, int, int)
The training report for this epoch consisting of three elements:
* Size of data chunk processed, for example number of sentences in the corpus chunk.
* Effective word count used in training (after ignoring unknown words and trimming the sentence length).
* Total word count used in training.

"""
self._check_input_data_sanity(data_iterable, data_iterables)
job_queue = Queue(maxsize=queue_factor * self.workers)
progress_queue = Queue(maxsize=(queue_factor + 1) * self.workers)
Expand Down Expand Up @@ -762,7 +792,7 @@ def build_vocab(self, sentences=None, input_streams=None, workers=None, update=F
Can be simply a list of lists of tokens, but for larger corpora,
consider an iterable that streams the sentences directly from disk/network.
See :class:`~gensim.models.word2vec.BrownCorpus`, :class:`~gensim.models.word2vec.Text8Corpus`
or :class:`~gensim.models.word2vec.LineSentence` in :mod:`~gensim.models.word2vec` module for such examples.
or :class:`~gensim.models.word2vec.LineSentence` module for such examples.
input_streams : list or tuple of iterable of iterables
The tuple or list of `sentences`-like arguments. Use it if you have multiple input streams. It is possible
to process streams in parallel, using `workers` parameter.
Expand Down
109 changes: 88 additions & 21 deletions gensim/models/doc2vec.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,26 +402,59 @@ class Doc2Vec(BaseWordEmbeddingsModel):
"""Class for training, using and evaluating neural networks described in
`Distributed Representations of Sentences and Documents <http://arxiv.org/abs/1405.4053v2>`_.

Some important internal attributes are the following:

Attributes
----------
wv : :class:`~gensim.models.keyedvectors.Word2VecKeyedVectors`
This object essentially contains the mapping between words and embeddings. After training, it can be used
directly to query those embeddings in various ways. See the module level docstring for examples.

docvecs : :class:`~gensim.models.keyedvectors.Doc2VecKeyedVectors`
This object contains the paragraph vectors. Remember that the only difference between this model and
:class:`~gensim.models.word2vec.Word2Vec` is that besides the word vectors we also include paragraph embeddings
to capture the paragraph.

In this way we can capture the difference between the same word used in a different context.
For example we now have a different representation of the word "leaves" in the following two sentences ::

1. Manos leaves the office every day at 18:00 to catch his train
2. This season is called Fall, because leaves fall from the trees.

In a plain :class:`~gensim.models.word2vec.Word2Vec` model the word would have exactly the same representation
in both sentences, in :class:`~gensim.models.doc2vec.Doc2Vec` it will not.

vocabulary : :class:`~gensim.models.doc2vec.Doc2VecVocab`
This object represents the vocabulary (sometimes called Dictionary in gensim) of the model.
Besides keeping track of all unique words, this object provides extra functionality, such as
sorting words by frequency, or discarding extremely rare words.

trainables : :class:`~gensim.models.doc2vec.Doc2VecTrainables`
This object represents the inner shallow neural network used to train the embeddings. The semantics of the
network differ slightly in the two available training modes (CBOW or SG) but you can think of it as a NN with
a single projection and hidden layer which we train on the corpus. The weights are then used as our embeddings
The only addition to the underlying NN used in :class:`~gensim.models.word2vec.Word2Vec` is that the input
includes not only the word vectors of each word in the context, but also the paragraph vector.

"""
def __init__(self, documents=None, input_streams=None, dm_mean=None, dm=1, dbow_words=0, dm_concat=0,
dm_tag_count=1, docvecs=None, docvecs_mapfile=None, comment=None, trim_rule=None, callbacks=(),
**kwargs):
"""Initialize the model from an iterable of `documents`. Each document is a
TaggedDocument object that will be used for training.
"""

Parameters
----------
documents : {iterable of iterables, list or tuple of iterable of iterables}
The `documents` iterable can be simply a list of TaggedDocument elements, but for larger corpora,
consider an iterable that streams the documents directly from disk/network.
If you don't supply `documents`, the model is left uninitialized -- use if
you plan to initialize it in some other way.
documents : iterable of list of :class:`~gensim.models.doc2vec.TaggedDocument`, optional
Input corpus, can be simply a list of elements, but for larger corpora,consider an iterable that streams
the documents directly from disk/network. If you don't supply `documents`, the model is
left uninitialized -- use if you plan to initialize it in some other way.
input_streams : list or tuple of iterable of iterables
The tuple or list of `documents`-like arguments. Use it if you have multiple input streams. It is possible
to process streams in parallel, using `workers` parameter.
dm : int {1,0}
dm : {1,0}, optional
Defines the training algorithm. If `dm=1`, 'distributed memory' (PV-DM) is used.
Otherwise, `distributed bag of words` (PV-DBOW) is employed.
size : int
size : int, optional
Dimensionality of the feature vectors.
window : int, optional
The maximum distance between the current and predicted word within a sentence.
Expand Down Expand Up @@ -656,15 +689,14 @@ def train(self, documents=None, input_streams=None, total_examples=None, total_w

Parameters
----------
documents : iterable of iterables
The `documents` iterable can be simply a list of TaggedDocument elements, but for larger corpora,
consider an iterable that streams the documents directly from disk/network.
See :class:`~gensim.models.doc2vec.TaggedBrownCorpus` or :class:`~gensim.models.doc2vec.TaggedLineDocument`
in :mod:`~gensim.models.doc2vec` module for such examples.
documents : iterable of list of :class:`~gensim.models.doc2vec.TaggedDocument`
Can be simply a list of elements, but for larger corpora,consider an iterable that streams
the documents directly from disk/network. If you don't supply `documents`, the model is
left uninitialized -- use if you plan to initialize it in some other way.
input_streams : list or tuple of iterable of iterables
The tuple or list of `documents`-like arguments. Use it if you have multiple input streams. It is possible
to process streams in parallel, using `workers` parameter.
total_examples : int
total_examples : int, optional
Count of sentences.
total_words : int, optional
Count of raw words in documents.
Expand Down Expand Up @@ -975,19 +1007,17 @@ def build_vocab(self, documents=None, input_streams=None, update=False, progress

Parameters
----------
documents : {iterable of iterables, list or tuple of iterable of iterables}
The `documents` iterable can be simply a list of TaggedDocument elements, but for larger corpora,
documents : iterable of list of :class:`~gensim.models.doc2vec.TaggedDocument`
Can be simply a list of :class:`~gensim.models.doc2vec.TaggedDocument` elements, but for larger corpora,
consider an iterable that streams the documents directly from disk/network.
See :class:`~gensim.models.doc2vec.TaggedBrownCorpus` or :class:`~gensim.models.doc2vec.TaggedLineDocument`
in :mod:`~gensim.models.doc2vec` module for such examples.
input_streams : list or tuple of iterable of iterables
The tuple or list of `documents`-like arguments. Use it if you have multiple input streams. It is possible
to process streams in parallel, using `workers` parameter.
progress_per : int
Indicates how many words to process before showing/updating the progress.
update : bool
If true, the new words in `sentences` will be added to model's vocab.
in :mod:`~gensim.models.doc2vec` module for such examples.
progress_per : int
Indicates how many words to process before showing/updating the progress.
keep_raw_vocab : bool
If not true, delete the raw vocabulary after the scaling is done and free up RAM.
trim_rule : function, optional
Expand All @@ -998,9 +1028,16 @@ def build_vocab(self, documents=None, input_streams=None, update=False, progress
:attr:`gensim.utils.RULE_DISCARD`, :attr:`gensim.utils.RULE_KEEP` or :attr:`gensim.utils.RULE_DEFAULT`.
The rule, if given, is only used to prune vocabulary during current method call and is not stored as part
of the model.
The input parameters are of the following types:
* `word` (str) - the word we are examining
* `count` (int) - the word's frequency count in the corpus
* `min_count` (int) - the minimum count threshold.

workers : int
Used if `input_streams` is passed. Determines how many processes to use for vocab building.
Actual number of workers is determined by `min(len(input_streams), workers)`.
**kwargs
Additional key word arguments passed to the internal vocabulary construction.

"""
workers = workers or self.workers
Expand Down Expand Up @@ -1237,6 +1274,36 @@ def _scan_vocab_singlestream(self, documents, docvecs, progress_per, trim_rule):

def scan_vocab(self, documents=None, input_streams=None, docvecs=None, progress_per=10000, workers=None,
trim_rule=None):
"""Create the models Vocabulary: A mapping from unique words in the corpus to their frequency count.

Parameters
----------
documents : iterable of :class:`~gensim.models.doc2vec.TaggedDocument`
The tagged documents used to create the vocabulary. Their tags can be either str tokens or ints (faster).
docvecs : list of :class:`~gensim.models.keyedvectors.Doc2VecKeyedVectors`
The vector representations of the documents in our corpus. Each of them has a size == `vector_size`.
progress_per : int
Progress will be logged every `progress_per` documents.
trim_rule : function, optional
Vocabulary trimming rule, specifies whether certain words should remain in the vocabulary,
be trimmed away, or handled using the default (discard if word count < min_count).
Can be None (min_count will be used, look to :func:`~gensim.utils.keep_vocab_item`),
or a callable that accepts parameters (word, count, min_count) and returns either
:attr:`gensim.utils.RULE_DISCARD`, :attr:`gensim.utils.RULE_KEEP` or :attr:`gensim.utils.RULE_DEFAULT`.
The rule, if given, is only used to prune vocabulary during
:meth:`~gensim.models.doc2vec.Doc2Vec.build_vocab` and is not stored as part of the model.

The input parameters are of the following types:
* `word` (str) - the word we are examining
* `count` (int) - the word's frequency count in the corpus
* `min_count` (int) - the minimum count threshold.

Returns
-------
(int, int)
Tuple of (Total words in the corpus, number of documents)

"""
logger.info("collecting all words and their counts")
if input_streams is None:
total_words, corpus_count = self._scan_vocab_singlestream(documents, docvecs, progress_per, trim_rule)
Expand Down
33 changes: 16 additions & 17 deletions gensim/models/fasttext.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,8 @@ def __init__(self, sentences=None, input_streams=None, sg=0, hs=0, size=100, alp

Parameters
----------
sentences : {iterable of iterables, list or tuple of iterable of iterables}
The `sentences` iterable can be simply a list of lists of tokens, but for larger corpora,
sentences : iterable of list of str, optional
Can be simply a list of lists of tokens, but for larger corpora,
consider an iterable that streams the sentences directly from disk/network.
See :class:`~gensim.models.word2vec.BrownCorpus`, :class:`~gensim.models.word2vec.Text8Corpus`
or :class:`~gensim.models.word2vec.LineSentence` in :mod:`~gensim.models.word2vec` module for such examples.
Expand All @@ -259,11 +259,11 @@ def __init__(self, sentences=None, input_streams=None, sg=0, hs=0, size=100, alp
input_streams : list or tuple of iterable of iterables
The tuple or list of `sentences`-like arguments. Use it if you have multiple input streams. It is possible
to process streams in parallel, using `workers` parameter.
sg : int {1, 0}
Defines the training algorithm. If 1, skip-gram is used, otherwise, CBOW is employed.
size : int
Dimensionality of the feature vectors.
window : int
min_count : int, optional
The model ignores all words with total frequency lower than this.
size : int, optional
Dimensionality of the word vectors.
window : int, optional
The maximum distance between the current and predicted word within a sentence.
workers : int, optional
Use these many worker threads to train the model (=faster training with multicore machines).
Expand Down Expand Up @@ -344,7 +344,6 @@ def __init__(self, sentences=None, input_streams=None, sg=0, hs=0, size=100, alp
>>> say_vector = model['say'] # get vector for word
>>> of_vector = model['of'] # get vector for out-of-vocab word


"""
self.load = call_on_class_only
self.load_fasttext_format = call_on_class_only
Expand Down Expand Up @@ -431,6 +430,10 @@ def build_vocab(self, sentences=None, input_streams=None, update=False, progress
input_streams : list or tuple of iterable of iterables
The tuple or list of `sentences`-like arguments. Use it if you have multiple input streams. It is possible
to process streams in parallel, using `workers` parameter.
update : bool
If true, the new words in `sentences` will be added to model's vocab.
progress_per : int
Indicates how many words to process before showing/updating the progress.
keep_raw_vocab : bool
If not true, delete the raw vocabulary after the scaling is done and free up RAM.
trim_rule : function, optional
Expand All @@ -439,21 +442,17 @@ def build_vocab(self, sentences=None, input_streams=None, update=False, progress
Can be None (min_count will be used, look to :func:`~gensim.utils.keep_vocab_item`),
or a callable that accepts parameters (word, count, min_count) and returns either
:attr:`gensim.utils.RULE_DISCARD`, :attr:`gensim.utils.RULE_KEEP` or :attr:`gensim.utils.RULE_DEFAULT`.
Note: The rule, if given, is only used to prune vocabulary during build_vocab() and is not stored as part
of the model.
progress_per : int
Indicates how many words to process before showing/updating the progress.
update : bool
If true, the new words in `sentences` will be added to model's vocab.
workers : int
Used if `input_streams` is passed. Determines how many processes to use for vocab building.
Actual number of workers is determined by `min(len(input_streams), workers)`.
The rule, if given, is only used to prune vocabulary during
:meth:`~gensim.models.fasttext.FastText.build_vocab` and is not stored as part of the model.

The input parameters are of the following types:
* `word` (str) - the word we are examining
* `count` (int) - the word's frequency count in the corpus
* `min_count` (int) - the minimum count threshold.

workers : int
Used if `input_streams` is passed. Determines how many processes to use for vocab building.
Actual number of workers is determined by `min(len(input_streams), workers)`.
**kwargs
Additional key word parameters passed to
:meth:`~gensim.models.base_any2vec.BaseWordEmbeddingsModel.build_vocab`.
Expand Down
Loading
You are viewing a condensed version of this merge commit. You can view the full changes here.