Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ensemble model parallelism #1342

Merged
merged 27 commits into from
Oct 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions dependencies.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ Note all install methods after "main" take
<xarray>0.12.1</xarray>
<netcdf4>1.4.2</netcdf4>
<matplotlib>3.1.1</matplotlib>
<statsmodels>0.9.0</statsmodels>
<statsmodels>0.12.0</statsmodels>
<cloudpickle>1.1.1</cloudpickle>
<tensorflow>1.13.1</tensorflow>
<python skip_check='True'>3</python>
Expand All @@ -53,7 +53,7 @@ Note all install methods after "main" take
<pyside2 source='forge'/>
<nomkl os='linux' skip_check='True'/>
<numexpr os='linux'/>
<ray os="mac,linux" source="pip"/>
<ray os="mac,linux" source="pip">1.0.0</ray>
<pillow optional='True'>6.0.0</pillow>
<line_profiler optional='True'/>
</main>
Expand Down
4 changes: 2 additions & 2 deletions framework/JobHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ def __initializeRay(self):
servers = self.__runRemoteListeningSockets(self.rayServer['redis_address'])
else:
self.rayServer = ray.init(num_cpus=int(self.runInfoDict['totalNumCoresUsed'])) if _rayAvail else \
pp.Server(ncpus=int(self.runInfoDict['totalNumCoresUsed']))
pp.Server(ncpus=int(self.runInfoDict['totalNumCoresUsed']))
if _rayAvail:
self.raiseADebug("Head node IP address: ", self.rayServer['node_ip_address'])
self.raiseADebug("Redis address : ", self.rayServer['redis_address'])
Expand Down Expand Up @@ -725,7 +725,7 @@ def shutdown(self):
@ Out, None
"""
self.completed = True
if _rayAvail:
if _rayAvail and self.rayServer:
ray.shutdown()


Expand Down
4 changes: 2 additions & 2 deletions framework/Models/Code.py
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ def evaluateSample(self, myInput, samplerType, kwargs):

precommand = kwargs['precommand']
postcommand = kwargs['postcommand']
bufferSize = kwargs['bufferSize']
bufferSize = kwargs['logfileBuffer']
fileExtensionsToDelete = kwargs['deleteOutExtension']
deleteSuccessfulLogFiles = kwargs['delSucLogFiles']

Expand Down Expand Up @@ -881,7 +881,7 @@ def submit(self, myInput, samplerType, jobHandler, **kwargs):
## we copy this dictionary (Caught this when running an ensemble model)
## -- DPM 4/11/17
nodesList = jobHandler.runInfoDict.get('Nodes',[])
kwargs['bufferSize' ] = jobHandler.runInfoDict['logfileBuffer']
kwargs['logfileBuffer' ] = jobHandler.runInfoDict['logfileBuffer']
kwargs['precommand' ] = jobHandler.runInfoDict['precommand']
kwargs['postcommand' ] = jobHandler.runInfoDict['postcommand']
kwargs['delSucLogFiles' ] = jobHandler.runInfoDict['delSucLogFiles']
Expand Down
207 changes: 129 additions & 78 deletions framework/Models/EnsembleModel.py

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion framework/Models/ExternalModel.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,6 @@ def collectOutput(self,finishedJob,output,options=None):
if outputSize == -1:
outputSize = len(np.atleast_1d(evaluation[key]))
if not mathUtils.sizeMatch(evaluation[key],outputSize):
self.raiseAnError(Exception,"the time series size needs to be the same for the output space in a HistorySet! Variable:"+key+". Size in the HistorySet="+str(outputSize)+".Size outputed="+str(len(np.atleast_1d(outcomes[key]))))
self.raiseAnError(Exception,"the time series size needs to be the same for the output space in a HistorySet! Variable:"+key+". Size in the HistorySet="+str(outputSize)+".Size outputed="+str(outputSize))

Dummy.collectOutput(self, finishedJob, output, options)
5 changes: 5 additions & 0 deletions framework/Models/ROM.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@
import itertools
import numpy as np
import functools
import os
#External Modules End--------------------------------------------------------------------------------

#Internal Modules------------------------------------------------------------------------------------
from .Dummy import Dummy
import Decorators
import SupervisedLearning
from utils import utils
from utils import xmlUtils
Expand All @@ -37,6 +39,8 @@
import LearningGate
#Internal Modules End--------------------------------------------------------------------------------

# set enviroment variable to avoid parallelim degradation in some surrogate models
os.environ["MKL_NUM_THREADS"]="1"

class ROM(Dummy):
"""
Expand Down Expand Up @@ -1419,6 +1423,7 @@ def confidence(self,request,target = None):
confidenceDict = self.supervisedEngine.confidence(inputToROM)
return confidenceDict

@Decorators.timingProfile
def evaluate(self, request):
"""
When the ROM is used directly without need of having the sampler passing in the new values evaluate instead of run should be used
Expand Down
2 changes: 1 addition & 1 deletion framework/Optimizers/SimulatedAnnealing.py
Original file line number Diff line number Diff line change
Expand Up @@ -678,7 +678,7 @@ def _nextNeighbour(self, rlz,fraction=1):
delta = (-amp/2.)+ amp * r
elif self._coolingMethod == 'boltzmann':
amp = min(np.sqrt(self.T), 1/3.0/alpha)
delta = randomUtils.randomNormal(dim=D, samples=1)*alpha*amp
delta = randomUtils.randomNormal(dim=D, size=1)*alpha*amp
elif self._coolingMethod == 'veryfast':
amp = randomUtils.random(dim=D, samples=1)
delta = np.sign(amp-0.5)*self.T*((1+1.0/self.T)**abs(2*amp-1)-1.0)
Expand Down
10 changes: 6 additions & 4 deletions framework/SupervisedLearning/ARMA.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from scipy import stats
from scipy.signal import find_peaks
from scipy.stats import rv_histogram

#External Modules End--------------------------------------------------------------------------------

#Internal Modules------------------------------------------------------------------------------------
Expand Down Expand Up @@ -716,17 +717,18 @@ def _generateARMASignal(self, model, numSamples=None,randEngine=None):
numSamples = len(self.pivotParameterValues)
if randEngine is None:
randEngine=self.randomEng
import statsmodels.api
hist = statsmodels.api.tsa.arma_generate_sample(ar = np.append(1., -model.arparams),
import statsmodels.tsa
hist = statsmodels.tsa.arima_process.arma_generate_sample(ar = np.append(1., -model.arparams),
ma = np.append(1., model.maparams),
nsample = numSamples,
distrvs = functools.partial(randomUtils.randomNormal,engine=randEngine),
# functool.partial provide the random number generator as a function
# with normal distribution and take engine as the positional arguments keywords.
sigma = np.sqrt(model.sigma2),
burnin = 2*max(self.P,self.Q)) # @epinas, 2018
scale = np.sqrt(model.sigma2),
burnin = 2*max(self.P,self.Q)) # @alfoa, 2020
return hist


def _generateFourierSignal(self, pivots, periods):
"""
Generate fourier signal as specified by the input file
Expand Down
70 changes: 42 additions & 28 deletions framework/utils/randomUtils.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,39 +149,39 @@ def random(dim=1, samples=1, keepMatrix=False, engine=None):
if keepMatrix:
return vals
else:
return _reduceRedundantListing(vals, dim, samples)
return _reduceRedundantListing(vals, (samples, dim))

def randomNormal(dim=1, samples=1, keepMatrix=False, engine=None):
def randomNormal(size=(1,), keepMatrix=False, engine=None):
"""
Function to get a single random value, an array of random values, or a matrix of random values, normally distributed
@ In, dim, int, optional, dimensionality of samples
@ In, samples, int, optional, number of arrays to deliver
@ In, size, int or tuple, optional, shape of the samples to return
(if int, an array of samples will be returned if size>1, otherwise a float if keepMatrix is false)
@ In, keepMatrix, bool, optional, if True then will always return np.array(np.array(float))
@ In, engine, instance, optional, random number generator
@ Out, vals, float, random normal number (or np.array with size [n] if n>1, or np.array with size [n,samples] if sampels>1)
"""
engine = getEngine(engine)
dim = int(dim)
samples = int(samples)
if isinstance(size, int):
size = (size, )
if isinstance(engine, np.random.RandomState):
vals = engine.randn(samples,dim)
vals = engine.randn(*size)
elif isinstance(engine, findCrowModule('randomENG').RandomClass):
vals = np.zeros([samples,dim])
vals = np.zeros(np.prod(size))
for i in range(len(vals)):
for j in range(len(vals[0])):
vals[i,j] = boxMullerGen.generate(engine=engine)
vals[i] = boxMullerGen.generate(engine=engine)
vals.shape = size
if keepMatrix:
return vals
else:
return _reduceRedundantListing(vals,dim,samples)
return _reduceRedundantListing(vals,size)

def randomIntegers(low, high, caller, engine=None):
def randomIntegers(low, high, caller=None, engine=None):
"""
Function to get a random integer
@ In, low, int, low boundary
@ In, high, int, upper boundary
@ In, caller, instance, object requesting the random integers
@ In, engine, instance, optional, random number generator
@ In, caller, instance, optional, object requesting the random integers
@ In, engine, instance, optional, optional, random number generator
@ Out, rawInt, int, random int
"""
engine = getEngine(engine)
Expand All @@ -192,12 +192,28 @@ def randomIntegers(low, high, caller, engine=None):
rawNum = low + random(engine=engine)*intRange
rawInt = int(round(rawNum))
if rawInt < low or rawInt > high:
caller.raiseAMessage("Random int out of range")
if caller:
caller.raiseAMessage("Random int out of range")
rawInt = max(low, min(rawInt, high))
return rawInt
else:
raise TypeError('Engine type not recognized! {}'.format(type(engine)))

def randomChoice(array, engine=None):
"""
Generates a random sample from a given array-like (list or such) or N-D array
This equivalent to np.random.choice but extending the functionality to N-D arrays
@ In, array, list or np.ndarray, the array from which to pick
@ In, engine, instance, optional, optional, random number generator
@ Out, randomChoice, object, the random choice (1 element)
"""
assert(hasattr(array,"shape") or isinstance(array,list))
if hasattr(array,"shape"):
coord = tuple([randomIntegers(0, dim-1, engine=engine) for dim in array.shape])
return array[coord]
else:
return array[randomIntegers(0, len(array)-1, engine=engine)]

def randomPermutation(l,caller,engine=None):
"""
Function to get a random permutation
Expand Down Expand Up @@ -230,7 +246,7 @@ def randPointsOnHypersphere(dim,samples=1,r=1,keepMatrix=False,engine=None):
"""
engine=getEngine(engine)
## first fill random samples
pts = randomNormal(dim,samples=samples,keepMatrix=True,engine=engine)
pts = randomNormal(size=(samples, dim),keepMatrix=True,engine=engine)
## extend radius, place inside sphere through normalization
rnorm = float(r)/np.linalg.norm(pts,axis=1)
pts *= rnorm[:,np.newaxis]
Expand All @@ -240,7 +256,7 @@ def randPointsOnHypersphere(dim,samples=1,r=1,keepMatrix=False,engine=None):
if keepMatrix:
return pts
else:
return _reduceRedundantListing(pts,dim,samples)
return _reduceRedundantListing(pts,(samples, dim))
return pts

def randPointsInHypersphere(dim,samples=1,r=1,keepMatrix=False,engine=None):
Expand All @@ -260,7 +276,7 @@ def randPointsInHypersphere(dim,samples=1,r=1,keepMatrix=False,engine=None):
if keepMatrix:
return pts
else:
return _reduceRedundantListing(pts,dim,samples)
return _reduceRedundantListing(pts,(samples, dim))
return pts

def newRNG(env=None):
Expand All @@ -279,23 +295,21 @@ def newRNG(env=None):

### internal utilities ###

def _reduceRedundantListing(data,dim,samples):
def _reduceRedundantListing(data,size):
"""
Adjusts data to be intuitive for developers.
- if dim = samples = 1: returns a float
- if dim > 1 but samples = 1: returns a 1D numpy array of floats
- otherwise: returns a 2D numpy array indexed by [sample][dim]
@ In, data, numpy.array, two-dimensional array indexed by [sample][dim]
- if np.prod(size) => dim = samples = 1: returns a float
- if size[1,...,n] > 1 but size[0] (samples) = 1: returns a 1D numpy array of floats
- otherwise: returns a numpy array indexed by the original shape
@ In, data, numpy.array, n-dimensional array indexed by [sample, :, ...,n]
@ In, dim, int, dimensionality of each sample
@ In, samples, int, number of samples taken
@ Out, data, np.array, shape and size described above in method description.
"""
if dim==1 and samples==1: #user expects single float
return data[0][0]
elif samples==1: #user expects array of floats
if np.prod(size) == 1: #user expects single float
return data.flatten()[0]
elif size[0]==1: #user expects array of floats (or matrix)
return data[0]
#elif dim==1: #potentially user expects array of floats, but probably wants array of single-entry arrays
# return data[:,0]
else:
return data

Expand Down
2 changes: 1 addition & 1 deletion scripts/establish_conda_env.sh
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ then
# if it doesn't exist, make some noise.
else
echo ${INSTALL_MANAGER} environment ${RAVEN_LIBS_NAME} not found!
echo Please run "raven/establish_conda_env.sh" with argument "--install" "--installation-manager $INSTALL_MANAGER".
echo Please run "raven/scripts/establish_conda_env.sh" with argument "--install" "--installation-manager $INSTALL_MANAGER".
exit 1
fi
fi
Expand Down
1 change: 1 addition & 0 deletions scripts/library_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,7 @@ def _readDependencies(initFile):
elif args.subset == 'pip':
src = ''
installer = 'pip'
equals = '=='
actionArgs = ''
addOptional = False
limit = ['pip']
Expand Down

This file was deleted.

Loading