Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added ability to run at a low latency of less than 3 days (0-3) #112

Merged
merged 28 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
9feb42b
Changed default behaviour of RAT:
SanchitMinocha Aug 24, 2024
595b04c
Updated comments
SanchitMinocha Aug 24, 2024
9447b6e
All IMERG data has been revised to V07B
SanchitMinocha Aug 24, 2024
f0d3767
Bug fix in read_write_chunk & updated CombinedNC
SanchitMinocha Aug 24, 2024
a08f89d
Dwnlod of new low latency data using GFS & GEFS
SanchitMinocha Aug 24, 2024
bb2b4a5
Divide forecasting into 2 to avoid circular import
SanchitMinocha Aug 24, 2024
e4dcb59
Vic_init_state_finder to run operationally
SanchitMinocha Aug 24, 2024
f67e024
Vic save date is no longer end date
SanchitMinocha Aug 24, 2024
e393b5f
divide forecasting into 2 to avoid circular import
SanchitMinocha Aug 24, 2024
c864ebf
Added low latency feature in RAT
SanchitMinocha Aug 24, 2024
79cb63b
updated storage scenario
pritamd47 Aug 25, 2024
c741d36
updated storage based scenario in forecasting
pritamd47 Aug 25, 2024
3394b9a
Removed duplicate of forecasting.py
SanchitMinocha Aug 25, 2024
f113700
Renamed imerg_latency as low_latency_limit
SanchitMinocha Aug 25, 2024
21de4dd
Added additional util functions for usage
SanchitMinocha Aug 25, 2024
d85a724
Added function to estimate quantile of res. area
SanchitMinocha Aug 30, 2024
c1b3077
Updated basin reservoir shpfile create fn
SanchitMinocha Aug 30, 2024
7c2ccd7
conversion of all inflow files to final outputs
SanchitMinocha Aug 30, 2024
7dea06d
forecast outfl. scenario by user & actual storage
SanchitMinocha Aug 30, 2024
d8a4e91
Easy forecasting for multiple dates in the past.
SanchitMinocha Aug 30, 2024
fb16a94
Updated forecasting docs
SanchitMinocha Aug 30, 2024
411eb72
Bug fix: rout init & added basedate parameter
SanchitMinocha Aug 30, 2024
7c11d59
added low_latency as parameter
SanchitMinocha Aug 30, 2024
fd9a9f6
Bug fix: loading of netcdf file to overwrite
SanchitMinocha Aug 30, 2024
17d67b6
added generic fn to create metsim inputs
SanchitMinocha Aug 30, 2024
08b5eba
Added resolution for review comments
SanchitMinocha Aug 30, 2024
8faea6c
Clearification of forecast_vic_init_state in the docs
SanchitMinocha Sep 9, 2024
240269a
Updated docs to describe changes in new version
SanchitMinocha Sep 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/Configuration/rat_config.md
Original file line number Diff line number Diff line change
Expand Up @@ -808,7 +808,7 @@ This section of the configuration file describes the parameters defined by `rout
If `clean_previous_outputs` is `True`, the previous outputs are cleaned before executing any step in `steps`.

!!! tip_note "Tip"
You should use `clean_previous_outputs` if you want to have fresh outputs of RAT for a river basin. Otherwise, by default RAT will keep appending the new outputs to the same files and will concatenate data by calendar dates.
You should use `clean_previous_outputs` if you want to have fresh outputs of RAT for a river basin. Otherwise, by default RAT will keep appending the new outputs to the same files and will concatenate data by calendar dates. In case the new outputs and previous outputs have some coinciding dates, RAT will replace the previous outputs with the new outputs for these dates.

### Confidential
* <h6 class="parameter_heading">*`secrets:`* :</h6>
Expand Down
11 changes: 8 additions & 3 deletions docs/Plugins/Forecasting.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,17 @@ To run the forecast plugin, set the value of the `forecast` option in the PLUGIN
PLUGINS:
forecast: True
forecast_lead_time: 15
forecast_start_time: end_date # can either be “end_date” or a date in YYYY-MM-DD format
forecast_rule_curve_dir: /path/to/rule_curve
forecast_gen_start_date: end_date # can either be “end_date” or a date in YYYY-MM-DD format
forecast_gen_end_date: # a date in YYYY-MM-DD format (Optional)
forecast_rule_curve_dir: /path/to/rule_curve (Optional)
forecast_reservoir_shpfile_column_dict: {column_id: GRAND_ID, column_capacity: CAP_MCM}
forecast_vic_init_state: # path of VIC state file or date in YYYY-MM-DD format (Optional)
forecast_rout_init_state: # path of Routing state file or date in YYYY-MM-DD format (Optional)
forecast_storage_scenario: [ST, GO, CO]
forecast_storage_change_percent_of_smax: [20, 10, 5]
```

The `forecast_start_time` option controls when the forecast will begin. If the value is set to `end_date`, the forecast will begin on the end date of RAT’s normal mode of running, i.e., in nowcast mode, which are controlled by `start_date` and `end_date` options in the BASIN section. Alternatively, a date in the YYYY-MM-DD format can also be provided to start the forecast from that date. The forecast window or the number of days ahead for which the forecast is generated is controlled by the `forecast_lead_time` option, with a maximum of 15 days ahead. The `forecast_rule_curve_dir` option should point to the directory containing the rule curve files for the reservoir. The `forecast_reservoir_shpfile_column_dict` option specifies the names of the columns that correspond to the ID and capacity of the reservoir, named `column_id` and `column_capacity`. These columns should be present in the [`reservoir_vector_file` in the GEE section](../../Configuration/rat_config/#gee) in the configuration file for running the forecasting plugin.
The `forecast_gen_start_date` option controls when the generation of forecast will begin. If the value is set to `end_date`, the forecast will begin on the end date of RAT’s normal mode of running, i.e., in nowcast mode, which are controlled by `start_date` and `end_date` options in the BASIN section. Alternatively, a date in the YYYY-MM-DD format can also be provided to start the generation of forecast from that date. The `forecast_gen_end_date` option defines the end date for which forecasts will be generated. The forecast window for each date when the forecast is being generated or the number of days ahead for which the forecast is generated is controlled by the `forecast_lead_time` option, with a maximum of 15 days ahead. The `forecast_rule_curve_dir` option should point to the directory containing the rule curve files for the reservoir. The `forecast_reservoir_shpfile_column_dict` option specifies the names of the columns that correspond to the ID and capacity of the reservoir, named `column_id` and `column_capacity`. These columns should be present in the [`reservoir_vector_file` in the GEE section](../../Configuration/rat_config/#gee) in the configuration file for running the forecasting plugin. The `forecast_vic_init_state` option can be used to define the date of vic init file (if any) or path of the vic init state file. If the path is provided, it is assumed as the state file corresponding to `forecast_gen_start_date` and `forecast_rout_init_state` becomes a required parameter.
SanchitMinocha marked this conversation as resolved.
Show resolved Hide resolved

!!!note
The files in the `forecast_rule_curve_dir` should be named according to their IDs, corresponding to the values in `column_id`. For instance, if the id of a reservoir is 7001, then the file name of the reservoir's rule curve should be `7001.txt`. The rule curve files should be in `.txt` format with the following columns - `Month,S/Smax`, corresponding to the month and the storage level as a fraction of the maximum storage level. Rule curve files for reservoirs in the [GRAND](https://www.globaldamwatch.org/grand) database can be downloaded [here](https://www.dropbox.com/scl/fi/jtquzasjdv2tz1vtupgq5/rc.zip?rlkey=4svbutjd3aup255pnrlgnxbkl&dl=0).
Expand Down
31 changes: 21 additions & 10 deletions src/rat/core/run_metsim.py
SanchitMinocha marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,28 @@ def convert_to_vic_forcings(self, forcings_dir):
log.debug(f"Will create {len(years)} forcing files")
for year, ds, p in zip(years, dataset, paths):
if os.path.isfile(p):
existing = xr.open_dataset(p)#.load()
existing.close()

log.debug(f"Writing file for year {year}: {p} -- Updating existing")
# xr.merge([existing, ds], compat='override', join='outer').to_netcdf(p)
# xr.concat([existing, ds], dim='time').to_netcdf(p)
last_existing_time = existing.time[-1]
log.debug("Existing data: %s", last_existing_time)
# Open the existing NetCDF file
with xr.open_dataset(p) as existing:
last_existing_time = existing.time[-1]
log.debug(f"Writing file for year {year}: {p} -- Updating existing")
log.debug("Existing data: %s", last_existing_time)

# Load the data into memory before closing the dataset
existing_data = existing.sel(
time=slice(None, last_existing_time)
).load() # Load the data into memory

# Select the new data that needs to be appended
ds = ds.sel(time=slice(last_existing_time + np.timedelta64(6,'h'), ds.time[-1]))
#ds = ds.isel(time=slice(1, None))
xr.merge([existing, ds]).to_netcdf(p)

# Merge the existing data with the new data and save it back to the file
xr.merge([existing_data, ds]).to_netcdf(p)

# # Explicitly delete variables to free up memory
# del existing_data, ds

# # Manually trigger garbage collection to ensure memory is freed
# gc.collect()
else:
log.debug(f"Writing file for year {year}: {p} -- Updating new")
ds.to_netcdf(p)
2 changes: 1 addition & 1 deletion src/rat/core/run_postprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ def calc_E(res_data, start_date, end_date, forcings_path, vic_res_path, sarea, s
# Concat the two dataframes into a new dataframe holding all the data (memory intensive):
complement = pd.concat([existing_data, new_data], ignore_index=True)
# Remove all duplicates:
complement.drop_duplicates(subset=['time'],inplace=True, keep='first')
complement.drop_duplicates(subset=['time'],inplace=True, keep='last')
complement.to_csv(savepath, index=False)
else:
data[['time', 'penman_E']].rename({'penman_E': 'OUT_EVAP'}, axis=1).to_csv(savepath, index=False)
Expand Down
2 changes: 1 addition & 1 deletion src/rat/core/run_routing.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ def generate_inflow(src_dir, dst_dir):
# Concat the two dataframes into a new dataframe holding all the data (memory intensive):
complement = pd.concat([existing_data, new_data], ignore_index=True)
# Remove all duplicates:
complement.drop_duplicates(subset=['date'], inplace=True, keep='first')
complement.drop_duplicates(subset=['date'], inplace=True, keep='last')
complement.sort_values(by='date', inplace=True)
complement.to_csv(outpath, index=False)
else:
Expand Down
4 changes: 2 additions & 2 deletions src/rat/core/run_vic.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def generate_routing_input_state(self, ndays, rout_input_state_file, save_path,
first_existing_time = new_vic_output.time[0]
new_vic_output.close()

#Preprocessing function for merging netcdf files
#Preprocessing function for merging netcdf files by removing coinciding dates from the first and using those values from latest
def _remove_coinciding_days(ds, cutoff_time, ndays):
file_name = ds.encoding["source"]
file_stem = Path(file_name).stem
Expand All @@ -55,7 +55,7 @@ def _remove_coinciding_days(ds, cutoff_time, ndays):
return ds
remove_coinciding_days_func = partial(_remove_coinciding_days, cutoff_time=first_existing_time, ndays=ndays)

# Merging previous and new vic outputs
# Merging previous and new vic outputs by taking 365 days data in state file (previous vic outputs) before the first date in new vic output. So coinciding data will be removed from state file.
try:
save_vic_output = xr.open_mfdataset([rout_input_state_file,self.vic_result],{'time':365}, preprocess=remove_coinciding_days_func)
save_vic_output.to_netcdf(save_path)
Expand Down
137 changes: 130 additions & 7 deletions src/rat/data_processing/metsim_input_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
log.setLevel(LOG_LEVEL)

class CombinedNC:
def __init__(self, start, end, datadir, basingridpath, outputdir, use_previous, forecast_dir=None, forecast_basedate=None, climatological_data=None, z_lim=3):
def __init__(self, start, end, datadir, basingridpath, outputdir, use_previous, forecast_dir=None, forecast_basedate=None, climatological_data=None, z_lim=3, low_latency_dir=None):
"""
Parameters:
start: Start date in YYYY-MM-DD format
Expand All @@ -43,8 +43,13 @@ def __init__(self, start, end, datadir, basingridpath, outputdir, use_previous,
self._latitudes, self._longitudes = self._get_lat_long_meshgrid()

if forecast_dir:
log.debug("Combining forcing data for forecasting mode.")
self.read_forecast(forecast_dir, forecast_basedate)
self._write()
elif low_latency_dir:
log.debug("Combining forcing data for low latency mode.")
self.read_low_latency(low_latency_dir)
self._write()
else:
self._read_and_write_in_chunks()
# self._read()
Expand Down Expand Up @@ -173,6 +178,52 @@ def read_forecast(self, forecast_dir, basedate):
self.winds[day, :, :] = wind
# pbar.update(1)

def read_low_latency(self, low_latency_dir):
low_latency_dir = Path(low_latency_dir)
start = self._start
end = self._end

# define data arrays
self.precips = np.zeros((self._total_days+1, self._rast.height, self._rast.width))
self.tmaxes = np.zeros((self._total_days+1, self._rast.height, self._rast.width))
self.tmins = np.zeros((self._total_days+1, self._rast.height, self._rast.width))
self.winds = np.zeros((self._total_days+1, self._rast.height, self._rast.width))

self.dates = pd.date_range(start, end)

for day, date in enumerate(self.dates):
fileDate = date
reqDate = fileDate.strftime("%Y-%m-%d")
log.debug("Combining data: %s", reqDate)
# pbar.set_description(reqDate)

precipfilepath = low_latency_dir / f'gefs-chirps/processed' / f'{date:%Y%m%d}' / f'{date:%Y%m%d}.asc'
precipitation = rio.open(precipfilepath).read(1, masked=True).astype(np.float32).filled(np.nan)#.flatten()[self.gridvalue==0.0]

#Reading Maximum Temperature ASCII file contents
tmaxfilepath = low_latency_dir / f'gfs/processed/{date:%Y%m%d}/tmax/{date:%Y%m%d}.asc'
tmax = rio.open(tmaxfilepath).read(1, masked=True).astype(np.float32).filled(np.nan)#.flatten()[self.gridvalue==0.0]

#Reading Minimum Temperature ASCII file contents
tminfilepath = low_latency_dir / f'gfs/processed/{date:%Y%m%d}/tmin/{date:%Y%m%d}.asc'
tmin = rio.open(tminfilepath).read(1, masked=True).astype(np.float32).filled(np.nan)#.flatten()[self.gridvalue==0.0]

#Reading Average Wind Speed ASCII file contents
uwndfilepath = low_latency_dir / f'gfs/processed/{date:%Y%m%d}/uwnd/{date:%Y%m%d}.asc'
uwnd = rio.open(uwndfilepath).read(1, masked=True).astype(np.float32).filled(np.nan)

# #Reading Average Wind Speed ASCII file contents
vwndfilepath = low_latency_dir / f'gfs/processed/{date:%Y%m%d}/vwnd/{date:%Y%m%d}.asc'
vwnd = rio.open(vwndfilepath).read(1, masked=True).astype(np.float32).filled(np.nan)
wind = (0.75*np.sqrt(uwnd**2 + vwnd**2))#.flatten()[self.gridvalue==0.0]

# self.dates.append(fileDate)
self.precips[day, :, :] = precipitation
self.tmaxes[day, :, :] = tmax
self.tmins[day, :, :] = tmin
self.winds[day, :, :] = wind
# pbar.update(1)

# Imputes missing data by interpolation in the order of dimensions time, lon, lat.
def _impute_basin_missing_data(self, combined_data):
combine_nomiss_data = combined_data
Expand Down Expand Up @@ -350,7 +401,7 @@ def _write_chunk(self, ds_list, existing_to_append=None, last_existing_time=None
# Separate the non-time-dependent variable
if 'extent' in existing_to_append.data_vars:
# Drop the extent variable from existing_to_append if it already exists in the file for concat operation with other chunks. It will be added after writing all chunks in _apply_dataset_operations
ds_chunk = ds_chunk.drop_vars('extent')
existing_to_append = existing_to_append.drop_vars('extent')
write_ds_chunk = xr.concat([existing_to_append, ds_chunk_sel], dim='time')
write_ds_chunk.to_netcdf(self._outputpath, mode='w', unlimited_dims=['time'])
# Else just write the chunk in file (by creating new if first chunk, otherwise append)
Expand Down Expand Up @@ -397,11 +448,17 @@ def _read_and_write_in_chunks(self):
log.debug(f"Found existing file at {self._outputpath} -- Updating in-place")
# Assuming the existing file structure is same as the one generated now. Basically
# assuming that the previous file was also created by MetSimRunner
existing = xr.open_dataset(self._outputpath)
existing.close()
last_existing_time = existing.time[-1]
log.debug("Existing data: %s", last_existing_time)
existing_to_append = existing.sel(time=slice(start_date_pd.to_datetime64() - np.timedelta64(120,'D') , last_existing_time))

SanchitMinocha marked this conversation as resolved.
Show resolved Hide resolved
# Open the existing NetCDF file
with xr.open_dataset(self._outputpath) as existing:
last_existing_time = existing.time[-1]
log.debug("Existing data: %s", last_existing_time)

# Select the time slice you need before closing the dataset
existing_to_append = existing.sel(
time=slice(start_date_pd.to_datetime64() - np.timedelta64(120, 'D'), last_existing_time)
).load() # Load the data into memory
# Now that the dataset is closed, pass the sliced data to the write function
self._write_chunk(ds_list, existing_to_append, last_existing_time, first_loop=True)
else:
raise Exception('Previous combined dataset not found. Please run RAT with spin-up or use state file paths.')
Expand Down Expand Up @@ -456,4 +513,70 @@ def generate_state_and_inputs(forcings_startdate, forcings_enddate, combined_dat
log.debug(f"Saving forcings: {forcings_outpath}")
forcings_ds.to_netcdf(forcings_outpath)

return state_outpath, forcings_outpath

def generate_metsim_state_and_inputs_with_multiple_nc_files(
nc_file_paths, # List of paths to NetCDF files to concatenate
start_dates, # List of start dates for each subsequent file in nc_file_paths
out_dir, # Output directory
forcings_start_date, # Start date for forcings
forcings_end_date, # End date for forcings
forecast_mode=False # Flag to indicate forecast mode
):
"""
Generate MetSim state and input files by concatenating multiple NetCDF files along the time dimension.

Parameters:
- nc_file_paths (list of str): Paths to the NetCDF files to be concatenated.
- start_dates (list of datetime.datetime): Start dates for each file in nc_file_paths from the second file onwards.
The list should have one fewer elements than nc_file_paths.
- out_dir (str): Directory where the output NetCDF files will be saved.
- forcings_start_date (datetime.datetime): Start date for the forcings time slice.
- forcings_end_date (datetime.datetime): End date for the forcings time slice.
- forecast_mode (bool): If True, output files are prefixed with 'forecast_'.

Returns:
- state_outpath (Path): Path to the generated state NetCDF file.
- forcings_outpath (Path): Path to the generated MetSim input NetCDF file.
"""

out_dir = Path(out_dir)
nc_datasets = []

# Load and slice the datasets
for i, nc_file_path in enumerate(nc_file_paths):
ds = xr.open_dataset(nc_file_path)

if i == 0:
# First dataset: Slice from the start to the first start_date
ds = ds.sel(time=slice(None, start_dates[0] - pd.Timedelta(days=1)))
else:
# Subsequent datasets: Slice from the day after the previous start_date
if i < len(start_dates):
ds = ds.sel(time=slice(start_dates[i-1], start_dates[i] - pd.Timedelta(days=1)))
else:
ds = ds.sel(time=slice(start_dates[i-1], None))

nc_datasets.append(ds)

# Concatenate all datasets along the time dimension
combined_data = xr.concat(nc_datasets, dim="time")

# Generate state NetCDF file
state_startdate = forcings_start_date - pd.Timedelta(days=90)
state_enddate = forcings_start_date - pd.Timedelta(days=1)
state_ds = combined_data.sel(time=slice(state_startdate, state_enddate))

state_filename = "forecast_state.nc" if forecast_mode else "state.nc"
state_outpath = out_dir / state_filename
state_ds.to_netcdf(state_outpath)

# Generate MetSim input NetCDF file
forcings_ds = combined_data.sel(time=slice(forcings_start_date, forcings_end_date))

forcings_filename = "forecast_metsim_input.nc" if forecast_mode else "metsim_input.nc"
forcings_outpath = out_dir / forcings_filename
print(f"Saving forcings: {forcings_outpath}")
forcings_ds.to_netcdf(forcings_outpath)

return state_outpath, forcings_outpath
Loading