Skip to content

Commit

Permalink
Fast MBCn (a la groupies) (#1580)
Browse files Browse the repository at this point in the history
<!--Please ensure the PR fulfills the following requirements! -->
<!-- If this is your first PR, make sure to add your details to the
AUTHORS.rst! -->
### Pull Request Checklist:
- [x] This PR addresses an already opened issue (for bug fixes /
features)
    - This PR fixes #xyz
- [x] Tests for the changes have been added (for bug fixes / features)
- [x] (If applicable) Documentation has been added / updated (for bug
fixes / features)
- [x] CHANGES.rst has been updated (with summary of main changes)
- [x] Link to issue (:issue:`number`) and pull request (:pull:`number`)
has been added

### What kind of change does this PR introduce?

New `MBCn` TrainAdjust class. The train part finds adjustment factors
for the npdf transform. The adjust part does the rest.

* A single numpy function to perform all rotations of the npdf_transform
makes the process faster
* Grouping is handled using the same logic as in numpy_groupies. I
initially tried to stop using map_blocks by using what I call a the Big
Dataset (BD) solution. It was a dataset that included the group windowed
blocks. This was working well but sometimes caused dask workers to die.
Maybe a better chunking could have solved this problem. But instead of
constructing a BD, we simply loop over blocks, and simply specify time
indices in each block (à la groupies) in the original datasets. The
resulting code is a bit more messy, but it seems to be working well
performance-wise.

The function also changes how windowed group blocks are handled
throughout the computation. Now, a block is preserved its form from
begin to start of the MBCn computation.
* This is in contrast to the current way which was grouping and
ungrouping block between each iteration of the NpdfTransform.
* The standardization is performed on a block
* The univariate bias correction is maintainted as blocks, reordered,
*then* the blocks are ungrouped
* In the sdba notebook, it was suggested that we should give the
univariate bias corrected datasets in the npdf transform. But following
(Cannon, 2018), we should input the raw datasets in the npdf transform.
This change should not really matter that much, but still, to perform
exactly the MBCn as presented by Cannon, this change is necessary.

All these changes will result in a different output for `window>1` and
our implementation should now match that of Cannon.

### Does this PR introduce a breaking change?
No

### Other information

* It might be worthwhile to retest `map_blocks` to see if, with the rest
of changes, it can offer a good performance. It would be cleaner code
* Using BD would also simplify many things, worth re-exploring if it can
maintain the performance
  • Loading branch information
coxipi authored Jul 24, 2024
2 parents bf1db0a + 02f3c55 commit 1d91900
Show file tree
Hide file tree
Showing 8 changed files with 975 additions and 224 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Contributors to this version: David Huard (:user:`huard`), Trevor James Smith (:
New features and enhancements
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
* ``xclim.sdba.nbutils.quantile`` and its child functions are now faster. If the module `fastnanquantile` is installed, it is used as the backend for the computation of quantiles and yields even faster results. (:issue:`1255`, :pull:`1513`).
* New multivariate bias adjustment class `MBCn`, giving a faster and more accurate implementation of the 'MBCn' algorithm (:issue:`1551`, :pull:`1580`).

Bug fixes
^^^^^^^^^
Expand Down
174 changes: 43 additions & 131 deletions docs/notebooks/sdba.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"### Fourth example : Multivariate bias-adjustment with multiple steps (Cannon, 2018)\n",
"### Fourth example : Multivariate bias-adjustment (Cannon, 2018)\n",
"\n",
"This section replicates the \"MBCn\" algorithm described by [Cannon (2018)](https://doi.org/10.1007/s00382-017-3580-6). The method relies on some univariate algorithm, an adaption of the N-pdf transform of [Pitié et al. (2005)](https://ieeexplore.ieee.org/document/1544887/) and a final reordering step.\n",
"\n",
Expand Down Expand Up @@ -474,55 +474,16 @@
"\n",
"dhist = dsim.sel(time=slice(\"1981\", \"2010\"))\n",
"dsim = dsim.sel(time=slice(\"2041\", \"2070\"))\n",
"dref"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"##### Perform an initial univariate adjustment."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# additive for tasmax\n",
"QDMtx = sdba.QuantileDeltaMapping.train(\n",
" dref.tasmax, dhist.tasmax, nquantiles=20, kind=\"+\", group=\"time\"\n",
")\n",
"# Adjust both hist and sim, we'll feed both to the Npdf transform.\n",
"scenh_tx = QDMtx.adjust(dhist.tasmax)\n",
"scens_tx = QDMtx.adjust(dsim.tasmax)\n",
"\n",
"# remove == 0 values in pr:\n",
"dref[\"pr\"] = sdba.processing.jitter_under_thresh(dref.pr, \"0.01 mm d-1\")\n",
"dhist[\"pr\"] = sdba.processing.jitter_under_thresh(dhist.pr, \"0.01 mm d-1\")\n",
"dsim[\"pr\"] = sdba.processing.jitter_under_thresh(dsim.pr, \"0.01 mm d-1\")\n",
"\n",
"# multiplicative for pr\n",
"QDMpr = sdba.QuantileDeltaMapping.train(\n",
" dref.pr, dhist.pr, nquantiles=20, kind=\"*\", group=\"time\"\n",
")\n",
"# Adjust both hist and sim, we'll feed both to the Npdf transform.\n",
"scenh_pr = QDMpr.adjust(dhist.pr)\n",
"scens_pr = QDMpr.adjust(dsim.pr)\n",
"\n",
"scenh = xr.Dataset(dict(tasmax=scenh_tx, pr=scenh_pr))\n",
"scens = xr.Dataset(dict(tasmax=scens_tx, pr=scens_pr))"
"# Stack variables : Dataset -> DataArray with `multivar` dimension\n",
"dref, dhist, dsim = (sdba.stack_variables(da) for da in (dref, dhist, dsim))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"##### Stack the variables to multivariate arrays and standardize them\n",
"The standardization process ensure the mean and standard deviation of each column (variable) is 0 and 1 respectively.\n",
"\n",
"`scenh` and `scens` are standardized together, so the two series are coherent. As we'll see further, we do not need to keep the mean and standard deviation, as we only keep the rank order information from the `NpdfTransform` output."
"##### Perform the multivariate adjustment (MBCn)."
]
},
{
Expand All @@ -531,91 +492,42 @@
"metadata": {},
"outputs": [],
"source": [
"# Stack the variables (tasmax and pr)\n",
"ref = sdba.processing.stack_variables(dref)\n",
"scenh = sdba.processing.stack_variables(scenh)\n",
"scens = sdba.processing.stack_variables(scens)\n",
"\n",
"# Standardize\n",
"ref, _, _ = sdba.processing.standardize(ref)\n",
"\n",
"allsim_std, _, _ = sdba.processing.standardize(xr.concat((scenh, scens), \"time\"))\n",
"scenh_std = allsim_std.sel(time=scenh.time)\n",
"scens_std = allsim_std.sel(time=scens.time)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"##### Perform the N-dimensional probability density function transform\n",
"ADJ = sdba.MBCn.train(\n",
" dref,\n",
" dhist,\n",
" base_kws={\"nquantiles\": 20, \"group\": \"time\"},\n",
" adj_kws={\"interp\": \"nearest\", \"extrapolation\": \"constant\"},\n",
" n_iter=20, # perform 20 iteration\n",
" n_escore=1000, # only send 1000 points to the escore metric\n",
")\n",
"\n",
"The NpdfTransform will iteratively randomly rotate our arrays in the \"variables\" space and apply the univariate adjustment before rotating it back. In Cannon (2018) and Pitié et al. (2005), it can be seen that the source array's joint distribution converges toward the target's joint distribution when a large number of iterations is done."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from xclim import set_options\n",
"\n",
"# See the advanced notebook for details on how this option work\n",
"with set_options(sdba_extra_output=True):\n",
" out = sdba.adjustment.NpdfTransform.adjust(\n",
" ref,\n",
" scenh_std,\n",
" scens_std,\n",
" base=sdba.QuantileDeltaMapping, # Use QDM as the univariate adjustment.\n",
" base_kws={\"nquantiles\": 20, \"group\": \"time\"},\n",
" n_iter=20, # perform 20 iteration\n",
" n_escore=1000, # only send 1000 points to the escore metric (it is really slow)\n",
"scenh, scens = (\n",
" ADJ.adjust(\n",
" sim=ds,\n",
" ref=dref,\n",
" hist=dhist,\n",
" base=sdba.QuantileDeltaMapping,\n",
" base_kws_vars={\n",
" \"pr\": {\n",
" \"kind\": \"*\",\n",
" \"jitter_under_thresh_value\": \"0.01 mm d-1\",\n",
" \"adapt_freq_thresh\": \"0.1 mm d-1\",\n",
" },\n",
" \"tasmax\": {\"kind\": \"+\"},\n",
" },\n",
" adj_kws={\"interp\": \"nearest\", \"extrapolation\": \"constant\"},\n",
" )\n",
"\n",
"scenh_npdft = out.scenh.rename(time_hist=\"time\") # Bias-adjusted historical period\n",
"scens_npdft = out.scen # Bias-adjusted future period\n",
"extra = out.drop_vars([\"scenh\", \"scen\"])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"##### Restoring the trend\n",
"\n",
"The NpdfT has given us new \"hist\" and \"sim\" arrays with a correct rank structure. However, the trend is lost in this process. We reorder the result of the initial adjustment according to the rank structure of the NpdfT outputs to get our final bias-adjusted series.\n",
"\n",
"`sdba.processing.reordering`: 'ref' the argument that provides the order, 'sim' is the argument to reorder."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"scenh = sdba.processing.reordering(scenh_npdft, scenh, group=\"time\")\n",
"scens = sdba.processing.reordering(scens_npdft, scens, group=\"time\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"scenh = sdba.processing.unstack_variables(scenh)\n",
"scens = sdba.processing.unstack_variables(scens)"
" for ds in (dhist, dsim)\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"##### There we are!\n",
"##### Let's trigger all the computations.\n",
"\n",
"Let's trigger all the computations. The use of `dask.compute` allows the three DataArrays to be computed at the same time, avoiding repeating the common steps."
"The use of `dask.compute` allows the three DataArrays to be computed at the same time, avoiding repeating the common steps."
]
},
{
Expand All @@ -628,9 +540,7 @@
"from dask.diagnostics import ProgressBar\n",
"\n",
"with ProgressBar():\n",
" scenh, scens, escores = compute(\n",
" scenh.isel(location=2), scens.isel(location=2), extra.escores.isel(location=2)\n",
" )"
" scenh, scens, escores = compute(scenh, scens, ADJ.ds.escores)"
]
},
{
Expand All @@ -646,13 +556,15 @@
"metadata": {},
"outputs": [],
"source": [
"fig, ax = plt.subplots()\n",
"\n",
"dref.isel(location=2).tasmax.plot(ax=ax, label=\"Reference\")\n",
"scenh.tasmax.plot(ax=ax, label=\"Adjusted\", alpha=0.65)\n",
"dhist.isel(location=2).tasmax.plot(ax=ax, label=\"Simulated\")\n",
"\n",
"ax.legend()"
"fig, axs = plt.subplots(1, 2, figsize=(16, 4))\n",
"for da, label in zip((dref, scenh, dhist), (\"Reference\", \"Adjusted\", \"Simulated\")):\n",
" ds = sdba.unstack_variables(da).isel(location=2)\n",
" # time series - tasmax\n",
" ds.tasmax.plot(ax=axs[0], label=label, alpha=0.65 if label == \"Adjusted\" else 1)\n",
" # scatter plot\n",
" ds.plot.scatter(x=\"pr\", y=\"tasmax\", ax=axs[1], label=label)\n",
"axs[0].legend()\n",
"axs[1].legend()"
]
},
{
Expand All @@ -661,7 +573,7 @@
"metadata": {},
"outputs": [],
"source": [
"escores.plot()\n",
"escores.isel(location=2).plot()\n",
"plt.title(\"E-scores for each iteration.\")\n",
"plt.xlabel(\"iteration\")\n",
"plt.ylabel(\"E-score\")"
Expand All @@ -686,7 +598,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.2"
"version": "3.12.3"
},
"toc": {
"base_numbering": 1,
Expand Down
99 changes: 85 additions & 14 deletions tests/test_sdba/test_adjustment.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,17 @@
import xarray as xr
from scipy.stats import genpareto, norm, uniform

from xclim.core.calendar import stack_periods
from xclim.core.options import set_options
from xclim.core.units import convert_units_to
from xclim.sdba import adjustment
from xclim.sdba.adjustment import (
LOCI,
BaseAdjustment,
DetrendedQuantileMapping,
EmpiricalQuantileMapping,
ExtremeValues,
MBCn,
PrincipalComponents,
QuantileDeltaMapping,
Scaling,
Expand All @@ -35,6 +38,33 @@
from xclim.testing.sdba_utils import nancov # noqa


class TestBaseAdjustment:
def test_harmonize_units(self, series, random):
n = 10
u = random.random(n)
da = series(u, "tas")
da2 = da.copy()
da2 = convert_units_to(da2, "degC")
(da, da2), _ = BaseAdjustment._harmonize_units(da, da2)
assert da.units == da2.units

@pytest.mark.parametrize("use_dask", [True, False])
def test_harmonize_units_multivariate(self, series, random, use_dask):
n = 10
u = random.random(n)
ds = xr.merge([series(u, "tas"), series(u * 100, "pr")])
ds2 = ds.copy()
ds2["tas"] = convert_units_to(ds2["tas"], "degC")
ds2["pr"] = convert_units_to(ds2["pr"], "nm/day")
da, da2 = stack_variables(ds), stack_variables(ds2)
if use_dask:
da, da2 = da.chunk({"multivar": 1}), da2.chunk({"multivar": 1})

(da, da2), _ = BaseAdjustment._harmonize_units(da, da2)
ds, ds2 = unstack_variables(da), unstack_variables(da2)
assert (ds.tas.units == ds2.tas.units) & (ds.pr.units == ds2.pr.units)


class TestLoci:
@pytest.mark.parametrize("group,dec", (["time", 2], ["time.month", 1]))
def test_time_and_from_ds(self, series, group, dec, tmp_path, random):
Expand Down Expand Up @@ -554,6 +584,50 @@ def test_add_dims(self, use_dask, open_dataset):
assert scen2.sel(location=["Kugluktuk", "Vancouver"]).isnull().all()


@pytest.mark.slow
class TestMBCn:
@pytest.mark.parametrize("use_dask", [True, False])
@pytest.mark.parametrize("group, window", [["time", 1], ["time.dayofyear", 31]])
@pytest.mark.parametrize("period_dim", [None, "period"])
def test_simple(self, open_dataset, use_dask, group, window, period_dim):
group, window, period_dim, use_dask = "time", 1, None, False
with set_options(sdba_encode_cf=use_dask):
if use_dask:
chunks = {"location": -1}
else:
chunks = None
ref, dsim = (
open_dataset(
f"sdba/{file}",
chunks=chunks,
drop_variables=["lat", "lon"],
)
.isel(location=1, drop=True)
.expand_dims(location=["Amos"])
for file in ["ahccd_1950-2013.nc", "CanESM2_1950-2100.nc"]
)
ref, hist = (
ds.sel(time=slice("1981", "2010")).isel(time=slice(365 * 4))
for ds in [ref, dsim]
)
dsim = dsim.sel(time=slice("1981", None))
sim = (stack_periods(dsim).isel(period=slice(1, 2))).isel(
time=slice(365 * 4)
)

ref, hist, sim = (stack_variables(ds) for ds in [ref, hist, sim])

MBCN = MBCn.train(
ref,
hist,
base_kws=dict(nquantiles=50, group=Grouper(group, window)),
adj_kws=dict(interp="linear"),
)
p = MBCN.adjust(sim=sim, ref=ref, hist=hist, period_dim=period_dim)
# 'does it run' test
p.load()


class TestPrincipalComponents:
@pytest.mark.parametrize(
"group", (Grouper("time.month"), Grouper("time", add_dims=["lon"]))
Expand Down Expand Up @@ -601,20 +675,17 @@ def _group_assert(ds, dim):
@pytest.mark.parametrize("use_dask", [True, False])
@pytest.mark.parametrize("pcorient", ["full", "simple"])
def test_real_data(self, atmosds, use_dask, pcorient):
ref = stack_variables(
xr.Dataset(
{"tasmax": atmosds.tasmax, "tasmin": atmosds.tasmin, "tas": atmosds.tas}
)
).isel(location=3)
hist = stack_variables(
xr.Dataset(
{
"tasmax": 1.001 * atmosds.tasmax,
"tasmin": atmosds.tasmin - 0.25,
"tas": atmosds.tas + 1,
}
)
).isel(location=3)
ds0 = xr.Dataset(
{"tasmax": atmosds.tasmax, "tasmin": atmosds.tasmin, "tas": atmosds.tas}
)
ref = stack_variables(ds0).isel(location=3)
hist0 = ds0
with xr.set_options(keep_attrs=True):
hist0["tasmax"] = 1.001 * hist0.tasmax
hist0["tasmin"] = hist0.tasmin - 0.25
hist0["tas"] = hist0.tas + 1

hist = stack_variables(hist0).isel(location=3)
with xr.set_options(keep_attrs=True):
sim = hist + 5
sim["time"] = sim.time + np.timedelta64(10, "Y").astype("<m8[ns]")
Expand Down
Loading

0 comments on commit 1d91900

Please sign in to comment.