Skip to content

Commit

Permalink
fixed bug
Browse files Browse the repository at this point in the history
  • Loading branch information
zhenghh04 committed Feb 14, 2025
1 parent 9da8f2a commit 5d4eb59
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 23 deletions.
6 changes: 3 additions & 3 deletions dlio_benchmark/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,9 +253,9 @@ def _train(self, epoch):
if overall_step > max_steps or ((self.total_training_steps > 0) and (overall_step > self.total_training_steps)):
if self.args.my_rank == 0:
logging.info(f"{utcnow()} Maximum number of steps reached")
if (block_step != 1 and self.do_checkpoint) or (not self.do_checkpoint):
self.stats.end_block(epoch, block, block_step - 1)
break
if (block_step != 1 and self.do_checkpoint) or (not self.do_checkpoint):
self.stats.end_block(epoch, block, block_step - 1)
break
self.stats.batch_loaded(epoch, overall_step, block)
# Log a new block, unless it's the first one which we've already logged before the loop
if block_step == 1 and block != 1:
Expand Down
1 change: 0 additions & 1 deletion dlio_benchmark/utils/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,6 @@ def reconfigure(self, epoch_number):
global_train_sample_sum = DLIOMPI.get_instance().reduce(local_train_sample_sum)
global_eval_sample_sum = DLIOMPI.get_instance().reduce(local_eval_sample_sum)
if self.my_rank == 0:
logging.info(f"total sample: train {global_train_sample_sum} eval {global_eval_sample_sum}")
if self.train_sample_index_sum != global_train_sample_sum:
raise Exception(f"Sharding of train samples are missing samples got {global_train_sample_sum} but expected {self.train_sample_index_sum}")

Expand Down
14 changes: 4 additions & 10 deletions dlio_benchmark/utils/statscounter.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,17 +174,11 @@ def end_run(self):
metric = metric + f"[METRIC] Training Accelerator Utilization [AU] (%): {np.mean(train_au):.4f} ({np.std(train_au):.4f})\n"
metric = metric + f"[METRIC] Training Throughput (samples/second): {np.mean(train_throughput):.4f} ({np.std(train_throughput):.4f})\n"
metric = metric + f"[METRIC] Training I/O Throughput (MB/second): {np.mean(train_throughput)*self.record_size/1024/1024:.4f} ({np.std(train_throughput)*self.record_size/1024/1024:.4f})\n"
metric = metric + f"[METRIC] **Expected Throughputs if compute-bound\n"
metric = metric + f"[METRIC] Training Throughput (expected) (samples/second): {np.mean(train_throughput/train_au)*100:.4f}\n"
metric = metric + f"[METRIC] Training I/O Throughput (expected) (MB/second): {np.mean(train_throughput/train_au)*100*self.record_size/1024/1024:.4f}\n"

if self.args.do_eval:
metric = metric + f"[METRIC] Eval Accelerator Utilization [AU] (%): {np.mean(eval_au):.4f} ({np.std(eval_au):.4f})\n"
metric = metric + f"[METRIC] Eval Throughput (samples/second): {np.mean(eval_throughput):.6f} ({np.std(eval_throughput):.6f})\n"
metric = metric + f"[METRIC] Eval Throughput (MB/second): {np.mean(eval_throughput)*self.record_size/1024/1024:.6f} ({np.std(eval_throughput)*self.record_size/1024/1024:.6f})\n"
metric = metric + f"[METRIC] **Expected Throughputs if compute-bound\n"
metric = metric + f"[METRIC] Eval Throughput (expected) (samples/second): {np.mean(eval_throughput/eval_au)*100:.4f}\n"
metric = metric + f"[METRIC] Eval I/O Throughput (expected) (MB/second): {np.mean(eval_throughput/eval_au) * self.record_size/1024/1024:.4f}\n"
metric+="[METRIC] ==========================================================\n"
logging.info(metric)
def start_train(self, epoch):
Expand Down Expand Up @@ -319,7 +313,7 @@ def batch_loaded(self, epoch, step, block):
self.output[epoch]['load'][key].append(duration)
else:
self.output[epoch]['load'][key] = [duration]
logging.debug(f"{utcnow()} Rank {self.my_rank} step {step}: loaded {self.batch_size} samples in {duration} s")
logging.debug(f"{utcnow()} Rank {self.my_rank} step {step}: loaded {self.batch_size} samples in {duration:.4f} s")

def batch_processed(self, epoch, step, block):
current_time = time()
Expand All @@ -332,7 +326,7 @@ def batch_processed(self, epoch, step, block):
else:
self.output[epoch]['proc'] = [duration]
self.output[epoch]['compute']=[self.computation_time]
logging.info(f"{utcnow()} Rank {self.my_rank} step {step} processed {self.batch_size} samples in {duration}s)")
logging.info(f"{utcnow()} Rank {self.my_rank} step {step} processed {self.batch_size} samples in {duration:.4f}s)")

def compute_metrics_train(self, epoch, block):
key = f"block{block}"
Expand Down Expand Up @@ -361,15 +355,15 @@ def compute_metrics_eval(self, epoch):
def eval_batch_loaded(self, epoch, step):
duration = time() - self.start_time_loading
self.output[epoch]['load']['eval'].append(duration)
logging.debug(f"{utcnow()} Rank {self.my_rank} step {step} loaded {self.batch_size_eval} samples in {duration} s")
logging.debug(f"{utcnow()} Rank {self.my_rank} step {step} loaded {self.batch_size_eval} samples in {duration:.4f} s")

def eval_batch_processed(self, epoch, step):
current_time = time()
duration = current_time - self.start_time_loading
computation_time = current_time - self.start_time_compute
self.output[epoch]['proc']['eval'].append(duration)
self.output[epoch]['compute']['eval'].append(computation_time)
logging.info(f"{utcnow()} Rank {self.my_rank} step {step} processed {self.batch_size_eval} samples in {duration} s")
logging.info(f"{utcnow()} Rank {self.my_rank} step {step} processed {self.batch_size_eval} samples in {duration:.4f} s")
def finalize(self):
self.summary['end'] = utcnow()
def save_data(self):
Expand Down
18 changes: 9 additions & 9 deletions dlio_benchmark/utils/utility.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,19 @@
from dftracer.logger import dftracer as PerfTrace, dft_fn as Profile, DFTRACER_ENABLE as DFTRACER_ENABLE
except:
class Profile(object):
def __init__(self, **kwargs):
return
def log(self, **kwargs):
return
def log_init(self, **kwargs):
return
def iter(self, **kwargs):
def __init__(self, name=None, cat=None, **kwargs):
return
def log(self, func, **kwargs):
return func
def log_init(self, func, **kwargs):
return func
def iter(self, a, **kwargs):
return a
def __enter__(self):
return
def __exit__(self, **kwargs):
def __exit__(self, type, value, traceback, **kwargs):
return
def update(self, **kwargs):
def update(self, epoch = 0, step =0, size=0, default = None, **kwargs):
return
def flush(self):
return
Expand Down

0 comments on commit 5d4eb59

Please sign in to comment.