Skip to content

Commit

Permalink
Cleaning up redundant method calls, adding logging details
Browse files Browse the repository at this point in the history
  • Loading branch information
jsschreck committed Dec 27, 2024
1 parent 6b6093a commit cb3d266
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 19 deletions.
18 changes: 14 additions & 4 deletions credit/datasets/era5_multistep_batcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,17 @@ def __init__(
self.time_steps = None # Tracks time steps for each batch index
self.forecast_step_counts = None # Track forecast step counts for each batch item

# initialze the batch indices by faking the epoch number here and resetting to None
# this is mainly a feature for working with smaller datasets / testing purposes
self.set_epoch(0)
self.current_epoch = None
if len(self.batch_indices) < batch_size:
logger.warning(
"Note that the batch size is smaller than the number of data indices"
" if this is not what you wanted, check batch_size in your config."
)
self.batch_size = min(batch_size, len(self.batch_indices))

def initialize_batch(self):
"""
Initializes batch indices using DistributedSampler's indices.
Expand Down Expand Up @@ -343,7 +354,7 @@ def set_epoch(self, epoch):
self.initialize_batch()

def batches_per_epoch(self):
return math.ceil(len(list(self.sampler)) / self.batch_size)
return math.ceil(len(list(self.batch_indices)) / self.batch_size)

def __getitem__(self, _):
"""
Expand Down Expand Up @@ -590,9 +601,8 @@ def worker_process(self, k, index_pair, result_dict):
logger.info("Initiating shutdown sequence.")
self.shutdown()
return
except:
raise RuntimeError(f"Error in worker process for index {k}: {e}") from e

except: # This is here to catch the workers that may not have died
raise RuntimeError(f"Error in worker process for index {k}")

def _fetch_batch(self):
"""
Expand Down
27 changes: 12 additions & 15 deletions credit/datasets/load_dataset_and_dataloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,11 @@
MultiprocessingBatcher,
MultiprocessingBatcherPrefetch
)
from credit.data import ERA5_and_Forcing_Dataset
from torch.utils.data import DataLoader
from torch.utils.data.distributed import DistributedSampler
from credit.transforms import load_transforms
from credit.parser import credit_main_parser, training_data_check
from credit.datasets import setup_data_loading, set_globals
import logging
import sys
import re


Expand Down Expand Up @@ -50,13 +48,14 @@ def load_dataset(conf, rank=0, world_size=1, is_train=True):
Returns:
Dataset: The loaded dataset.
"""
try:
data_config = setup_data_loading(conf)
except KeyError:
logging.warning(
"You must run credit.parser.credit_main_parser(conf) before loading data. Exiting."
)
sys.exit()
seed = conf["seed"]
conf = credit_main_parser(
conf, parse_training=True, parse_predict=False, print_summary=False
)
training_data_check(conf, print_summary=False) # this is redundant once load_dataset is called from train_*.py
data_config = setup_data_loading(conf)

training_type = "train" if is_train else "valid"
dataset_type = conf["data"].get("dataset_type", )
batch_size = conf["trainer"][f"{training_type}_batch_size"]
Expand Down Expand Up @@ -302,16 +301,17 @@ def load_dataloader(conf, dataset, rank=0, world_size=1, is_train=True):
if __name__ == "__main__":

import sys
import time
import yaml
from credit.parser import credit_main_parser, training_data_check
from credit.datasets import setup_data_loading, set_globals

if len(sys.argv) != 2:
print("Usage: python script.py [dataset_type]")
sys.exit(1)

dataset_id = int(sys.argv[1])

import time
import yaml

# Set up the logger
logging.basicConfig(
level=logging.INFO,
Expand All @@ -328,7 +328,6 @@ def load_dataloader(conf, dataset, rank=0, world_size=1, is_train=True):
conf, parse_training=True, parse_predict=False, print_summary=False
)
training_data_check(conf, print_summary=False)
data_config = setup_data_loading(conf)

# options
dataset_type = [
Expand All @@ -352,8 +351,6 @@ def load_dataloader(conf, dataset, rank=0, world_size=1, is_train=True):
conf["data"]["valid_forecast_len"] = 0
conf["data"]["dataset_type"] = dataset_type

set_globals(data_config, namespace=globals())

try:
# Load the dataset using the provided dataset_type
dataset = load_dataset(conf, rank=rank, world_size=world_size)
Expand Down

0 comments on commit cb3d266

Please sign in to comment.