diff --git a/gensim/models/ldamulticore.py b/gensim/models/ldamulticore.py index e3ed274128..f3341fee0b 100644 --- a/gensim/models/ldamulticore.py +++ b/gensim/models/ldamulticore.py @@ -225,13 +225,14 @@ def update(self, corpus, chunks_as_numpy=False): self.state.numdocs += lencorpus - if not self.batch: - updatetype = "online" - updateafter = self.chunksize * self.workers - else: + if self.batch: updatetype = "batch" updateafter = lencorpus - evalafter = min(lencorpus, (self.eval_every or 0) * updateafter) + else: + updatetype = "online" + updateafter = self.chunksize * self.workers + eval_every = self.eval_every or 0 + evalafter = min(lencorpus, eval_every * updateafter) updates_per_pass = max(1, lencorpus / updateafter) logger.info( @@ -257,47 +258,45 @@ def update(self, corpus, chunks_as_numpy=False): def rho(): return pow(self.offset + pass_ + (self.num_updates / self.chunksize), -self.decay) + def process_result_queue(force=False): + """ + Clear the result queue, merging all intermediate results, and update the + LDA model if necessary. + + """ + merged_new = False + while not result_queue.empty(): + other.merge(result_queue.get()) + queue_size[0] -= 1 + merged_new = True + + if (force and merged_new and queue_size[0] == 0) or (other.numdocs >= updateafter): + self.do_mstep(rho(), other, pass_ > 0) + other.reset() + if eval_every > 0 and (force or (self.num_updates / updateafter) % eval_every == 0): + self.log_perplexity(chunk, total_docs=lencorpus) + logger.info("training LDA model using %i processes", self.workers) pool = Pool(self.workers, worker_e_step, (job_queue, result_queue,)) for pass_ in range(self.passes): queue_size, reallen = [0], 0 other = LdaState(self.eta, self.state.sstats.shape) - def process_result_queue(force=False): - """ - Clear the result queue, merging all intermediate results, and update the - LDA model if necessary. - - """ - merged_new = False - while not result_queue.empty(): - other.merge(result_queue.get()) - queue_size[0] -= 1 - merged_new = True - if (force and merged_new and queue_size[0] == 0) or (not self.batch and (other.numdocs >= updateafter)): - self.do_mstep(rho(), other, pass_ > 0) - other.reset() - if self.eval_every is not None \ - and ((force and queue_size[0] == 0) - or (self.eval_every != 0 and (self.num_updates / updateafter) % self.eval_every == 0)): - self.log_perplexity(chunk, total_docs=lencorpus) - chunk_stream = utils.grouper(corpus, self.chunksize, as_numpy=chunks_as_numpy) for chunk_no, chunk in enumerate(chunk_stream): reallen += len(chunk) # keep track of how many documents we've processed so far # put the chunk into the workers' input job queue - chunk_put = False - while not chunk_put: + while True: try: - job_queue.put((chunk_no, chunk, self), block=False, timeout=0.1) - chunk_put = True + job_queue.put((chunk_no, chunk, self), block=False) queue_size[0] += 1 logger.info( "PROGRESS: pass %i, dispatched chunk #%i = documents up to #%i/%i, " "outstanding queue size %i", pass_, chunk_no, chunk_no * self.chunksize + len(chunk), lencorpus, queue_size[0] ) + break except queue.Full: # in case the input job queue is full, keep clearing the # result queue, to make sure we don't deadlock