diff --git a/py/redrock/archetypes.py b/py/redrock/archetypes.py index d1c810eb..0b6fc006 100644 --- a/py/redrock/archetypes.py +++ b/py/redrock/archetypes.py @@ -11,14 +11,13 @@ from scipy.interpolate import interp1d import scipy.special -from .zscan import calc_zchi2_one +from .zscan import calc_zchi2_one, calc_zchi2_batch from .rebin import trapz_rebin from .utils import transmission_Lyman -from .zscan import per_camera_coeff_with_least_square - +from .zscan import per_camera_coeff_with_least_square, per_camera_coeff_with_least_square_batch class Archetype(): """Class to store all different archetypes from the same spectype. @@ -47,6 +46,8 @@ def __init__(self, filename): self.wave = np.asarray(hdr['CRVAL1'] + hdr['CDELT1']*np.arange(self.flux.shape[1])) if hdr['LOGLAM']: self.wave = 10**self.wave + self.minwave = self.wave[0] + self.maxwave = self.wave[-1] self._archetype = {} self._archetype['INTERP'] = np.array([None]*self._narch) @@ -55,7 +56,28 @@ def __init__(self, filename): h.close() + #It is much more efficient to calculate edges once if rebinning + #and copy to GPU and store here rather than doing this every time + self._gpuwave = None + self._gpuflux = None return + + @property + def gpuwave(self): + if (self._gpuwave is None): + #Copy to GPU once + import cupy as cp + self._gpuwave = cp.asarray(self.wave) + return self._gpuwave + + @property + def gpuflux(self): + if (self._gpuflux is None): + #Copy to GPU once + import cupy as cp + self._gpuflux = cp.asarray(self.flux) + return self._gpuflux + def rebin_template(self,index,z,dwave,trapz=True): """ """ @@ -64,6 +86,45 @@ def rebin_template(self,index,z,dwave,trapz=True): else: return {hs:self._archetype['INTERP'][index](wave/(1.+z)) for hs, wave in dwave.items()} + def rebin_template_batch(self,z,dwave,trapz=True,dedges=None,indx=None,use_gpu=False): + """ + """ + if (use_gpu): + import cupy as cp + xmin = self.minwave + xmax = self.maxwave + wave = self.gpuwave + flux = self.gpuflux + else: + wave = self.wave + flux = self.flux + + if (indx is not None): + flux = flux[indx] + minedge = None + maxedge = None + result = dict() + if (trapz and use_gpu and dedges is not None): + #Use GPU mode with bin edges already calculated + for hs, edges in dedges.items(): + #Check if edges is a 1-d array or a tuple also containing scalar min/max values + if type(edges) is tuple: + (edges, minedge, maxedge) = edges + result[hs] = trapz_rebin(wave, flux, edges=edges, use_gpu=use_gpu, myz=cp.array([z]), xmin=xmin, xmax=xmax, edge_min=minedge, edge_max=maxedge)[0,:,:] + return result + if trapz: + #Use batch mode of trapz_rebin + return {hs:trapz_rebin((1.+z)*wave, flux, w, use_gpu=use_gpu) for hs, w in dwave.items()} + else: + for hs, w in dwave.items(): + result[hs] = np.empty((len(w), self._narch)) + for i in range(self._narch): + result[hs][:,i] = self._archetype['INTERP'][i](w/(1.+z)) + if (use_gpu): + result[hs] = cp.asarray(result[hs]) + #return {hs:self._archetype['INTERP'](wave/(1.+z)) for hs, wave in dwave.items()} + return result + def eval(self, subtype, dwave, coeff, wave, z): """ @@ -82,23 +143,26 @@ def eval(self, subtype, dwave, coeff, wave, z): return flux - def nearest_neighbour_model(self, spectra,weights,flux,wflux,dwave,z, legendre, n_nearest, zzchi2, trans, per_camera): + def nearest_neighbour_model(self, target,weights,flux,wflux,dwave,z, n_nearest, zzchi2, trans, per_camera, dedges=None, binned=None, use_gpu=False): """ Parameters: ------------------ - spectra (list): list of Spectrum objects. + target (Target): the target object that contains spectra weights (array): concatenated spectral weights (ivar). flux (array): concatenated flux values. wflux (array): concatenated weighted flux values. dwave (dict): dictionary of wavelength grids z (float): best redshift - legendre (dict): legendre polynomial n_nearest (int): number of nearest neighbours to be used in chi2 space (including best archetype) zchi2 (array); chi2 array for all archetypes trans (dict); dictionary of transmission Ly-a arrays per_camera (bool): If True model will be solved in each camera + dedges (dict): in GPU mode, use pre-computed dict of wavelength bin edges, already on GPU + binned (dict): already computed dictionary of rebinned fluxes + use_gpu (bool): use GPU or not + Returns: ----------------- @@ -109,32 +173,48 @@ def nearest_neighbour_model(self, spectra,weights,flux,wflux,dwave,z, legendre, """ + #trans and dedges only need to be passed if binned is not as binned already + #is multiplied by trans in get_best_archetype + spectra = target.spectra + nleg = target.nleg + legendre = target.legendre(nleg=nleg, use_gpu=False) #Get previously calculated legendre + nleg = legendre[list(legendre.keys())[0]].shape[0] iBest = np.argsort(zzchi2)[0:n_nearest] - binned_best = self.rebin_template(iBest[0], z, dwave,trapz=True) - binned_best = { hs:trans[hs]*binned_best[hs] for hs, w in dwave.items() } - tdata = {hs: binned_best[hs][:,None] for hs, wave in dwave.items()} - if len(iBest)>1: - for ib in iBest[1:]: - binned = self.rebin_template(ib, z, dwave,trapz=True) - binned = { hs:trans[hs]*binned[hs] for hs, w in dwave.items() } - tdata = { hs:np.append(tdata[hs], binned[hs][:,None], axis=1) for hs, wave in dwave.items()} - if nleg>0: - tdata = { hs:np.append(tdata[hs],legendre[hs].transpose(), axis=1 ) for hs, wave in dwave.items() } - - if per_camera: - zzchi2, zzcoeff= per_camera_coeff_with_least_square(spectra, tdata, nleg, method='bvls', n_nbh=n_nearest) + tdata = dict() + if (binned is not None): + if (use_gpu): + binned = { hs:binned[hs][:,iBest].get() for hs in binned } else: - zzchi2, zzcoeff= calc_zchi2_one(spectra, weights, flux, wflux, tdata) + binned = { hs:binned[hs][:,iBest] for hs in binned } + else: + binned = self.rebin_template_batch(z, dwave, trapz=True, dedges=dedges, indx=iBest, use_gpu=use_gpu) + for hs, w in dwave.items(): + if (use_gpu): + binned[hs] = binned[hs].get() + if (trans[hs] is not None): + #Only multiply if trans[hs] is not None + #Both arrays are on CPU so no need to wrap with asarray + binned[hs] *= trans[hs][:,None] + for hs, w in dwave.items(): + tdata[hs] = binned[hs][None,:,:] + if (nleg > 0): + tdata[hs] = np.append(tdata[hs], legendre[hs].transpose()[None,:,:], axis=2) + nbasis = tdata[hs].shape[2] + if per_camera: + #Batch placeholder that right now loops over each arch but will be GPU accelerated + (zzchi2, zzcoeff) = per_camera_coeff_with_least_square_batch(spectra, tdata, nleg, method='bvls', n_nbh=1, use_gpu=use_gpu) + else: + #Use CPU mode for calc_zchi2 since small tdata + (zzchi2, zzcoeff) = calc_zchi2_batch(spectra, tdata, weights, flux, wflux, 1, nbasis, use_gpu=False) - sstype = ['%s'%(self._subtype[k]) for k in iBest] # subtypes of best archetypes - fsstype = '_'.join(sstype) - #print(sstype) - #print(z, zzchi2, zzcoeff, fsstype) - return zzchi2, zzcoeff, self._rrtype+':::%s'%(fsstype) + sstype = ['%s'%(self._subtype[k]) for k in iBest] # subtypes of best archetypes + fsstype = '_'.join(sstype) + #print(sstype) + #print(z, zzchi2, zzcoeff, fsstype) + return zzchi2[0], zzcoeff[0], self._rrtype+':::%s'%(fsstype) - - def get_best_archetype(self,spectra,weights,flux,wflux,dwave,z,legendre, per_camera, n_nearest): + def get_best_archetype(self,target,weights,flux,wflux,dwave,z, per_camera, n_nearest, trans=None, use_gpu=False): """Get the best archetype for the given redshift and spectype. @@ -145,9 +225,10 @@ def get_best_archetype(self,spectra,weights,flux,wflux,dwave,z,legendre, per_cam wflux (array): concatenated weighted flux values. dwave (dict): dictionary of wavelength grids z (float): best redshift - legendre (dict): legendre polynomial per_camera (bool): True if fitting needs to be done in each camera n_nearest (int): number of nearest neighbours to be used in chi2 space (including best archetype) + trans (dict): pass previously calcualated Lyman transmission instead of recalculating + use_gpu (bool): use GPU or not Returns: chi2 (float): chi2 of best archetype @@ -155,6 +236,24 @@ def get_best_archetype(self,spectra,weights,flux,wflux,dwave,z,legendre, per_cam fulltype (str): fulltype of best archetype """ + spectra = target.spectra + nleg = target.nleg + legendre = target.legendre(nleg=nleg, use_gpu=use_gpu) #Get previously calculated legendre + + #Select np or cp for operations as arrtype + if (use_gpu): + import cupy as cp + arrtype = cp + #Get CuPy arrays of weights, flux, wflux + #These are created on the first call of gpu_spectral_data() for a + #target and stored. They are retrieved on subsequent calls. + (gpuweights, gpuflux, gpuwflux) = target.gpu_spectral_data() + # Build dictionaries of wavelength bin edges, min/max, and centers + dedges = { s.wavehash:(s.gpuedges, s.minedge, s.maxedge) for s in spectra } + else: + arrtype = np + dedges = None + if per_camera: ncam=3 # b, r, z cameras else: @@ -172,22 +271,41 @@ def get_best_archetype(self,spectra,weights,flux,wflux,dwave,z,legendre, per_cam #TODO: return best fit model as well #zzmodel = np.zeros((self._narch, obs_wave.size), dtype=np.float64) - trans = { hs:transmission_Lyman(z,w) for hs, w in dwave.items() } - - for i in range(self._narch): - binned = self.rebin_template(i, z, dwave,trapz=True) - binned = { hs:trans[hs]*binned[hs] for hs, w in dwave.items() } - if nleg>0: - tdata = { hs:np.append(binned[hs][:,None],legendre[hs].transpose(), axis=1 ) for hs, wave in dwave.items() } + if (trans is None): + #Calculate Lyman transmission if not passed as dict + trans = { hs:transmission_Lyman(z,w, use_gpu=False) for hs, w in dwave.items() } + else: + #Use previously calculated Lyman transmission + for hs in trans: + if (trans[hs] is not None): + trans[hs] = trans[hs][0,:] + + #Rebin in batch + binned = self.rebin_template_batch(z, dwave, trapz=True, dedges=dedges, use_gpu=use_gpu) + + tdata = dict() + nbasis = 1 + for hs, wave in dwave.items(): + if (trans[hs] is not None): + #Only multiply if trans[hs] is not None + binned[hs] *= arrtype.asarray(trans[hs][:,None]) + #Create 3-d tdata with narch x nwave x nbasis where nbasis = 1+nleg + if nleg > 0: + tdata[hs] = arrtype.append(binned[hs].transpose()[:,:,None], arrtype.tile(arrtype.asarray(legendre[hs]).transpose()[None,:,:], (self._narch, 1, 1)), axis=2) else: - tdata = {hs:binned[hs][:,None] for hs, wave in dwave.items()} - if per_camera: - zzchi2[i], zzcoeff[i]= per_camera_coeff_with_least_square(spectra, tdata, nleg, method='bvls', n_nbh=1) + tdata[hs] = binned[hs].transpose()[:,:,None] + nbasis = tdata[hs].shape[2] + if per_camera: + #Batch placeholder that right now loops over each arch but will be GPU accelerated + (zzchi2, zzcoeff) = per_camera_coeff_with_least_square_batch(spectra, tdata, nleg, self._narch, method='bvls', n_nbh=1, use_gpu=use_gpu) + else: + if (use_gpu): + (zzchi2, zzcoeff) = calc_zchi2_batch(spectra, tdata, gpuweights, gpuflux, gpuwflux, self._narch, nbasis, use_gpu=use_gpu) else: - zzchi2[i], zzcoeff[i] = calc_zchi2_one(spectra, weights, flux, wflux, tdata) - + (zzchi2, zzcoeff) = calc_zchi2_batch(spectra, tdata, weights, flux, wflux, self._narch, nbasis, use_gpu=use_gpu) + if n_nearest is not None: - best_chi2, best_coeff, best_fulltype = self.nearest_neighbour_model(spectra,weights,flux,wflux,dwave,z, legendre, n_nearest, zzchi2, trans, per_camera) + best_chi2, best_coeff, best_fulltype = self.nearest_neighbour_model(target,weights,flux,wflux,dwave,z, n_nearest, zzchi2, trans, per_camera, dedges=dedges, binned=binned, use_gpu=use_gpu) return best_chi2, best_coeff, best_fulltype else: iBest = np.argmin(zzchi2) diff --git a/py/redrock/fitz.py b/py/redrock/fitz.py index 230feb96..e868bd7a 100644 --- a/py/redrock/fitz.py +++ b/py/redrock/fitz.py @@ -109,7 +109,6 @@ def minfit(x, y): def legendre_calculate(nleg, dwave): - wave = np.concatenate([ w for w in dwave.values() ]) wmin = wave.min() wmax = wave.max() @@ -117,7 +116,7 @@ def legendre_calculate(nleg, dwave): return legendre -def fitz(zchi2, redshifts, target, template, nminima=3, archetype=None, use_gpu=False, deg_legendre=None, nz=None, per_camera=False, n_nearest=None): +def fitz(zchi2, redshifts, target, template, nminima=3, archetype=None, use_gpu=False, deg_legendre=None, nz=15, per_camera=False, n_nearest=None): """Refines redshift measurement around up to nminima minima. TODO: @@ -132,7 +131,7 @@ def fitz(zchi2, redshifts, target, template, nminima=3, archetype=None, use_gpu= nminima (int): the number of minima to consider. use_gpu (bool): use GPU or not deg_legendre (int): in archetype mode polynomials upto deg_legendre-1 will be used - nz (int): number of finer redshift pixels to search for final redshift + nz (int): number of finer redshift pixels to search for final redshift - default 15 per_camera: (bool): True if fitting needs to be done in each camera for archetype mode n_nearest: (int): number of nearest neighbours to be used in chi2 space (including best archetype) @@ -151,10 +150,6 @@ def fitz(zchi2, redshifts, target, template, nminima=3, archetype=None, use_gpu= # Build dictionary of wavelength grids dwave = { s.wavehash:s.wave for s in spectra } - if not archetype is None: - legendre = legendre_calculate(deg_legendre, dwave=dwave) - - (weights, flux, wflux) = spectral_data(spectra) if (use_gpu): @@ -166,9 +161,14 @@ def fitz(zchi2, redshifts, target, template, nminima=3, archetype=None, use_gpu= gpuedges = { s.wavehash:(s.gpuedges, s.minedge, s.maxedge) for s in spectra } gpudwave = { s.wavehash:s.gpuwave for s in spectra } + if not archetype is None: + #legendre = legendre_calculate(deg_legendre, dwave=dwave) + legendre = target.legendre(deg_legendre) + results = list() - #Define nz here instead of hard-coding length 15 and then defining nz as - #length of zz list + #Moved default nz to arg list + if (nz is None): + nz = 15 for imin in find_minima(zchi2): if len(results) == nminima: @@ -226,6 +226,7 @@ def fitz(zchi2, redshifts, target, template, nminima=3, archetype=None, use_gpu= i = min(max(np.argmin(zzchi2),1), len(zz)-2) zmin, sigma, chi2min, zwarn = minfit(zz[i-1:i+2], zzchi2[i-1:i+2]) + trans = dict() try: #Calculate xmin and xmax from template and pass as scalars xmin = template.minwave*(1+zmin) @@ -242,6 +243,7 @@ def fitz(zchi2, redshifts, target, template, nminima=3, archetype=None, use_gpu= binned[k] = binned[k].get() #Use CPU always T = transmission_Lyman(np.array([zmin]),dwave[k], use_gpu=False) + trans[k] = T if (T is None): #Return value of None means that wavelenght regime #does not overlap Lyman transmission - continue here @@ -289,7 +291,8 @@ def fitz(zchi2, redshifts, target, template, nminima=3, archetype=None, use_gpu= chi2=chi2min, zz=zz, zzchi2=zzchi2, coeff=coeff)) else: - chi2min, coeff, fulltype = archetype.get_best_archetype(spectra,weights,flux,wflux,dwave,zbest,legendre, per_camera, n_nearest) + chi2min, coeff, fulltype = archetype.get_best_archetype(target,weights,flux,wflux,dwave,zbest, per_camera, n_nearest, trans=trans, use_gpu=use_gpu) + del trans results.append(dict(z=zbest, zerr=zerr, zwarn=zwarn, chi2=chi2min, zz=zz, zzchi2=zzchi2, diff --git a/py/redrock/targets.py b/py/redrock/targets.py index 08e37493..5b06a860 100644 --- a/py/redrock/targets.py +++ b/py/redrock/targets.py @@ -165,6 +165,31 @@ def __init__(self, targetid, spectra, coadd=False, cosmics_nsig=0., meta=None): self.gpuweights = None self.gpuflux = None self.gpuwflux = None + self._legendre = None + self._gpulegendre = None + self.nleg = 0 + + def legendre(self, nleg, use_gpu=False): + if (use_gpu and self.nleg == nleg): + if (self._gpulegendre is not None): + return self._gpulegendre + elif (self._legendre is not None): + import cupy as cp + self._gpulegendre = { hs:cp.asarray(self._legendre[hs]) for hs in self._legendre } + return self._gpulegendre + if (self._legendre is not None and self.nleg == nleg): + return self._legendre + dwave = { s.wavehash:s.wave for s in self.spectra } + wave = np.concatenate([ w for w in dwave.values() ]) + wmin = wave.min() + wmax = wave.max() + self._legendre = { hs:np.array([scipy.special.legendre(i)( (w-wmin)/(wmax-wmin)*2.) for i in range(nleg)]) for hs, w in dwave.items() } + self.nleg = nleg + if (use_gpu): + import cupy as cp + self._gpulegendre = { hs:cp.asarray(self._legendre[hs]) for hs in self._legendre } + return self._gpulegendre + return self._legendre def gpu_spectral_data(self): if self.gpuweights is None: diff --git a/py/redrock/utils.py b/py/redrock/utils.py index 91c1a111..089f1078 100644 --- a/py/redrock/utils.py +++ b/py/redrock/utils.py @@ -255,6 +255,11 @@ def transmission_Lyman(zObj,lObs, use_gpu=False): min_wave = 0 if (np.isscalar(zObj)): #zObj is a float + min_wave = lObs.min()/(1+zObj) + if (min_wave > Lyman_series['Lya']['line']): + #Return None if wavelength range doesn't overlap with Lyman series + #No need to perform any calculations in this case + return None lRF = lObs/(1.+zObj) else: if (len(zObj) == 0): diff --git a/py/redrock/zscan.py b/py/redrock/zscan.py index c7590674..c3b44b7f 100644 --- a/py/redrock/zscan.py +++ b/py/redrock/zscan.py @@ -307,6 +307,7 @@ def per_camera_coeff_with_least_square(spectra, tdata, nleg, method=None, n_nbh= Tb = Tb_for_archetype(spectra, tdata, nbasis, n_nbh, nleg) M = Tb.T.dot(np.multiply(weights[:,None], Tb)) y = Tb.T.dot(wflux) + ret_zcoeff= {'alpha':[], 'b':[], 'r':[], 'z':[]} # PCA method will use numpy Linear Algebra method to solve the best fit linear equation @@ -350,6 +351,21 @@ def per_camera_coeff_with_least_square(spectra, tdata, nleg, method=None, n_nbh= #print(f'{time.time()-start} [sec] took for per camera BVLS method\n') return zchi2, coeff +def per_camera_coeff_with_least_square_batch(spectra, tdata, nleg, narch, method=None, n_nbh=None, use_gpu=False): + ### PLACEHOLDER for algorithm that will be GPU accelerated + ncam = 3 # number of cameras in DESI: b, r, z + zzchi2 = np.zeros(narch, dtype=np.float64) + zzcoeff = np.zeros((narch, 1+ncam*(nleg)), dtype=np.float64) + + tdata_one = dict() + for i in range(narch): + for hs in tdata: + tdata_one[hs] = tdata[hs][i,:,:] + if (use_gpu): + tdata_one[hs] = tdata_one[hs].get() + zzchi2[i], zzcoeff[i]= per_camera_coeff_with_least_square(spectra, tdata_one, nleg, method='bvls', n_nbh=1) + return zzchi2, zzcoeff + def batch_dot_product_sparse(spectra, tdata, nz, use_gpu): """Calculate a batch dot product of the 3 sparse matrices in spectra with every template in tdata. Sparse matrix libraries are used