Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move out process_result_queue from cycle in LdaMulticore #2358

Merged
merged 4 commits into from
Jan 29, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 27 additions & 28 deletions gensim/models/ldamulticore.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,13 +225,14 @@ def update(self, corpus, chunks_as_numpy=False):

self.state.numdocs += lencorpus

if not self.batch:
updatetype = "online"
updateafter = self.chunksize * self.workers
else:
if self.batch:
updatetype = "batch"
updateafter = lencorpus
evalafter = min(lencorpus, (self.eval_every or 0) * updateafter)
else:
updatetype = "online"
updateafter = self.chunksize * self.workers
eval_every = self.eval_every or 0
evalafter = min(lencorpus, eval_every * updateafter)

updates_per_pass = max(1, lencorpus / updateafter)
logger.info(
Expand All @@ -257,47 +258,45 @@ def update(self, corpus, chunks_as_numpy=False):
def rho():
return pow(self.offset + pass_ + (self.num_updates / self.chunksize), -self.decay)

def process_result_queue(force=False):
"""
Clear the result queue, merging all intermediate results, and update the
LDA model if necessary.

"""
merged_new = False
while not result_queue.empty():
other.merge(result_queue.get())
queue_size[0] -= 1
merged_new = True

if (force and merged_new and queue_size[0] == 0) or (other.numdocs >= updateafter):
menshikh-iv marked this conversation as resolved.
Show resolved Hide resolved
self.do_mstep(rho(), other, pass_ > 0)
other.reset()
if eval_every > 0 and (force or (self.num_updates / updateafter) % eval_every == 0):
menshikh-iv marked this conversation as resolved.
Show resolved Hide resolved
self.log_perplexity(chunk, total_docs=lencorpus)

logger.info("training LDA model using %i processes", self.workers)
pool = Pool(self.workers, worker_e_step, (job_queue, result_queue,))
for pass_ in range(self.passes):
queue_size, reallen = [0], 0
other = LdaState(self.eta, self.state.sstats.shape)

def process_result_queue(force=False):
"""
Clear the result queue, merging all intermediate results, and update the
LDA model if necessary.

"""
merged_new = False
while not result_queue.empty():
other.merge(result_queue.get())
queue_size[0] -= 1
merged_new = True
if (force and merged_new and queue_size[0] == 0) or (not self.batch and (other.numdocs >= updateafter)):
self.do_mstep(rho(), other, pass_ > 0)
other.reset()
if self.eval_every is not None \
and ((force and queue_size[0] == 0)
or (self.eval_every != 0 and (self.num_updates / updateafter) % self.eval_every == 0)):
self.log_perplexity(chunk, total_docs=lencorpus)

chunk_stream = utils.grouper(corpus, self.chunksize, as_numpy=chunks_as_numpy)
for chunk_no, chunk in enumerate(chunk_stream):
reallen += len(chunk) # keep track of how many documents we've processed so far

# put the chunk into the workers' input job queue
chunk_put = False
while not chunk_put:
while True:
try:
job_queue.put((chunk_no, chunk, self), block=False, timeout=0.1)
chunk_put = True
job_queue.put((chunk_no, chunk, self), block=False)
queue_size[0] += 1
logger.info(
"PROGRESS: pass %i, dispatched chunk #%i = documents up to #%i/%i, "
"outstanding queue size %i",
pass_, chunk_no, chunk_no * self.chunksize + len(chunk), lencorpus, queue_size[0]
)
break
except queue.Full:
# in case the input job queue is full, keep clearing the
# result queue, to make sure we don't deadlock
Expand Down