diff --git a/docs/Configuration/rat_config.md b/docs/Configuration/rat_config.md index f9cfa60..2d0695b 100644 --- a/docs/Configuration/rat_config.md +++ b/docs/Configuration/rat_config.md @@ -808,7 +808,7 @@ This section of the configuration file describes the parameters defined by `rout If `clean_previous_outputs` is `True`, the previous outputs are cleaned before executing any step in `steps`. !!! tip_note "Tip" - You should use `clean_previous_outputs` if you want to have fresh outputs of RAT for a river basin. Otherwise, by default RAT will keep appending the new outputs to the same files and will concatenate data by calendar dates. + You should use `clean_previous_outputs` if you want to have fresh outputs of RAT for a river basin. Otherwise, by default RAT will keep appending the new outputs to the same files and will concatenate data by calendar dates. In case the new outputs and previous outputs have some coinciding dates, RAT will replace the previous outputs with the new outputs for these dates. ### Confidential *
*`secrets:`* :
diff --git a/docs/Development/PatchNotes.md b/docs/Development/PatchNotes.md index 667d7bb..cba0232 100644 --- a/docs/Development/PatchNotes.md +++ b/docs/Development/PatchNotes.md @@ -1,5 +1,17 @@ # Patch Notes +### v3.0.14 +In this release, we have: + +1. Enhanced Low-Latency Functionality: RAT can now run in operational mode with significantly reduced latency as low as 0. +2. Updated Forecasting Plugin: The forecasting plugin has been upgraded to allow forecast generation for multiple past dates. +3. Updated IMERG Precipitation Web Links: The web links for downloading historical IMERG data (prior to 2024) have been updated to match those for current data. This change reflects the revision of the IMERG product version for historical data to V07B, which is now the same as the version for recent IMERG data. These updates were implemented on June 1, 2024, on the IMERG web servers. +4. Updated RAT documentation: to reflect the changes in the forecasting plugin and the possibility of using low latency in operational mode. + +!!!note + 1. Previously, a latency of 3 or more days was recommended due to delays in retrieving meteorological data from servers. However, RAT can now operate with latencies of less than 3 days, including real-time data (0-day latency). This is a major improvement over earlier versions, enabling users to generate data for the current day and produce forecasts up to 15 days ahead from the current day. + 2. Previously, forecasts could only be generated for the final date of the RAT run, which worked well for operational use. Now, for case studies and research purposes, users can generate forecasts for several historical dates, offering greater flexibility and utility. + ### v3.0.13 In this release, we have: @@ -29,7 +41,9 @@ In this release, we have: ### v3.0.8 -In this release, we have added rat.toolbox module to contain helpful and utility functions of user. Right now it has one function related to config and that is to update an existing config. Future work will be to add more functions to this module like to create config, or to plot outputs etc. +In this release, we have: + +1. Added rat.toolbox module to contain helpful and utility functions of user. Right now it has one function related to config and that is to update an existing config. Future work will be to add more functions to this module like to create config, or to plot outputs etc. ### v3.0.7 diff --git a/docs/Development/RecentAdjustments.md b/docs/Development/RecentAdjustments.md index 58952d2..854362a 100644 --- a/docs/Development/RecentAdjustments.md +++ b/docs/Development/RecentAdjustments.md @@ -1,5 +1,10 @@ # Recent Adjustments +### September8, 2024 +From June 1 in 2024,all the historical IMERG data has been revised to Version 07B along with the current IMERG data being generated daily. In the last patch we updated the link to download IMERG for recent and current data while the link for the past historic data was not updated to V07B. This was raising the 'Connection Reset' error if one tries to run RAT from say 2020 to 2024. The problem has been resolved in the [developer version](../../Development/DeveloperVersion/) of RAT. It has been released in the version of RAT [v3.0.14](../../Development/PatchNotes/#v3014). To [update](https://conda.io/projects/conda/en/latest/commands/update.html) RAT, please use `conda update rat` in your RAT environment. + +Due to the non-availability of IMERG data for April and May 2024 on the IMERG web server, users may encounter issues when running RAT for time periods that include these months. This issue is on the IMERG data provider's side, and we are hopeful it will be resolved soon. + ### June 30, 2024 There is a change in the required permissions for Google Service Accounts to access Earth Engine API. diff --git a/docs/Plugins/Forecasting.md b/docs/Plugins/Forecasting.md index bffda8f..391206d 100644 --- a/docs/Plugins/Forecasting.md +++ b/docs/Plugins/Forecasting.md @@ -9,12 +9,17 @@ To run the forecast plugin, set the value of the `forecast` option in the PLUGIN PLUGINS: forecast: True forecast_lead_time: 15 - forecast_start_time: end_date # can either be “end_date” or a date in YYYY-MM-DD format - forecast_rule_curve_dir: /path/to/rule_curve + forecast_gen_start_date: end_date # can either be “end_date” or a date in YYYY-MM-DD format + forecast_gen_end_date: # a date in YYYY-MM-DD format (Optional) + forecast_rule_curve_dir: /path/to/rule_curve (Optional) forecast_reservoir_shpfile_column_dict: {column_id: GRAND_ID, column_capacity: CAP_MCM} + forecast_vic_init_state: # path of VIC state file or date in YYYY-MM-DD format (Optional) + forecast_rout_init_state: # path of Routing state file or date in YYYY-MM-DD format (Optional) + forecast_storage_scenario: [ST, GO, CO] + forecast_storage_change_percent_of_smax: [20, 10, 5] ``` -The `forecast_start_time` option controls when the forecast will begin. If the value is set to `end_date`, the forecast will begin on the end date of RAT’s normal mode of running, i.e., in nowcast mode, which are controlled by `start_date` and `end_date` options in the BASIN section. Alternatively, a date in the YYYY-MM-DD format can also be provided to start the forecast from that date. The forecast window or the number of days ahead for which the forecast is generated is controlled by the `forecast_lead_time` option, with a maximum of 15 days ahead. The `forecast_rule_curve_dir` option should point to the directory containing the rule curve files for the reservoir. The `forecast_reservoir_shpfile_column_dict` option specifies the names of the columns that correspond to the ID and capacity of the reservoir, named `column_id` and `column_capacity`. These columns should be present in the [`reservoir_vector_file` in the GEE section](../../Configuration/rat_config/#gee) in the configuration file for running the forecasting plugin. +The `forecast_gen_start_date` option controls when the generation of forecast will begin. If the value is set to `end_date`, the forecast will begin on the end date of RAT’s normal mode of running, i.e., in nowcast mode, which are controlled by `start_date` and `end_date` options in the BASIN section. Alternatively, a date in the YYYY-MM-DD format can also be provided to start the generation of forecast from that date. The `forecast_gen_end_date` option defines the end date for which forecasts will be generated. The forecast window for each date when the forecast is being generated or the number of days ahead for which the forecast is generated is controlled by the `forecast_lead_time` option, with a maximum of 15 days ahead. The `forecast_rule_curve_dir` option should point to the directory containing the rule curve files for the reservoir. The `forecast_reservoir_shpfile_column_dict` option specifies the names of the columns that correspond to the ID and capacity of the reservoir, named `column_id` and `column_capacity`. These columns should be present in the [`reservoir_vector_file` in the GEE section](../../Configuration/rat_config/#gee) in the configuration file for running the forecasting plugin. The `forecast_vic_init_state` option can be used to define the date in 'YYYY-MM-DD' format of vic init file to be used. It can also be the path of the vic init state file that the user want to use. In case the path of vic init state file is provided instead of date, it is assumed that the state file is for the `forecast_gen_start_date`. Also, in this case, `forecast_rout_init_state` becomes a required parameter and has to be the actual path of the rout init state file. !!!note The files in the `forecast_rule_curve_dir` should be named according to their IDs, corresponding to the values in `column_id`. For instance, if the id of a reservoir is 7001, then the file name of the reservoir's rule curve should be `7001.txt`. The rule curve files should be in `.txt` format with the following columns - `Month,S/Smax`, corresponding to the month and the storage level as a fraction of the maximum storage level. Rule curve files for reservoirs in the [GRAND](https://www.globaldamwatch.org/grand) database can be downloaded [here](https://www.dropbox.com/scl/fi/jtquzasjdv2tz1vtupgq5/rc.zip?rlkey=4svbutjd3aup255pnrlgnxbkl&dl=0). diff --git a/src/rat/core/run_metsim.py b/src/rat/core/run_metsim.py index 517775e..14755fd 100644 --- a/src/rat/core/run_metsim.py +++ b/src/rat/core/run_metsim.py @@ -38,17 +38,28 @@ def convert_to_vic_forcings(self, forcings_dir): log.debug(f"Will create {len(years)} forcing files") for year, ds, p in zip(years, dataset, paths): if os.path.isfile(p): - existing = xr.open_dataset(p)#.load() - existing.close() - - log.debug(f"Writing file for year {year}: {p} -- Updating existing") - # xr.merge([existing, ds], compat='override', join='outer').to_netcdf(p) - # xr.concat([existing, ds], dim='time').to_netcdf(p) - last_existing_time = existing.time[-1] - log.debug("Existing data: %s", last_existing_time) + # Open the existing NetCDF file + with xr.open_dataset(p) as existing: + last_existing_time = existing.time[-1] + log.debug(f"Writing file for year {year}: {p} -- Updating existing") + log.debug("Existing data: %s", last_existing_time) + + # Load the data into memory before closing the dataset + existing_data = existing.sel( + time=slice(None, last_existing_time) + ).load() # Load the data into memory + + # Select the new data that needs to be appended ds = ds.sel(time=slice(last_existing_time + np.timedelta64(6,'h'), ds.time[-1])) - #ds = ds.isel(time=slice(1, None)) - xr.merge([existing, ds]).to_netcdf(p) + + # Merge the existing data with the new data and save it back to the file + xr.merge([existing_data, ds]).to_netcdf(p) + + # # Explicitly delete variables to free up memory + # del existing_data, ds + + # # Manually trigger garbage collection to ensure memory is freed + # gc.collect() else: log.debug(f"Writing file for year {year}: {p} -- Updating new") ds.to_netcdf(p) \ No newline at end of file diff --git a/src/rat/core/run_postprocessing.py b/src/rat/core/run_postprocessing.py index 84aa8ff..48b3883 100644 --- a/src/rat/core/run_postprocessing.py +++ b/src/rat/core/run_postprocessing.py @@ -129,7 +129,7 @@ def calc_E(res_data, start_date, end_date, forcings_path, vic_res_path, sarea, s # Concat the two dataframes into a new dataframe holding all the data (memory intensive): complement = pd.concat([existing_data, new_data], ignore_index=True) # Remove all duplicates: - complement.drop_duplicates(subset=['time'],inplace=True, keep='first') + complement.drop_duplicates(subset=['time'],inplace=True, keep='last') complement.to_csv(savepath, index=False) else: data[['time', 'penman_E']].rename({'penman_E': 'OUT_EVAP'}, axis=1).to_csv(savepath, index=False) diff --git a/src/rat/core/run_routing.py b/src/rat/core/run_routing.py index e3594f2..a86af26 100644 --- a/src/rat/core/run_routing.py +++ b/src/rat/core/run_routing.py @@ -152,7 +152,7 @@ def generate_inflow(src_dir, dst_dir): # Concat the two dataframes into a new dataframe holding all the data (memory intensive): complement = pd.concat([existing_data, new_data], ignore_index=True) # Remove all duplicates: - complement.drop_duplicates(subset=['date'], inplace=True, keep='first') + complement.drop_duplicates(subset=['date'], inplace=True, keep='last') complement.sort_values(by='date', inplace=True) complement.to_csv(outpath, index=False) else: diff --git a/src/rat/core/run_vic.py b/src/rat/core/run_vic.py index e00e20e..d9220a8 100644 --- a/src/rat/core/run_vic.py +++ b/src/rat/core/run_vic.py @@ -45,7 +45,7 @@ def generate_routing_input_state(self, ndays, rout_input_state_file, save_path, first_existing_time = new_vic_output.time[0] new_vic_output.close() - #Preprocessing function for merging netcdf files + #Preprocessing function for merging netcdf files by removing coinciding dates from the first and using those values from latest def _remove_coinciding_days(ds, cutoff_time, ndays): file_name = ds.encoding["source"] file_stem = Path(file_name).stem @@ -55,7 +55,7 @@ def _remove_coinciding_days(ds, cutoff_time, ndays): return ds remove_coinciding_days_func = partial(_remove_coinciding_days, cutoff_time=first_existing_time, ndays=ndays) - # Merging previous and new vic outputs + # Merging previous and new vic outputs by taking 365 days data in state file (previous vic outputs) before the first date in new vic output. So coinciding data will be removed from state file. try: save_vic_output = xr.open_mfdataset([rout_input_state_file,self.vic_result],{'time':365}, preprocess=remove_coinciding_days_func) save_vic_output.to_netcdf(save_path) diff --git a/src/rat/data_processing/metsim_input_processing.py b/src/rat/data_processing/metsim_input_processing.py index f8842a6..bd8b92d 100644 --- a/src/rat/data_processing/metsim_input_processing.py +++ b/src/rat/data_processing/metsim_input_processing.py @@ -19,7 +19,7 @@ log.setLevel(LOG_LEVEL) class CombinedNC: - def __init__(self, start, end, datadir, basingridpath, outputdir, use_previous, forecast_dir=None, forecast_basedate=None, climatological_data=None, z_lim=3): + def __init__(self, start, end, datadir, basingridpath, outputdir, use_previous, forecast_dir=None, forecast_basedate=None, climatological_data=None, z_lim=3, low_latency_dir=None): """ Parameters: start: Start date in YYYY-MM-DD format @@ -43,8 +43,13 @@ def __init__(self, start, end, datadir, basingridpath, outputdir, use_previous, self._latitudes, self._longitudes = self._get_lat_long_meshgrid() if forecast_dir: + log.debug("Combining forcing data for forecasting mode.") self.read_forecast(forecast_dir, forecast_basedate) self._write() + elif low_latency_dir: + log.debug("Combining forcing data for low latency mode.") + self.read_low_latency(low_latency_dir) + self._write() else: self._read_and_write_in_chunks() # self._read() @@ -173,6 +178,52 @@ def read_forecast(self, forecast_dir, basedate): self.winds[day, :, :] = wind # pbar.update(1) + def read_low_latency(self, low_latency_dir): + low_latency_dir = Path(low_latency_dir) + start = self._start + end = self._end + + # define data arrays + self.precips = np.zeros((self._total_days+1, self._rast.height, self._rast.width)) + self.tmaxes = np.zeros((self._total_days+1, self._rast.height, self._rast.width)) + self.tmins = np.zeros((self._total_days+1, self._rast.height, self._rast.width)) + self.winds = np.zeros((self._total_days+1, self._rast.height, self._rast.width)) + + self.dates = pd.date_range(start, end) + + for day, date in enumerate(self.dates): + fileDate = date + reqDate = fileDate.strftime("%Y-%m-%d") + log.debug("Combining data: %s", reqDate) + # pbar.set_description(reqDate) + + precipfilepath = low_latency_dir / f'gefs-chirps/processed' / f'{date:%Y%m%d}' / f'{date:%Y%m%d}.asc' + precipitation = rio.open(precipfilepath).read(1, masked=True).astype(np.float32).filled(np.nan)#.flatten()[self.gridvalue==0.0] + + #Reading Maximum Temperature ASCII file contents + tmaxfilepath = low_latency_dir / f'gfs/processed/{date:%Y%m%d}/tmax/{date:%Y%m%d}.asc' + tmax = rio.open(tmaxfilepath).read(1, masked=True).astype(np.float32).filled(np.nan)#.flatten()[self.gridvalue==0.0] + + #Reading Minimum Temperature ASCII file contents + tminfilepath = low_latency_dir / f'gfs/processed/{date:%Y%m%d}/tmin/{date:%Y%m%d}.asc' + tmin = rio.open(tminfilepath).read(1, masked=True).astype(np.float32).filled(np.nan)#.flatten()[self.gridvalue==0.0] + + #Reading Average Wind Speed ASCII file contents + uwndfilepath = low_latency_dir / f'gfs/processed/{date:%Y%m%d}/uwnd/{date:%Y%m%d}.asc' + uwnd = rio.open(uwndfilepath).read(1, masked=True).astype(np.float32).filled(np.nan) + + # #Reading Average Wind Speed ASCII file contents + vwndfilepath = low_latency_dir / f'gfs/processed/{date:%Y%m%d}/vwnd/{date:%Y%m%d}.asc' + vwnd = rio.open(vwndfilepath).read(1, masked=True).astype(np.float32).filled(np.nan) + wind = (0.75*np.sqrt(uwnd**2 + vwnd**2))#.flatten()[self.gridvalue==0.0] + + # self.dates.append(fileDate) + self.precips[day, :, :] = precipitation + self.tmaxes[day, :, :] = tmax + self.tmins[day, :, :] = tmin + self.winds[day, :, :] = wind + # pbar.update(1) + # Imputes missing data by interpolation in the order of dimensions time, lon, lat. def _impute_basin_missing_data(self, combined_data): combine_nomiss_data = combined_data @@ -350,7 +401,7 @@ def _write_chunk(self, ds_list, existing_to_append=None, last_existing_time=None # Separate the non-time-dependent variable if 'extent' in existing_to_append.data_vars: # Drop the extent variable from existing_to_append if it already exists in the file for concat operation with other chunks. It will be added after writing all chunks in _apply_dataset_operations - ds_chunk = ds_chunk.drop_vars('extent') + existing_to_append = existing_to_append.drop_vars('extent') write_ds_chunk = xr.concat([existing_to_append, ds_chunk_sel], dim='time') write_ds_chunk.to_netcdf(self._outputpath, mode='w', unlimited_dims=['time']) # Else just write the chunk in file (by creating new if first chunk, otherwise append) @@ -397,11 +448,17 @@ def _read_and_write_in_chunks(self): log.debug(f"Found existing file at {self._outputpath} -- Updating in-place") # Assuming the existing file structure is same as the one generated now. Basically # assuming that the previous file was also created by MetSimRunner - existing = xr.open_dataset(self._outputpath) - existing.close() - last_existing_time = existing.time[-1] - log.debug("Existing data: %s", last_existing_time) - existing_to_append = existing.sel(time=slice(start_date_pd.to_datetime64() - np.timedelta64(120,'D') , last_existing_time)) + + # Open the existing NetCDF file + with xr.open_dataset(self._outputpath) as existing: + last_existing_time = existing.time[-1] + log.debug("Existing data: %s", last_existing_time) + + # Select the time slice you need before closing the dataset + existing_to_append = existing.sel( + time=slice(start_date_pd.to_datetime64() - np.timedelta64(120, 'D'), last_existing_time) + ).load() # Load the data into memory + # Now that the dataset is closed, pass the sliced data to the write function self._write_chunk(ds_list, existing_to_append, last_existing_time, first_loop=True) else: raise Exception('Previous combined dataset not found. Please run RAT with spin-up or use state file paths.') @@ -456,4 +513,70 @@ def generate_state_and_inputs(forcings_startdate, forcings_enddate, combined_dat log.debug(f"Saving forcings: {forcings_outpath}") forcings_ds.to_netcdf(forcings_outpath) + return state_outpath, forcings_outpath + +def generate_metsim_state_and_inputs_with_multiple_nc_files( + nc_file_paths, # List of paths to NetCDF files to concatenate + start_dates, # List of start dates for each subsequent file in nc_file_paths + out_dir, # Output directory + forcings_start_date, # Start date for forcings + forcings_end_date, # End date for forcings + forecast_mode=False # Flag to indicate forecast mode + ): + """ + Generate MetSim state and input files by concatenating multiple NetCDF files along the time dimension. + + Parameters: + - nc_file_paths (list of str): Paths to the NetCDF files to be concatenated. + - start_dates (list of datetime.datetime): Start dates for each file in nc_file_paths from the second file onwards. + The list should have one fewer elements than nc_file_paths. + - out_dir (str): Directory where the output NetCDF files will be saved. + - forcings_start_date (datetime.datetime): Start date for the forcings time slice. + - forcings_end_date (datetime.datetime): End date for the forcings time slice. + - forecast_mode (bool): If True, output files are prefixed with 'forecast_'. + + Returns: + - state_outpath (Path): Path to the generated state NetCDF file. + - forcings_outpath (Path): Path to the generated MetSim input NetCDF file. + """ + + out_dir = Path(out_dir) + nc_datasets = [] + + # Load and slice the datasets + for i, nc_file_path in enumerate(nc_file_paths): + ds = xr.open_dataset(nc_file_path) + + if i == 0: + # First dataset: Slice from the start to the first start_date + ds = ds.sel(time=slice(None, start_dates[0] - pd.Timedelta(days=1))) + else: + # Subsequent datasets: Slice from the day after the previous start_date + if i < len(start_dates): + ds = ds.sel(time=slice(start_dates[i-1], start_dates[i] - pd.Timedelta(days=1))) + else: + ds = ds.sel(time=slice(start_dates[i-1], None)) + + nc_datasets.append(ds) + + # Concatenate all datasets along the time dimension + combined_data = xr.concat(nc_datasets, dim="time") + + # Generate state NetCDF file + state_startdate = forcings_start_date - pd.Timedelta(days=90) + state_enddate = forcings_start_date - pd.Timedelta(days=1) + state_ds = combined_data.sel(time=slice(state_startdate, state_enddate)) + + state_filename = "forecast_state.nc" if forecast_mode else "state.nc" + state_outpath = out_dir / state_filename + state_ds.to_netcdf(state_outpath) + + # Generate MetSim input NetCDF file + forcings_ds = combined_data.sel(time=slice(forcings_start_date, forcings_end_date)) + + forcings_filename = "forecast_metsim_input.nc" if forecast_mode else "metsim_input.nc" + forcings_outpath = out_dir / forcings_filename + print(f"Saving forcings: {forcings_outpath}") + forcings_ds.to_netcdf(forcings_outpath) + return state_outpath, forcings_outpath \ No newline at end of file diff --git a/src/rat/data_processing/newdata.py b/src/rat/data_processing/newdata.py index e722d3b..0f846c0 100644 --- a/src/rat/data_processing/newdata.py +++ b/src/rat/data_processing/newdata.py @@ -56,27 +56,10 @@ def _determine_precip_link_and_version(date): if version == "IMERG-FINAL": link = f"ftp://arthurhou.pps.eosdis.nasa.gov/gpmdata/{date.strftime('%Y')}/{date.strftime('%m')}/{date.strftime('%d')}/gis/3B-DAY-GIS.MS.MRG.3IMERG.{date.strftime('%Y%m%d')}-S000000-E235959.0000.V06A.tif" elif version == "IMERG-LATE": - if date >= datetime(2024, 6, 1): # Version was changed from V06E to V07B - link = f"https://jsimpsonhttps.pps.eosdis.nasa.gov/imerg/gis/{date.strftime('%Y')}/{date.strftime('%m')}/3B-HHR-L.MS.MRG.3IMERG.{date.strftime('%Y%m%d')}-S233000-E235959.1410.V07B.1day.tif" - elif date >= datetime(2023, 11, 8): # Version was changed from V06D to V06E - link = f"https://jsimpsonhttps.pps.eosdis.nasa.gov/imerg/gis/{date.strftime('%Y')}/{date.strftime('%m')}/3B-HHR-L.MS.MRG.3IMERG.{date.strftime('%Y%m%d')}-S233000-E235959.1410.V06E.1day.tif" - elif date >= datetime(2023, 7, 1): # Version was changed from V06C to V06D - link = f"https://jsimpsonhttps.pps.eosdis.nasa.gov/imerg/gis/{date.strftime('%Y')}/{date.strftime('%m')}/3B-HHR-L.MS.MRG.3IMERG.{date.strftime('%Y%m%d')}-S233000-E235959.1410.V06D.1day.tif" - elif date >= datetime(2022, 5, 8): # Version was changed from V06B to V06C - link = f"https://jsimpsonhttps.pps.eosdis.nasa.gov/imerg/gis/{date.strftime('%Y')}/{date.strftime('%m')}/3B-HHR-L.MS.MRG.3IMERG.{date.strftime('%Y%m%d')}-S233000-E235959.1410.V06C.1day.tif" - else: - link = f"https://jsimpsonhttps.pps.eosdis.nasa.gov/imerg/gis/{date.strftime('%Y')}/{date.strftime('%m')}/3B-HHR-L.MS.MRG.3IMERG.{date.strftime('%Y%m%d')}-S233000-E235959.1410.V06B.1day.tif" + # if date >= datetime(2024, 6, 1): # Version was changed from V06E to V07B + link = f"https://jsimpsonhttps.pps.eosdis.nasa.gov/imerg/gis/{date.strftime('%Y')}/{date.strftime('%m')}/3B-HHR-L.MS.MRG.3IMERG.{date.strftime('%Y%m%d')}-S233000-E235959.1410.V07B.1day.tif" else: - if date >= datetime(2024, 6, 1): # Version was changed from V06E to V07B - link = f"https://jsimpsonhttps.pps.eosdis.nasa.gov/imerg/gis/early/{date.strftime('%Y')}/{date.strftime('%m')}/3B-HHR-E.MS.MRG.3IMERG.{date.strftime('%Y%m%d')}-S233000-E235959.1410.V07B.1day.tif" - elif date >= datetime(2023, 11, 8): # Version was changed from V06D to V06E - link = f"https://jsimpsonhttps.pps.eosdis.nasa.gov/imerg/gis/early/{date.strftime('%Y')}/{date.strftime('%m')}/3B-HHR-E.MS.MRG.3IMERG.{date.strftime('%Y%m%d')}-S233000-E235959.1410.V06E.1day.tif" - elif date >= datetime(2023, 7, 1): # Version was changed from V06C to V06D - link = f"https://jsimpsonhttps.pps.eosdis.nasa.gov/imerg/gis/early/{date.strftime('%Y')}/{date.strftime('%m')}/3B-HHR-E.MS.MRG.3IMERG.{date.strftime('%Y%m%d')}-S233000-E235959.1410.V06D.1day.tif" - elif date >= datetime(2022, 5, 8): # Version was changed from V06B to V06C - link = f"https://jsimpsonhttps.pps.eosdis.nasa.gov/imerg/gis/early/{date.strftime('%Y')}/{date.strftime('%m')}/3B-HHR-E.MS.MRG.3IMERG.{date.strftime('%Y%m%d')}-S233000-E235959.1410.V06C.1day.tif" - else: - link = f"https://jsimpsonhttps.pps.eosdis.nasa.gov/imerg/gis/early/{date.strftime('%Y')}/{date.strftime('%m')}/3B-HHR-E.MS.MRG.3IMERG.{date.strftime('%Y%m%d')}-S233000-E235959.1410.V06B.1day.tif" + link = f"https://jsimpsonhttps.pps.eosdis.nasa.gov/imerg/gis/early/{date.strftime('%Y')}/{date.strftime('%m')}/3B-HHR-E.MS.MRG.3IMERG.{date.strftime('%Y%m%d')}-S233000-E235959.1410.V07B.1day.tif" return (link,version) def _get_cmd_precip_download(outputpath,link,version,secrets): diff --git a/src/rat/data_processing/newdata_low_latency.py b/src/rat/data_processing/newdata_low_latency.py new file mode 100644 index 0000000..4e1ce10 --- /dev/null +++ b/src/rat/data_processing/newdata_low_latency.py @@ -0,0 +1,63 @@ +import pandas as pd +from logging import getLogger + +from plugins.forecasting.forecasting import get_gefs_precip +from plugins.forecasting.forecasting import get_GFS_data +from rat.utils.logging import LOG_NAME, LOG_LEVEL, NOTIFICATION + +# Getting the log-level 2 +log = getLogger(LOG_NAME) +log.setLevel(LOG_LEVEL) + +def get_newdata_low_latency(start, end , basin_bounds, raw_low_latency_dir, processed_low_latency_dir, low_latency_gfs_dir): + """ + Downloads and processes low-latency weather data (precipitation, temperature, wind speed) for a given basin + over a specified date range. + + Args: + start (datetime.datetime): The start date for the data retrieval. + end (datetime.datetime): The end date for the data retrieval. + basin_bounds (tuple or dict): The geographical bounds of the basin for which data is to be processed. + This can be a tuple of coordinates or a dictionary with bounding box details. + raw_low_latency_dir (str): Directory path where raw low-latency GEFS data is stored. + processed_low_latency_dir (str): Directory path where processed low-latency GEFS data will be saved. + low_latency_gfs_dir (str): Directory path where GFS data (temperature, wind speed) is stored. + + Returns: + None + + Description: + This function iterates over a date range defined by `start` and `end`. For each date, it: + 1. Downloads and processes GEFS-CHIRPS precipitation data for the given basin bounds. + 2. Downloads and processes GFS data (including temperature and wind speed) for the same date. + + The processed data is saved to the specified directories. + + Logging: + The function logs the status of the data download and processing for each date using `log.info`, + which is assumed to be configured in the broader application context. + """ + # Create array of dates + low_latency_dates = pd.date_range(start, end) + # Getting now cast for every date, so 0 lead time + lead_time = 0 + + for date in low_latency_dates: + #Download and process GEFS-CHIRPS data to the basin bounds (precipitation) + try: + log.info(f"Downloading low latency GEFS Precipitation for {date.strftime('%Y-%m-%d')}.") + get_gefs_precip(basin_bounds=basin_bounds, forecast_raw_dir=raw_low_latency_dir, forecast_processed_dir=processed_low_latency_dir,begin= date, lead_time=lead_time, low_latency=True) + except Exception as e: + log.error(f"Downloading of low latency GEFS Precipitation for {date.strftime('%Y-%m-%d')} failed.") + log.error(e) + #Download and process GFS data (tmin, tmax, wind speed) + try: + log.info(f"Downloading low latency GFS data for {date.strftime('%Y-%m-%d')}.") + get_GFS_data(basedate=date,lead_time=lead_time, basin_bounds=basin_bounds, gfs_dir=low_latency_gfs_dir, hour_of_day=12) + except Exception as e: + log.error(f"Downloading of low latency GFS data for {date.strftime('%Y-%m-%d')} failed.") + log.error(e) + + + + diff --git a/src/rat/plugins/forecasting/forecast_basin.py b/src/rat/plugins/forecasting/forecast_basin.py new file mode 100644 index 0000000..8702115 --- /dev/null +++ b/src/rat/plugins/forecasting/forecast_basin.py @@ -0,0 +1,226 @@ +import ruamel_yaml as ryaml +import pandas as pd +import geopandas as gpd +import numpy as np +from pathlib import Path +import shutil +import datetime +from logging import getLogger + +from rat.data_processing.metsim_input_processing import CombinedNC, generate_metsim_state_and_inputs_with_multiple_nc_files +from rat.toolbox.config import update_config +from rat.utils.vic_init_state_finder import get_vic_init_state_date +from rat.utils.utils import check_date_in_netcdf, get_first_date_from_netcdf +from rat.plugins.forecasting.forecasting import get_gefs_precip, get_GFS_data, forecast_outflow +from rat.plugins.forecasting.forecasting import convert_forecast_evaporation,convert_forecast_inflow, convert_forecast_outflow_states +from rat.rat_basin import rat_basin + + +def forecast(config, rat_logger, low_latency_limit): + """Function to run the forecasting plugin. + + Args: + config (dict): Dictionary containing the configuration parameters. + rat_logger (Logger): Logger object + low_latency_limit (int): Maximum number of days that will be counted as low latency from today + """ + + print("Forecasting Plugin Started") + # read necessary parameters from config + basins_shapefile_path = config['GLOBAL']['basin_shpfile'] # Shapefile containg information of basin(s)- geometry and attributes + basins_shapefile = gpd.read_file(basins_shapefile_path) # Reading basins_shapefile_path to get basin polygons and their attributes + basins_shapefile_column_dict = config['GLOBAL']['basin_shpfile_column_dict'] # Dictionary of column names in basins_shapefile, Must contain 'id' field + region_name = config['BASIN']['region_name'] # Major basin name used to cluster multiple basins data in data-directory + basin_name = config['BASIN']['basin_name'] # Basin name used to save basin related data + basin_id = config['BASIN']['basin_id'] # Unique identifier for each basin used to map basin polygon in basins_shapefile + basin_data = basins_shapefile[basins_shapefile[basins_shapefile_column_dict['id']]==basin_id] # Getting the particular basin related information corresponding to basin_id + basin_bounds = basin_data.bounds # Obtaining bounds of the particular basin + basin_bounds = np.array(basin_bounds)[0] + data_dir = config['GLOBAL']['data_dir'] + basin_data_dir = Path(config['GLOBAL']['data_dir']) / region_name / 'basins' / basin_name + if config['PLUGINS'].get('forecast_rule_curve_dir'): + rule_curve_dir = Path(config['PLUGINS'].get('forecast_rule_curve_dir')) + else: + rule_curve_dir = None + reservoirs_gdf_column_dict = config['GEE']['reservoir_vector_file_columns_dict'] + forecast_reservoir_shpfile_column_dict = config['PLUGINS']['forecast_reservoir_shpfile_column_dict'] + if (config['ROUTING']['station_global_data']): + reservoirs_gdf_column_dict['unique_identifier'] = 'uniq_id' + else: + reservoirs_gdf_column_dict['unique_identifier'] = reservoirs_gdf_column_dict['dam_name_column'] + + # determine start date to generate forecast + if not config['PLUGINS'].get('forecast_gen_start_date'): + raise Exception("The start date for generating forecast is not provided. Please provide 'forecast_gen_start_date' in config file.") + elif config['PLUGINS']['forecast_gen_start_date'] == 'end_date': + forecast_gen_start_date = pd.to_datetime(config['BASIN']['end']) + else: + forecast_gen_start_date = pd.to_datetime(config['PLUGINS']['forecast_gen_start_date']) + + # determine end date to generate forecast if provided, otherwise same as forecast_gen_start_date + forecast_gen_end_date = config['PLUGINS'].get('forecast_gen_end_date') + if forecast_gen_end_date: + forecast_gen_end_date = pd.to_datetime(forecast_gen_end_date) + else: + forecast_gen_end_date = forecast_gen_start_date + + # Verify that forecast_gen start and end dates are less than RAT's end date for last run. + if not ((forecast_gen_end_date <= pd.to_datetime(config['BASIN']['end'])) and + (forecast_gen_start_date <= forecast_gen_end_date)): + raise Exception("forecast_gen_start_date should be less than or equal to forecast_gen_end_date and both should be less than or equal to RAT's end date for last run.") + + # determine lead time for each day to generate forecast + if not config['PLUGINS'].get('forecast_lead_time'): + raise Exception("The lead time for each day to generate forecast is not provided. Please provide 'forecast_lead_time' in config file.") + else: + lead_time = config['PLUGINS']['forecast_lead_time'] + + # Determine vic_init_state_date to be used + if config['PLUGINS'].get('forecast_vic_init_state'): + vic_init_state = config['PLUGINS']['forecast_vic_init_state'] + else: + vic_init_state = get_vic_init_state_date(forecast_gen_start_date,low_latency_limit,data_dir, region_name, basin_name) + if not vic_init_state: + raise Exception("No vic init state was found to execute RAT. You can provide path or date using 'forecast_vic_init_state' in config file.") + + # Determine rout_init_state_date to be used + if(isinstance(vic_init_state, datetime.date)): + rout_init_state = vic_init_state + else: + if config['PLUGINS'].get('forecast_rout_init_state'): + rout_init_state = config['PLUGINS']['forecast_rout_init_state'] + else: + rout_init_state = None + + # Determine RAT start date to be used (corresponding to vic init state) + if(isinstance(vic_init_state, datetime.date)): + rat_start_date = vic_init_state + # If vic init state path file is available, use forecast_gen_start_date as start date + else: + rat_start_date = forecast_gen_start_date + + # define and create directories + hindcast_nc_path = basin_data_dir / 'pre_processing' / 'nc' / 'combined_data.nc' + low_latency_nc_path = basin_data_dir / 'pre_processing' / 'nc' / 'low_latency_combined.nc' + combined_nc_path = basin_data_dir / 'pre_processing' / 'nc' / 'forecast_combined.nc' + metsim_inputs_dir = basin_data_dir / 'metsim' / 'metsim_inputs' + basingridfile_path = basin_data_dir / 'basin_grid_data' / f'{basin_name}_grid_mask.tif' + forecast_data_dir = basin_data_dir / 'forecast' + raw_gefs_chirps_dir = forecast_data_dir / 'gefs-chirps' / 'raw' + processed_gefs_chirps_dir = forecast_data_dir / 'gefs-chirps' / 'processed' + gfs_dir = forecast_data_dir / 'gfs' + raw_gfs_dir = gfs_dir / 'raw' + extracted_gfs_dir = gfs_dir / 'extracted' + processed_gfs_dir = gfs_dir / 'processed' + s_area_dir = basin_data_dir / 'final_outputs' / 'sarea_tmsos' + + # List of Dates on which to generate forecast + forecast_basedates = pd.date_range(forecast_gen_start_date, forecast_gen_end_date) + + # Get Storage scenarios + outflow_storage_scenarios = config['PLUGINS'].get('forecast_storage_scenario') + + # Get list of storage percent left if 'ST' in scenario. + if 'ST' in outflow_storage_scenarios: + percent_st_change = config['PLUGINS'].get('forecast_storage_change_percent_of_smax') + actual_storage_scenario = True + # If list is not available raise warning and remove the 'ST' scenario. + if not percent_st_change: + outflow_storage_scenarios.remove('ST') + actual_storage_scenario = False + raise Warning("List of storage change scenarios as percent of smax is not available. Please provide 'forecast_storage_change_percent_of_smax' in the RAT config file.") + else: + actual_storage_scenario = False + + # For each basedate, generate forecasting results by running RAT + for basedate in forecast_basedates: + forecast_lead_enddate = basedate + pd.Timedelta(days=lead_time) + forecast_lead_startdate = basedate + pd.Timedelta(days=1) + + # Determine whether to use low_latency_combined_nc ( if basedate is not in hindcast combinedNC) + low_latency_data_need = not check_date_in_netcdf(hindcast_nc_path, basedate) + + # Define directories for basedate + forecast_inflow_dst_dir = basin_data_dir / 'rat_outputs' / 'forecast_inflow' / f"{basedate:%Y%m%d}" + basin_reservoir_shpfile_path = Path(basin_data_dir) / 'gee' / 'gee_basin_params' / 'basin_reservoirs.shp' + final_inflow_out_dir = basin_data_dir / 'final_outputs' / 'forecast_inflow' / f"{basedate:%Y%m%d}" + final_evap_out_dir = basin_data_dir / 'final_outputs' / 'forecast_evaporation' / f"{basedate:%Y%m%d}" + evap_dir = basin_data_dir / 'rat_outputs' / 'forecast_evaporation' / f"{basedate:%Y%m%d}" + outflow_forecast_dir = basin_data_dir / 'rat_outputs' / 'forecast_outflow' / f'{basedate:%Y%m%d}' + final_outflow_out_dir = basin_data_dir / 'final_outputs' / 'forecast_outflow' / f'{basedate:%Y%m%d}' + final_dels_out_dir = basin_data_dir / 'final_outputs' / 'forecast_dels' / f'{basedate:%Y%m%d}' + final_sarea_out_dir = basin_data_dir / 'final_outputs' / 'forecast_sarea' / f'{basedate:%Y%m%d}' + + for d in [ + raw_gefs_chirps_dir, processed_gefs_chirps_dir, raw_gfs_dir, extracted_gfs_dir, processed_gfs_dir, outflow_forecast_dir, + final_evap_out_dir, final_inflow_out_dir, final_outflow_out_dir + ]: + d.mkdir(parents=True, exist_ok=True) + + # cleanup previous runs + vic_forecast_input_dir = basin_data_dir / 'vic' / 'forecast_vic_inputs' + [f.unlink() for f in vic_forecast_input_dir.glob("*") if f.is_file()] + vic_forecast_output_dir = basin_data_dir / 'vic' / 'forecast_vic_outputs' + [f.unlink() for f in vic_forecast_output_dir.glob("*") if f.is_file()] + vic_forecast_state_dir = basin_data_dir / 'vic' / 'forecast_vic_state' + [f.unlink() for f in vic_forecast_state_dir.glob("*") if f.is_file()] + combined_nc_path.unlink() if combined_nc_path.is_file() else None + rout_forecast_state_dir = basin_data_dir / 'rout' / 'forecast_rout_state_file' + [f.unlink() for f in rout_forecast_state_dir.glob("*") if f.is_file()] + + + # RAT STEP-1 (Forecasting) Download and process GEFS-CHIRPS data + get_gefs_precip(basin_bounds, raw_gefs_chirps_dir, processed_gefs_chirps_dir, basedate, lead_time) + + # RAT STEP-1 (Forecasting) Download and process GFS data + get_GFS_data(basedate, lead_time, basin_bounds, gfs_dir) + + # RAT STEP-2 (Forecasting) make combined nc + CombinedNC( + basedate, forecast_lead_enddate, None, + basingridfile_path, combined_nc_path, False, + forecast_data_dir, basedate + ) + + # RAT STEP-2 (Forecasting) generate metsim inputs + if low_latency_data_need: + file_paths_to_combine = [hindcast_nc_path, low_latency_nc_path, combined_nc_path] + low_latency_data_first_date = get_first_date_from_netcdf(low_latency_nc_path) + start_dates_to_combine = [low_latency_data_first_date,forecast_lead_startdate.to_pydatetime()] + else: + file_paths_to_combine = [hindcast_nc_path, combined_nc_path] + start_dates_to_combine = [forecast_lead_startdate.to_pydatetime()] + generate_metsim_state_and_inputs_with_multiple_nc_files( + nc_file_paths=file_paths_to_combine, + start_dates= start_dates_to_combine, + out_dir= metsim_inputs_dir, + forcings_start_date= rat_start_date, + forcings_end_date= forecast_lead_enddate, + forecast_mode= True + ) + + # change config to only run metsim-routing + config['BASIN']['vic_init_state'] = vic_init_state + config['BASIN']['rout_init_state'] = rout_init_state + config['GLOBAL']['steps'] = [3, 4, 5, 6, 7, 8, 13] # only run metsim-routing and inflow file generation + config['BASIN']['start'] = rat_start_date + config['BASIN']['end'] = forecast_lead_enddate + config['BASIN']['spin_up'] = False + + # run RAT with forecasting parameters + no_errors, _ = rat_basin(config, rat_logger, forecast_mode=True, forecast_basedate=basedate) + + # generate outflow forecast + forecast_outflow( + basedate, lead_time, basin_data_dir, basin_reservoir_shpfile_path, reservoirs_gdf_column_dict, forecast_reservoir_shpfile_column_dict, rule_curve_dir, + scenarios = outflow_storage_scenarios, + st_percSmaxes = percent_st_change[:], + actual_st_left=actual_storage_scenario + ) + + # RAT STEP-14 (Forecasting) convert forecast inflow and evaporation + convert_forecast_inflow(forecast_inflow_dst_dir, basin_reservoir_shpfile_path, reservoirs_gdf_column_dict, final_inflow_out_dir, basedate) + convert_forecast_evaporation(evap_dir, final_evap_out_dir) + convert_forecast_outflow_states(outflow_forecast_dir, final_outflow_out_dir, final_dels_out_dir, final_sarea_out_dir) + + return no_errors \ No newline at end of file diff --git a/src/rat/plugins/forecasting.py b/src/rat/plugins/forecasting/forecasting.py similarity index 74% rename from src/rat/plugins/forecasting.py rename to src/rat/plugins/forecasting/forecasting.py index e1bf28f..b34eaf8 100644 --- a/src/rat/plugins/forecasting.py +++ b/src/rat/plugins/forecasting/forecasting.py @@ -1,5 +1,4 @@ import xarray as xr -import rioxarray as rxr import numpy as np import requests import dask @@ -9,20 +8,14 @@ from pathlib import Path import tempfile import os -import subprocess import shutil import numpy as np from datetime import date from rat.utils.run_command import run_command -import ruamel_yaml as ryaml -# import cfgrib -from rat.data_processing.metsim_input_processing import CombinedNC -from rat.rat_basin import rat_basin -from rat.toolbox.config import update_config - +from rat.toolbox.MiscUtil import get_quantile_from_area # Obtain GEFS precip data -def download_gefs_chirps(basedate, lead_time, save_dir): +def download_gefs_chirps(basedate, lead_time, save_dir, low_latency=False): """Download forecast precipitation from GEFS-CHIRPS. Args: @@ -33,8 +26,12 @@ def download_gefs_chirps(basedate, lead_time, save_dir): assert lead_time <= 15, "Maximum lead time is 15 days for GEFS-CHIRPS." forecast_dates = pd.date_range(basedate, basedate + pd.Timedelta(days=lead_time)) + if low_latency: + start_index=0 + else: + start_index=1 - for forecast_date in forecast_dates[1:]: # Skip the first day, download forecast from next day + for forecast_date in forecast_dates[start_index:]: # Skip the first day, download forecast from next day but if latency mode is being used, download will start first day by = basedate.year bm = basedate.month bd = basedate.day @@ -119,10 +116,10 @@ def process_gefs_chirps( return date, 'Precipitaion', STATUS -def get_gefs_precip(basin_bounds, forecast_raw_dir, forecast_processed_dir, begin, lead_time, temp_datadir=None): +def get_gefs_precip(basin_bounds, forecast_raw_dir, forecast_processed_dir, begin, lead_time, low_latency=False, temp_datadir=None): """Download and process forecast data for a day.""" # download data - download_gefs_chirps(begin, lead_time, forecast_raw_dir) + download_gefs_chirps(begin, lead_time, forecast_raw_dir, low_latency) # process data futures = [] @@ -145,12 +142,12 @@ def get_gefs_precip(basin_bounds, forecast_raw_dir, forecast_processed_dir, begi def download_GFS_files( - basedate, lead_time, save_dir + basedate, lead_time, save_dir, hour_of_day=0 ): """Download GFS files for a day and saves it with a consistent naming format for further processing.""" save_dir = Path(save_dir) save_dir.mkdir(parents=True, exist_ok=True) - hours = range(24, 16*24+1, 24) + hours = range(bool(lead_time)*24+hour_of_day, lead_time*24+hour_of_day+1, 24) # Added 1 in the end of range because it is exclusive. So just increase by 1 # determine where to download from. nomads has data for the last 10 days if basedate > pd.Timestamp(date.today()) - pd.Timedelta(days=10): @@ -300,14 +297,15 @@ def process_GFS_file(fn, basin_bounds, gfs_dir): res = run_command(cmd)#, stdout=subprocess.PIPE, stderr=subprocess.PIPE) # print(f"{basedate:%Y-%m-%d} - {var} - (4) Converted to .asc") -def process_GFS_files(basedate, lead_time, basin_bounds, gfs_dir): +def process_GFS_files(basedate, lead_time, basin_bounds, gfs_dir, hour_of_day=0): """Extracts only the required meteorological variables and converts from GRIB2 format to netcdf Args: - fns (string): Path of GRIB2 GFS file + fns (string):, + hour_of_day=hour_of_day Path of GRIB2 GFS file """ gfs_dir = Path(gfs_dir) - hours = range(24, lead_time*24+1, 24) + hours = range(bool(lead_time)*24+hour_of_day, lead_time*24+hour_of_day+1, 24) forecasted_dates = [basedate + pd.Timedelta(h, 'hours') for h in hours] in_fns = [ @@ -318,7 +316,7 @@ def process_GFS_files(basedate, lead_time, basin_bounds, gfs_dir): for i, in_fn in enumerate(in_fns): process_GFS_file(in_fn, basin_bounds, gfs_dir) -def get_GFS_data(basedate, lead_time, basin_bounds, gfs_dir): +def get_GFS_data(basedate, lead_time, basin_bounds, gfs_dir, hour_of_day=0): """Extracts only the required meteorological variables and converts from GRIB2 format to netcdf Args: @@ -328,6 +326,7 @@ def get_GFS_data(basedate, lead_time, basin_bounds, gfs_dir): basedate, lead_time=lead_time, save_dir=gfs_dir / "raw", + hour_of_day=hour_of_day ) extract_gribs(gfs_dir, basedate) @@ -336,7 +335,8 @@ def get_GFS_data(basedate, lead_time, basin_bounds, gfs_dir): basedate, lead_time, basin_bounds, - gfs_dir + gfs_dir, + hour_of_day=hour_of_day ) def forecast_scenario_custom_wl(forecast_outflow, initial_sa, aec_data, cust_wl): @@ -491,7 +491,7 @@ def forecast_scenario_gates_closed(forecast_outflow, initial_sa, aec_data): def forecast_scenario_st(forecast_outflow, initial_sa, cust_st, s_max, aec_data, st_percSmax): #Creating positive and negative values for permissible storage cases st_percSmax = np.array(st_percSmax) - st_percSmax = np.append(st_percSmax, -st_percSmax) + # st_percSmax = np.append(st_percSmax, -st_percSmax) #Converting storage cases from %Smax to absolute volumes st_volumes = np.array(st_percSmax)/100*s_max*1E6 @@ -508,6 +508,8 @@ def forecast_scenario_st(forecast_outflow, initial_sa, cust_st, s_max, aec_data, curr_sa = initial_sa curr_elevation = np.interp(curr_sa, aec_data['area'], aec_data['elevation']) cum_netInflow = forecast_outflow['inflow'].sum() - forecast_outflow['evaporation'].sum() + dels_remaining = vol + dels_stored = 0 for iter,inflow in enumerate(forecast_outflow['inflow'].values): if(cum_netInflow < vol): delS.append(inflow - forecast_outflow['evaporation'].values[iter]) @@ -519,9 +521,22 @@ def forecast_scenario_st(forecast_outflow, initial_sa, cust_st, s_max, aec_data, sarea.append(curr_sa) outflow.append(0) else: - net_delS = cum_netInflow - vol - outflow.append(net_delS/15) - delS.append(inflow - outflow[-1]) + I_E = inflow - forecast_outflow['evaporation'].values[iter] + if dels_remaining > 0: + if I_E <= dels_remaining: + O = 0 + dels_stored = I_E + dels_remaining = dels_remaining - dels_stored + else: + O = I_E - dels_remaining + dels_stored = dels_remaining + dels_remaining = 0 + else: + # If no remaining storage capacity, release all incoming water minus evaporation + O = I_E + dels_stored = 0 # No water is stored + outflow.append(O) + delS.append(dels_stored) delH.append(delS[-1]/(curr_sa*1E6)) curr_elevation = np.interp(curr_sa, aec_data['area'], aec_data['elevation']) new_elevation = curr_elevation + delH[-1] @@ -560,7 +575,8 @@ def forecast_outflow_for_res( rule_curve = None, st_percSmax = [0.5, 1, 2.5], cust_st = None, - output_path = None + output_path = None, + actual_st_left = False ): ''' Generates forecasted outflow for RAT given forecasted inflow time series data based on specific delS scenarios. @@ -601,8 +617,14 @@ def forecast_outflow_for_res( output_path: str If provided, generates a netcdf file at the specified path containing the forecasted outflow results. ''' + # If actual_st_left is true + if actual_st_left and 'ST' in dels_scenario: + actual_st_left_percent = np.round(100*(1-get_quantile_from_area(sarea_fp)),1) + st_percSmax.append(actual_st_left_percent) + # Reading forecasted inflow data, computing forecasting period as base date to base date + 15 days forecast_inflow = pd.read_csv(forecast_inflow_fp, parse_dates=['date']).set_index('date').rename({'streamflow': 'inflow'}, axis=1).to_xarray() + forecast_inflow = forecast_inflow*86400 # convert m3/s to m3/day base_date = pd.to_datetime(base_date) base_date_15lead = base_date + pd.Timedelta(days=forecast_lead_time) @@ -671,111 +693,73 @@ def forecast_outflow( forecast_reservoir_shpfile_column_dict, rule_curve_dir, scenarios = ['GC', 'GO', 'RC', 'ST'], st_percSmaxes = [0.5, 1, 2.5], + actual_st_left = False ): reservoirs = gpd.read_file(reservoir_shpfile) reservoirs['Inflow_filename'] = reservoirs[reservoir_shpfile_column_dict['unique_identifier']].astype(str) for res_name in reservoirs['Inflow_filename'].tolist(): res_scenarios = scenarios.copy() - forecast_inflow_fp = basin_data_dir / 'rat_outputs' / 'forecast_inflow' / f'{basedate:%Y%m%d}' / f'{res_name}.csv' - forecast_evap_fp = basin_data_dir / 'rat_outputs' / 'forecast_evaporation' / f'{basedate:%Y%m%d}' / f'{res_name}.csv' - sarea_fp = basin_data_dir / 'gee' / 'gee_sarea_tmsos' / f'{res_name}.csv' - aec_fp = basin_data_dir / 'final_outputs' / 'aec' / f'{res_name}.csv' - output_fp = basin_data_dir / 'rat_outputs' / 'forecast_outflow' / f'{basedate:%Y%m%d}' / f'{res_name}.csv' - output_fp.parent.mkdir(parents=True, exist_ok=True) - - s_max = reservoirs[reservoirs[reservoir_shpfile_column_dict['unique_identifier']] == res_name][forecast_reservoir_shpfile_column_dict['column_capacity']].values[0] - reservoir_id = reservoirs[reservoirs[reservoir_shpfile_column_dict['unique_identifier']] == res_name][forecast_reservoir_shpfile_column_dict['column_id']].values[0] - - if np.isnan(s_max): - res_scenarios.remove('ST') - - if np.isnan(reservoir_id): - res_scenarios.remove('RC') - rule_curve_fp = None - else: - rule_curve_fp = rule_curve_dir / f'{reservoir_id}.txt' - - for scenario in res_scenarios: - forecast_outflow_for_res( - base_date = basedate, - forecast_lead_time = lead_time, - forecast_inflow_fp = forecast_inflow_fp, - evap_fp = forecast_evap_fp, - sarea_fp = sarea_fp, - aec_fp = aec_fp, - dels_scenario = scenario, - s_max = s_max, - rule_curve = rule_curve_fp, - st_percSmax = st_percSmaxes, - output_path = output_fp - ) - - -def generate_forecast_state_and_inputs( - forecast_startdate, # forecast start date - forecast_enddate, # forecast end date - hindcast_combined_datapath, - forecast_combined_datapath, - out_dir - ): - out_dir = Path(out_dir) - hindcast_combined_data = xr.open_dataset(hindcast_combined_datapath) - forecast_combined_data = xr.open_dataset(forecast_combined_datapath) - - # check if tmin < tmax. If not, set tmin = tmax - forecast_combined_data['tmin'] = xr.where( - forecast_combined_data['tmin'] < forecast_combined_data['tmax'], - forecast_combined_data['tmin'], - forecast_combined_data['tmax'] - ) - - hindcast_combined_data = hindcast_combined_data.sel(time=slice(None, forecast_startdate)) - - combined_data = xr.concat([ - hindcast_combined_data, forecast_combined_data - ], dim="time") - - state_startdate = forecast_startdate - pd.Timedelta(days=90) - state_enddate = forecast_startdate - pd.Timedelta(days=1) - - state_ds = combined_data.sel(time=slice(state_startdate, state_enddate)) - state_outpath = out_dir / "forecast_state.nc" - state_ds.to_netcdf(state_outpath) - - # # Generate the metsim input - forcings_ds = combined_data.sel(time=slice(forecast_startdate, forecast_enddate)) - forcings_outpath = out_dir / "forecast_metsim_input.nc" - print(f"Saving forcings: {forcings_outpath}") - forcings_ds.to_netcdf(forcings_outpath) - - return state_outpath, forcings_outpath - - -def convert_forecast_inflow(inflow_dir, reservoir_shpfile, reservoir_shpfile_column_dict, final_out_inflow_dir, basedate): + observed_outflow_fp = basin_data_dir / 'final_outputs' / 'outflow' / f'{res_name}.csv' + # Calculate forecast outflow only if observed outflow exists. Will skip for guages. + if (observed_outflow_fp.exists()): + + forecast_inflow_fp = basin_data_dir / 'rat_outputs' / 'forecast_inflow' / f'{basedate:%Y%m%d}' / f'{res_name}.csv' + forecast_evap_fp = basin_data_dir / 'rat_outputs' / 'forecast_evaporation' / f'{basedate:%Y%m%d}' / f'{res_name}.csv' + sarea_fp = basin_data_dir / 'gee' / 'gee_sarea_tmsos' / f'{res_name}.csv' + aec_fp = basin_data_dir / 'final_outputs' / 'aec' / f'{res_name}.csv' + output_fp = basin_data_dir / 'rat_outputs' / 'forecast_outflow' / f'{basedate:%Y%m%d}' / f'{res_name}.csv' + output_fp.parent.mkdir(parents=True, exist_ok=True) + + s_max = reservoirs[reservoirs[reservoir_shpfile_column_dict['unique_identifier']] == res_name][forecast_reservoir_shpfile_column_dict['column_capacity']].values[0] + reservoir_id = reservoirs[reservoirs[reservoir_shpfile_column_dict['unique_identifier']] == res_name][forecast_reservoir_shpfile_column_dict['column_id']].values[0] + + if np.isnan(s_max) and 'ST' in res_scenarios: + res_scenarios.remove('ST') + + if (np.isnan(reservoir_id) or rule_curve_dir) and 'RC' in res_scenarios: + res_scenarios.remove('RC') + rule_curve_fp = None + else: + rule_curve_fp = rule_curve_dir / f'{reservoir_id}.txt' + + for scenario in res_scenarios: + forecast_outflow_for_res( + base_date = basedate, + forecast_lead_time = lead_time, + forecast_inflow_fp = forecast_inflow_fp, + evap_fp = forecast_evap_fp, + sarea_fp = sarea_fp, + aec_fp = aec_fp, + dels_scenario = scenario, + s_max = s_max, + rule_curve = rule_curve_fp, + st_percSmax = st_percSmaxes[:], + output_path = output_fp, + actual_st_left = actual_st_left + ) + +def convert_forecast_inflow(forecast_inflow_dir, reservoir_shpfile, reservoir_shpfile_column_dict, final_out_forecast_inflow_dir, basedate): # Inflow reservoirs = gpd.read_file(reservoir_shpfile) reservoirs['Inflow_filename'] = reservoirs[reservoir_shpfile_column_dict['unique_identifier']].astype(str) - inflow_paths = list(Path(inflow_dir).glob('*.csv')) - final_out_inflow_dir.mkdir(exist_ok=True) + forecast_inflow_paths = list(Path(forecast_inflow_dir).glob('*.csv')) + final_out_forecast_inflow_dir.mkdir(exist_ok=True) - for inflow_path in inflow_paths: - res_name = os.path.splitext(os.path.split(inflow_path)[-1])[0] + for forecast_inflow_path in forecast_inflow_paths: + res_name = os.path.splitext(os.path.split(forecast_inflow_path)[-1])[0] - if res_name in reservoirs['Inflow_filename'].tolist(): - savepath = final_out_inflow_dir / inflow_path.name + savepath = final_out_forecast_inflow_dir / forecast_inflow_path.name - df = pd.read_csv(inflow_path, parse_dates=['date']) - df['inflow (m3/d)'] = df['streamflow'] * (24*60*60) # indicate units, convert from m3/s to m3/d - df = df[['date', 'inflow (m3/d)']] + forecast_df = pd.read_csv(forecast_inflow_path, parse_dates=['date']) + forecast_df['inflow (m3/d)'] = forecast_df['streamflow'] * (24*60*60) # indicate units, convert from m3/s to m3/d + forecast_df = forecast_df[['date', 'inflow (m3/d)']] - print(f"Converting [Inflow]: {res_name}") - df = df[df['date'] > basedate] - df.to_csv(savepath, index=False) - print(df.tail()) - else: - print(f"Skipping {res_name} as its inflow file is not available.") + print(f"Converting [Inflow]: {res_name}") + forecast_df = forecast_df[forecast_df['date'] >= basedate] + forecast_df.to_csv(savepath, index=False) + print(forecast_df.tail()) def convert_forecast_evaporation(evap_dir, final_evap_dir): @@ -832,7 +816,7 @@ def convert_forecast_outflow_states(outflow_dir, final_outflow_dir, final_dels_d converted_col_name = f'outflow (m3/d) [case: {outflow_case}]' col_names.append(converted_col_name) outflow_df.loc[outflow_df[outflow_col]<0, outflow_col] = 0 - outflow_df[converted_col_name] = outflow_df[outflow_col] * (24*60*60) # indicate units, convert from m3/s to m3/d + outflow_df[converted_col_name] = outflow_df[outflow_col] # Units are already in m3/d outflow_df = outflow_df[['date', *col_names]] final_outflow_dir.mkdir(parents=True, exist_ok=True) outflow_df.to_csv(outflow_savefp, index=False) @@ -874,119 +858,3 @@ def convert_forecast_outflow_states(outflow_dir, final_outflow_dir, final_dels_d sarea_df.to_csv(sarea_savefp, index=False) -def forecast(config, rat_logger): - """Function to run the forecasting plugin. - - Args: - config (dict): Dictionary containing the configuration parameters. - rat_logger (Logger): Logger object - """ - print("Forecasting Plugin Started") - # read necessary parameters from config - basins_shapefile_path = config['GLOBAL']['basin_shpfile'] # Shapefile containg information of basin(s)- geometry and attributes - basins_shapefile = gpd.read_file(basins_shapefile_path) # Reading basins_shapefile_path to get basin polygons and their attributes - basins_shapefile_column_dict = config['GLOBAL']['basin_shpfile_column_dict'] # Dictionary of column names in basins_shapefile, Must contain 'id' field - region_name = config['BASIN']['region_name'] # Major basin name used to cluster multiple basins data in data-directory - basin_name = config['BASIN']['basin_name'] # Basin name used to save basin related data - basin_id = config['BASIN']['basin_id'] # Unique identifier for each basin used to map basin polygon in basins_shapefile - basin_data = basins_shapefile[basins_shapefile[basins_shapefile_column_dict['id']]==basin_id] # Getting the particular basin related information corresponding to basin_id - basin_bounds = basin_data.bounds # Obtaining bounds of the particular basin - basin_bounds = np.array(basin_bounds)[0] - basin_data_dir = Path(config['GLOBAL']['data_dir']) / region_name / 'basins' / basin_name - rule_curve_dir = Path(config['PLUGINS']['forecast_rule_curve_dir']) - reservoirs_gdf_column_dict = config['GEE']['reservoir_vector_file_columns_dict'] - forecast_reservoir_shpfile_column_dict = config['PLUGINS']['forecast_reservoir_shpfile_column_dict'] - if (config['ROUTING']['station_global_data']): - reservoirs_gdf_column_dict['unique_identifier'] = 'uniq_id' - else: - reservoirs_gdf_column_dict['unique_identifier'] = reservoirs_gdf_column_dict['dam_name_column'] - - # determine forecast related dates - basedate, lead time and enddate - if config['PLUGINS']['forecast_start_date'] == 'end_date': - basedate = pd.to_datetime(config['BASIN']['end']) - else: - basedate = pd.to_datetime(config['PLUGINS']['forecast_start_date']) - lead_time = config['PLUGINS']['forecast_lead_time'] - forecast_enddate = basedate + pd.Timedelta(days=lead_time) - - # define and create directories - hindcast_nc_path = basin_data_dir / 'pre_processing' / 'nc' / 'combined_data.nc' - combined_nc_path = basin_data_dir / 'pre_processing' / 'nc' / 'forecast_combined.nc' - metsim_inputs_dir = basin_data_dir / 'metsim' / 'metsim_inputs' - basingridfile_path = basin_data_dir / 'basin_grid_data' / f'{basin_name}_grid_mask.tif' - forecast_data_dir = basin_data_dir / 'forecast' - raw_gefs_chirps_dir = forecast_data_dir / 'gefs-chirps' / 'raw' - processed_gefs_chirps_dir = forecast_data_dir / 'gefs-chirps' / 'processed' - gfs_dir = forecast_data_dir / 'gfs' - raw_gfs_dir = gfs_dir / 'raw' - extracted_gfs_dir = gfs_dir / 'extracted' - processed_gfs_dir = gfs_dir / 'processed' - inflow_dst_dir = basin_data_dir / 'rat_outputs' / 'forecast_inflow' / f"{basedate:%Y%m%d}" - basin_reservoir_shpfile_path = Path(basin_data_dir) / 'gee' / 'gee_basin_params' / 'basin_reservoirs.shp' - final_inflow_out_dir = basin_data_dir / 'final_outputs' / 'forecast_inflow' / f"{basedate:%Y%m%d}" - final_evap_out_dir = basin_data_dir / 'final_outputs' / 'forecast_evaporation' / f"{basedate:%Y%m%d}" - evap_dir = basin_data_dir / 'rat_outputs' / 'forecast_evaporation' / f"{basedate:%Y%m%d}" - outflow_forecast_dir = basin_data_dir / 'rat_outputs' / 'forecast_outflow' / f'{basedate:%Y%m%d}' - final_outflow_out_dir = basin_data_dir / 'final_outputs' / 'forecast_outflow' / f'{basedate:%Y%m%d}' - final_dels_out_dir = basin_data_dir / 'final_outputs' / 'forecast_dels' / f'{basedate:%Y%m%d}' - final_sarea_out_dir = basin_data_dir / 'final_outputs' / 'forecast_sarea' / f'{basedate:%Y%m%d}' - - for d in [ - raw_gefs_chirps_dir, processed_gefs_chirps_dir, raw_gfs_dir, extracted_gfs_dir, processed_gfs_dir, outflow_forecast_dir, - final_evap_out_dir, final_inflow_out_dir, final_outflow_out_dir - ]: - d.mkdir(parents=True, exist_ok=True) - - # cleanup previous runs - vic_forecast_input_dir = basin_data_dir / 'vic' / 'forecast_vic_inputs' - [f.unlink() for f in vic_forecast_input_dir.glob("*") if f.is_file()] - vic_forecast_state_dir = basin_data_dir / 'vic' / 'forecast_vic_state' - [f.unlink() for f in vic_forecast_state_dir.glob("*") if f.is_file()] - combined_nc_path.unlink() if combined_nc_path.is_file() else None - rout_forecast_state_dir = basin_data_dir / 'rout' / 'forecast_rout_state_file' - [f.unlink() for f in rout_forecast_state_dir.glob("*") if f.is_file()] - - - # RAT STEP-1 (Forecasting) Download and process GEFS-CHIRPS data - get_gefs_precip(basin_bounds, raw_gefs_chirps_dir, processed_gefs_chirps_dir, basedate, lead_time) - - # RAT STEP-1 (Forecasting) Download and process GFS data - get_GFS_data(basedate, lead_time, basin_bounds, gfs_dir) - - # RAT STEP-2 (Forecasting) make combined nc - CombinedNC( - basedate, forecast_enddate, None, - basingridfile_path, combined_nc_path, False, - forecast_data_dir, basedate - ) - - # RAT STEP-2 (Forecasting) generate metsim inputs - generate_forecast_state_and_inputs( - basedate, forecast_enddate, - hindcast_nc_path, combined_nc_path, - metsim_inputs_dir - ) - - # change config to only run metsim-routing - config['BASIN']['vic_init_state'] = config['BASIN']['end'] # assuming vic_init_state is available for the end date - config['GLOBAL']['steps'] = [3, 4, 5, 6, 7, 8, 13] # only run metsim-routing and inflow file generation - config['BASIN']['start'] = basedate - config['BASIN']['end'] = forecast_enddate - config['BASIN']['spin_up'] = False - - # run RAT with forecasting parameters - no_errors, _ = rat_basin(config, rat_logger, forecast_mode=True) - - # generate outflow forecast - forecast_outflow( - basedate, lead_time, basin_data_dir, basin_reservoir_shpfile_path, reservoirs_gdf_column_dict, forecast_reservoir_shpfile_column_dict, rule_curve_dir, - scenarios = ['GC', 'GO', 'RC', 'ST'], - st_percSmaxes = [0.5, 1, 2.5] - ) - - # RAT STEP-14 (Forecasting) convert forecast inflow and evaporation - convert_forecast_inflow(inflow_dst_dir, basin_reservoir_shpfile_path, reservoirs_gdf_column_dict, final_inflow_out_dir, basedate) - convert_forecast_evaporation(evap_dir, final_evap_out_dir) - convert_forecast_outflow_states(outflow_forecast_dir, final_outflow_out_dir, final_dels_out_dir, final_sarea_out_dir) - - return no_errors \ No newline at end of file diff --git a/src/rat/rat_basin.py b/src/rat/rat_basin.py index a76fd1f..4ad8ae1 100644 --- a/src/rat/rat_basin.py +++ b/src/rat/rat_basin.py @@ -16,8 +16,9 @@ from rat.utils.clean import Clean from rat.data_processing.newdata import get_newdata +from rat.data_processing.newdata_low_latency import get_newdata_low_latency -from rat.data_processing.metsim_input_processing import CombinedNC,generate_state_and_inputs +from rat.data_processing.metsim_input_processing import CombinedNC,generate_state_and_inputs, generate_metsim_state_and_inputs_with_multiple_nc_files from rat.utils.metsim_param_reader import MSParameterFile from rat.core.run_metsim import MetSimRunner @@ -58,12 +59,15 @@ #module-5 step-13 outflow (mass-balance) #RAT using all modules and step-14 to produce final outputs -def rat_basin(config, rat_logger, forecast_mode=False): +def rat_basin(config, rat_logger, forecast_mode=False, gfs_days=0, forecast_basedate=None): """Runs RAT as per configuration defined in `config_fn` for one single basin. parameters: - config_fn (str): Path to the configuration file for one basin and not for multiple basins. To run rat for multiple basins use run_rat() - + config_fn (str): Path to the configuration file for one basin and not for multiple basins. To run rat for multiple basins use run_rat(). + rat_logger (logging object): Logger to use for logging of level 2 file with complete details. + forecast_mode (bool): Run rat_basin in forecast mode if True. + gfs_days (int): Number of most recent days for which GFS data will be used due to low latency. + forecast_basedate (datetime.datetime): Date on which forecasts are being generated. It does not need to be same as RAT start date. returns: no_errors (int): Number of errors/exceptions occurred while running rat latest_altimetry_cycle (int): Latest altimetry jason3 cycle number if rat runs step 11 @@ -74,7 +78,7 @@ def rat_basin(config, rat_logger, forecast_mode=False): no_errors = 0 # Tracking latest altimetry cycle number if step 11 is run, otherwise return None - latest_altimetry_cycle = None + latest_altimetry_cycle = None ######### Step -1 Mandatory Step try: @@ -86,7 +90,7 @@ def rat_basin(config, rat_logger, forecast_mode=False): warn_message = "The parameter 'vic_init_state_date' has been deprecated and will not be supported in future versions. Please use 'vic_init_state' instead." warnings.warn(warn_message, DeprecationWarning, stacklevel=2) rat_logger.warning(warn_message) - config['vic_init_state'] = config['BASIN'].get('vic_init_state_date') + config['BASIN']['vic_init_state'] = config['BASIN'].get('vic_init_state_date') # Reading steps to be executed in RAT otherwise seting default value if config['GLOBAL'].get('steps'): @@ -125,7 +129,6 @@ def rat_basin(config, rat_logger, forecast_mode=False): #config['BASIN']['begin'] = datetime.datetime.combine(config['BASIN']['begin'], datetime.time.min) config['BASIN']['start'] = datetime.datetime.combine(config['BASIN']['start'], datetime.time.min) config['BASIN']['end'] = datetime.datetime.combine(config['BASIN']['end'], datetime.time.min) - rout_init_state_save_date = config['BASIN']['end'] if(not config['BASIN']['spin_up']): if(isinstance(config['BASIN'].get('vic_init_state'), datetime.date)): # If vic_init_state is a datetime.date instance and not a file path @@ -142,14 +145,14 @@ def rat_basin(config, rat_logger, forecast_mode=False): rout_init_state = None # No initial state of Routing is present as running RAT for first time in this basin gee_start_date = user_given_start # Run gee from the date provided by user and not spin-off start date. elif(config['BASIN'].get('vic_init_state')): - data_download_start = config['BASIN']['start'] # Downloading data from the same date as we want to run RAT from [will be changed if vic_init_state is ot Date] + data_download_start = config['BASIN']['start'] # Downloading data from the same date as we want to run RAT from [will be changed if vic_init_state is a Date] vic_init_state = config['BASIN']['vic_init_state'] # Date or File path of the vic_init_state use_state = True # Routing state file will be used ## Assuming that if vic_init_state is file then the user doesn't have previous data to use if(isinstance(config['BASIN'].get('vic_init_state'), datetime.date)): use_previous_data = True # Previous Combined Nc file will be used else: - use_previous_data = False # Previous Combined Nc file won't be used + use_previous_data = False # Previous Combined Nc file won't be used data_download_start = config['BASIN']['start']-datetime.timedelta(days=90) # Downloading 90 days of extra meteorological data for MetSim to prepare it's initial state as won't be using previous data ## Assuming rout_init_state (if not provided) as same date as vic_init_state if it is a date else assigning it the start date if(config['BASIN'].get('rout_init_state')): @@ -166,6 +169,20 @@ def rat_basin(config, rat_logger, forecast_mode=False): use_previous_data = False # Previous Combined Nc file won't be used rout_init_state = None # No initial state of Routing will be used. gee_start_date = config['BASIN']['start'] # Run gee from the date provided by user. + + # Defining dates based on gfs_days. Vic init state date and data download will happend for observed data + vic_init_state_save_date = config['BASIN']['end'] - datetime.timedelta(days=gfs_days) + rout_init_state_save_date = config['BASIN']['end'] - datetime.timedelta(days=gfs_days) + + data_download_end = config['BASIN']['end'] - datetime.timedelta(days=gfs_days) + # Defining dates if gfs_days is not zero. + if gfs_days: + gfs_data_download_start = config['BASIN']['end'] - datetime.timedelta(days=gfs_days-1) + gfs_data_download_end = config['BASIN']['end'] + + + + # Defining logger log = init_logger( @@ -202,8 +219,14 @@ def rat_basin(config, rat_logger, forecast_mode=False): else: rat_logger.info("Read Configuration settings to run RAT.") ##--------------------- Read and initialised global parameters ----------------------## - - rat_logger.info(f"Running RAT from {config['BASIN']['start'] } to {config['BASIN']['end']} which might include spin-up.") + if (config['BASIN']['spin_up']): + rat_logger.info(f"Running RAT from {config['BASIN']['start'] } to {config['BASIN']['end']} which includes spin-up.") + else: + rat_logger.info(f"Running RAT from {config['BASIN']['start'] } to {config['BASIN']['end']}.") + if gfs_days: + rat_logger.info(f"Note 1: Due to low latency availability, GFS daily forecasted data will be used for {gfs_days} most recent days.") + rat_logger.info(f"Note 2: The GFS data will be removed and replaced in next RAT run by observed data, if available.") + rat_logger.info(f"Note 3: Vic init state will be saved for {vic_init_state_save_date}.") ######### Step-0 Mandatory Step try: @@ -278,7 +301,7 @@ def rat_basin(config, rat_logger, forecast_mode=False): rout_dir.mkdir(parents=True, exist_ok=True) if forecast_mode: routing_output_dir = Path(config['GLOBAL']['data_dir']).joinpath(config['BASIN']['region_name'], 'basins', basin_name, 'ro','forecast_ou') - inflow_dst_dir = Path(config['GLOBAL']['data_dir']).joinpath(config['BASIN']['region_name'], 'basins', basin_name, 'rat_outputs', 'forecast_inflow', f"{config['BASIN']['start']:%Y%m%d}") + inflow_dst_dir = Path(config['GLOBAL']['data_dir']).joinpath(config['BASIN']['region_name'], 'basins', basin_name, 'rat_outputs', 'forecast_inflow', f"{forecast_basedate:%Y%m%d}") else: routing_output_dir = Path(config['GLOBAL']['data_dir']).joinpath(config['BASIN']['region_name'], 'basins', basin_name, 'ro','ou') inflow_dst_dir = Path(config['GLOBAL']['data_dir']).joinpath(config['BASIN']['region_name'], 'basins', basin_name, 'rat_outputs', 'inflow') @@ -323,15 +346,25 @@ def rat_basin(config, rat_logger, forecast_mode=False): aec_dir_path = create_directory(os.path.join(basin_data_dir,'post_processing','post_processing_gee_aec',''), True) ## Paths for storing post-processed data and in webformat data if forecast_mode: - evap_savedir = create_directory(os.path.join(basin_data_dir,'rat_outputs', 'forecast_evaporation', f"{config['BASIN']['start']:%Y%m%d}"), True) + evap_savedir = create_directory(os.path.join(basin_data_dir,'rat_outputs', 'forecast_evaporation', f"{forecast_basedate:%Y%m%d}"), True) else: evap_savedir = create_directory(os.path.join(basin_data_dir,'rat_outputs', 'Evaporation'), True) dels_savedir = create_directory(os.path.join(basin_data_dir,'rat_outputs', "dels"), True) outflow_savedir = create_directory(os.path.join(basin_data_dir,'rat_outputs', "rat_outflow"),True) aec_savedir = Path(create_directory(os.path.join(basin_data_dir,'rat_outputs', "aec"),True)) final_output_path = create_directory(os.path.join(basin_data_dir,'final_outputs',''),True) - ## End of defining paths for storing post-processed data and webformat data #----------- Paths Necessary for running of Post-Processing-----------# + + #----------------------- Paths Necessary for running of Low latency Mode using GFS & GEFS data ------------------# + if gfs_days: + low_latency_data_dir = create_directory(Path(basin_data_dir) / 'low_latency', True) + raw_gefs_chirps_dir = create_directory(Path(low_latency_data_dir) / 'gefs-chirps' / 'raw', True) + processed_gefs_chirps_dir = create_directory(Path(low_latency_data_dir) / 'gefs-chirps' / 'processed',True) + gfs_dir = create_directory(Path(low_latency_data_dir) / 'gfs',True) + low_latency_combined_nc_path = Path(basin_data_dir) / 'pre_processing' / 'nc' / 'low_latency_combined.nc' + #----------------------- Paths Necessary for running of Low latency Mode using GFS & GEFS data ------------------# + + ## End of defining paths for storing post-processed data and webformat data except: no_errors = -1 rat_logger.exception("Error in creating required directory structure for RAT") @@ -352,11 +385,20 @@ def rat_basin(config, rat_logger, forecast_mode=False): data_dir= config['GLOBAL']['data_dir'], basin_data_dir= basin_data_dir, startdate= data_download_start, - enddate= config['BASIN']['end'], + enddate= data_download_end, secrets_file= config['CONFIDENTIAL']['secrets'], download= True, process= True ) + if gfs_days: + get_newdata_low_latency( + start=gfs_data_download_start, + end=gfs_data_download_end, + basin_bounds=basin_bounds, + raw_low_latency_dir=raw_gefs_chirps_dir, + processed_low_latency_dir=processed_gefs_chirps_dir, + low_latency_gfs_dir=gfs_dir + ) ##---------- Download Data End -----------# NEW_DATA_STATUS = 1 #Data downloaded successfully except: @@ -381,7 +423,7 @@ def rat_basin(config, rat_logger, forecast_mode=False): #----------- Process Data Begin to combine all var data -----------# CombinedNC( start= data_download_start, - end= config['BASIN']['end'], + end= data_download_end, datadir= processed_datadir, forecast_dir=None, basingridpath= basingridfile_path, @@ -389,16 +431,38 @@ def rat_basin(config, rat_logger, forecast_mode=False): use_previous= use_previous_data, climatological_data=config['METSIM'].get('historical_precipitation') ) + if gfs_days: + CombinedNC( + start= gfs_data_download_start, + end= gfs_data_download_end, + datadir= None, + forecast_dir=None, + basingridpath= basingridfile_path, + outputdir= low_latency_combined_nc_path, + use_previous=False, + low_latency_dir=low_latency_data_dir + ) #----------- Process Data End and combined data created -----------# #------ MetSim Input Data Preparation Begin ------# - # Prepare data to metsim input format - ms_state, ms_input_data = generate_state_and_inputs( - forcings_startdate= config['BASIN']['start'], - forcings_enddate= config['BASIN']['end'], - combined_datapath= combined_datapath, - out_dir= metsim_inputs_dir - ) + # Prepare data to metsim input format in case of non-low latency + if gfs_days==0: + ms_state, ms_input_data = generate_state_and_inputs( + forcings_startdate= config['BASIN']['start'], + forcings_enddate= config['BASIN']['end'], + combined_datapath= combined_datapath, + out_dir= metsim_inputs_dir + ) + + # Prepare data to metsim input format in case of low latency + else: + ms_state, ms_input_data = generate_metsim_state_and_inputs_with_multiple_nc_files( + nc_file_paths=[combined_datapath, low_latency_combined_nc_path], + start_dates=[gfs_data_download_start], + out_dir= metsim_inputs_dir, + forcings_start_date= config['BASIN']['start'], + forcings_end_date= gfs_data_download_end + ) #------- MetSim Input Data Preparation End -------# except: no_errors = no_errors+1 @@ -437,10 +501,10 @@ def rat_basin(config, rat_logger, forecast_mode=False): start= config['BASIN']['start'], end= config['BASIN']['end'], init_param= config['METSIM']['metsim_param_file'], - out_dir= metsim_output_path, - forcings=ms_input_data, - state=ms_state, - domain= domain_nc_path + out_dir= str(metsim_output_path), + forcings=str(ms_input_data), + state=str(ms_state), + domain= str(domain_nc_path) ) as m: ms = MetSimRunner( @@ -506,6 +570,7 @@ def rat_basin(config, rat_logger, forecast_mode=False): forcing_prefix = vic_input_forcing_path, init_state = vic_init_state, init_state_dir=init_state_in_dir, + init_state_save_date= vic_init_state_save_date, init_state_out_dir=init_state_out_dir ) as p: vic = VICRunner( diff --git a/src/rat/run_rat.py b/src/rat/run_rat.py index 8b7f5a4..2c801eb 100644 --- a/src/rat/run_rat.py +++ b/src/rat/run_rat.py @@ -12,11 +12,12 @@ from dask.distributed import Client, LocalCluster from rat.utils.logging import init_logger,close_logger,LOG_LEVEL1_NAME +from rat.utils.vic_init_state_finder import get_vic_init_state_date import rat.ee_utils.ee_config as ee_configuration from rat.rat_basin import rat_basin #------------ Define Variables ------------# -def run_rat(config_fn, operational_latency=None): +def run_rat(config_fn, operational_latency=None ): """Runs RAT as per configuration defined in `config_fn`. parameters: @@ -24,6 +25,9 @@ def run_rat(config_fn, operational_latency=None): operational_latency (int): Number of days in the past from today to end RAT operational run . RAT won't run operationally if operational_latency is None. Default is None. """ + # IMERG Latency (in days) that works fine + low_latency_limit = 3 + # Reading config with comments config_fn = Path(config_fn).resolve() ryaml_client = ryaml.YAML() @@ -62,22 +66,56 @@ def run_rat(config_fn, operational_latency=None): except: log.info("Failed to connect to Earth engine. Wrong credentials. If you want to use Surface Area Estimations from RAT, please update the EE credentials.") - ############ ----------- Single basin run ---------------- ################ + ############################ ----------- Single basin run ---------------- ###################################### if(not config['GLOBAL']['multiple_basin_run']): log.info('############## Starting RAT for '+config['BASIN']['basin_name']+' #################') # Checking if Rat is running operationally with some latency. If yes, update start, end and vic_init_state dates. if operational_latency: + operational_latency = int(operational_latency) + ## Calculation of gfs_days + # If running for more than a latency of 3 days, IMERG data will be there. So data for gfs days is 0. + if operational_latency>low_latency_limit: + gfs_days = 0 + # If running for a lower latency of 0-3 days, GFS data will have to be used for 3-0 days. + else: + gfs_days = low_latency_limit-operational_latency + try: - log.info(f'Running RAT operationally at a latency of {operational_latency}. Updating start and end date.') - config['BASIN']['start'] = copy.deepcopy(config['BASIN']['end']) - config['BASIN']['vic_init_state'] = copy.deepcopy(config['BASIN']['end']) + # RAT has to be run for one overlapping day of IMERG(1) + one new day for IMERG(1) + GFS days + log.info(f'Running RAT operationally at a latency of {operational_latency} day(s) from today. Updating start and end date.') + # Record the previous end date + previous_end_date = copy.deepcopy(config['BASIN']['end']) + # Find vic_init_state_date from last run + config['BASIN']['vic_init_state'] = get_vic_init_state_date(previous_end_date, low_latency_limit, config['GLOBAL']['data_dir'], + config['BASIN']['region_name'], config['BASIN']['basin_name']) + if not(config['BASIN']['vic_init_state']): + raise Exception('No vic init state file was found from last run.') + # Start date will be same as Vic init date because RAT will start from the same date as date of VIC's initial state + config['BASIN']['start'] = copy.deepcopy(config['BASIN']['vic_init_state']) config['BASIN']['rout_init_state'] = None + # End date will be updated to today - latency config['BASIN']['end'] = datetime.datetime.now().date() - datetime.timedelta(days=int(operational_latency)) except: log.exception('Failed to update start and end date for RAT to run operationally. Please make sure RAT has been run atleast once before.') return None - # Running RAT if start < end date + else: + ## Calculation of gfs_days + #If not running operationally, check end date and start date's difference from today + end_date_diff_from_today = datetime.datetime.now().date() - config['BASIN']['end'] + start_date_diff_from_today = datetime.datetime.now().date() - config['BASIN']['start'] + # If difference is more than 3 days, gfs_days will be 0. + if end_date_diff_from_today > datetime.timedelta(days=int(low_latency_limit)): + gfs_days = 0 + # Else if start date and today has less than 3 days difference, gfs_days will be start-end + elif (start_date_diff_from_today= config['BASIN']['end']): log.error('Sorry, RAT operational run for '+config['BASIN']['basin_name']+' failed.') log.error(f"Start date - {config['BASIN']['start']} is before the end date - {config['BASIN']['end']}") @@ -88,16 +126,16 @@ def run_rat(config_fn, operational_latency=None): # Store deep copy of config as it is mutable config_copy = copy.deepcopy(config) # Run RAT for basin - no_errors, latest_altimetry_cycle = rat_basin(config, log) + no_errors, latest_altimetry_cycle = rat_basin(config, log, gfs_days=gfs_days) # Run RAT forecast for basin if forecast is True if config.get('PLUGINS', {}).get('forecasting'): # Importing the forecast module try: - from rat.plugins.forecasting import forecast + from plugins.forecasting.forecast_basin import forecast except: log.exception("Failed to import Forecast plugin due to missing package(s).") log.info('############## Starting RAT forecast for '+config['BASIN']['basin_name']+' #################') - forecast_no_errors = forecast(config, log) + forecast_no_errors = forecast(config, log, low_latency_limit) if(forecast_no_errors>0): log.info('############## RAT-Forecasting run finished for '+config_copy['BASIN']['basin_name']+ ' with '+str(forecast_no_errors)+' error(s). #################') elif(forecast_no_errors==0): @@ -115,7 +153,7 @@ def run_rat(config_fn, operational_latency=None): else: log.error('############## RAT run failed for '+config['BASIN']['basin_name']+' #################') - ############ ----------- Multiple basin run ---------------- ################ + ######################## ----------- Multiple basin run ---------------- ############################# else: # Reading basins metadata try: @@ -143,41 +181,96 @@ def run_rat(config_fn, operational_latency=None): log.info('############## Starting RAT for '+basin+' #################') # Checking if Rat is running operationally with some latency. If yes, update start, end and vic_init_state dates. if operational_latency: + operational_latency = int(operational_latency) + ## Calculation of gfs_days + # If running for more than a latency of 3 days, IMERG data will be there. So data for gfs days is 0. + if operational_latency>low_latency_limit: + gfs_days = 0 + # If running for a lower latency of 0-3 days, GFS data will have to be used for 3-0 days. + else: + gfs_days = 3-operational_latency + try: - log.info(f'Running RAT operationally at a latency of {operational_latency}. Updating start and end date.') + log.info(f'Running RAT operationally at a latency of {operational_latency} day(s) from today. Updating start and end date.') ## If end date is not in basins metadata.columns then it is in config file. if ('BASIN','end') not in basins_metadata.columns: - ## Adding [Basin][start] to metadata.columns if not there with with None value - if ('BASIN','start') not in basins_metadata.columns: - basins_metadata['BASIN','start'] = None - ## Changning [Basin][start] in metadata.columns to previous end value from config file - basins_metadata['BASIN','start'] = copy.deepcopy(config['BASIN']['end']) - config['BASIN']['end'] = datetime.datetime.now().date() - datetime.timedelta(days=int(operational_latency)) - ## Else it is in metadata.columns + # Record the previous end date + previous_end_date = copy.deepcopy(config['BASIN']['end']) + else: + previous_end_date = basins_metadata['BASIN','end'].loc[basins_metadata['BASIN','basin_name']== basin] + # Find vic_init_state_date from last run + if ('GLOBAL','data_dir') not in basins_metadata.columns: + data_dir = config['GLOBAL']['data_dir'] + else: + data_dir = basins_metadata['GLOBAL','data_dir'].loc[basins_metadata['BASIN','basin_name']== basin] + if ('BASIN','region_name') not in basins_metadata.columns: + region_name = config['BASIN']['region_name'] else: - # Updating start - ## Adding [Basin][start] to metadata.columns if not there with None value - if ('BASIN','start') not in basins_metadata.columns: - basins_metadata['BASIN','start'] = None - ## Changning [Basin][start] in metadata.columns to previous end value from metadata - basins_metadata['BASIN','start'].where(basins_metadata['BASIN','basin_name']!= basin, basins_metadata['BASIN','end'], inplace=True) - # Updating end - operational_end_date = datetime.datetime.now().date() - datetime.timedelta(days=int(operational_latency)) - basins_metadata['BASIN','end'].where(basins_metadata['BASIN','basin_name']!= basin, operational_end_date, inplace=True) - ### We can add vic_init_state and rout_init_state to metadata as it will override the values in config anyway. - # Updating vic_init_state - ## Adding [Basin][vic_init_state] to metadata.columns if not there with None value + region_name = basins_metadata['GLOBAL','region_name'].loc[basins_metadata['BASIN','basin_name']== basin] + if ('BASIN','basin_name') not in basins_metadata.columns: + basin_name = config['BASIN']['basin_name'] + else: + basin_name = basins_metadata['GLOBAL','basin_name'].loc[basins_metadata['BASIN','basin_name']== basin] + + ## Adding [Basin][vic_init_date] to metadata.columns if not there with with None value if ('BASIN','vic_init_state') not in basins_metadata.columns: - basins_metadata['BASIN','vic_init_state'] = None - basins_metadata['BASIN','vic_init_state'].where(basins_metadata['BASIN','basin_name']!= basin, basins_metadata['BASIN','start'], inplace=True) + basins_metadata['BASIN','vic_init_state'] = None + # Find vic_init_state + vic_init_state_date = get_vic_init_state_date(previous_end_date, low_latency_limit, data_dir,region_name, basin_name) + # Check if vic init state is not none, else raise error + if not(config['BASIN']['vic_init_state']): + raise Exception('No vic init state file was found from last run.') + # Replace vic_init_state in basins metadata + basins_metadata['BASIN','vic_init_state'].where(basins_metadata['BASIN','basin_name']!= basin, vic_init_state_date, inplace=True) + + ## Adding [Basin][start] to metadata.columns if not there with with None value + if ('BASIN','start') not in basins_metadata.columns: + basins_metadata['BASIN','start'] = None + # Replace start date same as vic_init_date + basins_metadata['BASIN','start'].where(basins_metadata['BASIN','basin_name']!= basin, vic_init_state_date, inplace=True) + + ## Adding [Basin][end] to metadata.columns if not there with with None value + if ('BASIN','end') not in basins_metadata.columns: + basins_metadata['BASIN','end'] = None + # Replace end date as today - operational latency + operational_end_date = datetime.datetime.now().date() - datetime.timedelta(days=int(operational_latency)) + basins_metadata['BASIN','end'].where(basins_metadata['BASIN','basin_name']!= basin, operational_end_date, inplace=True) + # Updating rout_init_state to None for all. - ## Adding [Basin][vic_init_state] to metadata.columns if not there with None value + ## Adding [Basin][rout_init_state] to metadata.columns if not there with None value if ('BASIN','rout_init_state') not in basins_metadata.columns: basins_metadata['BASIN','rout_init_state'] = None basins_metadata['BASIN','rout_init_state'] = None + except: log.exception('Failed to update start and end date for RAT to run operationally. Please make sure RAT has been run atleast once before.') return None + + else: + ## Calculation of gfs_days + # Read start and end date from metadata if there, otherwise from config. + if ('BASIN','start') not in basins_metadata.columns: + start_date_value = config['BASIN']['start'] + else: + start_date_value = basins_metadata['BASIN','start'].loc[basins_metadata['BASIN','basin_name']== basin] + if ('BASIN','end') not in basins_metadata.columns: + end_date_value = config['BASIN']['end'] + else: + end_date_value = basins_metadata['BASIN','end'].loc[basins_metadata['BASIN','basin_name']== basin] + #If not running operationally, check end date and start date's difference from today + end_date_diff_from_today = datetime.datetime.now().date() - end_date_value + start_date_diff_from_today = datetime.datetime.now().date() - start_date_value + # If difference is more than 3 days, gfs_days will be 0. + if end_date_diff_from_today > datetime.timedelta(days=int(low_latency_limit)): + gfs_days = 0 + # Else if start date and today has less than 3 days difference, gfs_days will be start-end + elif (start_date_diff_from_today0): log.info('############## RAT-Forecasting run finished for '+config_copy['BASIN']['basin_name']+ ' with '+str(forecast_no_errors)+' error(s). #################') elif(forecast_no_errors==0): diff --git a/src/rat/toolbox/MiscUtil.py b/src/rat/toolbox/MiscUtil.py index 0f56acd..901f48e 100644 --- a/src/rat/toolbox/MiscUtil.py +++ b/src/rat/toolbox/MiscUtil.py @@ -1,4 +1,6 @@ from pathlib import Path #to work with relative paths +from scipy import stats +import pandas as pd ## Function to convert a relative path to absolute path def rel2abs(relative_path: str) -> str: @@ -21,4 +23,49 @@ def rel2abs(relative_path: str) -> str: absolute_path = Path(relative_path).resolve() # Convert Path object to string - return str(absolute_path) \ No newline at end of file + return str(absolute_path) + +def get_quantile_from_area(filepath, area_value=None): + """ + Calculate the quantile of a given area value from a time series stored in a CSV file. + If the area value is not provided, the function returns the quantile of the last area value in the dataset. + + Parameters: + - filepath (str): Path to the CSV file containing the time series data. The file should have at least 'date' and 'area' columns. + - area_value (float, optional): The area value for which to calculate the quantile. + If None, the quantile of the last area value in the DataFrame is returned. + + Returns: + - quantile_value (float): The quantile of the given area value or the last area value in the dataset. + """ + + # Step 1: Read the CSV file + df = pd.read_csv(filepath) + + # Step 2: Convert 'date' column to datetime format if it's not already + if not pd.api.types.is_datetime64_any_dtype(df['date']): + df['date'] = pd.to_datetime(df['date']) + + # Step 3: Filter the data to ensure the time series spans complete years + df.set_index('date', inplace=True) + df = df.asfreq('D') # Ensure daily frequency + + start_year = df.index[0].year + end_year = df.index[-1].year + + # Adjust start and end to ensure full years + start_date = f'{start_year}-01-01' + end_date = f'{end_year}-12-31' + + # Filter the dataframe for complete years + df_filtered= df[start_date:end_date] + + # Step 4: If area_value is None, use the last value in the 'area' column + if area_value is None: + area_value = df['area'].iloc[-1] + + # Step 5: Compute the quantile of the given area value + area_values = df_filtered['area'].dropna() + quantile_value = stats.percentileofscore(area_values, area_value) / 100 + + return quantile_value \ No newline at end of file diff --git a/src/rat/utils/convert_to_final_outputs.py b/src/rat/utils/convert_to_final_outputs.py index fefa412..95f4309 100644 --- a/src/rat/utils/convert_to_final_outputs.py +++ b/src/rat/utils/convert_to_final_outputs.py @@ -35,18 +35,15 @@ def convert_inflow(inflow_dir, reservoir_shpfile, reservoir_shpfile_column_dict, for inflow_path in inflow_paths: res_name = os.path.splitext(os.path.split(inflow_path)[-1])[0] - if res_name in reservoirs['Inflow_filename'].tolist(): - savepath = final_out_inflow_dir / inflow_path.name + savepath = final_out_inflow_dir / inflow_path.name - df = pd.read_csv(inflow_path, parse_dates=['date']) - df['inflow (m3/d)'] = df['streamflow'] * (24*60*60) # indicate units, convert from m3/s to m3/d - df = df[['date', 'inflow (m3/d)']] + df = pd.read_csv(inflow_path, parse_dates=['date']) + df['inflow (m3/d)'] = df['streamflow'] * (24*60*60) # indicate units, convert from m3/s to m3/d + df = df[['date', 'inflow (m3/d)']] - print(f"Converting [Inflow]: {res_name}") - df.to_csv(savepath, index=False) - print(df.tail()) - else: - print(f"Currently not displayed in website: {res_name}") + print(f"Converting [Inflow]: {res_name}") + df.to_csv(savepath, index=False) + print(df.tail()) def convert_dels(dels_dir, website_v_dir): # Delta S diff --git a/src/rat/utils/files_creator.py b/src/rat/utils/files_creator.py index feb1644..c4a0d6e 100644 --- a/src/rat/utils/files_creator.py +++ b/src/rat/utils/files_creator.py @@ -158,18 +158,11 @@ def create_basin_reservoir_shpfile(reservoir_shpfile,reservoir_shpfile_column_di reservoirs_gdf_column_dict = reservoir_shpfile_column_dict if routing_station_global_data: - reservoirs_spatialjoin = gpd.sjoin(reservoirs, basin_data_crs_changed, "inner")[[ - reservoirs_gdf_column_dict['id_column'], - reservoirs_gdf_column_dict['dam_name_column'], - reservoirs_gdf_column_dict['area_column'], - 'geometry']] + reservoirs_spatialjoin = gpd.sjoin(reservoirs, basin_data_crs_changed, "inner")[reservoirs.columns] reservoirs_spatialjoin['uniq_id'] = reservoirs_spatialjoin[reservoirs_gdf_column_dict['id_column']].astype(str)+'_'+ \ reservoirs_spatialjoin[reservoirs_gdf_column_dict['dam_name_column']].astype(str).str.replace(' ','_') else: - reservoirs_spatialjoin = gpd.sjoin(reservoirs, basin_data_crs_changed, "inner")[[ - reservoirs_gdf_column_dict['dam_name_column'], - reservoirs_gdf_column_dict['area_column'], - 'geometry']] + reservoirs_spatialjoin = gpd.sjoin(reservoirs, basin_data_crs_changed, "inner")[reservoirs.columns] if(reservoirs_spatialjoin.empty): raise Exception('Reservoir names in reservoir shapefile are not matching with the station names in the station file used for routing.') diff --git a/src/rat/utils/utils.py b/src/rat/utils/utils.py index 315bc9e..d787ce0 100644 --- a/src/rat/utils/utils.py +++ b/src/rat/utils/utils.py @@ -3,6 +3,8 @@ import numpy as np from scipy.signal import savgol_filter from datetime import datetime +import xarray as xr +import pandas as pd def days_between(d1, d2): d1 = datetime.strptime(d1, "%Y-%m-%d") @@ -85,3 +87,53 @@ def weighted_moving_average(data, weights, window_size): smoothed_data[i] = np.sum(weighted_values) / np.sum(weights[start:end]) return smoothed_data + +def check_date_in_netcdf(nc_file_path, check_date): + """ + Check if a given date is present in the 'time' dimension of a NetCDF file. + + Parameters: + - nc_file_path (str): Path to the NetCDF file. + - check_date (datetime.datetime): The date to check for in the NetCDF file. + + Returns: + - bool: True if the date is present in the 'time' dimension, False otherwise. + """ + try: + # Open the NetCDF file + with xr.open_dataset(nc_file_path) as ds: + # Convert the check_date to a pandas Timestamp for comparison + check_date = pd.Timestamp(check_date) + + # Check if the date is in the 'time' dimension + if check_date in ds['time'].values: + return True + else: + return False + except Exception as e: + print(f"An error occurred: {e}") + return False + +def get_first_date_from_netcdf(nc_file_path): + """ + Retrieve the first date from the 'time' dimension of a NetCDF file. + + Parameters: + - nc_file_path (str): Path to the NetCDF file. + + Returns: + - datetime.datetime: The first date in the 'time' dimension of the NetCDF file. + """ + try: + # Open the NetCDF file + with xr.open_dataset(nc_file_path) as ds: + # Extract the first date from the 'time' dimension + first_date = ds['time'].values[0] + + # Convert to a datetime.datetime object if necessary + if isinstance(first_date, np.datetime64): + first_date = pd.to_datetime(first_date).to_pydatetime() + return first_date + except Exception as e: + print(f"An error occurred: {e}") + return None diff --git a/src/rat/utils/vic_init_state_finder.py b/src/rat/utils/vic_init_state_finder.py new file mode 100644 index 0000000..7f0d043 --- /dev/null +++ b/src/rat/utils/vic_init_state_finder.py @@ -0,0 +1,40 @@ +import os +import pandas as pd +import datetime + +def get_vic_init_state_date(initial_date, days, data_dir_path, region_name, basin_name): + """ + Check for the presence of a VIC state file in a directory within a given date range. + + Args: + initial_date (str or datetime.datetime): The starting date from which to begin the search. + days (int): The number of days to search backwards from the initial_date. + data_dir_path (str): The data directory path of RAT. + region_name (str): The name of the region for which vic_init_state_date is required. + basin_name (str): The name of the basin for which vic_init_state_date is required. + + Returns: + datetime.datetime or None: The date as a datetime.datetime object if a file is found, + otherwise None if no file is found within the given days from initial date. + """ + # Convert the initial_date to a datetime object if it isn't already + if isinstance(initial_date, str): + date = pd.to_datetime(initial_date).to_pydatetime() + else: + date = initial_date + + # Check for no of days + the initial_date + for _ in range(days+1): + # Construct the file path for the current date + file_path = os.path.join(data_dir_path, region_name, 'basins', basin_name, 'vic', 'vic_init_states', f'state_.{date:%Y%m%d}_00000.nc') + # print(file_path) + + # Check if the file exists at the constructed file path + if os.path.isfile(file_path): + return date + + # Move to the previous date + date -= datetime.timedelta(days=1) + + # If no file is found in the entire range, return None + return None \ No newline at end of file diff --git a/src/rat/utils/vic_param_reader.py b/src/rat/utils/vic_param_reader.py index 7ba29d4..150f61b 100644 --- a/src/rat/utils/vic_param_reader.py +++ b/src/rat/utils/vic_param_reader.py @@ -13,7 +13,7 @@ class VICParameterFile: def __init__(self, config, basin_name, startdate=None, enddate=None, vic_output_path=None, vic_section='VIC', - forcing_prefix=None, runname=None, init_state=None, save_init_state=True, init_state_dir=None, + forcing_prefix=None, runname=None, init_state=None, init_state_dir=None, save_init_state=True, init_state_save_date= None, init_state_out_dir=None, intermediate_files= False): self.params = { @@ -110,7 +110,11 @@ def __init__(self, config, basin_name, startdate=None, enddate=None, vic_output_ self.straight_from_metsim = False self.save_init_state = save_init_state #VIC State Save Date - self.vic_init_state_save_date = config['BASIN']['end'] + if self.save_init_state: + if not init_state_save_date: + self.vic_init_state_save_date = config['BASIN']['end'] + else: + self.vic_init_state_save_date = init_state_save_date if init_state_dir: self.init_state_dir = init_state_dir else: @@ -336,6 +340,18 @@ def _load_from_config(self): else: del self.params['state_file_params']['INIT_STATE'] + # If using init state and also wants to save init state file, check if they don't have the same date (or path). If yes, do not save the state file as VIC will give error. + if (self.init_state) and (self.save_init_state): + vic_init_state_save_date_str = str(self.vic_init_state_save_date.strftime('%Y'))+str(self.vic_init_state_save_date.strftime('%m'))+\ + str(self.vic_init_state_save_date.strftime('%d')) + init_state_save_file_path = os.path.join(config['GLOBAL']['data_dir'],config['BASIN']['region_name'], + 'basins',self.basin_name,'vic',self.init_state_dir, + 'state_.'+vic_init_state_save_date_str+'_00000.nc') + + if self.params['state_file_params']['INIT_STATE']==init_state_save_file_path: + del self.params['state_file_params']['STATENAME'] + self.save_init_state=False + def _out_format_params(self): # return a VIC compatible string of paramters header = '\n'.join([ f'#------------------------- VIC Parameter File -------------------------#',