From f61219e90ee6cbc4a76c53b6afd1c792bc401aa7 Mon Sep 17 00:00:00 2001 From: horpto <__singleton__@hackerdom.ru> Date: Mon, 28 Jan 2019 07:46:51 +0500 Subject: [PATCH 1/3] Move out `process_result_queue` + simplify inner check + add heuristic for case with large number of workers --- gensim/models/ldamulticore.py | 52 +++++++++++++++++++---------------- 1 file changed, 28 insertions(+), 24 deletions(-) diff --git a/gensim/models/ldamulticore.py b/gensim/models/ldamulticore.py index e3ed274128..12879a8a2a 100644 --- a/gensim/models/ldamulticore.py +++ b/gensim/models/ldamulticore.py @@ -231,7 +231,8 @@ def update(self, corpus, chunks_as_numpy=False): else: updatetype = "batch" updateafter = lencorpus - evalafter = min(lencorpus, (self.eval_every or 0) * updateafter) + eval_every = self.eval_every or 0 + evalafter = min(lencorpus, eval_every * updateafter) updates_per_pass = max(1, lencorpus / updateafter) logger.info( @@ -257,47 +258,50 @@ def update(self, corpus, chunks_as_numpy=False): def rho(): return pow(self.offset + pass_ + (self.num_updates / self.chunksize), -self.decay) + def process_result_queue(force=False): + """ + Clear the result queue, merging all intermediate results, and update the + LDA model if necessary. + + """ + merged_new = False + while not result_queue.empty(): + other.merge(result_queue.get()) + queue_size[0] -= 1 + merged_new = True + + if not (force or job_queue.full()): + # better to put one more chunk than process all completed + # as workers should be busy constantly + break + + if (force and merged_new and queue_size[0] == 0) or (other.numdocs >= updateafter): + self.do_mstep(rho(), other, pass_ > 0) + other.reset() + if eval_every > 0 and (force or (self.num_updates / updateafter) % eval_every == 0): + self.log_perplexity(chunk, total_docs=lencorpus) + logger.info("training LDA model using %i processes", self.workers) pool = Pool(self.workers, worker_e_step, (job_queue, result_queue,)) for pass_ in range(self.passes): queue_size, reallen = [0], 0 other = LdaState(self.eta, self.state.sstats.shape) - def process_result_queue(force=False): - """ - Clear the result queue, merging all intermediate results, and update the - LDA model if necessary. - - """ - merged_new = False - while not result_queue.empty(): - other.merge(result_queue.get()) - queue_size[0] -= 1 - merged_new = True - if (force and merged_new and queue_size[0] == 0) or (not self.batch and (other.numdocs >= updateafter)): - self.do_mstep(rho(), other, pass_ > 0) - other.reset() - if self.eval_every is not None \ - and ((force and queue_size[0] == 0) - or (self.eval_every != 0 and (self.num_updates / updateafter) % self.eval_every == 0)): - self.log_perplexity(chunk, total_docs=lencorpus) - chunk_stream = utils.grouper(corpus, self.chunksize, as_numpy=chunks_as_numpy) for chunk_no, chunk in enumerate(chunk_stream): reallen += len(chunk) # keep track of how many documents we've processed so far # put the chunk into the workers' input job queue - chunk_put = False - while not chunk_put: + while True: try: - job_queue.put((chunk_no, chunk, self), block=False, timeout=0.1) - chunk_put = True + job_queue.put((chunk_no, chunk, self), block=False) queue_size[0] += 1 logger.info( "PROGRESS: pass %i, dispatched chunk #%i = documents up to #%i/%i, " "outstanding queue size %i", pass_, chunk_no, chunk_no * self.chunksize + len(chunk), lencorpus, queue_size[0] ) + break except queue.Full: # in case the input job queue is full, keep clearing the # result queue, to make sure we don't deadlock From ac8ad102adf2db6e4660909949f0674d96117493 Mon Sep 17 00:00:00 2001 From: horpto <__singleton__@hackerdom.ru> Date: Tue, 29 Jan 2019 13:55:56 +0500 Subject: [PATCH 2/3] delete heuristic --- gensim/models/ldamulticore.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/gensim/models/ldamulticore.py b/gensim/models/ldamulticore.py index 12879a8a2a..107fead2ee 100644 --- a/gensim/models/ldamulticore.py +++ b/gensim/models/ldamulticore.py @@ -270,11 +270,6 @@ def process_result_queue(force=False): queue_size[0] -= 1 merged_new = True - if not (force or job_queue.full()): - # better to put one more chunk than process all completed - # as workers should be busy constantly - break - if (force and merged_new and queue_size[0] == 0) or (other.numdocs >= updateafter): self.do_mstep(rho(), other, pass_ > 0) other.reset() From afae888ffd7d5a7136c61833ba1a98aebd13c5da Mon Sep 17 00:00:00 2001 From: horpto <__singleton__@hackerdom.ru> Date: Tue, 29 Jan 2019 14:09:24 +0500 Subject: [PATCH 3/3] swap branches in if self.batch check --- gensim/models/ldamulticore.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/gensim/models/ldamulticore.py b/gensim/models/ldamulticore.py index 107fead2ee..f3341fee0b 100644 --- a/gensim/models/ldamulticore.py +++ b/gensim/models/ldamulticore.py @@ -225,12 +225,12 @@ def update(self, corpus, chunks_as_numpy=False): self.state.numdocs += lencorpus - if not self.batch: - updatetype = "online" - updateafter = self.chunksize * self.workers - else: + if self.batch: updatetype = "batch" updateafter = lencorpus + else: + updatetype = "online" + updateafter = self.chunksize * self.workers eval_every = self.eval_every or 0 evalafter = min(lencorpus, eval_every * updateafter)