From 5d4eb59f408ce48c87379708c12ab253e470c0c1 Mon Sep 17 00:00:00 2001 From: Huihuo Zheng Date: Fri, 14 Feb 2025 08:55:50 -0600 Subject: [PATCH] fixed bug --- dlio_benchmark/main.py | 6 +++--- dlio_benchmark/utils/config.py | 1 - dlio_benchmark/utils/statscounter.py | 14 ++++---------- dlio_benchmark/utils/utility.py | 18 +++++++++--------- 4 files changed, 16 insertions(+), 23 deletions(-) diff --git a/dlio_benchmark/main.py b/dlio_benchmark/main.py index abfccf0f..ff9b389b 100644 --- a/dlio_benchmark/main.py +++ b/dlio_benchmark/main.py @@ -253,9 +253,9 @@ def _train(self, epoch): if overall_step > max_steps or ((self.total_training_steps > 0) and (overall_step > self.total_training_steps)): if self.args.my_rank == 0: logging.info(f"{utcnow()} Maximum number of steps reached") - if (block_step != 1 and self.do_checkpoint) or (not self.do_checkpoint): - self.stats.end_block(epoch, block, block_step - 1) - break + if (block_step != 1 and self.do_checkpoint) or (not self.do_checkpoint): + self.stats.end_block(epoch, block, block_step - 1) + break self.stats.batch_loaded(epoch, overall_step, block) # Log a new block, unless it's the first one which we've already logged before the loop if block_step == 1 and block != 1: diff --git a/dlio_benchmark/utils/config.py b/dlio_benchmark/utils/config.py index c2fa6abf..93d6ea50 100644 --- a/dlio_benchmark/utils/config.py +++ b/dlio_benchmark/utils/config.py @@ -411,7 +411,6 @@ def reconfigure(self, epoch_number): global_train_sample_sum = DLIOMPI.get_instance().reduce(local_train_sample_sum) global_eval_sample_sum = DLIOMPI.get_instance().reduce(local_eval_sample_sum) if self.my_rank == 0: - logging.info(f"total sample: train {global_train_sample_sum} eval {global_eval_sample_sum}") if self.train_sample_index_sum != global_train_sample_sum: raise Exception(f"Sharding of train samples are missing samples got {global_train_sample_sum} but expected {self.train_sample_index_sum}") diff --git a/dlio_benchmark/utils/statscounter.py b/dlio_benchmark/utils/statscounter.py index 77f42ef9..7ce7ab72 100644 --- a/dlio_benchmark/utils/statscounter.py +++ b/dlio_benchmark/utils/statscounter.py @@ -174,17 +174,11 @@ def end_run(self): metric = metric + f"[METRIC] Training Accelerator Utilization [AU] (%): {np.mean(train_au):.4f} ({np.std(train_au):.4f})\n" metric = metric + f"[METRIC] Training Throughput (samples/second): {np.mean(train_throughput):.4f} ({np.std(train_throughput):.4f})\n" metric = metric + f"[METRIC] Training I/O Throughput (MB/second): {np.mean(train_throughput)*self.record_size/1024/1024:.4f} ({np.std(train_throughput)*self.record_size/1024/1024:.4f})\n" - metric = metric + f"[METRIC] **Expected Throughputs if compute-bound\n" - metric = metric + f"[METRIC] Training Throughput (expected) (samples/second): {np.mean(train_throughput/train_au)*100:.4f}\n" - metric = metric + f"[METRIC] Training I/O Throughput (expected) (MB/second): {np.mean(train_throughput/train_au)*100*self.record_size/1024/1024:.4f}\n" if self.args.do_eval: metric = metric + f"[METRIC] Eval Accelerator Utilization [AU] (%): {np.mean(eval_au):.4f} ({np.std(eval_au):.4f})\n" metric = metric + f"[METRIC] Eval Throughput (samples/second): {np.mean(eval_throughput):.6f} ({np.std(eval_throughput):.6f})\n" metric = metric + f"[METRIC] Eval Throughput (MB/second): {np.mean(eval_throughput)*self.record_size/1024/1024:.6f} ({np.std(eval_throughput)*self.record_size/1024/1024:.6f})\n" - metric = metric + f"[METRIC] **Expected Throughputs if compute-bound\n" - metric = metric + f"[METRIC] Eval Throughput (expected) (samples/second): {np.mean(eval_throughput/eval_au)*100:.4f}\n" - metric = metric + f"[METRIC] Eval I/O Throughput (expected) (MB/second): {np.mean(eval_throughput/eval_au) * self.record_size/1024/1024:.4f}\n" metric+="[METRIC] ==========================================================\n" logging.info(metric) def start_train(self, epoch): @@ -319,7 +313,7 @@ def batch_loaded(self, epoch, step, block): self.output[epoch]['load'][key].append(duration) else: self.output[epoch]['load'][key] = [duration] - logging.debug(f"{utcnow()} Rank {self.my_rank} step {step}: loaded {self.batch_size} samples in {duration} s") + logging.debug(f"{utcnow()} Rank {self.my_rank} step {step}: loaded {self.batch_size} samples in {duration:.4f} s") def batch_processed(self, epoch, step, block): current_time = time() @@ -332,7 +326,7 @@ def batch_processed(self, epoch, step, block): else: self.output[epoch]['proc'] = [duration] self.output[epoch]['compute']=[self.computation_time] - logging.info(f"{utcnow()} Rank {self.my_rank} step {step} processed {self.batch_size} samples in {duration}s)") + logging.info(f"{utcnow()} Rank {self.my_rank} step {step} processed {self.batch_size} samples in {duration:.4f}s)") def compute_metrics_train(self, epoch, block): key = f"block{block}" @@ -361,7 +355,7 @@ def compute_metrics_eval(self, epoch): def eval_batch_loaded(self, epoch, step): duration = time() - self.start_time_loading self.output[epoch]['load']['eval'].append(duration) - logging.debug(f"{utcnow()} Rank {self.my_rank} step {step} loaded {self.batch_size_eval} samples in {duration} s") + logging.debug(f"{utcnow()} Rank {self.my_rank} step {step} loaded {self.batch_size_eval} samples in {duration:.4f} s") def eval_batch_processed(self, epoch, step): current_time = time() @@ -369,7 +363,7 @@ def eval_batch_processed(self, epoch, step): computation_time = current_time - self.start_time_compute self.output[epoch]['proc']['eval'].append(duration) self.output[epoch]['compute']['eval'].append(computation_time) - logging.info(f"{utcnow()} Rank {self.my_rank} step {step} processed {self.batch_size_eval} samples in {duration} s") + logging.info(f"{utcnow()} Rank {self.my_rank} step {step} processed {self.batch_size_eval} samples in {duration:.4f} s") def finalize(self): self.summary['end'] = utcnow() def save_data(self): diff --git a/dlio_benchmark/utils/utility.py b/dlio_benchmark/utils/utility.py index 52a3eb00..f96228b8 100644 --- a/dlio_benchmark/utils/utility.py +++ b/dlio_benchmark/utils/utility.py @@ -37,19 +37,19 @@ from dftracer.logger import dftracer as PerfTrace, dft_fn as Profile, DFTRACER_ENABLE as DFTRACER_ENABLE except: class Profile(object): - def __init__(self, **kwargs): - return - def log(self, **kwargs): - return - def log_init(self, **kwargs): - return - def iter(self, **kwargs): + def __init__(self, name=None, cat=None, **kwargs): return + def log(self, func, **kwargs): + return func + def log_init(self, func, **kwargs): + return func + def iter(self, a, **kwargs): + return a def __enter__(self): return - def __exit__(self, **kwargs): + def __exit__(self, type, value, traceback, **kwargs): return - def update(self, **kwargs): + def update(self, epoch = 0, step =0, size=0, default = None, **kwargs): return def flush(self): return