diff --git a/gensim/corpora/stateful_pool.py b/gensim/corpora/stateful_pool.py new file mode 100644 index 0000000000..912997bd48 --- /dev/null +++ b/gensim/corpora/stateful_pool.py @@ -0,0 +1,178 @@ +""" +Python's builtin `multiprocessing.Pool` is designed to use stateless processes +for applying a function defined at the module scope. This means that `Process` +subclasses with run methods that differ from the default cannot be used with +the `Pool`. So when such a `Processes` subclass would be useful (often the +case for complicated run logic), a custom Pool implementation must be used. + +This module provides a `StatefulProcessingPool` with an accompanying +`StatefulProcessor` class that can be overridden to provide stateful `run` +implementations. +""" +import multiprocessing as mp +from multiprocessing.pool import MaybeEncodingError, mapstar + + +class StatefulProcessor(mp.Process): + """Process designed for use in `StatefulProcessingPool`. + + The `target` function is ignored in favor of a `run` method that reads + from an input queue and writes to an output queue. The actual processing + of the input is delegated to the `process` method, which should be overridden + by subclasses. + """ + + def __init__(self, group=None, target=None, name=None, args=(), kwargs={}, + state_kwargs=None): + super(StatefulProcessor, self).__init__(group, target, name, args, kwargs) + state_kwargs = {} if state_kwargs is None else dict(state_kwargs) + self.__dict__.update(state_kwargs) + + def process(self, text): + """Process a document text; subclasses should override.""" + raise NotImplementedError + + def run(self): + inqueue, outqueue, initializer, initargs, maxtasks = self._args + + assert maxtasks is None or (type(maxtasks) == int and maxtasks > 0) + put = outqueue.put + get = inqueue.get + if hasattr(inqueue, '_writer'): + inqueue._writer.close() + outqueue._reader.close() + + if initializer is not None: + initializer(*initargs) + + completed = 0 + while maxtasks is None or (maxtasks and completed < maxtasks): + try: + task = get() + except (EOFError, OSError): + mp.util.debug('worker got EOFError or OSError -- exiting') + break + + if task is None: + mp.util.debug('worker got sentinel -- exiting') + break + + job, i, func, args, kwds = task + try: + if func is mapstar: + args = [(self.process, arg_list) for _, arg_list in args] + else: + func = self.process + result = (True, func(*args, **kwds)) + except Exception as e: + result = (False, e) + + try: + put((job, i, result)) + except Exception as e: + wrapped = MaybeEncodingError(e, result[1]) + mp.util.debug("Possible encoding error while sending result: %s" % ( + wrapped)) + put((job, i, (False, wrapped))) + + task = job = result = func = args = kwds = None + completed += 1 + mp.util.debug('worker exiting after %d tasks' % completed) + + +class _PatchedPool(mp.pool.Pool): + """Patch the builtin `Pool` to take a `processor_class` and `state_kwargs`; + + This allows users to use custom subclasses of `StatefulProcessor` to implement + custom stateful `run` logic. Instances of `processor_class` are used to populate + the worker pool; each is initialized using the standard `Process` init procedure, + followed by setting custom attributes from `state_kwargs`. + """ + + def Process(self, *args, **kwds): + kwds['state_kwargs'] = self._state_kwargs + return self._processor_class(*args, **kwds) + + def __init__(self, processes=None, initializer=None, initargs=(), maxtasksperchild=None, + state_kwargs=None, processor_class=StatefulProcessor): + self._state_kwargs = state_kwargs + self._processor_class = processor_class + super(_PatchedPool, self).__init__(processes, initializer, initargs, maxtasksperchild) + + def _repopulate_pool(self): + """Bring the number of pool processes up to the specified number, + for use after reaping workers which have exited. This is also used + to initially populate the pool on init. + """ + for i in range(self._processes - len(self._pool)): + w = self.Process(args=( + self._inqueue, self._outqueue, + self._initializer, self._initargs, self._maxtasksperchild)) + + self._pool.append(w) + w.name = w.name.replace('Process', 'PoolWorker') + w.daemon = True + w.start() + mp.util.debug('added worker') + + +class StatefulProcessingPool(object): + """Pool that uses stateful worker processes; designed to worked with subclasses of + `StatefulProcessor`. Implements the same interface as `multiprocessing.Pool`. + """ + + def __init__(self, processes=None, initializer=None, initargs=(), + maxtasksperchild=None, state_kwargs=None, processor_class=StatefulProcessor): + """ + See `multiprocessing.Pool` for arguments not discussed here. + + Args: + state_kwargs (dict): dictionary of attributes to initialize worker processes + with. These will be set as attributes of the process object. + processor_class (class): subclass of `StatefulProcessor` to use for worker + processes. + """ + self._pool = _PatchedPool( + processes, initializer, initargs, maxtasksperchild, state_kwargs, + processor_class) + + def apply(self, args=(), kwds={}): + """Apply TextProcessor.process(*args, **kwds).""" + return self._pool.apply_async(None, args, kwds).get() + + def map(self, iterable, chunksize=None): + """Apply `TextProcessor.process` to each element in `iterable`, collecting the results + in a list that is returned. + """ + return self._pool.map(None, iterable, chunksize) + + def imap(self, iterable, chunksize=1): + """Equivalent of `map()` -- can be MUCH slower than `Pool.map()`.""" + return self._pool.imap(None, iterable, chunksize) + + def imap_unordered(self, iterable, chunksize=1): + """Like `imap()` method but ordering of results is arbitrary.""" + return self._pool.imap_unordered(None, iterable, chunksize) + + def apply_async(self, args=(), kwds={}, callback=None): + """Asynchronous version of `apply()` method.""" + return self._pool.apply_async(None, args, kwds, callback) + + def map_async(self, iterable, chunksize=None, callback=None): + """Asynchronous version of `map()` method.""" + return self._pool.map_async(None, iterable, chunksize, callback) + + def close(self): + self._pool.close() + + def terminate(self): + self._pool.terminate() + + def join(self): + self._pool.join() + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self._pool.terminate() diff --git a/gensim/corpora/textcorpus.py b/gensim/corpora/textcorpus.py index 7265d20d0c..1584f51fe8 100644 --- a/gensim/corpora/textcorpus.py +++ b/gensim/corpora/textcorpus.py @@ -29,19 +29,31 @@ from __future__ import with_statement +import functools +import itertools import logging +import multiprocessing as mp import os import random import re +import signal import sys +try: + from itertools import imap +except ImportError: # Python 3... + imap = map + from gensim import interfaces, utils from gensim.corpora.dictionary import Dictionary +from gensim.corpora.stateful_pool import StatefulProcessingPool, StatefulProcessor from gensim.parsing.preprocessing import STOPWORDS, RE_WHITESPACE -from gensim.utils import deaccent, simple_tokenize +from gensim.utils import deaccent, simple_tokenize, walk_with_depth logger = logging.getLogger(__name__) +MAX_WORDS_IN_BATCH = 10000 + def remove_stopwords(tokens, stopwords=STOPWORDS): """Remove stopwords using list from `gensim.parsing.preprocessing.STOPWORDS.""" @@ -53,9 +65,8 @@ def remove_short(tokens, minsize=3): return [token for token in tokens if len(token) >= minsize] -def lower_to_unicode(text, encoding='utf8', errors='strict'): - """Lowercase `text` and convert to unicode.""" - return utils.to_unicode(text.lower(), encoding, errors) +def lowercase(text): + return text.lower() def strip_multiple_whitespaces(s): @@ -63,7 +74,76 @@ def strip_multiple_whitespaces(s): return RE_WHITESPACE.sub(" ", s) -class TextCorpus(interfaces.CorpusABC): +def init_to_ignore_interrupt(): + """Should only be used when master is prepared to handle termination of child processes.""" + signal.signal(signal.SIGINT, signal.SIG_IGN) + + +class TextPreprocessor(object): + """Mixin for classes that perform text preprocessing.""" + + def preprocess_text(self, text): + """Apply preprocessing to a single text document. This should perform tokenization + in addition to any other desired preprocessing steps. + + Note: The `TextCorpus` class transplants its own version of this method onto a + dynamically created subclass that is used to spawn a multiprocessing worker. + So if you want to subclass it in a `TextCorpus` subclass, and you want to call + the super method using the `super` keyword, do it like this: + + # do some preprocessing of text + tokens = super(self.__class__, self).preprocess_text(text) + # do some post-processing of tokens + + Args: + text (str): document text read from plain-text file. + + Returns: + iterable of str: tokens produced from `text` as a result of preprocessing. + """ + for character_filter in self.character_filters: + text = character_filter(text) + + tokens = self.tokenizer(text) + for token_filter in self.token_filters: + tokens = token_filter(tokens) + + return tokens + + def step_through_preprocess(self, text): + """Yield tuples of functions and their output for each stage of preprocessing. + This is useful for debugging issues with the corpus preprocessing pipeline. + """ + for character_filter in self.character_filters: + text = character_filter(text) + yield (character_filter, text) + + tokens = self.tokenizer(text) + yield (self.tokenizer, tokens) + + for token_filter in self.token_filters: + tokens = token_filter(tokens) + yield (token_filter, tokens) + + +class _TextPreprocessorMP(StatefulProcessor, TextPreprocessor): + """TextPreprocessor that can be used for multiprocessing.""" + pass + + +def without_metadata(method): + """Avoid yielding metadata for some particular corpus method.""" + def wrapper(corpus, *args): + metadata_setting = corpus.metadata + corpus.metadata = False + try: + return method(corpus, *args) + finally: + corpus.metadata = metadata_setting + return wrapper + + +class TextCorpus(interfaces.CorpusABC, TextPreprocessor): """Helper class to simplify the pipeline of getting bag-of-words vectors (= a gensim corpus) from plain text. @@ -112,35 +192,57 @@ class TextCorpus(interfaces.CorpusABC): 6. remove stopwords; see `gensim.parsing.preprocessing` for the list of stopwords """ - def __init__(self, input=None, dictionary=None, metadata=False, character_filters=None, tokenizer=None, token_filters=None): + def __init__(self, source=None, dictionary=None, metadata=False, character_filters=None, + tokenizer=None, token_filters=None, processes=-1, no_below=1, no_above=1.0, + encoding='utf8', decoding_error_handling='strict'): """ Args: - input (str): path to top-level directory to traverse for corpus documents. + source (str): path to corpus source file to read documents from. dictionary (Dictionary): if a dictionary is provided, it will not be updated with the given corpus on initialization. If none is provided, a new dictionary will be built for the given corpus. If no corpus is given, the dictionary will remain uninitialized. metadata (bool): True to yield metadata with each document, else False (default). + By default, metadata consists only of the line number for each document. This + is yielded from `get_texts` as a tuple of (tokens, (line_num,)) and from __iter__ + as a tuple of (bag_of_words, (line_num,)). character_filters (iterable of callable): each will be applied to the text of each document in order, and should return a single string with the modified text. - For Python 2, the original text will not be unicode, so it may be useful to - convert to unicode as the first character filter. The default character filters - lowercase, convert to unicode (strict utf8), perform ASCII-folding, then collapse - multiple whitespaces. + The first filter will receive the text directly from the `getstream` method, + which reads documents from the `source` file and yields them as unicode. + The default character filters perform ASCII-folding, lowercase, then collapse + all whitespace character sequences into single spaces. tokenizer (callable): takes as input the document text, preprocessed by all filters - in `character_filters`; should return an iterable of tokens (strings). + in `character_filters`; should return an iterable of tokens. The tokens should + be in a format that can be parsed by the first callable in `token_filters`. token_filters (iterable of callable): each will be applied to the iterable of tokens in order, and should return another iterable of tokens. These filters can add, remove, or replace tokens, or do nothing at all. The default token filters remove tokens less than 3 characters long and remove stopwords using the list in `gensim.parsing.preprocessing.STOPWORDS`. + processes (int): number of processes to use for text preprocessing. The default is + -1, which will use (number of virtual CPUs - 1) worker processes, in addition + to the master process. If set to 1, no worker pool will be used; instead, all + preprocessing will occur in the main process. + no_below (int): minimum number of documents a term needs to appear in, in order + to keep it in the dictionary. This applies when building a new dictionary, + and does nothing when passing in your own pre-initialized dictionary. Set to + 1 by default (discard no tokens). + no_above (float): if a term occurs in greater than this proportion of documents + from the source corpus, it will be discarded. This applies when building a new + dictionary, and does nothing when passing in your own pre-initialized dictionary. + Set to 1.0 by default (discard no tokens). + encoding (str): encoding of `source` file; used to decode bytes to unicode + (default is 'utf8'). + decoding_error_handling (str): error-handling strategy for conversion from file bytes to unicode. + See `gensim.utils.unicode` for more info. """ - self.input = input + self.source = source self.metadata = metadata self.character_filters = character_filters if self.character_filters is None: - self.character_filters = [lower_to_unicode, deaccent, strip_multiple_whitespaces] + self.character_filters = [deaccent, lowercase, strip_multiple_whitespaces] self.tokenizer = tokenizer if self.tokenizer is None: @@ -150,6 +252,15 @@ def __init__(self, input=None, dictionary=None, metadata=False, character_filter if self.token_filters is None: self.token_filters = [remove_short, remove_stopwords] + self.processes = processes + if processes <= 0: + self.processes = max(1, mp.cpu_count() - 1) + + self.encoding = encoding + self.decoding_error_handling = decoding_error_handling + + self.no_below = no_below + self.no_above = no_above self.length = None self.dictionary = None self.init_dictionary(dictionary) @@ -159,18 +270,25 @@ def init_dictionary(self, dictionary): is an `input` for the corpus, add all documents from that `input`. If the `dictionary` is already initialized, simply set it as the corpus's `dictionary`. """ - self.dictionary = dictionary if dictionary is not None else Dictionary() - if self.input is not None: + if self.source is not None: if dictionary is None: - logger.info("Initializing dictionary") - metadata_setting = self.metadata - self.metadata = False - self.dictionary.add_documents(self.get_texts()) - self.metadata = metadata_setting + self.dictionary = self._build_dictionary() else: + self.dictionary = dictionary logger.info("Input stream provided but dictionary already initialized") else: - logger.warning("No input document stream provided; assuming dictionary will be initialized some other way.") + self.dictionary = Dictionary() + logger.warning( + "No input document stream provided; assuming " + "dictionary will be initialized some other way.") + + @without_metadata + def _build_dictionary(self): + logger.info("Initializing dictionary") + dictionary = Dictionary() + dictionary.add_documents(self.get_texts()) + dictionary.filter_extremes(no_below=self.no_below, no_above=self.no_above, keep_n=None) + return dictionary def __iter__(self): """The function that defines a corpus. @@ -187,68 +305,112 @@ def __iter__(self): def getstream(self): """Yield documents from the underlying plain text collection (of one or more files). Each item yielded from this method will be considered a document by subsequent - preprocessing methods. + preprocessing methods. Documents will be read as bytes and converted to unicode using + the instance attributes `encoding` and `decoding_error_handling`. + + Subclasses should override `_getstream` instead of this method. If you really want + to override this method, be sure to call `_decode_bytes` to convert the bytes read + from the source file to unicode. """ + return (self._decode_bytes(line) for line in self._getstream()) + + def _getstream(self): + """Yield documents as bytes read from `source` file.""" num_texts = 0 - with utils.file_or_filename(self.input) as f: + with utils.smart_open(self.source, 'rb') as f: for line in f: yield line num_texts += 1 self.length = num_texts - def preprocess_text(self, text): - """Apply preprocessing to a single text document. This should perform tokenization - in addition to any other desired preprocessing steps. + def _decode_bytes(self, text_bytes): + return utils.to_unicode( + text_bytes, encoding=self.encoding, errors=self.decoding_error_handling) + + def _create_preprocessor_pool(self): + state_kwargs = dict( + character_filters=self.character_filters, + tokenizer=self.tokenizer, + token_filters=self.token_filters + ) + _TextPreprocessor = type('_TextPreprocessor', (_TextPreprocessorMP,), {}) + func = getattr(self.__class__, 'preprocess_text') + if hasattr(func, '__func__'): # get unbound method in Python 2 + func = func.__func__ + _TextPreprocessor.process = func + + return StatefulProcessingPool( + self.processes, init_to_ignore_interrupt, + processor_class=_TextPreprocessor, state_kwargs=state_kwargs) + + def _get_mapper(self): + if self.processes > 1: + pool = self._create_preprocessor_pool() + map_preprocess = pool.imap + else: + pool = None + map_preprocess = functools.partial(imap, self.preprocess_text) - Args: - text (str): document text read from plain-text file. + return pool, map_preprocess - Returns: - iterable of str: tokens produced from `text` as a result of preprocessing. - """ - for character_filter in self.character_filters: - text = character_filter(text) + def yield_tokens(self): + texts = self.getstream() + pool, map_preprocess = self._get_mapper() - tokens = self.tokenizer(text) - for token_filter in self.token_filters: - tokens = token_filter(tokens) + num_texts_total, num_texts = 0, 0 + num_positions_total, num_positions = 0, 0 - return tokens + try: + # process the corpus in smaller chunks of docs, because multiprocessing.Pool + # is dumb and would load the entire input into RAM at once... + for group in utils.chunkize(texts, chunksize=10 * self.processes, maxsize=1): + for output in map_preprocess(group): + tokens = output[0] if isinstance(output, tuple) else output + num_texts_total += 1 + num_positions_total += len(tokens) - def step_through_preprocess(self, text): - """Yield tuples of functions and their output for each stage of preprocessing. - This is useful for debugging issues with the corpus preprocessing pipeline. + if self.should_keep_tokens(output): + num_texts += 1 + num_positions += len(tokens) + yield tokens + except KeyboardInterrupt: + logger.warning( + "user terminated iteration over %s after %i docs with %i positions" + " (total %i docs, %i positions before pruning)", self.__class__.__name__, + num_texts, num_positions, num_texts_total, num_positions_total) + else: + logger.info( + "finished iterating over %s of %i docs with %i positions" + " (total %i docs, %i positions before pruning)", self.__class__.__name__, + num_texts, num_positions, num_texts_total, num_positions_total) + self.length = num_texts # cache corpus length + finally: + if pool is not None: + pool.terminate() + + def should_keep_tokens(self, output): + """Output is either the list of tokens, or a tuple containing that list and some other + elements from the preprocessing. The default implementation assumes it is the former + and returns False if the list is empty. """ - for character_filter in self.character_filters: - text = character_filter(text) - yield (character_filter, text) - - tokens = self.tokenizer(text) - yield (self.tokenizer, tokens) - - for token_filter in self.token_filters: - yield (token_filter, token_filter(tokens)) + return len(output) > 0 def get_texts(self): """Iterate over the collection, yielding one document at a time. A document is a sequence of words (strings) that can be fed into `Dictionary.doc2bow`. - Each document will be fed through `preprocess_text`. That method should be - overridden to provide different preprocessing steps. This method will need - to be overridden if the metadata you'd like to yield differs from the line - number. - Returns: - generator of lists of tokens (strings); each list corresponds to a preprocessed - document from the corpus `input`. + Override this function to match your input (parse input files, do any + text preprocessing, lowercasing, tokenizing etc.). There will be no further + preprocessing of the words coming out of this function. """ - lines = self.getstream() + doc_token_stream = self.yield_tokens() if self.metadata: - for lineno, line in enumerate(lines): - yield self.preprocess_text(line), (lineno,) + for lineno, tokens in enumerate(doc_token_stream): + yield tokens, (lineno,) else: - for line in lines: - yield self.preprocess_text(line) + for tokens in doc_token_stream: + yield tokens def sample_texts(self, n, seed=None, length=None): """Yield n random documents from the corpus without replacement. @@ -280,8 +442,8 @@ def sample_texts(self, n, seed=None, length=None): if not 0 <= n: raise ValueError("Negative sample size n {0:d}.".format(n)) - i = 0 - for i, sample in enumerate(self.getstream()): + # Use get_texts because some docs from getstream may be removed in preprocessing. + for i, sample in enumerate(self.get_texts()): if i == length: break @@ -289,10 +451,7 @@ def sample_texts(self, n, seed=None, length=None): chance = random_generator.randint(1, remaining_in_corpus) if chance <= n: n -= 1 - if self.metadata: - yield self.preprocess_text(sample[0]), sample[1] - else: - yield self.preprocess_text(sample) + yield sample if n != 0: # This means that length was set to be greater than number of items in corpus @@ -301,17 +460,21 @@ def sample_texts(self, n, seed=None, length=None): def __len__(self): if self.length is None: - # cache the corpus length - self.length = sum(1 for _ in self.getstream()) + self._cache_corpus_length() return self.length + def _cache_corpus_length(self): + # cache the corpus length + # Use get_texts because some docs from getstream may be removed in preprocessing. + self.length = sum(1 for _ in self.get_texts()) + class TextDirectoryCorpus(TextCorpus): """Read documents recursively from a directory, where each file (or line of each file) is interpreted as a plain text document. """ - def __init__(self, input, dictionary=None, metadata=False, min_depth=0, max_depth=None, + def __init__(self, source, dictionary=None, metadata=False, min_depth=0, max_depth=None, pattern=None, exclude_pattern=None, lines_are_documents=False, **kwargs): """ Args: @@ -335,7 +498,7 @@ def __init__(self, input, dictionary=None, metadata=False, min_depth=0, max_dept self.pattern = pattern self.exclude_pattern = exclude_pattern self.lines_are_documents = lines_are_documents - super(TextDirectoryCorpus, self).__init__(input, dictionary, metadata, **kwargs) + super(TextDirectoryCorpus, self).__init__(source, dictionary, metadata, **kwargs) @property def lines_are_documents(self): @@ -387,7 +550,7 @@ def iter_filepaths(self): range of depths. If a filename pattern to match was given, further filter to only those filenames that match. """ - for depth, dirpath, dirnames, filenames in walk(self.input): + for depth, dirpath, dirnames, filenames in walk_with_depth(self.source): if self.min_depth <= depth <= self.max_depth: if self.pattern is not None: filenames = (n for n in filenames if self.pattern.match(n) is not None) @@ -397,7 +560,7 @@ def iter_filepaths(self): for name in filenames: yield os.path.join(dirpath, name) - def getstream(self): + def _getstream(self): """Yield documents from the underlying plain text collection (of one or more files). Each item yielded from this method will be considered a document by subsequent preprocessing methods. @@ -405,65 +568,214 @@ def getstream(self): If `lines_are_documents` was set to True, items will be lines from files. Otherwise there will be one item per file, containing the entire contents of the file. """ - num_texts = 0 for path in self.iter_filepaths(): - with open(path, 'rt') as f: + logging.debug("reading file: %s", path) + with utils.smart_open(path, 'rb') as f: if self.lines_are_documents: for line in f: yield line.strip() - num_texts += 1 else: yield f.read().strip() - num_texts += 1 - self.length = num_texts - def __len__(self): - if self.length is None: - self._cache_corpus_length() - return self.length +def unicode_and_tokenize(text): + return utils.to_unicode(text).split() - def _cache_corpus_length(self): - if not self.lines_are_documents: - self.length = sum(1 for _ in self.iter_filepaths()) - else: - self.length = sum(1 for _ in self.getstream()) + +class TextTokensIterator(object): + """Mixin for TextCorpus that changes its __iter__ to yield results of get_texts.""" + + def __iter__(self): + return self.get_texts() -def walk(top, topdown=True, onerror=None, followlinks=False, depth=0): - """This is a mostly copied version of `os.walk` from the Python 2 source code. - The only difference is that it returns the depth in the directory tree structure - at which each yield is taking place. +class LineSentence(TextTokensIterator, TextCorpus): + """Simple format: one sentence = one line. + + In general, words should already be preprocessed and separated by whitespace. + If a line exceeds the `max_sentence_length`, it will be split into multiple + sentences not exceeding this amount. Additional preprocessing can be applied + using the `TextCorpus` preprocessing keyword arguments if needed. """ - islink, join, isdir = os.path.islink, os.path.join, os.path.isdir - - try: - # Should be O(1) since it's probably just reading your filesystem journal - names = os.listdir(top) - except OSError as err: - if onerror is not None: - onerror(err) - return - - dirs, nondirs = [], [] - - # O(n) where n = number of files in the directory - for name in names: - if isdir(join(top, name)): - dirs.append(name) - else: - nondirs.append(name) - if topdown: - yield depth, top, dirs, nondirs + def __init__(self, source, max_sentence_length=MAX_WORDS_IN_BATCH, limit=None, **kwargs): + """ + `source` can be either a string or a file object. Clip the file to the first + `limit` lines (or no clipped if limit is None, the default). + + Example:: + + sentences = LineSentence('myfile.txt') + + Or for compressed files:: + + sentences = LineSentence('compressed_text.txt.bz2') + sentences = LineSentence('compressed_text.txt.gz') + + """ + self.max_sentence_length = max_sentence_length + self.limit = limit + kwargs['tokenizer'] = kwargs.get('tokenizer', unicode_and_tokenize) + kwargs['character_filters'] = kwargs.get('character_filters', []) + kwargs['token_filters'] = kwargs.get('token_filters', []) + kwargs['processes'] = kwargs.get('processes', 1) + TextCorpus.__init__(self, source, **kwargs) + + def _getstream(self): + with utils.smart_open(self.source, 'rb') as fin: + for line in itertools.islice(fin, self.limit): + yield line + + def yield_tokens(self): + doc_token_stream = super(LineSentence, self).yield_tokens() + for tokens in doc_token_stream: + i = 0 + while i < len(tokens): + yield tokens[i: i + self.max_sentence_length] + i += self.max_sentence_length + + +class PathLineSentences(TextTokensIterator, TextDirectoryCorpus): + """Simple format: one sentence = one line. + + Like LineSentence, but will process all files in a directory, + optionally in alphabetical order by filename. - # Again O(n), where n = number of directories in the directory - for name in dirs: - new_path = join(top, name) - if followlinks or not islink(new_path): + In general, words should already be preprocessed and separated by whitespace. + If a line exceeds the `max_sentence_length`, it will be split into multiple + sentences not exceeding this amount. Additional preprocessing can be applied + using the `TextCorpus` preprocessing keyword arguments if needed. - # Generator so besides the recursive `walk()` call, no additional cost here. - for x in walk(new_path, topdown, onerror, followlinks, depth + 1): - yield x - if not topdown: - yield depth, top, dirs, nondirs + """ + + def __init__(self, source, max_sentence_length=MAX_WORDS_IN_BATCH, limit=None, + sort_filenames=True, **kwargs): + """ + Args: + source (str): path to a directory where all files can be opened by the + `LineSentence` class. Each file will be read up to `limit` lines + (or all lines if `limit` is None, the default). The files in the + directory should be either text files, .bz2 files, or .gz files. + sort_filenames (bool): whether or not to sort the filenames read from + the directory. If True (default), all filenames will be read into + memory on each call to `iter_filepaths` (called by `getstream`). + If False, the files are read in the order they are encountered during + the top-down directory walk. + + Example:: + + sentences = LineSentencePath(os.getcwd() + '\\corpus\\') + + """ + if os.path.isfile(source): + raise ValueError( + "source '%s' is file; use `corpora.textcorpus.LineSentence` instead" % source) + + self.max_sentence_length = max_sentence_length + self.limit = limit + self.sort_filenames = sort_filenames + kwargs['lines_are_documents'] = True + kwargs['tokenizer'] = kwargs.get('tokenizer', unicode_and_tokenize) + kwargs['character_filters'] = kwargs.get('character_filters', []) + kwargs['token_filters'] = kwargs.get('token_filters', []) + kwargs['processes'] = kwargs.get('processes', 1) + super(PathLineSentences, self).__init__(source, **kwargs) + + def _getstream(self): + """Yield documents from the underlying plain text collection (of one or more files). + Each item yielded from this method will be considered a document by subsequent + preprocessing methods. + """ + paths = self.iter_filepaths() + if self.sort_filenames: + logger.debug("sorting filepaths") + paths = list(paths) + paths.sort(key=lambda path: os.path.basename(path)) + logger.debug("found %d files: %r", len(paths), paths) + + num_files = 0 + for path in paths: + logger.debug("reading file: %s", path) + num_files += 1 + with utils.smart_open(path) as fin: + for line in itertools.islice(fin, self.limit): + yield line.strip() + + logger.debug("finished reading %d files", num_files) + + def yield_tokens(self): + doc_token_stream = super(PathLineSentences, self).yield_tokens() + for tokens in doc_token_stream: + i = 0 + while i < len(tokens): + yield tokens[i: i + self.max_sentence_length] + i += self.max_sentence_length + + +class Text8Corpus(TextTokensIterator, TextCorpus): + """Iterate over sentences from the "text8" corpus, + unzipped from http://mattmahoney.net/dc/text8.zip. + """ + + def __init__(self, source, max_sentence_length=MAX_WORDS_IN_BATCH, chunksize=65536, **kwargs): + self.max_sentence_length = max_sentence_length + self.chunksize = chunksize + kwargs['tokenizer'] = kwargs.get('tokenizer', unicode_and_tokenize) + kwargs['character_filters'] = kwargs.get('character_filters', []) + TextCorpus.__init__(self, source, **kwargs) + + def _sentence_token_stream(self): + # Entire corpus is one gigantic line -- there are no sentence marks at all. + # So just split the token sequence arbitrarily into sentences of length + # `max_sentence_length`. + sentence, rest = [], b'' + with utils.smart_open(self.source, 'rb') as fin: + while True: + text = rest + fin.read(self.chunksize) # avoid loading the entire file (=1 line) into RAM + if text == rest: # EOF + words = text.split() + sentence.extend(words) # return the last chunk of words, too (may be shorter/longer) + if sentence: + yield sentence + break + + # last token may have been split in two... keep for next iteration + last_token = text.rfind(b' ') + if last_token >= 0: + words = text[:last_token].split() + rest = text[last_token:].strip() + else: + words = [] + rest = text + + sentence.extend(words) + while len(sentence) >= self.max_sentence_length: + yield sentence[:self.max_sentence_length] + sentence = sentence[self.max_sentence_length:] + + def _getstream(self): + for sentence_tokens in self._sentence_token_stream(): + sentence = ' '.join(sentence_tokens) + yield sentence + + +def brown_corpus_tokenizer(text): + text = utils.to_unicode(text) + # each file line is a single sentence in the Brown corpus + # each token is WORD/POS_TAG + token_tags = [t.split('/') for t in text.split() if len(t.split('/')) == 2] + # ignore words with non-alphabetic tags like ",", "!" etc (punctuation, weird stuff) + return [ + "%s/%s" % (token.lower(), tag[:2]) + for token, tag in token_tags + if tag[:2].isalpha() + ] + + +class BrownCorpus(TextTokensIterator, TextDirectoryCorpus): + """Iterate over sentences from the Brown corpus (part of NLTK data).""" + + def __init__(self, source, dictionary=None, metadata=False, processes=-1): + super(self.__class__, self).__init__( + source, dictionary, metadata, max_depth=0, tokenizer=brown_corpus_tokenizer, + character_filters=[], token_filters=[], processes=processes) diff --git a/gensim/corpora/wikicorpus.py b/gensim/corpora/wikicorpus.py index 4a70f106f3..4162c7986b 100755 --- a/gensim/corpora/wikicorpus.py +++ b/gensim/corpora/wikicorpus.py @@ -20,16 +20,13 @@ import bz2 import logging -import multiprocessing import re -import signal from xml.etree.cElementTree import \ iterparse # LXML isn't faster, so let's go with the built-in solution from gensim import utils # cannot import whole gensim.corpora, because that imports wikicorpus... -from gensim.corpora.dictionary import Dictionary -from gensim.corpora.textcorpus import TextCorpus +from gensim.corpora import textcorpus logger = logging.getLogger(__name__) @@ -145,7 +142,10 @@ def remove_template(s): prev_c = c # Remove all the templates - return ''.join([s[end + 1:start] for start, end in zip(starts + [None], [-1] + ends)]) + return ''.join([ + s[end + 1:start] + for start, end in zip(starts + [None], [-1] + ends) + ]) def remove_file(s): @@ -173,7 +173,7 @@ def tokenize(content): """ # TODO maybe ignore tokens with non-latin characters? (no chinese, arabic, russian etc.) return [ - utils.to_unicode(token) for token in utils.tokenize(content, lower=True, errors='ignore') + token for token in utils.tokenize(content, lower=True, errors='ignore') if 2 <= len(token) <= 15 and not token.startswith('_') ] @@ -238,26 +238,7 @@ def extract_pages(f, filter_namespaces=False): _extract_pages = extract_pages # for backward compatibility -def process_article(args): - """ - Parse a wikipedia article, returning its content as a list of tokens - (utf8-encoded strings). - """ - text, lemmatize, title, pageid = args - text = filter_wiki(text) - if lemmatize: - result = utils.lemmatize(text) - else: - result = tokenize(text) - return result, title, pageid - - -def init_to_ignore_interrupt(): - """Should only be used when master is prepared to handle termination of child processes.""" - signal.signal(signal.SIGINT, signal.SIG_IGN) - - -class WikiCorpus(TextCorpus): +class WikiCorpus(textcorpus.TextCorpus): """ Treat a wikipedia articles dump (\*articles.xml.bz2) as a (read-only) corpus. @@ -269,7 +250,9 @@ class WikiCorpus(TextCorpus): """ - def __init__(self, fname, processes=None, lemmatize=utils.has_pattern(), dictionary=None, filter_namespaces=('0',)): + def __init__(self, source, processes=None, lemmatize=utils.has_pattern(), dictionary=None, + filter_namespaces=('0',), metadata=False, token_filters=None, tokenizer=None, + character_filters=None): """ Initialize the corpus. Unless a dictionary is provided, this scans the corpus once, to determine its vocabulary. @@ -280,21 +263,62 @@ def __init__(self, fname, processes=None, lemmatize=utils.has_pattern(), diction self.metadata if set to true will ensure that serialize will write out article titles to a pickle file. """ - self.fname = fname self.filter_namespaces = filter_namespaces - self.metadata = False - if processes is None: - processes = max(1, multiprocessing.cpu_count() - 1) - self.processes = processes - self.lemmatize = lemmatize - if dictionary is None: - self.dictionary = Dictionary(self.get_texts()) + tokenizer = self._choose_tokenizer(lemmatize, tokenizer) + + # The original `WikiCorpus` did not use deaccenting, stopword removal, etc. + # Passing None to the `TextCorpus` constructor would default to using these preprocessing + # steps, so we pass empty lists to maintain the same default historical behavior. + if character_filters is None: + character_filters = [] + if token_filters is None: + token_filters = [] + + super(WikiCorpus, self).__init__( + source, dictionary, metadata, character_filters, tokenizer, + token_filters, processes) + + @staticmethod + def _choose_tokenizer(lemmatize, tokenizer): + if tokenizer is not None: + if lemmatize: + logger.warning( + "`lemmatize` set to true but custom tokenizer also passed;" + " will use custom tokenizer instead of default lemmatizing tokenizer") else: - self.dictionary = dictionary + if lemmatize: + logger.info("using lemmatizing tokenizer") + tokenizer = utils.lemmatize + else: + logger.info("using standard tokenizer (no lemmatization)") + tokenizer = tokenize + + return tokenizer + + def getstream(self): + """Yield documents from the underlying plain text collection (of one or more files). + Each item yielded from this method will be considered a document by subsequent + preprocessing methods. + """ + return extract_pages(bz2.BZ2File(self.source), self.filter_namespaces) - def get_texts(self): + def preprocess_text(self, args): + """Parse a wikipedia article, returning its content as a list of tokens + (utf8-encoded strings). """ - Iterate over the dump, returning text version of each article as a list + title, text, pageid = args + text = filter_wiki(text) + result = super(self.__class__, self).preprocess_text(text) + return result, title, pageid + + def should_keep_tokens(self, output): + tokens, title, pageid = output + return not ( + len(tokens) < ARTICLE_MIN_WORDS or + any(title.startswith(ignore + ':') for ignore in IGNORED_NAMESPACES)) + + def get_texts(self): + """Iterate over the dump, returning text version of each article as a list of tokens. Only articles of sufficient length are returned (short articles & redirects @@ -306,42 +330,8 @@ def get_texts(self): >>> for vec in wiki_corpus: >>> print(vec) """ - articles, articles_all = 0, 0 - positions, positions_all = 0, 0 - texts = ( - (text, self.lemmatize, title, pageid) for title, text, pageid - in extract_pages(bz2.BZ2File(self.fname), self.filter_namespaces) - ) - pool = multiprocessing.Pool(self.processes, init_to_ignore_interrupt) - - try: - # process the corpus in smaller chunks of docs, because multiprocessing.Pool - # is dumb and would load the entire input into RAM at once... - for group in utils.chunkize(texts, chunksize=10 * self.processes, maxsize=1): - for tokens, title, pageid in pool.imap(process_article, group): - articles_all += 1 - positions_all += len(tokens) - # article redirects and short stubs are pruned here - if len(tokens) < ARTICLE_MIN_WORDS or any(title.startswith(ignore + ':') for ignore in IGNORED_NAMESPACES): - continue - articles += 1 - positions += len(tokens) - if self.metadata: - yield (tokens, (pageid, title)) - else: - yield tokens - except KeyboardInterrupt: - logger.warn( - "user terminated iteration over Wikipedia corpus after %i documents with %i positions " - "(total %i articles, %i positions before pruning articles shorter than %i words)", - articles, positions, articles_all, positions_all, ARTICLE_MIN_WORDS - ) + doc_token_stream = self.yield_tokens() + if self.metadata: + return ((tokens, (pageid, title)) for tokens, title, pageid in doc_token_stream) else: - logger.info( - "finished iterating over Wikipedia corpus of %i documents with %i positions " - "(total %i articles, %i positions before pruning articles shorter than %i words)", - articles, positions, articles_all, positions_all, ARTICLE_MIN_WORDS - ) - self.length = articles # cache corpus length - finally: - pool.terminate() + return doc_token_stream diff --git a/gensim/models/phrases.py b/gensim/models/phrases.py index 263968526f..470d5544d3 100644 --- a/gensim/models/phrases.py +++ b/gensim/models/phrases.py @@ -458,8 +458,9 @@ def __getitem__(self, sentence): sys.exit(1) infile = sys.argv[1] - from gensim.models import Phrases # noqa:F811 for pickle - from gensim.models.word2vec import Text8Corpus + from gensim.models import Phrases # for pickle + from gensim.corpora.textcorpus import Text8Corpus + sentences = Text8Corpus(infile) # test_doc = LineSentence('test/test_data/testcorpus.txt') diff --git a/gensim/models/word2vec.py b/gensim/models/word2vec.py index 2111478c00..8f669eafc5 100644 --- a/gensim/models/word2vec.py +++ b/gensim/models/word2vec.py @@ -103,7 +103,6 @@ from copy import deepcopy from collections import defaultdict import threading -import itertools import warnings from gensim.utils import keep_vocab_item, call_on_class_only @@ -1487,155 +1486,6 @@ def get_latest_training_loss(self): return self.running_training_loss -class BrownCorpus(object): - """Iterate over sentences from the Brown corpus (part of NLTK data).""" - - def __init__(self, dirname): - self.dirname = dirname - - def __iter__(self): - for fname in os.listdir(self.dirname): - fname = os.path.join(self.dirname, fname) - if not os.path.isfile(fname): - continue - for line in utils.smart_open(fname): - line = utils.to_unicode(line) - # each file line is a single sentence in the Brown corpus - # each token is WORD/POS_TAG - token_tags = [t.split('/') for t in line.split() if len(t.split('/')) == 2] - # ignore words with non-alphabetic tags like ",", "!" etc (punctuation, weird stuff) - words = ["%s/%s" % (token.lower(), tag[:2]) for token, tag in token_tags if tag[:2].isalpha()] - if not words: # don't bother sending out empty sentences - continue - yield words - - -class Text8Corpus(object): - """Iterate over sentences from the "text8" corpus, unzipped from http://mattmahoney.net/dc/text8.zip .""" - - def __init__(self, fname, max_sentence_length=MAX_WORDS_IN_BATCH): - self.fname = fname - self.max_sentence_length = max_sentence_length - - def __iter__(self): - # the entire corpus is one gigantic line -- there are no sentence marks at all - # so just split the sequence of tokens arbitrarily: 1 sentence = 1000 tokens - sentence, rest = [], b'' - with utils.smart_open(self.fname) as fin: - while True: - text = rest + fin.read(8192) # avoid loading the entire file (=1 line) into RAM - if text == rest: # EOF - words = utils.to_unicode(text).split() - sentence.extend(words) # return the last chunk of words, too (may be shorter/longer) - if sentence: - yield sentence - break - last_token = text.rfind(b' ') # last token may have been split in two... keep for next iteration - words, rest = (utils.to_unicode(text[:last_token]).split(), - text[last_token:].strip()) if last_token >= 0 else ([], text) - sentence.extend(words) - while len(sentence) >= self.max_sentence_length: - yield sentence[:self.max_sentence_length] - sentence = sentence[self.max_sentence_length:] - - -class LineSentence(object): - """ - Simple format: one sentence = one line; words already preprocessed and separated by whitespace. - """ - - def __init__(self, source, max_sentence_length=MAX_WORDS_IN_BATCH, limit=None): - """ - `source` can be either a string or a file object. Clip the file to the first - `limit` lines (or no clipped if limit is None, the default). - - Example:: - - sentences = LineSentence('myfile.txt') - - Or for compressed files:: - - sentences = LineSentence('compressed_text.txt.bz2') - sentences = LineSentence('compressed_text.txt.gz') - - """ - self.source = source - self.max_sentence_length = max_sentence_length - self.limit = limit - - def __iter__(self): - """Iterate through the lines in the source.""" - try: - # Assume it is a file-like object and try treating it as such - # Things that don't have seek will trigger an exception - self.source.seek(0) - for line in itertools.islice(self.source, self.limit): - line = utils.to_unicode(line).split() - i = 0 - while i < len(line): - yield line[i: i + self.max_sentence_length] - i += self.max_sentence_length - except AttributeError: - # If it didn't work like a file, use it as a string filename - with utils.smart_open(self.source) as fin: - for line in itertools.islice(fin, self.limit): - line = utils.to_unicode(line).split() - i = 0 - while i < len(line): - yield line[i: i + self.max_sentence_length] - i += self.max_sentence_length - - -class PathLineSentences(object): - """ - Simple format: one sentence = one line; words already preprocessed and separated by whitespace. - Like LineSentence, but will process all files in a directory in alphabetical order by filename - """ - - def __init__(self, source, max_sentence_length=MAX_WORDS_IN_BATCH, limit=None): - """ - `source` should be a path to a directory (as a string) where all files can be opened by the - LineSentence class. Each file will be read up to - `limit` lines (or no clipped if limit is None, the default). - - Example:: - - sentences = LineSentencePath(os.getcwd() + '\\corpus\\') - - The files in the directory should be either text files, .bz2 files, or .gz files. - - """ - self.source = source - self.max_sentence_length = max_sentence_length - self.limit = limit - - if os.path.isfile(self.source): - logging.warning('single file read, better to use models.word2vec.LineSentence') - self.input_files = [self.source] # force code compatibility with list of files - elif os.path.isdir(self.source): - self.source = os.path.join(self.source, '') # ensures os-specific slash at end of path - logging.debug('reading directory %s', self.source) - self.input_files = os.listdir(self.source) - self.input_files = [self.source + file for file in self.input_files] # make full paths - self.input_files.sort() # makes sure it happens in filename order - else: # not a file or a directory, then we can't do anything with it - raise ValueError('input is neither a file nor a path') - - logging.info('files read into PathLineSentences:%s', '\n'.join(self.input_files)) - - def __iter__(self): - """iterate through the files""" - for file_name in self.input_files: - logging.info('reading file %s', file_name) - with utils.smart_open(file_name) as fin: - for line in itertools.islice(fin, self.limit): - line = utils.to_unicode(line).split() - i = 0 - while i < len(line): - yield line[i:i + self.max_sentence_length] - i += self.max_sentence_length - - # Example: ./word2vec.py -train data.txt -output vec.txt -size 200 -window 5 -sample 1e-4 -negative 5 -hs 0 -binary 0 -cbow 1 -iter 3 if __name__ == "__main__": import argparse @@ -1652,6 +1502,7 @@ def __iter__(self): sys.exit(1) from gensim.models.word2vec import Word2Vec # noqa:F811 avoid referencing __main__ in pickle + from gensim.corpora.textcorpus import LineSentence seterr(all='raise') # don't ignore numpy errors diff --git a/gensim/scripts/word2vec_standalone.py b/gensim/scripts/word2vec_standalone.py index 878e588613..b905b399e8 100644 --- a/gensim/scripts/word2vec_standalone.py +++ b/gensim/scripts/word2vec_standalone.py @@ -55,10 +55,10 @@ import argparse from numpy import seterr -from gensim.models.word2vec import Word2Vec, LineSentence # avoid referencing __main__ in pickle - logger = logging.getLogger(__name__) +from gensim.models.word2vec import Word2Vec # avoid referencing __main__ in pickle +from gensim.corpora.textcorpus import LineSentence if __name__ == "__main__": logging.basicConfig(format='%(asctime)s : %(threadName)s : %(levelname)s : %(message)s', level=logging.INFO) diff --git a/gensim/test/test_corpora.py b/gensim/test/test_corpora.py index 3f7f4e8149..b4c32fbbab 100644 --- a/gensim/test/test_corpora.py +++ b/gensim/test/test_corpora.py @@ -10,7 +10,6 @@ from __future__ import unicode_literals -import codecs import itertools import logging import os.path @@ -19,17 +18,18 @@ import numpy as np -from gensim.corpora import (bleicorpus, mmcorpus, lowcorpus, svmlightcorpus, - ucicorpus, malletcorpus, textcorpus, indexedcorpus) +from gensim.corpora import ( + bleicorpus, mmcorpus, lowcorpus, svmlightcorpus, + ucicorpus, malletcorpus, indexedcorpus) from gensim.interfaces import TransformedCorpus from gensim.utils import to_unicode -# needed because sample data files are located in the same folder -module_path = os.path.dirname(__file__) +MODULE_PATH = os.path.dirname(__file__) +"""Needed because sample data files are located in the same folder.""" def datapath(fname): - return os.path.join(module_path, 'test_data', fname) + return os.path.join(MODULE_PATH, 'test_data', fname) def testfile(): @@ -311,236 +311,6 @@ def test_load_with_metadata(self): self.assertEqual(metadata[1], 'en') -class TestTextCorpus(CorpusTestCase): - - def setUp(self): - self.corpus_class = textcorpus.TextCorpus - self.file_extension = '.txt' - - def test_load_with_metadata(self): - fname = datapath('testcorpus.' + self.file_extension.lstrip('.')) - corpus = self.corpus_class(fname) - corpus.metadata = True - self.assertEqual(len(corpus), 9) - - docs = list(corpus) - self.assertEqual(len(docs), 9) - - for i, docmeta in enumerate(docs): - doc, metadata = docmeta - self.assertEqual(metadata[0], i) - - def test_default_preprocessing(self): - lines = [ - "Šéf chomutovských komunistů dostal poštou bílý prášek", - "this is a test for stopwords", - "zf tooth spaces " - ] - expected = [ - ['Sef', 'chomutovskych', 'komunistu', 'dostal', 'postou', 'bily', 'prasek'], - ['test', 'stopwords'], - ['tooth', 'spaces'] - ] - - corpus = self.corpus_from_lines(lines) - texts = list(corpus.get_texts()) - self.assertEqual(expected, texts) - - def corpus_from_lines(self, lines): - fpath = tempfile.mktemp() - with codecs.open(fpath, 'w', encoding='utf8') as f: - f.write('\n'.join(lines)) - - return self.corpus_class(fpath) - - def test_sample_text(self): - lines = ["document%d" % i for i in range(10)] - corpus = self.corpus_from_lines(lines) - corpus.tokenizer = lambda text: text.split() - docs = [doc for doc in corpus.get_texts()] - - sample1 = list(corpus.sample_texts(1)) - self.assertEqual(len(sample1), 1) - self.assertIn(sample1[0], docs) - - sample2 = list(corpus.sample_texts(len(lines))) - self.assertEqual(len(sample2), len(corpus)) - for i in range(len(corpus)): - self.assertEqual(sample2[i], ["document%s" % i]) - - with self.assertRaises(ValueError): - list(corpus.sample_texts(len(corpus) + 1)) - - with self.assertRaises(ValueError): - list(corpus.sample_texts(-1)) - - def test_sample_text_length(self): - lines = ["document%d" % i for i in range(10)] - corpus = self.corpus_from_lines(lines) - corpus.tokenizer = lambda text: text.split() - - sample1 = list(corpus.sample_texts(1, length=1)) - self.assertEqual(sample1[0], ["document0"]) - - sample2 = list(corpus.sample_texts(2, length=2)) - self.assertEqual(sample2[0], ["document0"]) - self.assertEqual(sample2[1], ["document1"]) - - def test_sample_text_seed(self): - lines = ["document%d" % i for i in range(10)] - corpus = self.corpus_from_lines(lines) - - sample1 = list(corpus.sample_texts(5, seed=42)) - sample2 = list(corpus.sample_texts(5, seed=42)) - self.assertEqual(sample1, sample2) - - def test_save(self): - pass - - def test_serialize(self): - pass - - def test_serialize_compressed(self): - pass - - def test_indexing(self): - pass - - -class TestTextDirectoryCorpus(unittest.TestCase): - - def write_one_level(self, *args): - if not args: - args = ('doc1', 'doc2') - dirpath = tempfile.mkdtemp() - self.write_docs_to_directory(dirpath, *args) - return dirpath - - def write_docs_to_directory(self, dirpath, *args): - for doc_num, name in enumerate(args): - with open(os.path.join(dirpath, name), 'w') as f: - f.write('document %d content' % doc_num) - - def test_one_level_directory(self): - dirpath = self.write_one_level() - - corpus = textcorpus.TextDirectoryCorpus(dirpath) - self.assertEqual(len(corpus), 2) - docs = list(corpus) - self.assertEqual(len(docs), 2) - - def write_two_levels(self): - dirpath = self.write_one_level() - next_level = os.path.join(dirpath, 'level_two') - os.mkdir(next_level) - self.write_docs_to_directory(next_level, 'doc1', 'doc2') - return dirpath, next_level - - def test_two_level_directory(self): - dirpath, next_level = self.write_two_levels() - - corpus = textcorpus.TextDirectoryCorpus(dirpath) - self.assertEqual(len(corpus), 4) - docs = list(corpus) - self.assertEqual(len(docs), 4) - - corpus = textcorpus.TextDirectoryCorpus(dirpath, min_depth=1) - self.assertEqual(len(corpus), 2) - docs = list(corpus) - self.assertEqual(len(docs), 2) - - corpus = textcorpus.TextDirectoryCorpus(dirpath, max_depth=0) - self.assertEqual(len(corpus), 2) - docs = list(corpus) - self.assertEqual(len(docs), 2) - - def test_filename_filtering(self): - dirpath = self.write_one_level('test1.log', 'test1.txt', 'test2.log', 'other1.log') - corpus = textcorpus.TextDirectoryCorpus(dirpath, pattern="test.*\.log") - filenames = list(corpus.iter_filepaths()) - expected = [os.path.join(dirpath, name) for name in ('test1.log', 'test2.log')] - self.assertEqual(expected, filenames) - - corpus.pattern = ".*.txt" - filenames = list(corpus.iter_filepaths()) - expected = [os.path.join(dirpath, 'test1.txt')] - self.assertEqual(expected, filenames) - - corpus.pattern = None - corpus.exclude_pattern = ".*.log" - filenames = list(corpus.iter_filepaths()) - self.assertEqual(expected, filenames) - - def test_lines_are_documents(self): - dirpath = tempfile.mkdtemp() - lines = ['doc%d text' % i for i in range(5)] - fpath = os.path.join(dirpath, 'test_file.txt') - with open(fpath, 'w') as f: - f.write('\n'.join(lines)) - - corpus = textcorpus.TextDirectoryCorpus(dirpath, lines_are_documents=True) - docs = [doc for doc in corpus.getstream()] - self.assertEqual(len(lines), corpus.length) # should have cached - self.assertEqual(lines, docs) - - corpus.lines_are_documents = False - docs = [doc for doc in corpus.getstream()] - self.assertEqual(1, corpus.length) - self.assertEqual('\n'.join(lines), docs[0]) - - def test_non_trivial_structure(self): - """Test with non-trivial directory structure, shown below: - . - ├── 0.txt - ├── a_folder - │   └── 1.txt - └── b_folder - ├── 2.txt - ├── 3.txt - └── c_folder - └── 4.txt - """ - dirpath = tempfile.mkdtemp() - self.write_docs_to_directory(dirpath, '0.txt') - - a_folder = os.path.join(dirpath, 'a_folder') - os.mkdir(a_folder) - self.write_docs_to_directory(a_folder, '1.txt') - - b_folder = os.path.join(dirpath, 'b_folder') - os.mkdir(b_folder) - self.write_docs_to_directory(b_folder, '2.txt', '3.txt') - - c_folder = os.path.join(b_folder, 'c_folder') - os.mkdir(c_folder) - self.write_docs_to_directory(c_folder, '4.txt') - - corpus = textcorpus.TextDirectoryCorpus(dirpath) - filenames = list(corpus.iter_filepaths()) - base_names = sorted([name[len(dirpath) + 1:] for name in filenames]) - expected = sorted([ - '0.txt', - 'a_folder/1.txt', - 'b_folder/2.txt', - 'b_folder/3.txt', - 'b_folder/c_folder/4.txt' - ]) - expected = [os.path.normpath(path) for path in expected] - self.assertEqual(expected, base_names) - - corpus.max_depth = 1 - self.assertEqual(expected[:-1], base_names[:-1]) - - corpus.min_depth = 1 - self.assertEqual(expected[2:-1], base_names[2:-1]) - - corpus.max_depth = 0 - self.assertEqual(expected[2:], base_names[2:]) - - corpus.pattern = "4.*" - self.assertEqual(expected[-1], base_names[-1]) - - if __name__ == '__main__': logging.basicConfig(level=logging.DEBUG) unittest.main() diff --git a/gensim/test/test_data/PathLineSentences/1.txt b/gensim/test/test_data/PathLineSentences/subdir/1.txt similarity index 100% rename from gensim/test/test_data/PathLineSentences/1.txt rename to gensim/test/test_data/PathLineSentences/subdir/1.txt diff --git a/gensim/test/test_miislita.py b/gensim/test/test_miislita.py index dd660f629f..2219afac79 100644 --- a/gensim/test/test_miislita.py +++ b/gensim/test/test_miislita.py @@ -45,18 +45,22 @@ def get_texts(self): """ for doc in self.getstream(): - yield [word for word in utils.to_unicode(doc).lower().split() - if word not in CorpusMiislita.stoplist] + tokens = [ + word for word in utils.to_unicode(doc).lower().split() + if word not in CorpusMiislita.stoplist + ] + yield tokens def __len__(self): """Define this so we can use `len(corpus)`""" - if 'length' not in self.__dict__: + if self.length is None: logger.info("caching corpus size (calculating number of documents)") self.length = sum(1 for _ in self.get_texts()) return self.length class TestMiislita(unittest.TestCase): + def test_textcorpus(self): """Make sure TextCorpus can be serialized to disk. """ # construct corpus from file diff --git a/gensim/test/test_stateful_pool.py b/gensim/test/test_stateful_pool.py new file mode 100644 index 0000000000..a4e3ecd522 --- /dev/null +++ b/gensim/test/test_stateful_pool.py @@ -0,0 +1,25 @@ +import logging +import unittest + +from gensim.corpora.stateful_pool import StatefulProcessor, StatefulProcessingPool + + +class _TextTokenizer(StatefulProcessor): + def process(self, text): + return text.split() + + +class TestStatefulPool(unittest.TestCase): + + def test_simple_tokenize_example(self): + pool = StatefulProcessingPool(4, processor_class=_TextTokenizer) + texts = ['this is some test text for multiprocessing'] * 10 + expected = [text.split() for text in texts] + + results = pool.imap(texts) + self.assertEquals(list(results), expected) + + +if __name__ == '__main__': + logging.basicConfig(format='%(asctime)s : %(levelname)s : %(message)s', level=logging.DEBUG) + unittest.main() diff --git a/gensim/test/test_textcorpus.py b/gensim/test/test_textcorpus.py new file mode 100644 index 0000000000..17ccee64ff --- /dev/null +++ b/gensim/test/test_textcorpus.py @@ -0,0 +1,329 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# Copyright (C) 2010 Radim Rehurek +# Licensed under the GNU LGPL v2.1 - http://www.gnu.org/licenses/lgpl.html + +""" +Automated tests for checking textcorpus I/O formats. +""" + +from __future__ import unicode_literals + +import codecs +import logging +import os +import tempfile +import unittest + +from gensim import utils +from gensim.corpora import textcorpus +from gensim.test.test_corpora import CorpusTestCase, datapath + + +def split_tokenize(text): + return text.split() + + +TEXTS = [ + ['human', 'interface', 'computer'], + ['survey', 'user', 'computer', 'system', 'response', 'time'], + ['eps', 'user', 'interface', 'system'], + ['system', 'human', 'system', 'eps'], + ['user', 'response', 'time'], + ['trees'], + ['graph', 'trees'], + ['graph', 'minors', 'trees'], + ['graph', 'minors', 'survey'] +] + + +class TestTextCorpus(CorpusTestCase): + + def setUp(self): + self.corpus_class = textcorpus.TextCorpus + self.file_extension = '.txt' + + def test_load_with_metadata(self): + fname = datapath('testcorpus.' + self.file_extension.lstrip('.')) + corpus = self.corpus_class(fname) + corpus.metadata = True + self.assertEqual(len(corpus), 9) + + docs = list(corpus) + self.assertEqual(len(docs), 9) + + for i, docmeta in enumerate(docs): + doc, metadata = docmeta + self.assertEqual(metadata[0], i) + + def test_default_preprocessing(self): + lines = [ + "Šéf chomutovských komunistů dostal poštou bílý prášek", + "this is a test for stopwords", + "zf tooth spaces " + ] + expected = [ + ['sef', 'chomutovskych', 'komunistu', 'dostal', 'postou', 'bily', 'prasek'], + ['test', 'stopwords'], + ['tooth', 'spaces'] + ] + + corpus = self.corpus_from_lines(lines) + texts = list(corpus.get_texts()) + self.assertEqual(expected, texts) + + def corpus_from_lines(self, lines): + fpath = tempfile.mktemp() + # Use codecs for non-ascii character encodings across Py2 and Py3 + with codecs.open(fpath, 'wb', encoding='utf8') as f: + f.write('\n'.join(lines)) + + return self.corpus_class(fpath) + + def test_sample_text(self): + lines = ["document%d" % i for i in range(10)] + corpus = self.corpus_from_lines(lines) + corpus.tokenizer = split_tokenize + docs = [doc for doc in corpus.get_texts()] + + sample1 = list(corpus.sample_texts(1)) + self.assertEqual(len(sample1), 1) + self.assertIn(sample1[0], docs) + + sample2 = list(corpus.sample_texts(len(lines))) + self.assertEqual(len(sample2), len(corpus)) + for i in range(len(corpus)): + self.assertEqual(sample2[i], ["document%s" % i]) + + with self.assertRaises(ValueError): + list(corpus.sample_texts(len(corpus) + 1)) + + with self.assertRaises(ValueError): + list(corpus.sample_texts(-1)) + + def test_sample_text_length(self): + lines = ["document%d" % i for i in range(10)] + corpus = self.corpus_from_lines(lines) + corpus.tokenizer = split_tokenize + + sample1 = list(corpus.sample_texts(1, length=1)) + self.assertEqual(sample1[0], ["document0"]) + + sample2 = list(corpus.sample_texts(2, length=2)) + self.assertEqual(sample2[0], ["document0"]) + self.assertEqual(sample2[1], ["document1"]) + + def test_sample_text_seed(self): + lines = ["document%d" % i for i in range(10)] + corpus = self.corpus_from_lines(lines) + + sample1 = list(corpus.sample_texts(5, seed=42)) + sample2 = list(corpus.sample_texts(5, seed=42)) + self.assertEqual(sample1, sample2) + + def test_save(self): + pass + + def test_serialize(self): + pass + + def test_serialize_compressed(self): + pass + + def test_indexing(self): + pass + + +class TestTextDirectoryCorpus(unittest.TestCase): + + def write_one_level(self, *args): + if not args: + args = ('doc1', 'doc2') + dirpath = tempfile.mkdtemp() + self.write_docs_to_directory(dirpath, *args) + return dirpath + + def write_docs_to_directory(self, dirpath, *args): + for doc_num, name in enumerate(args): + with open(os.path.join(dirpath, name), 'w') as f: + f.write('document %d content' % doc_num) + + def test_one_level_directory(self): + dirpath = self.write_one_level() + + corpus = textcorpus.TextDirectoryCorpus(dirpath) + self.assertEqual(len(corpus), 2) + docs = list(corpus) + self.assertEqual(len(docs), 2) + + def write_two_levels(self): + dirpath = self.write_one_level() + next_level = os.path.join(dirpath, 'level_two') + os.mkdir(next_level) + self.write_docs_to_directory(next_level, 'doc1', 'doc2') + return dirpath, next_level + + def test_two_level_directory(self): + dirpath, next_level = self.write_two_levels() + + corpus = textcorpus.TextDirectoryCorpus(dirpath) + self.assertEqual(len(corpus), 4) + docs = list(corpus) + self.assertEqual(len(docs), 4) + + corpus = textcorpus.TextDirectoryCorpus(dirpath, min_depth=1) + self.assertEqual(len(corpus), 2) + docs = list(corpus) + self.assertEqual(len(docs), 2) + + corpus = textcorpus.TextDirectoryCorpus(dirpath, max_depth=0) + self.assertEqual(len(corpus), 2) + docs = list(corpus) + self.assertEqual(len(docs), 2) + + def test_filename_filtering(self): + dirpath = self.write_one_level('test1.log', 'test1.txt', 'test2.log', 'other1.log') + corpus = textcorpus.TextDirectoryCorpus(dirpath, pattern="test.*\.log") + filenames = list(corpus.iter_filepaths()) + expected = [os.path.join(dirpath, name) for name in ('test1.log', 'test2.log')] + self.assertEqual(expected, filenames) + + corpus.pattern = ".*.txt" + filenames = list(corpus.iter_filepaths()) + expected = [os.path.join(dirpath, 'test1.txt')] + self.assertEqual(expected, filenames) + + corpus.pattern = None + corpus.exclude_pattern = ".*.log" + filenames = list(corpus.iter_filepaths()) + self.assertEqual(expected, filenames) + + def test_lines_are_documents(self): + dirpath = tempfile.mkdtemp() + lines = ['doc%d text' % i for i in range(5)] + fpath = os.path.join(dirpath, 'test_file.txt') + with open(fpath, 'w') as f: + f.write('\n'.join(lines)) + + corpus = textcorpus.TextDirectoryCorpus(dirpath, lines_are_documents=True) + docs = [doc for doc in corpus.getstream()] + self.assertEqual(len(lines), corpus.length) # should have cached + self.assertEqual(lines, docs) + + corpus.lines_are_documents = False + docs = [doc for doc in corpus.getstream()] + self.assertEqual('\n'.join(lines), docs[0]) + + def test_non_trivial_structure(self): + """Test with non-trivial directory structure, shown below: + . + ├── 0.txt + ├── a_folder + │   └── 1.txt + └── b_folder + ├── 2.txt + ├── 3.txt + └── c_folder + └── 4.txt + """ + dirpath = tempfile.mkdtemp() + self.write_docs_to_directory(dirpath, '0.txt') + + a_folder = os.path.join(dirpath, 'a_folder') + os.mkdir(a_folder) + self.write_docs_to_directory(a_folder, '1.txt') + + b_folder = os.path.join(dirpath, 'b_folder') + os.mkdir(b_folder) + self.write_docs_to_directory(b_folder, '2.txt', '3.txt') + + c_folder = os.path.join(b_folder, 'c_folder') + os.mkdir(c_folder) + self.write_docs_to_directory(c_folder, '4.txt') + + corpus = textcorpus.TextDirectoryCorpus(dirpath) + filenames = list(corpus.iter_filepaths()) + base_names = [name[len(dirpath) + 1:] for name in filenames] + expected = [ + '0.txt', + 'a_folder/1.txt', + 'b_folder/2.txt', + 'b_folder/3.txt', + 'b_folder/c_folder/4.txt' + ] + expected = [os.path.normpath(path) for path in expected] # Windows compatibility + self.assertEqual(expected, base_names) + + corpus.max_depth = 1 + self.assertEqual(expected[:-1], base_names[:-1]) + + corpus.min_depth = 1 + self.assertEqual(expected[2:-1], base_names[2:-1]) + + corpus.max_depth = 0 + self.assertEqual(expected[2:], base_names[2:]) + + corpus.pattern = "4.*" + self.assertEqual(expected[-1], base_names[-1]) + + +class TestLineSentence(unittest.TestCase): + def testLineSentenceWorksWithFilename(self): + """Does LineSentence work with a filename argument?""" + with utils.smart_open(datapath('lee_background.cor')) as orig: + sentences = textcorpus.LineSentence(datapath('lee_background.cor')) + for words in sentences: + self.assertEqual(words, utils.to_unicode(orig.readline()).split()) + + def testLineSentenceWorksWithCompressedFile(self): + """Does LineSentence work with a compressed file object argument?""" + with utils.smart_open(datapath('head500.noblanks.cor')) as orig: + sentences = textcorpus.LineSentence(datapath('head500.noblanks.cor.bz2')) + for words in sentences: + self.assertEqual(words, utils.to_unicode(orig.readline()).split()) + + def testLineSentenceWorksWithNormalFile(self): + """Does LineSentence work with a file object argument, rather than filename?""" + with utils.smart_open(datapath('head500.noblanks.cor')) as orig: + with utils.smart_open(datapath('head500.noblanks.cor')) as fin: + sentences = textcorpus.LineSentence(fin) + for words in sentences: + self.assertEqual(words, utils.to_unicode(orig.readline()).split()) + + +class TestPathLineSentences(unittest.TestCase): + + @classmethod + def setUpClass(cls): + cls.data_path = datapath('PathLineSentences') + + def get_test_file_path(self, *names): + return os.path.join(self.data_path, *names) + + def open_test_file(self, *names): + return utils.smart_open(self.get_test_file_path(*names)) + + def testPathLineSentences(self): + """Does PathLineSentences work with a path argument?""" + with self.open_test_file('subdir', '1.txt') as orig1, \ + self.open_test_file('2.txt.bz2') as orig2: + sentences = textcorpus.PathLineSentences(self.data_path) + orig = orig1.readlines() + orig2.readlines() + + corpus_sentences = iter(sentences) + for orig_text in orig: + expected = textcorpus.unicode_and_tokenize(orig_text) + if expected: + self.assertEqual(next(corpus_sentences), expected) + + def testPathLineSentencesOneFile(self): + """Does PathLineSentences work with a single file argument?""" + test_file = self.get_test_file_path('subdir', '1.txt') + with self.assertRaises(ValueError): + textcorpus.PathLineSentences(test_file) + + +if __name__ == '__main__': + logging.basicConfig(level=logging.DEBUG) + unittest.main() diff --git a/gensim/test/test_utils.py b/gensim/test/test_utils.py index ae94b5c2ad..9b342c94b5 100644 --- a/gensim/test/test_utils.py +++ b/gensim/test/test_utils.py @@ -7,13 +7,20 @@ Automated tests for checking various utils functions. """ - import logging +import os +import tempfile import unittest -from gensim import utils -from six import iteritems import numpy as np +from six import iteritems + +from gensim import utils + + +def touch(fname): + with open(fname, 'a'): + os.utime(fname, None) class TestIsCorpus(unittest.TestCase): @@ -83,6 +90,29 @@ def test_decode_entities(self): expected = u'It\x92s the Year of the Horse. YES VIN DIESEL \U0001f64c \U0001f4af' self.assertEquals(utils.decode_htmlentities(body), expected) + def test_walk_with_path_no_trailing_seperator(self): + dirpath = tempfile.mkdtemp() + self._test_walk_two_levels_from_dirpath(dirpath) + + def test_walk_with_path_with_leading_separator(self): + dirpath = tempfile.mkdtemp() + os.sep + self._test_walk_two_levels_from_dirpath(dirpath) + + def _test_walk_two_levels_from_dirpath(self, dirpath): + filename = os.path.join(dirpath, 'depth0.txt') + touch(filename) + subdirpath = os.path.join(dirpath, 'subdir') + os.mkdir(subdirpath) + sub_filename = os.path.join(subdirpath, 'depth1.txt') + touch(sub_filename) + walker = utils.walk_with_depth(dirpath) + first_yield = next(walker) + self.assertEqual((0, os.path.abspath(dirpath), ['subdir'], ['depth0.txt']), first_yield) + second_yield = next(walker) + self.assertEqual((1, subdirpath, [], ['depth1.txt']), second_yield) + with self.assertRaises(StopIteration): + next(walker) + class TestSampleDict(unittest.TestCase): def test_sample_dict(self): diff --git a/gensim/test/test_wikicorpus.py b/gensim/test/test_wikicorpus.py index ca81d6e51a..dc44e66d7e 100644 --- a/gensim/test/test_wikicorpus.py +++ b/gensim/test/test_wikicorpus.py @@ -15,13 +15,16 @@ from gensim.corpora.wikicorpus import WikiCorpus +logger = logging.getLogger(__name__) -module_path = os.path.dirname(__file__) # needed because sample data files are located in the same folder -datapath = lambda fname: os.path.join(module_path, 'test_data', fname) FILENAME = 'enwiki-latest-pages-articles1.xml-p000000010p000030302-shortened.bz2' FILENAME_U = 'bgwiki-latest-pages-articles-shortened.xml.bz2' +MODULE_PATH = os.path.dirname(__file__) +"""Needed because sample data files are located in the same folder.""" -logger = logging.getLogger(__name__) + +def datapath(fname): + return os.path.join(MODULE_PATH, 'test_data', fname) class TestWikiCorpus(unittest.TestCase): @@ -45,7 +48,7 @@ def test_first_element(self): 1) anarchism 2) autism """ - wc = WikiCorpus(datapath(FILENAME), processes=1) + wc = WikiCorpus(datapath(FILENAME), lemmatize=False, processes=1) texts = wc.get_texts() self.assertTrue(u'anarchism' in next(texts)) @@ -56,7 +59,7 @@ def test_unicode_element(self): First unicode article in this sample is 1) папа """ - wc = WikiCorpus(datapath(FILENAME_U), processes=1) + wc = WikiCorpus(datapath(FILENAME_U), lemmatize=False, processes=1) texts = wc.get_texts() self.assertTrue(u'папа' in next(texts)) diff --git a/gensim/test/test_word2vec.py b/gensim/test/test_word2vec.py index 81123ccd7a..4f4aabe42e 100644 --- a/gensim/test/test_word2vec.py +++ b/gensim/test/test_word2vec.py @@ -9,16 +9,17 @@ """ -import logging -import unittest import os -import tempfile import bz2 import sys +import logging +import tempfile +import unittest import numpy as np from gensim import utils +import gensim.corpora.textcorpus from gensim.models import word2vec, keyedvectors from testfixtures import log_capture @@ -28,8 +29,12 @@ except ImportError: PYEMD_EXT = False -module_path = os.path.dirname(__file__) # needed because sample data files are located in the same folder -datapath = lambda fname: os.path.join(module_path, 'test_data', fname) +MODULE_PATH = os.path.dirname(__file__) +"""Needed because sample data files are located in the same folder.""" + + +def datapath(fname): + return os.path.join(MODULE_PATH, 'test_data', fname) class LeeCorpus(object): @@ -39,9 +44,9 @@ def __iter__(self): yield utils.simple_preprocess(line) -list_corpus = list(LeeCorpus()) +LIST_CORPUS = list(LeeCorpus()) -sentences = [ +SENTENCES = [ ['human', 'interface', 'computer'], ['survey', 'user', 'computer', 'system', 'response', 'time'], ['eps', 'user', 'interface', 'system'], @@ -53,7 +58,7 @@ def __iter__(self): ['graph', 'minors', 'survey'] ] -new_sentences = [ +NEW_SENTENCES = [ ['computer', 'artificial', 'intelligence'], ['artificial', 'trees'], ['human', 'intelligence'], @@ -77,7 +82,7 @@ def _rule(word, count, min_count): def load_on_instance(): # Save and load a Word2Vec Model on instance for test - model = word2vec.Word2Vec(sentences, min_count=1) + model = word2vec.Word2Vec(SENTENCES, min_count=1) model.save(testfile()) model = word2vec.Word2Vec() # should fail at this point return model.load(testfile()) @@ -87,12 +92,12 @@ class TestWord2VecModel(unittest.TestCase): def testOnlineLearning(self): """Test that the algorithm is able to add new words to the vocabulary and to a trained model when using a sorted vocabulary""" - model_hs = word2vec.Word2Vec(sentences, size=10, min_count=0, seed=42, hs=1, negative=0) - model_neg = word2vec.Word2Vec(sentences, size=10, min_count=0, seed=42, hs=0, negative=5) + model_hs = word2vec.Word2Vec(SENTENCES, size=10, min_count=0, seed=42, hs=1, negative=0) + model_neg = word2vec.Word2Vec(SENTENCES, size=10, min_count=0, seed=42, hs=0, negative=5) self.assertTrue(len(model_hs.wv.vocab), 12) self.assertTrue(model_hs.wv.vocab['graph'].count, 3) - model_hs.build_vocab(new_sentences, update=True) - model_neg.build_vocab(new_sentences, update=True) + model_hs.build_vocab(NEW_SENTENCES, update=True) + model_neg.build_vocab(NEW_SENTENCES, update=True) self.assertTrue(model_hs.wv.vocab['graph'].count, 4) self.assertTrue(model_hs.wv.vocab['artificial'].count, 4) self.assertEqual(len(model_hs.wv.vocab), 14) @@ -101,17 +106,17 @@ def testOnlineLearning(self): def testOnlineLearningAfterSave(self): """Test that the algorithm is able to add new words to the vocabulary and to a trained model when using a sorted vocabulary""" - model_neg = word2vec.Word2Vec(sentences, size=10, min_count=0, seed=42, hs=0, negative=5) + model_neg = word2vec.Word2Vec(SENTENCES, size=10, min_count=0, seed=42, hs=0, negative=5) model_neg.save(testfile()) model_neg = word2vec.Word2Vec.load(testfile()) self.assertTrue(len(model_neg.wv.vocab), 12) - model_neg.build_vocab(new_sentences, update=True) - model_neg.train(new_sentences, total_examples=model_neg.corpus_count, epochs=model_neg.iter) + model_neg.build_vocab(NEW_SENTENCES, update=True) + model_neg.train(NEW_SENTENCES, total_examples=model_neg.corpus_count, epochs=model_neg.iter) self.assertEqual(len(model_neg.wv.vocab), 14) def onlineSanity(self, model): terro, others = [], [] - for l in list_corpus: + for l in LIST_CORPUS: if 'terrorism' in l: terro.append(l) else: @@ -156,7 +161,7 @@ def test_cbow_neg_online(self): def testPersistence(self): """Test storing/loading the entire model.""" - model = word2vec.Word2Vec(sentences, min_count=1) + model = word2vec.Word2Vec(SENTENCES, min_count=1) model.save(testfile()) self.models_equal(model, word2vec.Word2Vec.load(testfile())) # test persistence of the KeyedVectors of a model @@ -168,13 +173,13 @@ def testPersistence(self): def testPersistenceWithConstructorRule(self): """Test storing/loading the entire model with a vocab trimming rule passed in the constructor.""" - model = word2vec.Word2Vec(sentences, min_count=1, trim_rule=_rule) + model = word2vec.Word2Vec(SENTENCES, min_count=1, trim_rule=_rule) model.save(testfile()) self.models_equal(model, word2vec.Word2Vec.load(testfile())) def testRuleWithMinCount(self): """Test that returning RULE_DEFAULT from trim_rule triggers min_count.""" - model = word2vec.Word2Vec(sentences + [["occurs_only_once"]], min_count=2, trim_rule=_rule) + model = word2vec.Word2Vec(SENTENCES + [["occurs_only_once"]], min_count=2, trim_rule=_rule) self.assertTrue("human" not in model.wv.vocab) self.assertTrue("occurs_only_once" not in model.wv.vocab) self.assertTrue("interface" in model.wv.vocab) @@ -182,18 +187,18 @@ def testRuleWithMinCount(self): def testRule(self): """Test applying vocab trim_rule to build_vocab instead of constructor.""" model = word2vec.Word2Vec(min_count=1) - model.build_vocab(sentences, trim_rule=_rule) + model.build_vocab(SENTENCES, trim_rule=_rule) self.assertTrue("human" not in model.wv.vocab) def testLambdaRule(self): """Test that lambda trim_rule works.""" rule = lambda word, count, min_count: utils.RULE_DISCARD if word == "human" else utils.RULE_DEFAULT - model = word2vec.Word2Vec(sentences, min_count=1, trim_rule=rule) + model = word2vec.Word2Vec(SENTENCES, min_count=1, trim_rule=rule) self.assertTrue("human" not in model.wv.vocab) def testSyn0NormNotSaved(self): """Test syn0norm isn't saved in model file""" - model = word2vec.Word2Vec(sentences, min_count=1) + model = word2vec.Word2Vec(SENTENCES, min_count=1) model.init_sims() model.save(testfile()) loaded_model = word2vec.Word2Vec.load(testfile()) @@ -233,7 +238,7 @@ def testLoadPreKeyedVectorModelCFormat(self): def testPersistenceWord2VecFormat(self): """Test storing/loading the entire model in word2vec format.""" - model = word2vec.Word2Vec(sentences, min_count=1) + model = word2vec.Word2Vec(SENTENCES, min_count=1) model.init_sims() model.wv.save_word2vec_format(testfile(), binary=True) binary_model_kv = keyedvectors.KeyedVectors.load_word2vec_format(testfile(), binary=True) @@ -251,17 +256,17 @@ def testPersistenceWord2VecFormat(self): self.assertEquals(binary_model_kv.syn0.nbytes, half_precision_model_kv.syn0.nbytes * 2) def testNoTrainingCFormat(self): - model = word2vec.Word2Vec(sentences, min_count=1) + model = word2vec.Word2Vec(SENTENCES, min_count=1) model.init_sims() model.wv.save_word2vec_format(testfile(), binary=True) kv = keyedvectors.KeyedVectors.load_word2vec_format(testfile(), binary=True) binary_model = word2vec.Word2Vec() binary_model.wv = kv - self.assertRaises(ValueError, binary_model.train, sentences) + self.assertRaises(ValueError, binary_model.train, SENTENCES) def testTooShortBinaryWord2VecFormat(self): tfile = testfile() - model = word2vec.Word2Vec(sentences, min_count=1) + model = word2vec.Word2Vec(SENTENCES, min_count=1) model.init_sims() model.wv.save_word2vec_format(tfile, binary=True) f = open(tfile, 'r+b') @@ -271,7 +276,7 @@ def testTooShortBinaryWord2VecFormat(self): def testTooShortTextWord2VecFormat(self): tfile = testfile() - model = word2vec.Word2Vec(sentences, min_count=1) + model = word2vec.Word2Vec(SENTENCES, min_count=1) model.init_sims() model.wv.save_word2vec_format(tfile, binary=False) f = open(tfile, 'r+b') @@ -281,7 +286,7 @@ def testTooShortTextWord2VecFormat(self): def testPersistenceWord2VecFormatNonBinary(self): """Test storing/loading the entire model in word2vec non-binary format.""" - model = word2vec.Word2Vec(sentences, min_count=1) + model = word2vec.Word2Vec(SENTENCES, min_count=1) model.init_sims() model.wv.save_word2vec_format(testfile(), binary=False) text_model = keyedvectors.KeyedVectors.load_word2vec_format(testfile(), binary=False) @@ -296,7 +301,7 @@ def testPersistenceWord2VecFormatNonBinary(self): def testPersistenceWord2VecFormatWithVocab(self): """Test storing/loading the entire model and vocabulary in word2vec format.""" - model = word2vec.Word2Vec(sentences, min_count=1) + model = word2vec.Word2Vec(SENTENCES, min_count=1) model.init_sims() testvocab = os.path.join(tempfile.gettempdir(), 'gensim_word2vec.vocab') model.wv.save_word2vec_format(testfile(), testvocab, binary=True) @@ -305,7 +310,7 @@ def testPersistenceWord2VecFormatWithVocab(self): def testPersistenceKeyedVectorsFormatWithVocab(self): """Test storing/loading the entire model and vocabulary in word2vec format.""" - model = word2vec.Word2Vec(sentences, min_count=1) + model = word2vec.Word2Vec(SENTENCES, min_count=1) model.init_sims() testvocab = os.path.join(tempfile.gettempdir(), 'gensim_word2vec.vocab') model.wv.save_word2vec_format(testfile(), testvocab, binary=True) @@ -316,7 +321,7 @@ def testPersistenceWord2VecFormatCombinationWithStandardPersistence(self): """Test storing/loading the entire model and vocabulary in word2vec format chained with saving and loading via `save` and `load` methods`. It was possible prior to 1.0.0 release, now raises Exception""" - model = word2vec.Word2Vec(sentences, min_count=1) + model = word2vec.Word2Vec(SENTENCES, min_count=1) model.init_sims() testvocab = os.path.join(tempfile.gettempdir(), 'gensim_word2vec.vocab') model.wv.save_word2vec_format(testfile(), testvocab, binary=True) @@ -326,7 +331,7 @@ def testPersistenceWord2VecFormatCombinationWithStandardPersistence(self): def testLargeMmap(self): """Test storing/loading the entire model.""" - model = word2vec.Word2Vec(sentences, min_count=1) + model = word2vec.Word2Vec(SENTENCES, min_count=1) # test storing the internal arrays into separate files model.save(testfile(), sep_limit=0) @@ -365,12 +370,12 @@ def testTraining(self): """Test word2vec training.""" # build vocabulary, don't train yet model = word2vec.Word2Vec(size=2, min_count=1, hs=1, negative=0) - model.build_vocab(sentences) + model.build_vocab(SENTENCES) self.assertTrue(model.wv.syn0.shape == (len(model.wv.vocab), 2)) self.assertTrue(model.syn1.shape == (len(model.wv.vocab), 2)) - model.train(sentences, total_examples=model.corpus_count, epochs=model.iter) + model.train(SENTENCES, total_examples=model.corpus_count, epochs=model.iter) sims = model.most_similar('graph', topn=10) # self.assertTrue(sims[0][0] == 'trees', sims) # most similar @@ -381,16 +386,16 @@ def testTraining(self): self.assertEqual(sims, sims2) # build vocab and train in one step; must be the same as above - model2 = word2vec.Word2Vec(sentences, size=2, min_count=1, hs=1, negative=0) + model2 = word2vec.Word2Vec(SENTENCES, size=2, min_count=1, hs=1, negative=0) self.models_equal(model, model2) def testScoring(self): """Test word2vec scoring.""" - model = word2vec.Word2Vec(sentences, size=2, min_count=1, hs=1, negative=0) + model = word2vec.Word2Vec(SENTENCES, size=2, min_count=1, hs=1, negative=0) # just score and make sure they exist - scores = model.score(sentences, len(sentences)) - self.assertEqual(len(scores), len(sentences)) + scores = model.score(SENTENCES, len(SENTENCES)) + self.assertEqual(len(scores), len(SENTENCES)) def testLocking(self): """Test word2vec training doesn't change locked vectors.""" @@ -419,7 +424,7 @@ def testAccuracy(self): def testEvaluateWordPairs(self): """Test Spearman and Pearson correlation coefficients give sane results on similarity datasets""" - corpus = word2vec.LineSentence(datapath('head500.noblanks.cor.bz2')) + corpus = gensim.corpora.textcorpus.LineSentence(datapath('head500.noblanks.cor.bz2')) model = word2vec.Word2Vec(corpus, min_count=3, iter=10) correlation = model.evaluate_word_pairs(datapath('wordsim353.tsv')) pearson = correlation[0][0] @@ -433,9 +438,9 @@ def model_sanity(self, model, train=True): """Even tiny models trained on LeeCorpus should pass these sanity checks""" # run extra before/after training tests if train=True if train: - model.build_vocab(list_corpus) + model.build_vocab(LIST_CORPUS) orig0 = np.copy(model.wv.syn0[0]) - model.train(list_corpus, total_examples=model.corpus_count, epochs=model.iter) + model.train(LIST_CORPUS, total_examples=model.corpus_count, epochs=model.iter) self.assertFalse((orig0 == model.wv.syn0[1]).all()) # vector should vary after training sims = model.most_similar('war', topn=len(model.wv.index2word)) t_rank = [word for word, score in sims].index('terrorism') @@ -473,7 +478,7 @@ def test_cbow_neg(self): self.model_sanity(model) def test_cosmul(self): - model = word2vec.Word2Vec(sentences, size=2, min_count=1, hs=1, negative=0) + model = word2vec.Word2Vec(SENTENCES, size=2, min_count=1, hs=1, negative=0) sims = model.most_similar_cosmul('graph', topn=10) # self.assertTrue(sims[0][0] == 'trees', sims) # most similar @@ -488,11 +493,11 @@ def testTrainingCbow(self): # to test training, make the corpus larger by repeating its sentences over and over # build vocabulary, don't train yet model = word2vec.Word2Vec(size=2, min_count=1, sg=0, hs=1, negative=0) - model.build_vocab(sentences) + model.build_vocab(SENTENCES) self.assertTrue(model.wv.syn0.shape == (len(model.wv.vocab), 2)) self.assertTrue(model.syn1.shape == (len(model.wv.vocab), 2)) - model.train(sentences, total_examples=model.corpus_count, epochs=model.iter) + model.train(SENTENCES, total_examples=model.corpus_count, epochs=model.iter) sims = model.most_similar('graph', topn=10) # self.assertTrue(sims[0][0] == 'trees', sims) # most similar @@ -503,7 +508,7 @@ def testTrainingCbow(self): self.assertEqual(sims, sims2) # build vocab and train in one step; must be the same as above - model2 = word2vec.Word2Vec(sentences, size=2, min_count=1, sg=0, hs=1, negative=0) + model2 = word2vec.Word2Vec(SENTENCES, size=2, min_count=1, sg=0, hs=1, negative=0) self.models_equal(model, model2) def testTrainingSgNegative(self): @@ -511,11 +516,11 @@ def testTrainingSgNegative(self): # to test training, make the corpus larger by repeating its sentences over and over # build vocabulary, don't train yet model = word2vec.Word2Vec(size=2, min_count=1, sg=1, hs=0, negative=2) - model.build_vocab(sentences) + model.build_vocab(SENTENCES) self.assertTrue(model.wv.syn0.shape == (len(model.wv.vocab), 2)) self.assertTrue(model.syn1neg.shape == (len(model.wv.vocab), 2)) - model.train(sentences, total_examples=model.corpus_count, epochs=model.iter) + model.train(SENTENCES, total_examples=model.corpus_count, epochs=model.iter) sims = model.most_similar('graph', topn=10) # self.assertTrue(sims[0][0] == 'trees', sims) # most similar @@ -526,7 +531,7 @@ def testTrainingSgNegative(self): self.assertEqual(sims, sims2) # build vocab and train in one step; must be the same as above - model2 = word2vec.Word2Vec(sentences, size=2, min_count=1, sg=1, hs=0, negative=2) + model2 = word2vec.Word2Vec(SENTENCES, size=2, min_count=1, sg=1, hs=0, negative=2) self.models_equal(model, model2) def testTrainingCbowNegative(self): @@ -534,11 +539,11 @@ def testTrainingCbowNegative(self): # to test training, make the corpus larger by repeating its sentences over and over # build vocabulary, don't train yet model = word2vec.Word2Vec(size=2, min_count=1, sg=0, hs=0, negative=2) - model.build_vocab(sentences) + model.build_vocab(SENTENCES) self.assertTrue(model.wv.syn0.shape == (len(model.wv.vocab), 2)) self.assertTrue(model.syn1neg.shape == (len(model.wv.vocab), 2)) - model.train(sentences, total_examples=model.corpus_count, epochs=model.iter) + model.train(SENTENCES, total_examples=model.corpus_count, epochs=model.iter) sims = model.most_similar('graph', topn=10) # self.assertTrue(sims[0][0] == 'trees', sims) # most similar @@ -549,15 +554,15 @@ def testTrainingCbowNegative(self): self.assertEqual(sims, sims2) # build vocab and train in one step; must be the same as above - model2 = word2vec.Word2Vec(sentences, size=2, min_count=1, sg=0, hs=0, negative=2) + model2 = word2vec.Word2Vec(SENTENCES, size=2, min_count=1, sg=0, hs=0, negative=2) self.models_equal(model, model2) def testSimilarities(self): """Test similarity and n_similarity methods.""" # The model is trained using CBOW model = word2vec.Word2Vec(size=2, min_count=1, sg=0, hs=0, negative=2) - model.build_vocab(sentences) - model.train(sentences, total_examples=model.corpus_count, epochs=model.iter) + model.build_vocab(SENTENCES) + model.train(SENTENCES, total_examples=model.corpus_count, epochs=model.iter) self.assertTrue(model.n_similarity(['graph', 'trees'], ['trees', 'graph'])) self.assertTrue(model.n_similarity(['graph'], ['trees']) == model.similarity('graph', 'trees')) @@ -567,7 +572,7 @@ def testSimilarities(self): def testSimilarBy(self): """Test word2vec similar_by_word and similar_by_vector.""" - model = word2vec.Word2Vec(sentences, size=2, min_count=1, hs=1, negative=0) + model = word2vec.Word2Vec(SENTENCES, size=2, min_count=1, hs=1, negative=0) wordsims = model.similar_by_word('graph', topn=10) wordsims2 = model.most_similar(positive='graph', topn=10) vectorsims = model.similar_by_vector(model['graph'], topn=10) @@ -592,8 +597,8 @@ def testParallel(self): def testRNG(self): """Test word2vec results identical with identical RNG seed.""" - model = word2vec.Word2Vec(sentences, min_count=2, seed=42, workers=1) - model2 = word2vec.Word2Vec(sentences, min_count=2, seed=42, workers=1) + model = word2vec.Word2Vec(SENTENCES, min_count=2, seed=42, workers=1) + model2 = word2vec.Word2Vec(SENTENCES, min_count=2, seed=42, workers=1) self.models_equal(model, model2) def models_equal(self, model, model2): @@ -610,7 +615,7 @@ def testDeleteTemporaryTrainingData(self): """Test word2vec model after delete_temporary_training_data""" for i in [0, 1]: for j in [0, 1]: - model = word2vec.Word2Vec(sentences, size=10, min_count=0, seed=42, hs=i, negative=j) + model = word2vec.Word2Vec(SENTENCES, size=10, min_count=0, seed=42, hs=i, negative=j) if i: self.assertTrue(hasattr(model, 'syn1')) if j: @@ -625,16 +630,16 @@ def testDeleteTemporaryTrainingData(self): self.assertTrue(not hasattr(model, 'syn0_lockf')) def testNormalizeAfterTrainingData(self): - model = word2vec.Word2Vec(sentences, min_count=1) + model = word2vec.Word2Vec(SENTENCES, min_count=1) model.save(testfile()) norm_only_model = word2vec.Word2Vec.load(testfile()) norm_only_model.delete_temporary_training_data(replace_word_vectors_with_normalized=True) self.assertFalse(np.allclose(model['human'], norm_only_model['human'])) def testPredictOutputWord(self): - '''Test word2vec predict_output_word method handling for negative sampling scheme''' + """Test word2vec predict_output_word method handling for negative sampling scheme""" # under normal circumstances - model_with_neg = word2vec.Word2Vec(sentences, min_count=1) + model_with_neg = word2vec.Word2Vec(SENTENCES, min_count=1) predictions_with_neg = model_with_neg.predict_output_word(['system', 'human'], topn=5) self.assertTrue(len(predictions_with_neg) == 5) @@ -651,7 +656,7 @@ def testPredictOutputWord(self): self.assertRaises(RuntimeError, binary_model_with_neg.predict_output_word, ['system', 'human']) # negative sampling scheme not used - model_without_neg = word2vec.Word2Vec(sentences, min_count=1, negative=0) + model_without_neg = word2vec.Word2Vec(SENTENCES, min_count=1, negative=0) self.assertRaises(RuntimeError, model_without_neg.predict_output_word, ['system', 'human']) @log_capture() @@ -683,21 +688,21 @@ def testTrainWarning(self, l): def test_train_with_explicit_param(self): model = word2vec.Word2Vec(size=2, min_count=1, hs=1, negative=0) - model.build_vocab(sentences) + model.build_vocab(SENTENCES) with self.assertRaises(ValueError): - model.train(sentences, total_examples=model.corpus_count) + model.train(SENTENCES, total_examples=model.corpus_count) with self.assertRaises(ValueError): - model.train(sentences, epochs=model.iter) + model.train(SENTENCES, epochs=model.iter) with self.assertRaises(ValueError): - model.train(sentences) + model.train(SENTENCES) def test_sentences_should_not_be_a_generator(self): """ Is sentences a generator object? """ - gen = (s for s in sentences) + gen = (s for s in SENTENCES) self.assertRaises(TypeError, word2vec.Word2Vec, (gen,)) def testLoadOnClassError(self): @@ -706,22 +711,20 @@ def testLoadOnClassError(self): def test_reset_from(self): """Test if reset_from() uses pre-built structures from other model""" - model = word2vec.Word2Vec(sentences, min_count=1) - other_model = word2vec.Word2Vec(new_sentences, min_count=1) + model = word2vec.Word2Vec(SENTENCES, min_count=1) + other_model = word2vec.Word2Vec(NEW_SENTENCES, min_count=1) other_vocab = other_model.wv.vocab model.reset_from(other_model) self.assertEqual(model.wv.vocab, other_vocab) def test_compute_training_loss(self): model = word2vec.Word2Vec(min_count=1, sg=1, negative=5, hs=1) - model.build_vocab(sentences) - model.train(sentences, compute_loss=True, total_examples=model.corpus_count, epochs=model.iter) + model.build_vocab(SENTENCES) + model.train(SENTENCES, compute_loss=True, total_examples=model.corpus_count, epochs=model.iter) training_loss_val = model.get_latest_training_loss() self.assertTrue(training_loss_val > 0.0) -# endclass TestWord2VecModel - class TestWMD(unittest.TestCase): def testNonzero(self): '''Test basic functionality with a test sentence.''' @@ -729,7 +732,7 @@ def testNonzero(self): if not PYEMD_EXT: return - model = word2vec.Word2Vec(sentences, min_count=2, seed=42, workers=1) + model = word2vec.Word2Vec(SENTENCES, min_count=2, seed=42, workers=1) sentence1 = ['human', 'interface', 'computer'] sentence2 = ['survey', 'user', 'computer', 'system', 'response', 'time'] distance = model.wmdistance(sentence1, sentence2) @@ -743,7 +746,7 @@ def testSymmetry(self): if not PYEMD_EXT: return - model = word2vec.Word2Vec(sentences, min_count=2, seed=42, workers=1) + model = word2vec.Word2Vec(SENTENCES, min_count=2, seed=42, workers=1) sentence1 = ['human', 'interface', 'computer'] sentence2 = ['survey', 'user', 'computer', 'system', 'response', 'time'] distance1 = model.wmdistance(sentence1, sentence2) @@ -756,57 +759,12 @@ def testIdenticalSentences(self): if not PYEMD_EXT: return - model = word2vec.Word2Vec(sentences, min_count=1) + model = word2vec.Word2Vec(SENTENCES, min_count=1) sentence = ['survey', 'user', 'computer', 'system', 'response', 'time'] distance = model.wmdistance(sentence, sentence) self.assertEqual(0.0, distance) -class TestWord2VecSentenceIterators(unittest.TestCase): - def testLineSentenceWorksWithFilename(self): - """Does LineSentence work with a filename argument?""" - with utils.smart_open(datapath('lee_background.cor')) as orig: - sentences = word2vec.LineSentence(datapath('lee_background.cor')) - for words in sentences: - self.assertEqual(words, utils.to_unicode(orig.readline()).split()) - - def testLineSentenceWorksWithCompressedFile(self): - """Does LineSentence work with a compressed file object argument?""" - with utils.smart_open(datapath('head500.noblanks.cor')) as orig: - sentences = word2vec.LineSentence(bz2.BZ2File(datapath('head500.noblanks.cor.bz2'))) - for words in sentences: - self.assertEqual(words, utils.to_unicode(orig.readline()).split()) - - def testLineSentenceWorksWithNormalFile(self): - """Does LineSentence work with a file object argument, rather than filename?""" - with utils.smart_open(datapath('head500.noblanks.cor')) as orig: - with utils.smart_open(datapath('head500.noblanks.cor')) as fin: - sentences = word2vec.LineSentence(fin) - for words in sentences: - self.assertEqual(words, utils.to_unicode(orig.readline()).split()) - - def testPathLineSentences(self): - """Does PathLineSentences work with a path argument?""" - with utils.smart_open(os.path.join(datapath('PathLineSentences'), '1.txt')) as orig1,\ - utils.smart_open(os.path.join(datapath('PathLineSentences'), '2.txt.bz2')) as orig2: - sentences = word2vec.PathLineSentences(datapath('PathLineSentences')) - orig = orig1.readlines() + orig2.readlines() - orig_counter = 0 # to go through orig while matching PathLineSentences - for words in sentences: - self.assertEqual(words, utils.to_unicode(orig[orig_counter]).split()) - orig_counter += 1 - - def testPathLineSentencesOneFile(self): - """Does PathLineSentences work with a single file argument?""" - test_file = os.path.join(datapath('PathLineSentences'), '1.txt') - with utils.smart_open(test_file) as orig: - sentences = word2vec.PathLineSentences(test_file) - for words in sentences: - self.assertEqual(words, utils.to_unicode(orig.readline()).split()) - - -# endclass TestWord2VecSentenceIterators - # TODO: get correct path to Python binary # class TestWord2VecScripts(unittest.TestCase): # def testWord2VecStandAloneScript(self): diff --git a/gensim/utils.py b/gensim/utils.py index 10555d2b51..137a8b609f 100644 --- a/gensim/utils.py +++ b/gensim/utils.py @@ -889,18 +889,24 @@ def chunkize(corpus, chunksize, maxsize=0, as_numpy=False): assert chunksize > 0 if maxsize > 0: - q = multiprocessing.Queue(maxsize=maxsize) - worker = InputQueue(q, corpus, chunksize, maxsize=maxsize, as_numpy=as_numpy) - worker.daemon = True - worker.start() + return _buffer_and_yield(corpus, chunksize, maxsize, as_numpy) + else: + return chunkize_serial(corpus, chunksize, as_numpy=as_numpy) + + def _buffer_and_yield(corpus, chunksize, maxsize, as_numpy): + q = multiprocessing.Queue(maxsize=maxsize) + worker = InputQueue(q, corpus, chunksize, maxsize=maxsize, as_numpy=as_numpy) + worker.daemon = True + worker.start() + try: while True: chunk = [q.get(block=True)] if chunk[0] is None: break yield chunk.pop() - else: - for chunk in chunkize_serial(corpus, chunksize, as_numpy=as_numpy): - yield chunk + finally: + worker.terminate() + q.close() def smart_extension(fname, ext): @@ -1254,3 +1260,18 @@ def _iter_windows(document, window_size, copy=False, ignore_below_size=True): else: for doc_window in doc_windows: yield doc_window.copy() if copy else doc_window + + +def walk_with_depth(top, topdown=True, onerror=None, followlinks=False): + """Wrap `os.walk` in code that analyzes the directory path of each yield to + determine the depth relative to `top`. Yields in the `top` directory are at + depth 0. + + Returns: + generator of tuples of (depth, dirpath, dirnames, filenames). + """ + path = os.path.abspath(top) + for dirpath, dirnames, filenames in os.walk(path, topdown, onerror, followlinks): + sub_path = dirpath.replace(path, '') + depth = sub_path.count(os.sep) + yield depth, dirpath, dirnames, filenames