Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added ability to run at a low latency of less than 3 days (0-3) #112

Merged
merged 28 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
9feb42b
Changed default behaviour of RAT:
SanchitMinocha Aug 24, 2024
595b04c
Updated comments
SanchitMinocha Aug 24, 2024
9447b6e
All IMERG data has been revised to V07B
SanchitMinocha Aug 24, 2024
f0d3767
Bug fix in read_write_chunk & updated CombinedNC
SanchitMinocha Aug 24, 2024
a08f89d
Dwnlod of new low latency data using GFS & GEFS
SanchitMinocha Aug 24, 2024
bb2b4a5
Divide forecasting into 2 to avoid circular import
SanchitMinocha Aug 24, 2024
e4dcb59
Vic_init_state_finder to run operationally
SanchitMinocha Aug 24, 2024
f67e024
Vic save date is no longer end date
SanchitMinocha Aug 24, 2024
e393b5f
divide forecasting into 2 to avoid circular import
SanchitMinocha Aug 24, 2024
c864ebf
Added low latency feature in RAT
SanchitMinocha Aug 24, 2024
79cb63b
updated storage scenario
pritamd47 Aug 25, 2024
c741d36
updated storage based scenario in forecasting
pritamd47 Aug 25, 2024
3394b9a
Removed duplicate of forecasting.py
SanchitMinocha Aug 25, 2024
f113700
Renamed imerg_latency as low_latency_limit
SanchitMinocha Aug 25, 2024
21de4dd
Added additional util functions for usage
SanchitMinocha Aug 25, 2024
d85a724
Added function to estimate quantile of res. area
SanchitMinocha Aug 30, 2024
c1b3077
Updated basin reservoir shpfile create fn
SanchitMinocha Aug 30, 2024
7c2ccd7
conversion of all inflow files to final outputs
SanchitMinocha Aug 30, 2024
7dea06d
forecast outfl. scenario by user & actual storage
SanchitMinocha Aug 30, 2024
d8a4e91
Easy forecasting for multiple dates in the past.
SanchitMinocha Aug 30, 2024
fb16a94
Updated forecasting docs
SanchitMinocha Aug 30, 2024
411eb72
Bug fix: rout init & added basedate parameter
SanchitMinocha Aug 30, 2024
7c11d59
added low_latency as parameter
SanchitMinocha Aug 30, 2024
fd9a9f6
Bug fix: loading of netcdf file to overwrite
SanchitMinocha Aug 30, 2024
17d67b6
added generic fn to create metsim inputs
SanchitMinocha Aug 30, 2024
08b5eba
Added resolution for review comments
SanchitMinocha Aug 30, 2024
8faea6c
Clearification of forecast_vic_init_state in the docs
SanchitMinocha Sep 9, 2024
240269a
Updated docs to describe changes in new version
SanchitMinocha Sep 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/Configuration/rat_config.md
Original file line number Diff line number Diff line change
Expand Up @@ -808,7 +808,7 @@ This section of the configuration file describes the parameters defined by `rout
If `clean_previous_outputs` is `True`, the previous outputs are cleaned before executing any step in `steps`.

!!! tip_note "Tip"
You should use `clean_previous_outputs` if you want to have fresh outputs of RAT for a river basin. Otherwise, by default RAT will keep appending the new outputs to the same files and will concatenate data by calendar dates.
You should use `clean_previous_outputs` if you want to have fresh outputs of RAT for a river basin. Otherwise, by default RAT will keep appending the new outputs to the same files and will concatenate data by calendar dates. In case the new outputs and previous outputs have some coinciding dates, RAT will replace the previous outputs with the new outputs for these dates.

### Confidential
* <h6 class="parameter_heading">*`secrets:`* :</h6>
Expand Down
2 changes: 1 addition & 1 deletion src/rat/core/run_postprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ def calc_E(res_data, start_date, end_date, forcings_path, vic_res_path, sarea, s
# Concat the two dataframes into a new dataframe holding all the data (memory intensive):
complement = pd.concat([existing_data, new_data], ignore_index=True)
# Remove all duplicates:
complement.drop_duplicates(subset=['time'],inplace=True, keep='first')
complement.drop_duplicates(subset=['time'],inplace=True, keep='last')
complement.to_csv(savepath, index=False)
else:
data[['time', 'penman_E']].rename({'penman_E': 'OUT_EVAP'}, axis=1).to_csv(savepath, index=False)
Expand Down
2 changes: 1 addition & 1 deletion src/rat/core/run_routing.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ def generate_inflow(src_dir, dst_dir):
# Concat the two dataframes into a new dataframe holding all the data (memory intensive):
complement = pd.concat([existing_data, new_data], ignore_index=True)
# Remove all duplicates:
complement.drop_duplicates(subset=['date'], inplace=True, keep='first')
complement.drop_duplicates(subset=['date'], inplace=True, keep='last')
complement.sort_values(by='date', inplace=True)
complement.to_csv(outpath, index=False)
else:
Expand Down
4 changes: 2 additions & 2 deletions src/rat/core/run_vic.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def generate_routing_input_state(self, ndays, rout_input_state_file, save_path,
first_existing_time = new_vic_output.time[0]
new_vic_output.close()

#Preprocessing function for merging netcdf files
#Preprocessing function for merging netcdf files by removing coinciding dates from the first and using those values from latest
def _remove_coinciding_days(ds, cutoff_time, ndays):
file_name = ds.encoding["source"]
file_stem = Path(file_name).stem
Expand All @@ -55,7 +55,7 @@ def _remove_coinciding_days(ds, cutoff_time, ndays):
return ds
remove_coinciding_days_func = partial(_remove_coinciding_days, cutoff_time=first_existing_time, ndays=ndays)

# Merging previous and new vic outputs
# Merging previous and new vic outputs by taking 365 days data in state file (previous vic outputs) before the first date in new vic output. So coinciding data will be removed from state file.
try:
save_vic_output = xr.open_mfdataset([rout_input_state_file,self.vic_result],{'time':365}, preprocess=remove_coinciding_days_func)
save_vic_output.to_netcdf(save_path)
Expand Down
55 changes: 53 additions & 2 deletions src/rat/data_processing/metsim_input_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
log.setLevel(LOG_LEVEL)

class CombinedNC:
def __init__(self, start, end, datadir, basingridpath, outputdir, use_previous, forecast_dir=None, forecast_basedate=None, climatological_data=None, z_lim=3):
def __init__(self, start, end, datadir, basingridpath, outputdir, use_previous, forecast_dir=None, forecast_basedate=None, climatological_data=None, z_lim=3, low_latency_dir=None):
"""
Parameters:
start: Start date in YYYY-MM-DD format
Expand All @@ -43,8 +43,13 @@ def __init__(self, start, end, datadir, basingridpath, outputdir, use_previous,
self._latitudes, self._longitudes = self._get_lat_long_meshgrid()

if forecast_dir:
log.debug("Combining forcing data for forecasting mode.")
self.read_forecast(forecast_dir, forecast_basedate)
self._write()
elif low_latency_dir:
log.debug("Combining forcing data for low latency mode.")
self.read_low_latency(low_latency_dir)
self._write()
else:
self._read_and_write_in_chunks()
# self._read()
Expand Down Expand Up @@ -173,6 +178,52 @@ def read_forecast(self, forecast_dir, basedate):
self.winds[day, :, :] = wind
# pbar.update(1)

def read_low_latency(self, low_latency_dir):
low_latency_dir = Path(low_latency_dir)
start = self._start
end = self._end

# define data arrays
self.precips = np.zeros((self._total_days+1, self._rast.height, self._rast.width))
self.tmaxes = np.zeros((self._total_days+1, self._rast.height, self._rast.width))
self.tmins = np.zeros((self._total_days+1, self._rast.height, self._rast.width))
self.winds = np.zeros((self._total_days+1, self._rast.height, self._rast.width))

self.dates = pd.date_range(start, end)

for day, date in enumerate(self.dates):
fileDate = date
reqDate = fileDate.strftime("%Y-%m-%d")
log.debug("Combining data: %s", reqDate)
# pbar.set_description(reqDate)

precipfilepath = low_latency_dir / f'gefs-chirps/processed' / f'{date:%Y%m%d}' / f'{date:%Y%m%d}.asc'
precipitation = rio.open(precipfilepath).read(1, masked=True).astype(np.float32).filled(np.nan)#.flatten()[self.gridvalue==0.0]

#Reading Maximum Temperature ASCII file contents
tmaxfilepath = low_latency_dir / f'gfs/processed/{date:%Y%m%d}/tmax/{date:%Y%m%d}.asc'
tmax = rio.open(tmaxfilepath).read(1, masked=True).astype(np.float32).filled(np.nan)#.flatten()[self.gridvalue==0.0]

#Reading Minimum Temperature ASCII file contents
tminfilepath = low_latency_dir / f'gfs/processed/{date:%Y%m%d}/tmin/{date:%Y%m%d}.asc'
tmin = rio.open(tminfilepath).read(1, masked=True).astype(np.float32).filled(np.nan)#.flatten()[self.gridvalue==0.0]

#Reading Average Wind Speed ASCII file contents
uwndfilepath = low_latency_dir / f'gfs/processed/{date:%Y%m%d}/uwnd/{date:%Y%m%d}.asc'
uwnd = rio.open(uwndfilepath).read(1, masked=True).astype(np.float32).filled(np.nan)

# #Reading Average Wind Speed ASCII file contents
vwndfilepath = low_latency_dir / f'gfs/processed/{date:%Y%m%d}/vwnd/{date:%Y%m%d}.asc'
vwnd = rio.open(vwndfilepath).read(1, masked=True).astype(np.float32).filled(np.nan)
wind = (0.75*np.sqrt(uwnd**2 + vwnd**2))#.flatten()[self.gridvalue==0.0]

# self.dates.append(fileDate)
self.precips[day, :, :] = precipitation
self.tmaxes[day, :, :] = tmax
self.tmins[day, :, :] = tmin
self.winds[day, :, :] = wind
# pbar.update(1)

# Imputes missing data by interpolation in the order of dimensions time, lon, lat.
def _impute_basin_missing_data(self, combined_data):
combine_nomiss_data = combined_data
Expand Down Expand Up @@ -350,7 +401,7 @@ def _write_chunk(self, ds_list, existing_to_append=None, last_existing_time=None
# Separate the non-time-dependent variable
if 'extent' in existing_to_append.data_vars:
# Drop the extent variable from existing_to_append if it already exists in the file for concat operation with other chunks. It will be added after writing all chunks in _apply_dataset_operations
ds_chunk = ds_chunk.drop_vars('extent')
existing_to_append = existing_to_append.drop_vars('extent')
write_ds_chunk = xr.concat([existing_to_append, ds_chunk_sel], dim='time')
write_ds_chunk.to_netcdf(self._outputpath, mode='w', unlimited_dims=['time'])
# Else just write the chunk in file (by creating new if first chunk, otherwise append)
Expand Down
23 changes: 3 additions & 20 deletions src/rat/data_processing/newdata.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,27 +56,10 @@ def _determine_precip_link_and_version(date):
if version == "IMERG-FINAL":
link = f"ftp://arthurhou.pps.eosdis.nasa.gov/gpmdata/{date.strftime('%Y')}/{date.strftime('%m')}/{date.strftime('%d')}/gis/3B-DAY-GIS.MS.MRG.3IMERG.{date.strftime('%Y%m%d')}-S000000-E235959.0000.V06A.tif"
elif version == "IMERG-LATE":
if date >= datetime(2024, 6, 1): # Version was changed from V06E to V07B
link = f"https://jsimpsonhttps.pps.eosdis.nasa.gov/imerg/gis/{date.strftime('%Y')}/{date.strftime('%m')}/3B-HHR-L.MS.MRG.3IMERG.{date.strftime('%Y%m%d')}-S233000-E235959.1410.V07B.1day.tif"
elif date >= datetime(2023, 11, 8): # Version was changed from V06D to V06E
link = f"https://jsimpsonhttps.pps.eosdis.nasa.gov/imerg/gis/{date.strftime('%Y')}/{date.strftime('%m')}/3B-HHR-L.MS.MRG.3IMERG.{date.strftime('%Y%m%d')}-S233000-E235959.1410.V06E.1day.tif"
elif date >= datetime(2023, 7, 1): # Version was changed from V06C to V06D
link = f"https://jsimpsonhttps.pps.eosdis.nasa.gov/imerg/gis/{date.strftime('%Y')}/{date.strftime('%m')}/3B-HHR-L.MS.MRG.3IMERG.{date.strftime('%Y%m%d')}-S233000-E235959.1410.V06D.1day.tif"
elif date >= datetime(2022, 5, 8): # Version was changed from V06B to V06C
link = f"https://jsimpsonhttps.pps.eosdis.nasa.gov/imerg/gis/{date.strftime('%Y')}/{date.strftime('%m')}/3B-HHR-L.MS.MRG.3IMERG.{date.strftime('%Y%m%d')}-S233000-E235959.1410.V06C.1day.tif"
else:
link = f"https://jsimpsonhttps.pps.eosdis.nasa.gov/imerg/gis/{date.strftime('%Y')}/{date.strftime('%m')}/3B-HHR-L.MS.MRG.3IMERG.{date.strftime('%Y%m%d')}-S233000-E235959.1410.V06B.1day.tif"
# if date >= datetime(2024, 6, 1): # Version was changed from V06E to V07B
link = f"https://jsimpsonhttps.pps.eosdis.nasa.gov/imerg/gis/{date.strftime('%Y')}/{date.strftime('%m')}/3B-HHR-L.MS.MRG.3IMERG.{date.strftime('%Y%m%d')}-S233000-E235959.1410.V07B.1day.tif"
else:
if date >= datetime(2024, 6, 1): # Version was changed from V06E to V07B
link = f"https://jsimpsonhttps.pps.eosdis.nasa.gov/imerg/gis/early/{date.strftime('%Y')}/{date.strftime('%m')}/3B-HHR-E.MS.MRG.3IMERG.{date.strftime('%Y%m%d')}-S233000-E235959.1410.V07B.1day.tif"
elif date >= datetime(2023, 11, 8): # Version was changed from V06D to V06E
link = f"https://jsimpsonhttps.pps.eosdis.nasa.gov/imerg/gis/early/{date.strftime('%Y')}/{date.strftime('%m')}/3B-HHR-E.MS.MRG.3IMERG.{date.strftime('%Y%m%d')}-S233000-E235959.1410.V06E.1day.tif"
elif date >= datetime(2023, 7, 1): # Version was changed from V06C to V06D
link = f"https://jsimpsonhttps.pps.eosdis.nasa.gov/imerg/gis/early/{date.strftime('%Y')}/{date.strftime('%m')}/3B-HHR-E.MS.MRG.3IMERG.{date.strftime('%Y%m%d')}-S233000-E235959.1410.V06D.1day.tif"
elif date >= datetime(2022, 5, 8): # Version was changed from V06B to V06C
link = f"https://jsimpsonhttps.pps.eosdis.nasa.gov/imerg/gis/early/{date.strftime('%Y')}/{date.strftime('%m')}/3B-HHR-E.MS.MRG.3IMERG.{date.strftime('%Y%m%d')}-S233000-E235959.1410.V06C.1day.tif"
else:
link = f"https://jsimpsonhttps.pps.eosdis.nasa.gov/imerg/gis/early/{date.strftime('%Y')}/{date.strftime('%m')}/3B-HHR-E.MS.MRG.3IMERG.{date.strftime('%Y%m%d')}-S233000-E235959.1410.V06B.1day.tif"
link = f"https://jsimpsonhttps.pps.eosdis.nasa.gov/imerg/gis/early/{date.strftime('%Y')}/{date.strftime('%m')}/3B-HHR-E.MS.MRG.3IMERG.{date.strftime('%Y%m%d')}-S233000-E235959.1410.V07B.1day.tif"
return (link,version)

def _get_cmd_precip_download(outputpath,link,version,secrets):
Expand Down
63 changes: 63 additions & 0 deletions src/rat/data_processing/newdata_low_latency.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import pandas as pd
from logging import getLogger

from plugins.forecasting.forecasting import get_gefs_precip
from plugins.forecasting.forecasting import get_GFS_data
from rat.utils.logging import LOG_NAME, LOG_LEVEL, NOTIFICATION

# Getting the log-level 2
log = getLogger(LOG_NAME)
log.setLevel(LOG_LEVEL)

def get_newdata_low_latency(start, end , basin_bounds, raw_low_latency_dir, processed_low_latency_dir, low_latency_gfs_dir):
"""
Downloads and processes low-latency weather data (precipitation, temperature, wind speed) for a given basin
over a specified date range.

Args:
start (datetime.datetime): The start date for the data retrieval.
end (datetime.datetime): The end date for the data retrieval.
basin_bounds (tuple or dict): The geographical bounds of the basin for which data is to be processed.
This can be a tuple of coordinates or a dictionary with bounding box details.
raw_low_latency_dir (str): Directory path where raw low-latency GEFS data is stored.
processed_low_latency_dir (str): Directory path where processed low-latency GEFS data will be saved.
low_latency_gfs_dir (str): Directory path where GFS data (temperature, wind speed) is stored.

Returns:
None

Description:
This function iterates over a date range defined by `start` and `end`. For each date, it:
1. Downloads and processes GEFS-CHIRPS precipitation data for the given basin bounds.
2. Downloads and processes GFS data (including temperature and wind speed) for the same date.

The processed data is saved to the specified directories.

Logging:
The function logs the status of the data download and processing for each date using `log.info`,
which is assumed to be configured in the broader application context.
"""
# Create array of dates
low_latency_dates = pd.date_range(start, end)
# Getting now cast for every date, so 0 lead time
lead_time = 0

for date in low_latency_dates:
#Download and process GEFS-CHIRPS data to the basin bounds (precipitation)
try:
log.info(f"Downloading low latency GEFS Precipitation for {date.strftime('%Y-%m-%d')}.")
get_gefs_precip(basin_bounds=basin_bounds, forecast_raw_dir=raw_low_latency_dir, forecast_processed_dir=processed_low_latency_dir,begin= date, lead_time=lead_time, low_latency=True)
except Exception as e:
log.error(f"Downloading of low latency GEFS Precipitation for {date.strftime('%Y-%m-%d')} failed.")
log.error(e)
#Download and process GFS data (tmin, tmax, wind speed)
try:
log.info(f"Downloading low latency GFS data for {date.strftime('%Y-%m-%d')}.")
get_GFS_data(basedate=date,lead_time=lead_time, basin_bounds=basin_bounds, gfs_dir=low_latency_gfs_dir, hour_of_day=12)
except Exception as e:
log.error(f"Downloading of low latency GFS data for {date.strftime('%Y-%m-%d')} failed.")
log.error(e)




Loading