Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Forecasting Plugin #90

Merged
merged 19 commits into from
Feb 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions docs/Plugins/Forecasting.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# RAT-Forecasting

The forecasting plugin adds functionality to generate short-term forecasts of the reservoir state for up to 15 days. It uses forecasted weather data to forecast the inflow to reservoirs. Other Reservoir operations related fluxes, specifically, the storage change in the forecast window and the resulting outflow from the reservoir are estimated as different scenarios, which are described in detail below.

## How to Use
To run the forecast plugin, set the value of the `forecast` option in the PLUGINS section of the configuration file to True.

```
PLUGINS:
forecast: True
forecast_lead_time: 15
forecast_start_time: end_date # can either be “end_date” or a date in YYYY-MM-DD format
```

The `forecast_start_time` option controls when the forecast will begin. If the value is set to `end_date`, the forecast will begin on the end date of RAT’s normal mode of running, i.e., in nowcast mode, which are controlled by `start_date` and `end_date` options in the BASIN section. Alternatively, a date in the YYYY-MM-DD format can also be provided to start the forecast from that date. The forecast window or the number of days ahead for which the forecast is generated is controlled by the `forecast_lead_time` option, with a maximum of 15 days ahead.

## Forecasted inflow and evaporation
- The inflow to the reservoir is simulated using forecasted precipitation from Climate Hazards Center InfraRed Precipitation with Stations-Global Ensemble Forecasting System [(CHIRPS-GEFS)](https://chc.ucsb.edu/data/chirps-gefs) and forecasted temperature and wind data from Global Forecasting System [(GFS)](https://www.ncei.noaa.gov/products/weather-climate-models/global-forecast). CHIRPS-GEFS uses satellite and observations of precipitation (CHIRPS) for bias correction and downscaling of the Global Ensemble Forecasting System (GEFS) precipitation forecasts. The Global Forecasting System (GFS) is a Numerical Weather Prediction system for operational weather prediction which forecasts meteorological variables, including temperature and wind.
- The forecasted meteorology is used to run the hydrological model component of RAT (MetSim + VIC + VIC Routing) to obtain forecasted streamflow and evaporation for each reservoir.

## Reservoir Storage Change and Outflow Scenarios
The reservoir state – storage change, outflow, water surface elevation, and the water surface area – is estimated based on different scenarios of possible reservoir operations in the forecasting window. The scenarios used to estimate reservoir storage change are as follows -

- Target reservoir water level – Maintains a target water level by storing and releasing the necessary amount of water.

- Fraction of maximum reservoir storage – Storage change is estimated as a fraction of the maximum reservoir storage.

- Historical operations-based outflow scenarios – Storage change is estimated based on historical operations of the reservoir. Historical observations of the reservoir are obtained from Biswas et al. (2021), which infers the reservoir operations using long-term observations of the reservoir surface area.

- Gates Closed/Open - Simulation of the reservoir state by considering the dam gates to be either fully closed or fully open.

- User defined storage change – Users can directly input the expected volume of storage change in the forecasting window to simulate the reservoir states.

## Publication

We used this forecasting plugin to perform a forensic study of devastating floods due to extreme precipitation that caused havoc in the entire state of Kerala, India in 2018. The forecasted data generated for the study can be accessed here - [RAT-Forecasting-Kerala-2018](forecasting-data.zip). It contains inflow forecast, inflow nowcast and release scenarios.
2 changes: 2 additions & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ nav:
- 'Configuration File': 'Configuration/rat_config.md'
- 'Secrets File': 'Configuration/secrets.md'
- 'Multiple Basin Run': 'Configuration/basins_metadata.md'
- Plugins:
- 'Forecasting': 'Plugins/Forecasting.md'
- Data Description:
- 'Directory Structure': 'RAT_Data/DirectoryStructure.md'
- 'Global Database': 'RAT_Data/GlobalDatabase.md'
Expand Down
47 changes: 30 additions & 17 deletions src/rat/core/run_postprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def calc_dels(aecpath, sareapath, savepath):

df.to_csv(savepath, index=False)

def calc_E(res_data, start_date, end_date, forcings_path, vic_res_path, sarea_path, savepath):
def calc_E(res_data, start_date, end_date, forcings_path, vic_res_path, sarea, savepath):
ds = xr.open_dataset(vic_res_path)
forcings_ds = xr.open_mfdataset(forcings_path, engine='netcdf4')
## Slicing the latest run time period
Expand All @@ -71,15 +71,16 @@ def calc_E(res_data, start_date, end_date, forcings_path, vic_res_path, sarea_pa
reqvars_clipped = ds_clipped[['OUT_EVAP', 'OUT_R_NET', 'OUT_VP', 'OUT_WIND', 'OUT_AIR_TEMP']]
reqvars = reqvars_clipped.load()

# get sarea
log.debug("Getting surface areas")
sarea = pd.read_csv(sarea_path, parse_dates=['date']).rename({'date': 'time'}, axis=1)[['time', 'area']]
sarea = sarea.set_index('time')
upsampled_sarea = sarea.resample('D').mean()
sarea_interpolated = upsampled_sarea.interpolate(method='linear')

## Slicing the latest run time period
sarea_interpolated = sarea_interpolated[start_date:end_date]
# get sarea - if string, read from file, else use same surface area value for all time steps
log.debug("Getting surface areas", sarea)
if isinstance(sarea, str):
sarea = pd.read_csv(sarea, parse_dates=['date']).rename({'date': 'time'}, axis=1)[['time', 'area']]
sarea = sarea.set_index('time')
upsampled_sarea = sarea.resample('D').mean()
sarea_interpolated = upsampled_sarea.interpolate(method='linear')

## Slicing the latest run time period
sarea_interpolated = sarea_interpolated[start_date:end_date]

log.debug("Checking if grid cells lie inside reservoir")
last_layer = reqvars.isel(time=-1).to_dataframe().reset_index()
Expand All @@ -103,7 +104,10 @@ def calc_E(res_data, start_date, end_date, forcings_path, vic_res_path, sarea_pa

P = forcings.sel(lat=np.array(points_within.y), lon=np.array(points_within.x), method='nearest').resample({'time':'1D'}).mean().to_dataframe().groupby('time').mean()[1:]

data['area'] = sarea_interpolated['area']
if isinstance(sarea, str):
data['area'] = sarea_interpolated['area']
else:
data['area'] = sarea
data['P'] = P
data = data.dropna()
if (data.empty):
Expand Down Expand Up @@ -171,7 +175,7 @@ def calc_outflow(inflowpath, dspath, epath, area, savepath):


def run_postprocessing(basin_name, basin_data_dir, reservoir_shpfile, reservoir_shpfile_column_dict, aec_dir_path, start_date, end_date, rout_init_state_save_file, use_rout_state,
evap_datadir, dels_savedir, outflow_savedir, vic_status, routing_status, gee_status):
evap_datadir, dels_savedir, outflow_savedir, vic_status, routing_status, gee_status, forecast_mode=False):
# read file defining mapped resrvoirs
# reservoirs_fn = os.path.join(project_dir, 'backend/data/ancillary/RAT-Reservoirs.geojson')
reservoirs = gpd.read_file(reservoir_shpfile)
Expand Down Expand Up @@ -226,20 +230,29 @@ def run_postprocessing(basin_name, basin_data_dir, reservoir_shpfile, reservoir_
if(use_rout_state):
vic_results_path = rout_init_state_save_file
else:
vic_results_path = os.path.join(basin_data_dir,'vic', "vic_outputs/nc_fluxes."+start_date_str+".nc")
forcings_path = os.path.join(basin_data_dir,'vic', "vic_inputs/*.nc")
if forecast_mode:
vic_results_path = os.path.join(basin_data_dir,'vic', 'forecast_vic_outputs', "nc_fluxes."+start_date_str+".nc")
else:
vic_results_path = os.path.join(basin_data_dir,'vic', "vic_outputs/nc_fluxes."+start_date_str+".nc")
if forecast_mode:
forcings_path = os.path.join(basin_data_dir,'vic', 'forecast_vic_inputs/*.nc')
else:
forcings_path = os.path.join(basin_data_dir,'vic', "vic_inputs/*.nc")

for reservoir_no,reservoir in reservoirs.iterrows():
try:
# Reading reservoir information
reservoir_name = str(reservoir[reservoir_shpfile_column_dict['unique_identifier']])
sarea_path = os.path.join(sarea_raw_dir, reservoir_name + ".csv")
if not os.path.isfile(sarea_path):
sarea = float(reservoir[reservoir_shpfile_column_dict['area_column']])
else:
sarea = sarea_path
e_path = os.path.join(evap_datadir, reservoir_name + ".csv")

if os.path.isfile(sarea_path):
if os.path.isfile(sarea) or isinstance(sarea, float):
log.debug(f"Calculating Evaporation for {reservoir_name}")
# calc_E(e_path, respath, vic_results_path)
calc_E(reservoir, start_date_str_evap, end_date_str, forcings_path, vic_results_path, sarea_path, e_path)
calc_E(reservoir, start_date_str_evap, end_date_str, forcings_path, vic_results_path, sarea, e_path)
else:
raise Exception("Surface area file not found; skipping evaporation calculation")
except:
Expand Down
23 changes: 15 additions & 8 deletions src/rat/core/run_routing.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ def generate_inflow(src_dir, dst_dir):
log_level1.warning(f"Inflow was not calculated for {no_failed_files} reservoirs. Please check Level-2 log file for more details.")

@dask.delayed(pure=True)
def run_for_station(station_name, config, start, end, basin_flow_direction_file, rout_input_path_prefix, inflow_dir, station_path_latlon, route_workspace_dir, clean=False):
def run_for_station(station_name, config, start, end, basin_flow_direction_file, rout_input_path_prefix, inflow_dir, station_path_latlon, route_workspace_dir, forecast_mode=False, clean=False):
if isinstance(station_path_latlon, pd.Series):
station_path_latlon = station_path_latlon.to_frame().T
log.debug("Running routing for station: %s", station_name)
Expand All @@ -186,7 +186,10 @@ def run_for_station(station_name, config, start, end, basin_flow_direction_file,
input_files_glob = input_files_dst / Path(rout_input_path_prefix).stem

# output files
output_files_dst = route_workspace_dir / 'ou'
if forecast_mode:
output_files_dst = route_workspace_dir / 'forecast_ou'
else:
output_files_dst = route_workspace_dir / 'ou'
output_files_dst.mkdir(parents=True, exist_ok=True)

# flow direction file
Expand Down Expand Up @@ -229,7 +232,7 @@ def run_for_station(station_name, config, start, end, basin_flow_direction_file,
clean=False
) as r:
log.debug("Routing Parameter file: %s", r.route_param_path)
route = RoutingRunner(
route = RoutingRunner(
project_dir = config['GLOBAL']['project_dir'],
result_dir = str(output_files_dst),
inflow_dir = str(input_files_dst),
Expand All @@ -247,7 +250,7 @@ def run_for_station(station_name, config, start, end, basin_flow_direction_file,
output_path = Path(r.params['output_dir']) / f"{station_name}.day"
return output_path, basin_station_xy_path, ret_code, station_name

def run_routing(config, start, end, basin_flow_direction_file, rout_input_path_prefix, inflow_dir, station_path_latlon, route_dir, route_output_dir, clean=False):
def run_routing(config, start, end, basin_flow_direction_file, rout_input_path_prefix, inflow_dir, station_path_latlon, route_dir, route_output_dir, forecast_mode=False, clean=False):
start = pd.to_datetime(start)
end = pd.to_datetime(end)

Expand All @@ -265,7 +268,7 @@ def run_routing(config, start, end, basin_flow_direction_file, rout_input_path_p

futures = []
for i, station in stations.iterrows():
future = run_for_station(station['name'], config, start, end, basin_flow_direction_file, rout_input_path_prefix, inflow_dir, station, route_workspace_dir, clean)
future = run_for_station(station['name'], config, start, end, basin_flow_direction_file, rout_input_path_prefix, inflow_dir, station, route_workspace_dir, forecast_mode, clean)
futures.append(future)
routing_results = dask.compute(*futures)

Expand All @@ -282,7 +285,7 @@ def run_routing(config, start, end, basin_flow_direction_file, rout_input_path_p

# Gathering all the outputs of routing in route_output_dir
route_workspace_dir = route_dir / 'wkspc'
gathering_ro_ou(route_workspace_dir,route_output_dir)
gathering_ro_ou(route_workspace_dir,route_output_dir, forecast_mode=forecast_mode)

# Tracking number of stations (no_failed_files) for which routing has failed to execute.
no_failed_files = len(routing_statuses[routing_statuses["routing_status"]!=0])
Expand All @@ -292,7 +295,7 @@ def run_routing(config, start, end, basin_flow_direction_file, rout_input_path_p
return output_paths, station_xy_path, routing_statuses

# Gathers all the workspace routing outputs into single output directory and save them by their complete station name.
def gathering_ro_ou(wkspc_dir, gathered_ro_ou_dir):
def gathering_ro_ou(wkspc_dir, gathered_ro_ou_dir, forecast_mode):
'''Gathers all the workspace routing outputs into single output directory and save them by their complete station name.

Parameters:
Expand All @@ -302,7 +305,11 @@ def gathering_ro_ou(wkspc_dir, gathered_ro_ou_dir):
assert wkspc_dir.exists()
gathered_ro_ou_dir.mkdir(parents=True, exist_ok=True)
# for all files with this pattern in wkspc dir
for f in list(wkspc_dir.glob('**/ou/*.day')):
if forecast_mode:
glob_pattern = '**/forecast_ou/*.day'
else:
glob_pattern = '**/ou/*.day'
for f in list(wkspc_dir.glob(glob_pattern)):
try:
# grab station name
station_name = f.parent.parent.name
Expand Down
73 changes: 61 additions & 12 deletions src/rat/data_processing/metsim_input_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,16 @@
import os
import pandas as pd
import shutil
import dask
from pathlib import Path

from rat.utils.logging import LOG_NAME, LOG_LEVEL, NOTIFICATION

log = getLogger(LOG_NAME)
log.setLevel(LOG_LEVEL)

class CombinedNC:
def __init__(self, start, end, datadir, basingridpath, outputdir, use_previous, climatological_data=None, z_lim=3):
def __init__(self, start, end, datadir, basingridpath, outputdir, use_previous, forecast_dir=None, forecast_basedate=None, climatological_data=None, z_lim=3):
"""
Parameters:
start: Start date in YYYY-MM-DD format
Expand All @@ -39,13 +41,10 @@ def __init__(self, start, end, datadir, basingridpath, outputdir, use_previous,
self._longitudes1d, self._latitudes1d = self._get_lat_long_1d()
self._latitudes, self._longitudes = self._get_lat_long_meshgrid()

self.precips = np.zeros((self._total_days+1, self._rast.height, self._rast.width))
self.tmaxes = np.zeros((self._total_days+1, self._rast.height, self._rast.width))
self.tmins = np.zeros((self._total_days+1, self._rast.height, self._rast.width))
self.winds = np.zeros((self._total_days+1, self._rast.height, self._rast.width))
self.dates = pd.date_range(start, end)

self._read()
if forecast_dir:
self.read_forecast(forecast_dir, forecast_basedate)
else:
self._read()
self._write()
# If climatological data is passed, then run the climatological data correction
if climatological_data:
Expand Down Expand Up @@ -126,16 +125,66 @@ def _get_lat_long_meshgrid(self):

return latitudes, longitudes

def read_forecast(self, forecast_dir, basedate):
forecast_dir = Path(forecast_dir)
basedate = pd.to_datetime(basedate)

# define data arrays
self.precips = np.zeros((15, self._rast.height, self._rast.width))
self.tmaxes = np.zeros((15, self._rast.height, self._rast.width))
self.tmins = np.zeros((15, self._rast.height, self._rast.width))
self.winds = np.zeros((15, self._rast.height, self._rast.width))

self.dates = pd.date_range(basedate + datetime.timedelta(days=1), basedate + datetime.timedelta(days=15))

for day, date in enumerate(self.dates):
fileDate = date
reqDate = fileDate.strftime("%Y-%m-%d")
# pbar.set_description(reqDate)

precipfilepath = forecast_dir / f'gefs-chirps/processed' / f'{basedate:%Y%m%d}' / f'{date:%Y%m%d}.asc'
precipitation = rio.open(precipfilepath).read(1, masked=True).astype(np.float32).filled(np.nan)#.flatten()[self.gridvalue==0.0]

#Reading Maximum Temperature ASCII file contents
tmaxfilepath = forecast_dir / f'gfs/processed/{basedate:%Y%m%d}/tmax/{date:%Y%m%d}.asc'
tmax = rio.open(tmaxfilepath).read(1, masked=True).astype(np.float32).filled(np.nan)#.flatten()[self.gridvalue==0.0]

#Reading Minimum Temperature ASCII file contents
tminfilepath = forecast_dir / f'gfs/processed/{basedate:%Y%m%d}/tmin/{date:%Y%m%d}.asc'
tmin = rio.open(tminfilepath).read(1, masked=True).astype(np.float32).filled(np.nan)#.flatten()[self.gridvalue==0.0]

#Reading Average Wind Speed ASCII file contents
uwndfilepath = forecast_dir / f'gfs/processed/{basedate:%Y%m%d}/uwnd/{date:%Y%m%d}.asc'
uwnd = rio.open(uwndfilepath).read(1, masked=True).astype(np.float32).filled(np.nan)

# #Reading Average Wind Speed ASCII file contents
vwndfilepath = forecast_dir / f'gfs/processed/{basedate:%Y%m%d}/vwnd/{date:%Y%m%d}.asc'
vwnd = rio.open(vwndfilepath).read(1, masked=True).astype(np.float32).filled(np.nan)
wind = (0.75*np.sqrt(uwnd**2 + vwnd**2))#.flatten()[self.gridvalue==0.0]

# self.dates.append(fileDate)
self.precips[day, :, :] = precipitation
self.tmaxes[day, :, :] = tmax
self.tmins[day, :, :] = tmin
self.winds[day, :, :] = wind
# pbar.update(1)

def _read(self):
self.precips = np.zeros((self._total_days+1, self._rast.height, self._rast.width))
self.tmaxes = np.zeros((self._total_days+1, self._rast.height, self._rast.width))
self.tmins = np.zeros((self._total_days+1, self._rast.height, self._rast.width))
self.winds = np.zeros((self._total_days+1, self._rast.height, self._rast.width))
self.dates = pd.date_range(self._start, self._end)

for day, date in enumerate(self.dates):
fileDate = date
reqDate = fileDate.strftime("%Y-%m-%d")
log.debug("Combining data: %s", reqDate)
# pbar.set_description(reqDate)

precipfilepath = os.path.join(self._datadir, f'precipitation/{reqDate}_IMERG.asc')
precipitation = rio.open(precipfilepath).read(1, masked=True).astype(np.float32).filled(np.nan)#.flatten()[self.gridvalue==0.0]

#Reading Maximum Temperature ASCII file contents
tmaxfilepath = os.path.join(self._datadir, f'tmax/{reqDate}_TMAX.asc')
tmax = rio.open(tmaxfilepath).read(1, masked=True).astype(np.float32).filled(np.nan)#.flatten()[self.gridvalue==0.0]
Expand All @@ -147,12 +196,12 @@ def _read(self):
#Reading Average Wind Speed ASCII file contents
uwndfilepath = os.path.join(self._datadir, f'uwnd/{reqDate}_UWND.asc')
uwnd = rio.open(uwndfilepath).read(1, masked=True).astype(np.float32).filled(np.nan)

# #Reading Average Wind Speed ASCII file contents
vwndfilepath = os.path.join(self._datadir, f'vwnd/{reqDate}_VWND.asc')
vwnd = rio.open(vwndfilepath).read(1, masked=True).astype(np.float32).filled(np.nan)
wind = (0.75*np.sqrt(uwnd**2 + vwnd**2))#.flatten()[self.gridvalue==0.0]

# self.dates.append(fileDate)
self.precips[day, :, :] = precipitation
self.tmaxes[day, :, :] = tmax
Expand Down
Loading