Skip to content

Commit

Permalink
Change variable names (#3971)
Browse files Browse the repository at this point in the history
  • Loading branch information
ColCarroll authored Jun 17, 2020
1 parent ed66bc4 commit 3b765a5
Showing 1 changed file with 13 additions and 13 deletions.
26 changes: 13 additions & 13 deletions pymc3/sampling.py
Original file line number Diff line number Diff line change
Expand Up @@ -1003,7 +1003,7 @@ def __init__(self, steppers, parallelize, progressbar=True):
"""
self.nchains = len(steppers)
self.is_parallelized = False
self._master_ends = []
self._primary_ends = []
self._processes = []
self._steppers = steppers
if parallelize:
Expand All @@ -1017,11 +1017,11 @@ def __init__(self, steppers, parallelize, progressbar=True):
for c, stepper in (
enumerate(progress_bar(steppers)) if progressbar else enumerate(steppers)
):
slave_end, master_end = multiprocessing.Pipe()
secondary_end, primary_end = multiprocessing.Pipe()
stepper_dumps = pickle.dumps(stepper, protocol=4)
process = multiprocessing.Process(
target=self.__class__._run_slave,
args=(c, stepper_dumps, slave_end),
target=self.__class__._run_secondary,
args=(c, stepper_dumps, secondary_end),
name="ChainWalker{}".format(c),
)
# we want the child process to exit if the parent is terminated
Expand All @@ -1030,7 +1030,7 @@ def __init__(self, steppers, parallelize, progressbar=True):
# By doing it in the constructor, the sampling progress bar
# will not be confused by the process start.
process.start()
self._master_ends.append(master_end)
self._primary_ends.append(primary_end)
self._processes.append(process)
self.is_parallelized = True
except Exception:
Expand All @@ -1053,16 +1053,16 @@ def __enter__(self):
def __exit__(self, exc_type, exc_val, exc_tb):
if len(self._processes) > 0:
try:
for master_end in self._master_ends:
master_end.send(None)
for primary_end in self._primary_ends:
primary_end.send(None)
for process in self._processes:
process.join(timeout=3)
except Exception:
_log.warning("Termination failed.")
return

@staticmethod
def _run_slave(c, stepper_dumps, slave_end):
def _run_secondary(c, stepper_dumps, secondary_end):
"""This method is started on a separate process to perform stepping of a chain.
Parameters
Expand All @@ -1071,7 +1071,7 @@ def _run_slave(c, stepper_dumps, slave_end):
number of this chain
stepper : BlockedStep
a step method such as CompoundStep
slave_end : multiprocessing.connection.PipeConnection
secondary_end : multiprocessing.connection.PipeConnection
This is our connection to the main process
"""
# re-seed each child process to make them unique
Expand All @@ -1086,7 +1086,7 @@ def _run_slave(c, stepper_dumps, slave_end):
if isinstance(sm, arraystep.PopulationArrayStepShared):
population_steppers.append(sm)
while True:
incoming = slave_end.recv()
incoming = secondary_end.recv()
# receiving a None is the signal to exit
if incoming is None:
break
Expand All @@ -1099,7 +1099,7 @@ def _run_slave(c, stepper_dumps, slave_end):
for popstep in population_steppers:
popstep.population = population
update = stepper.step(population[c])
slave_end.send(update)
secondary_end.send(update)
except Exception:
_log.exception("ChainWalker{}".format(c))
return
Expand All @@ -1122,10 +1122,10 @@ def step(self, tune_stop, population):
updates = [None] * self.nchains
if self.is_parallelized:
for c in range(self.nchains):
self._master_ends[c].send((tune_stop, population))
self._primary_ends[c].send((tune_stop, population))
# Blockingly get the step outcomes
for c in range(self.nchains):
updates[c] = self._master_ends[c].recv()
updates[c] = self._primary_ends[c].recv()
else:
for c in range(self.nchains):
if tune_stop:
Expand Down

0 comments on commit 3b765a5

Please sign in to comment.