Skip to content

Commit

Permalink
Merge pull request #175 from NREL/gb/parallel_qdm
Browse files Browse the repository at this point in the history
Gb/parallel qdm
  • Loading branch information
grantbuster authored Jul 25, 2024
2 parents 264c225 + 27bf971 commit 6ccf1de
Show file tree
Hide file tree
Showing 2 changed files with 230 additions and 30 deletions.
162 changes: 132 additions & 30 deletions rex/utilities/bc_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
"""
rex bias correction utilities.
"""
import os
from concurrent.futures import ProcessPoolExecutor, as_completed
import logging

import numpy as np
import scipy

Expand Down Expand Up @@ -170,7 +171,8 @@ def _clean_kwarg(inp):

return inp

def _clean_params(self, params, arr_shape):
@staticmethod
def _clean_params(params, arr_shape, scipy_dist):
"""Verify and clean 2D parameter arrays for passing into empirical
distribution or scipy continuous distribution functions.
Expand All @@ -181,6 +183,10 @@ def _clean_params(self, params, arr_shape):
parameters for the distribution.
arr_shape : tuple
Array shape should be (time, space).
scipy_dist : scipy.stats.rv_continuous | None
Any continuous distribution class from ``scipy.stats`` or None if
using an empirical distribution (taken from attribute
``QuantileDeltaMapping.scipy_dist``)
Returns
-------
Expand All @@ -202,78 +208,122 @@ def _clean_params(self, params, arr_shape):
assert len(params.shape) == 2, msg
assert params.shape[0] == arr_shape[1], msg

if self.scipy_dist is not None:
if scipy_dist is not None:
params = [params[:, i] for i in range(params.shape[1])]

return params

def _get_quantiles(self, n_samples):
@staticmethod
def _get_quantiles(n_samples, sampling, log_base):
"""If dist='empirical', this will get the quantile values for the CDF
x-values specified in the input params"""

if self.sampling == 'linear':
if sampling == 'linear':
quantiles = sample_q_linear(n_samples)
elif self.sampling == 'log':
quantiles = sample_q_log(n_samples, self.log_base)
elif self.sampling == 'invlog':
quantiles = sample_q_invlog(n_samples, self.log_base)
elif sampling == 'log':
quantiles = sample_q_log(n_samples, log_base)
elif sampling == 'invlog':
quantiles = sample_q_invlog(n_samples, log_base)
else:
msg = ('sampling option must be linear, log, or invlog, but '
'received: {}'.format(self.sampling))
'received: {}'.format(sampling))
logger.error(msg)
raise KeyError(msg)

return quantiles

def cdf(self, x, params):
@classmethod
def cdf(cls, x, params, scipy_dist, sampling, log_base):
"""Run the CDF function e.g., convert physical variable to quantile"""

if self.scipy_dist is None:
if scipy_dist is None:
p = np.zeros_like(x)
for idx in range(x.shape[1]):
xp = params[idx, :]
fp = self._get_quantiles(len(xp))
fp = cls._get_quantiles(len(xp), sampling, log_base)
p[:, idx] = np.interp(x[:, idx], xp, fp)
else:
p = self.scipy_dist.cdf(x, *params)
p = scipy_dist.cdf(x, *params)

return p

def ppf(self, p, params):
@classmethod
def ppf(cls, p, params, scipy_dist, sampling, log_base):
"""Run the inverse CDF function (percent point function) e.g., convert
quantile to physical variable"""

if self.scipy_dist is None:
if scipy_dist is None:
x = np.zeros_like(p)
for idx in range(p.shape[1]):
fp = params[idx, :]
xp = self._get_quantiles(len(fp))
xp = cls._get_quantiles(len(fp), sampling, log_base)
x[:, idx] = np.interp(p[:, idx], xp, fp)
else:
x = self.scipy_dist.ppf(p, *params)
x = scipy_dist.ppf(p, *params)

return x

def __call__(self, arr):
"""Run the QDM function to bias correct an array
@classmethod
def run_qdm(cls, arr, params_oh, params_mh, params_mf,
scipy_dist, relative, sampling, log_base):
"""Run the actual QDM operation from args without initializing the
``QuantileDeltaMapping`` object
Parameters
----------
arr : np.ndarray
2D array of values in shape (time, space)
params_oh : np.ndarray
2D array of **observed historical** distribution parameters created
from a multi-year set of data where the shape is (space, N). This
can be the output of a parametric distribution fit like
``scipy.stats.weibull_min.fit()`` where N is the number of
parameters for that distribution, or this can define the x-values
of N points from an empirical CDF that will be linearly
interpolated between. If this is an empirical CDF, this must
include the 0th and 100th percentile values and have even
percentile spacing between values.
params_mh : np.ndarray
Same requirements as params_oh. This input arg is for the **modeled
historical distribution**.
params_mf : np.ndarray
Same requirements as params_oh. This input arg is for the **modeled
future distribution**.
scipy_dist : scipy.stats.rv_continuous | None
Any continuous distribution class from ``scipy.stats`` or None if
using an empirical distribution (taken from attribute
``QuantileDeltaMapping.scipy_dist``)
relative : bool | np.ndarray
Flag to preserve relative rather than absolute changes in
quantiles. relative=False (default) will multiply by the change in
quantiles while relative=True will add. See Equations 4-6 from
Cannon et al., 2015 for more details. Can also be a 1D array of
dist inputs if being used from reV, but they must all be the same
option.
sampling : str | np.ndarray
If dist="empirical", this is an option for how the quantiles were
sampled to produce the params inputs, e.g., how to sample the
y-axis of the distribution (see sampling functions in
``rex.utilities.bc_utils``). "linear" will do even spacing, "log"
will concentrate samples near quantile=0, and "invlog" will
concentrate samples near quantile=1. Can also be a 1D array of dist
inputs if being used from reV, but they must all be the same
option.
log_base : int | float | np.ndarray
Log base value if sampling is "log" or "invlog". A higher value
will concentrate more samples at the extreme sides of the
distribution. Can also be a 1D array of dist inputs if being used
from reV, but they must all be the same option.
Returns
-------
arr : np.ndarray
Bias corrected copy of the input array with same shape.
"""

if len(arr.shape) == 1:
arr = np.expand_dims(arr, 1)

params_oh = self._clean_params(self.params_oh, arr.shape)
params_mh = self._clean_params(self.params_mh, arr.shape)
params_mf = self._clean_params(self.params_mf, arr.shape)
params_oh = cls._clean_params(params_oh, arr.shape, scipy_dist)
params_mh = cls._clean_params(params_mh, arr.shape, scipy_dist)
params_mf = cls._clean_params(params_mf, arr.shape, scipy_dist)

# Equation references are from Section 3 of Cannon et al 2015:
# Cannon, A. J., Sobie, S. R. & Murdock, T. Q. Bias Correction of GCM
Expand All @@ -282,21 +332,73 @@ def __call__(self, arr):
# (2015).

logger.debug('Computing CDF on modeled future data')
q_mf = self.cdf(arr, params_mf) # Eq.3: Tau_m_p = F_m_p(x_m_p)
# Eq.3: Tau_m_p = F_m_p(x_m_p)
q_mf = cls.cdf(arr, params_mf, scipy_dist, sampling, log_base)

logger.debug('Computing PPF on observed historical data')
x_oh = self.ppf(q_mf, params_oh) # Eq.5: x^_o:m_h:p = F-1_o_h(Tau_m_p)
# Eq.5: x^_o:m_h:p = F-1_o_h(Tau_m_p)
x_oh = cls.ppf(q_mf, params_oh, scipy_dist, sampling, log_base)

logger.debug('Computing PPF on modeled historical data')
x_mh_mf = self.ppf(q_mf, params_mh) # Eq.4 denom: F-1_m_h(Tau_m_p)
# Eq.4 denom: F-1_m_h(Tau_m_p)
x_mh_mf = cls.ppf(q_mf, params_mh, scipy_dist, sampling, log_base)

logger.debug('Finished computing distributions.')

if self.relative:
if relative:
x_mh_mf[x_mh_mf == 0] = 0.001 # arbitrary limit to prevent div 0
delta = arr / x_mh_mf # Eq.4: x_m_p / F-1_m_h(Tau_m_p)
arr_bc = x_oh * delta # Eq.6: x^_m_p = x^_o:m_h:p * delta
else:
delta = arr - x_mh_mf # Eq.4: x_m_p - F-1_m_h(Tau_m_p)
arr_bc = x_oh + delta # Eq.6: x^_m_p = x^_o:m_h:p + delta

return arr_bc

def __call__(self, arr, max_workers=1):
"""Run the QDM function to bias correct an array
Parameters
----------
arr : np.ndarray
2D array of values in shape (time, space)
max_workers : int, None
Number of parallel workers to use in QDM bias correction. 1 will
run in serial (default), None will use all available cores.
Returns
-------
arr : np.ndarray
Bias corrected copy of the input array with same shape.
"""

if len(arr.shape) == 1:
arr = np.expand_dims(arr, 1)

if max_workers == 1:
arr_bc = self.run_qdm(arr, self.params_oh, self.params_mh,
self.params_mf, self.scipy_dist,
self.relative, self.sampling, self.log_base)
else:
max_workers = max_workers or os.cpu_count()
sslices = np.array_split(np.arange(arr.shape[1]), arr.shape[1])
sslices = [slice(idx[0], idx[-1] + 1) for idx in sslices]
arr_bc = arr.copy()
futures = {}
with ProcessPoolExecutor(max_workers=max_workers) as exe:
for idx in range(arr.shape[1]):
idx = slice(idx, idx + 1)
fut = exe.submit(self.run_qdm, arr[:, idx],
self.params_oh[idx],
self.params_mh[idx],
self.params_mf[idx], self.scipy_dist,
self.relative, self.sampling,
self.log_base)
futures[fut] = idx
for future in as_completed(futures):
idx = futures[future]
arr_bc[:, idx] = future.result()

msg = ('Input shape {} does not match QDM bias corrected output '
'shape {}!'.format(arr.shape, arr_bc.shape))
assert arr.shape == arr_bc.shape, msg
Expand Down
98 changes: 98 additions & 0 deletions tests/test_bc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
# -*- coding: utf-8 -*-
"""
pytests for bias correction utilities
"""

import numpy as np

from rex.temporal_stats.temporal_stats import cdf
from rex.utilities.bc_utils import QuantileDeltaMapping


def test_qdm():
"""Test basic QuantileDeltaMapping functionality with dummy distributions
Note that because we use a sampled empirical distribution the bias
corrected data will not be perfectly precise, hence the rtol values
"""

ntime = int(1e4)

arr_oh = np.random.gamma(0.5, scale=1.0, size=ntime)
arr_mh = np.random.gamma(0.75, scale=1.0, size=ntime)
arr_mf = np.random.gamma(1, scale=1.0, size=ntime)

params_oh = cdf(arr_oh, n_samples=100, sampling='invlog', log_base=10)
params_mh = cdf(arr_mh, n_samples=100, sampling='invlog', log_base=10)
params_mf = cdf(arr_mf, n_samples=100, sampling='invlog', log_base=10)
params_oh = params_oh[np.newaxis] # qdm expects (space, time) for params
params_mh = params_mh[np.newaxis] # qdm expects (space, time) for params
params_mf = params_mf[np.newaxis] # qdm expects (space, time) for params

qdm_abs_hist = QuantileDeltaMapping(params_oh, params_mh, params_mf=None,
dist='empirical', relative=False,
sampling='invlog', log_base=10)
qdm_rel_hist = QuantileDeltaMapping(params_oh, params_mh, params_mf=None,
dist='empirical', relative=True,
sampling='invlog', log_base=10)
qdm_abs_fut = QuantileDeltaMapping(params_oh, params_mh, params_mf,
dist='empirical', relative=False,
sampling='invlog', log_base=10)
qdm_rel_fut = QuantileDeltaMapping(params_oh, params_mh, params_mf,
dist='empirical', relative=True,
sampling='invlog', log_base=10)

# absolute changes
arr_mh_bc = qdm_abs_hist(arr_mh[:, np.newaxis])
arr_mf_bc = qdm_abs_fut(arr_mf[:, np.newaxis])

# check mean/max values for historical only (simple quantile mapping)
assert not np.allclose(arr_mh.mean(), arr_oh.mean(), rtol=1e-2)
assert not np.allclose(arr_mh.max(), arr_oh.max(), rtol=1e-2)
assert np.allclose(arr_mh_bc.mean(), arr_oh.mean(), rtol=1e-2)
assert np.allclose(arr_mh_bc.max(), arr_oh.max(), rtol=1e-2)

# check trend
diff_raw = arr_mf.mean() - arr_mh.mean()
diff_bc = arr_mf_bc.mean() - arr_mh_bc.mean()
assert np.allclose(diff_raw, diff_bc, rtol=1e-2)

# relative changes
arr_mh_bc = qdm_rel_hist(arr_mh[:, np.newaxis])
arr_mf_bc = qdm_rel_fut(arr_mf[:, np.newaxis])

# check trend
diff_raw = arr_mf.mean() / arr_mh.mean()
diff_bc = arr_mf_bc.mean() / arr_mh_bc.mean()
assert np.allclose(diff_raw, diff_bc, rtol=5e-2)


def test_qdm_parallel():
"""Test parallelization of QuantileDeltaMapping"""

ntime = int(1e4)
nspace = 1000
size = (ntime, nspace)

arr_oh = np.random.gamma(0.5, scale=1.0, size=size)
arr_mh = np.random.gamma(0.75, scale=1.0, size=size)
arr_mf = np.random.gamma(1, scale=1.0, size=size)

params_oh = []
params_mh = []
params_mf = []
for idx in range(nspace):
params_oh.append(cdf(arr_oh[:, idx], n_samples=100)[np.newaxis])
params_mh.append(cdf(arr_mh[:, idx], n_samples=100)[np.newaxis])
params_mf.append(cdf(arr_mf[:, idx], n_samples=100)[np.newaxis])

params_oh = np.concatenate(params_oh, axis=0)
params_mh = np.concatenate(params_mh, axis=0)
params_mf = np.concatenate(params_mf, axis=0)

qdm_rel_fut = QuantileDeltaMapping(params_oh, params_mh, params_mf,
dist='empirical', relative=True)

arr_mf_bc_ser = qdm_rel_fut(arr_mf, max_workers=1)
arr_mf_bc_par = qdm_rel_fut(arr_mf, max_workers=2)
assert np.allclose(arr_mf_bc_ser, arr_mf_bc_par)

0 comments on commit 6ccf1de

Please sign in to comment.