diff --git a/benchmarks/communication/all_gather.py b/benchmarks/communication/all_gather.py index 7a34c0d1c..8aa33581d 100644 --- a/benchmarks/communication/all_gather.py +++ b/benchmarks/communication/all_gather.py @@ -16,7 +16,7 @@ # Run all_gather and print metrics -def timed_all_gather(input, output, args): +def timed_all_gather(input, output, start_event, end_event, args): if args.dist == 'torch': import torch.distributed as dist @@ -33,11 +33,12 @@ def timed_all_gather(input, output, args): sync_all() # time the actual comm op trials times and average it - pre = time.perf_counter() + start_event.record() for i in range(args.trials): all_gather_func(output, input, group=None, async_op=args.async_op) + end_event.record() sync_all() - duration = time.perf_counter() - pre + duration = start_event.elapsed_time(end_event) / 1000 # maintain and clean performance data avg_duration = duration / args.trials @@ -63,6 +64,9 @@ def run_all_gather(local_rank, args): global_rank = dist.get_rank() world_size = dist.get_world_size() + start_event = torch.cuda.Event(enable_timing=True) + end_event = torch.cuda.Event(enable_timing=True) + if args.scan: # Create list of message sizes M_LIST = [] @@ -92,7 +96,7 @@ def run_all_gather(local_rank, args): else: raise e sync_all() - timed_all_gather(input, output, args) + timed_all_gather(input, output, start_event, end_event, args) else: # all_gather_into_tensor saves memory if ((args.dist == 'torch' or args.dist == 'deepspeed') and dist.has_all_gather_into_tensor()): @@ -126,7 +130,7 @@ def run_all_gather(local_rank, args): raise e sync_all() - timed_all_gather(input, output, args) + timed_all_gather(input, output, start_event, end_event, args) if __name__ == "__main__": diff --git a/benchmarks/communication/all_reduce.py b/benchmarks/communication/all_reduce.py index a474a704f..b9decfd98 100644 --- a/benchmarks/communication/all_reduce.py +++ b/benchmarks/communication/all_reduce.py @@ -14,7 +14,7 @@ from deepspeed.accelerator import get_accelerator -def timed_all_reduce(input, args): +def timed_all_reduce(input, start_event, end_event, args): if args.dist == 'torch': import torch.distributed as dist elif args.dist == 'deepspeed': @@ -27,11 +27,12 @@ def timed_all_reduce(input, args): sync_all() # time the actual comm op trials times and average it - pre = time.perf_counter() + start_event.record() for i in range(args.trials): dist.all_reduce(input, async_op=args.async_op) + end_event.record() sync_all() - duration = time.perf_counter() - pre + duration = start_event.elapsed_time(end_event) / 1000 # maintain and clean performance data avg_duration = duration / args.trials @@ -59,6 +60,9 @@ def run_all_reduce(local_rank, args): world_size = dist.get_world_size() global_rank = dist.get_rank() + start_event = torch.cuda.Event(enable_timing=True) + end_event = torch.cuda.Event(enable_timing=True) + if args.scan: M_LIST = [] for x in (2**p for p in range(1, args.maxsize)): @@ -82,7 +86,7 @@ def run_all_reduce(local_rank, args): else: raise e sync_all() - timed_all_reduce(input, args) + timed_all_reduce(input, start_event, end_event, args) else: # Send the biggest message size our GPUs can fit. If you're facing OOM errors, reduce the mem_factor # Don't need output tensor, so we double mem_factor @@ -104,7 +108,7 @@ def run_all_reduce(local_rank, args): else: raise e sync_all() - timed_all_reduce(input, args) + timed_all_reduce(input, start_event, end_event, args) if __name__ == "__main__": diff --git a/benchmarks/communication/all_to_all.py b/benchmarks/communication/all_to_all.py index 8735b1b4a..7eccfa824 100644 --- a/benchmarks/communication/all_to_all.py +++ b/benchmarks/communication/all_to_all.py @@ -14,7 +14,7 @@ from deepspeed.accelerator import get_accelerator -def timed_all_to_all(input, output, args): +def timed_all_to_all(input, output, start_event, end_event, args): if args.dist == 'torch': import torch.distributed as dist elif args.dist == 'deepspeed': @@ -27,11 +27,12 @@ def timed_all_to_all(input, output, args): sync_all() # time the actual comm op trials times and average it - pre = time.perf_counter() + start_event.record() for i in range(args.trials): dist.all_to_all_single(output, input, async_op=args.async_op) + end_event.record() sync_all() - duration = time.perf_counter() - pre + duration = start_event.elapsed_time(end_event) / 1000 # maintain and clean performance data avg_duration = duration / args.trials @@ -58,6 +59,9 @@ def run_all_to_all(local_rank, args): # Prepare benchmark header print_header(args, 'all_to_all') + start_event = torch.cuda.Event(enable_timing=True) + end_event = torch.cuda.Event(enable_timing=True) + if args.scan: M_LIST = [] for x in (2**p for p in range(1, args.maxsize)): @@ -83,7 +87,7 @@ def run_all_to_all(local_rank, args): else: raise e sync_all() - timed_all_to_all(input, output, args) + timed_all_to_all(input, output, start_event, end_event, args) else: # Send the biggest message size our GPUs can fit. If you're facing OOM errors, reduce the mem_factor elements_per_gpu = max_numel(comm_op='all_to_all', @@ -118,7 +122,7 @@ def run_all_to_all(local_rank, args): print(f"Before AllToAll Input List at rank {global_rank}: {input}") dist.barrier() - timed_all_to_all(input, output, args) + timed_all_to_all(input, output, start_event, end_event, args) if args.debug: for i in range(world_size): diff --git a/benchmarks/communication/broadcast.py b/benchmarks/communication/broadcast.py index 551c71f94..860c9555b 100644 --- a/benchmarks/communication/broadcast.py +++ b/benchmarks/communication/broadcast.py @@ -14,7 +14,7 @@ from deepspeed.accelerator import get_accelerator -def timed_broadcast(input, args): +def timed_broadcast(input, start_event, end_event, args): if args.dist == 'torch': import torch.distributed as dist elif args.dist == 'deepspeed': @@ -27,11 +27,12 @@ def timed_broadcast(input, args): sync_all() # time the actual comm op trials times and average it - pre = time.perf_counter() + start_event.record() for i in range(args.trials): dist.broadcast(input, 0, async_op=args.async_op) + end_event.record() sync_all() - duration = time.perf_counter() - pre + duration = start_event.elapsed_time(end_event) / 1000 # maintain and clean performance data avg_duration = duration / args.trials @@ -59,6 +60,9 @@ def run_broadcast(local_rank, args): world_size = dist.get_world_size() global_rank = dist.get_rank() + start_event = torch.cuda.Event(enable_timing=True) + end_event = torch.cuda.Event(enable_timing=True) + if args.scan: M_LIST = [] for x in (2**p for p in range(1, args.maxsize)): @@ -82,7 +86,7 @@ def run_broadcast(local_rank, args): else: raise e sync_all() - timed_broadcast(input, args) + timed_broadcast(input, start_event, end_event, args) else: # Send the biggest message size our GPUs can fit. If you're facing OOM errors, reduce the mem_factor # Don't need output tensor, so we double mem_factor @@ -102,7 +106,7 @@ def run_broadcast(local_rank, args): sync_all() return sync_all() - timed_broadcast(input, args) + timed_broadcast(input, start_event, end_event, args) if __name__ == "__main__": diff --git a/benchmarks/communication/pt2pt.py b/benchmarks/communication/pt2pt.py index 31028f99e..57eab9a66 100644 --- a/benchmarks/communication/pt2pt.py +++ b/benchmarks/communication/pt2pt.py @@ -14,7 +14,7 @@ from deepspeed.accelerator import get_accelerator -def timed_pt2pt(input, args): +def timed_pt2pt(input, start_event, end_event, args): if args.dist == 'torch': import torch.distributed as dist elif args.dist == 'deepspeed': @@ -36,7 +36,7 @@ def timed_pt2pt(input, args): sync_all() # time the actual comm op trials times and average it - pre = time.perf_counter() + start_event.record() for i in range(args.trials): if dist.get_rank() == 0: if args.async_op: @@ -49,8 +49,9 @@ def timed_pt2pt(input, args): else: dist.recv(input, src=0) + end_event.record() sync_all() - duration = time.perf_counter() - pre + duration = start_event.elapsed_time(end_event) / 1000 # maintain and clean performance data avg_duration = duration / args.trials @@ -77,6 +78,9 @@ def run_pt2pt(local_rank, args): global_rank = dist.get_rank() world_size = dist.get_world_size() + start_event = torch.cuda.Event(enable_timing=True) + end_event = torch.cuda.Event(enable_timing=True) + if args.scan: # Create list of message sizes M_LIST = [] @@ -101,7 +105,7 @@ def run_pt2pt(local_rank, args): else: raise e sync_all() - timed_pt2pt(input, args) + timed_pt2pt(input, start_event, end_event, args) else: # Send the biggest message size our GPUs can fit. If you're facing OOM errors, reduce the mem_factor # Don't need output tensor, so double mem_factor @@ -121,7 +125,7 @@ def run_pt2pt(local_rank, args): sync_all() return sync_all() - timed_pt2pt(input, args) + timed_pt2pt(input, start_event, end_event, args) if __name__ == "__main__":