Skip to content

Commit

Permalink
Rename aio_thread_count to intra_op_parallelism (#7056)
Browse files Browse the repository at this point in the history
Propagate API change.

Signed-off-by: Olatunji Ruwase <olruwase@microsoft.com>
  • Loading branch information
tjruwase authored Feb 19, 2025
1 parent 33dd2e2 commit c9da489
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 29 deletions.
20 changes: 13 additions & 7 deletions deepspeed/runtime/swap_tensor/aio_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
AIO_DEFAULT_DICT = {
AIO_BLOCK_SIZE: AIO_BLOCK_SIZE_DEFAULT,
AIO_QUEUE_DEPTH: AIO_QUEUE_DEPTH_DEFAULT,
AIO_THREAD_COUNT: AIO_THREAD_COUNT_DEFAULT,
AIO_INTRA_OP_PARALLELISM: AIO_INTRA_OP_PARALLELISM_DEFAULT,
AIO_SINGLE_SUBMIT: AIO_SINGLE_SUBMIT_DEFAULT,
AIO_OVERLAP_EVENTS: AIO_OVERLAP_EVENTS_DEFAULT,
AIO_USE_GDS: AIO_USE_GDS_DEFAULT
Expand All @@ -21,12 +21,18 @@ def get_aio_config(param_dict):
if AIO in param_dict.keys() and param_dict[AIO] is not None:
aio_dict = param_dict[AIO]
aio_config = {
AIO_BLOCK_SIZE: get_scalar_param(aio_dict, AIO_BLOCK_SIZE, AIO_BLOCK_SIZE_DEFAULT),
AIO_QUEUE_DEPTH: get_scalar_param(aio_dict, AIO_QUEUE_DEPTH, AIO_QUEUE_DEPTH_DEFAULT),
AIO_THREAD_COUNT: get_scalar_param(aio_dict, AIO_THREAD_COUNT, AIO_THREAD_COUNT_DEFAULT),
AIO_SINGLE_SUBMIT: get_scalar_param(aio_dict, AIO_SINGLE_SUBMIT, AIO_SINGLE_SUBMIT_DEFAULT),
AIO_OVERLAP_EVENTS: get_scalar_param(aio_dict, AIO_OVERLAP_EVENTS, AIO_OVERLAP_EVENTS_DEFAULT),
AIO_USE_GDS: get_scalar_param(aio_dict, AIO_USE_GDS, AIO_USE_GDS_DEFAULT)
AIO_BLOCK_SIZE:
get_scalar_param(aio_dict, AIO_BLOCK_SIZE, AIO_BLOCK_SIZE_DEFAULT),
AIO_QUEUE_DEPTH:
get_scalar_param(aio_dict, AIO_QUEUE_DEPTH, AIO_QUEUE_DEPTH_DEFAULT),
AIO_INTRA_OP_PARALLELISM:
get_scalar_param(aio_dict, AIO_INTRA_OP_PARALLELISM, AIO_INTRA_OP_PARALLELISM_DEFAULT),
AIO_SINGLE_SUBMIT:
get_scalar_param(aio_dict, AIO_SINGLE_SUBMIT, AIO_SINGLE_SUBMIT_DEFAULT),
AIO_OVERLAP_EVENTS:
get_scalar_param(aio_dict, AIO_OVERLAP_EVENTS, AIO_OVERLAP_EVENTS_DEFAULT),
AIO_USE_GDS:
get_scalar_param(aio_dict, AIO_USE_GDS, AIO_USE_GDS_DEFAULT)
}

if aio_config[AIO_USE_GDS]:
Expand Down
6 changes: 3 additions & 3 deletions deepspeed/runtime/swap_tensor/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
"aio": {
"block_size": 1048576,
"queue_depth": 8,
"thread_count": 1,
"intra_op_parallelism": 1,
"single_submit": false,
"overlap_events": true,
"use_gds": false
Expand All @@ -20,8 +20,8 @@
AIO_BLOCK_SIZE_DEFAULT = 1048576
AIO_QUEUE_DEPTH = "queue_depth"
AIO_QUEUE_DEPTH_DEFAULT = 8
AIO_THREAD_COUNT = "thread_count"
AIO_THREAD_COUNT_DEFAULT = 1
AIO_INTRA_OP_PARALLELISM = "intra_op_parallelism"
AIO_INTRA_OP_PARALLELISM_DEFAULT = 1
AIO_SINGLE_SUBMIT = "single_submit"
AIO_SINGLE_SUBMIT_DEFAULT = False
AIO_OVERLAP_EVENTS = "overlap_events"
Expand Down
2 changes: 1 addition & 1 deletion deepspeed/runtime/swap_tensor/optimizer_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ def __init__(self, swap_config, aio_config, base_folder, optimizer, largest_nume

# Read/Write alignment for each thread during Intra-request parallelism
self.min_aio_bytes = max(MIN_AIO_BYTES, aio_config[AIO_BLOCK_SIZE])
self.aligned_bytes = AIO_ALIGNED_BYTES * aio_config[AIO_THREAD_COUNT]
self.aligned_bytes = AIO_ALIGNED_BYTES * aio_config[AIO_INTRA_OP_PARALLELISM]
self.numel_alignment = self.aligned_bytes // self.swap_element_size

# Swap buffer management
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@ def __init__(self, swap_config, aio_config, base_folder, optimizer, largest_nume
largest_numel, device, dtype, timers)

aio_op = AsyncIOBuilder().load()
self.aio_handle = aio_op.aio_handle(aio_config[AIO_BLOCK_SIZE], aio_config[AIO_QUEUE_DEPTH],
aio_config[AIO_SINGLE_SUBMIT], aio_config[AIO_OVERLAP_EVENTS],
aio_config[AIO_THREAD_COUNT])
self.aio_handle = aio_op.aio_handle(block_size=aio_config[AIO_BLOCK_SIZE],
queue_depth=aio_config[AIO_QUEUE_DEPTH],
single_submit=aio_config[AIO_SINGLE_SUBMIT],
overlap_events=aio_config[AIO_OVERLAP_EVENTS],
intra_op_parallelism=aio_config[AIO_INTRA_OP_PARALLELISM])

# Overlap swapping out
self.gradient_swapper = AsyncTensorSwapper(aio_handle=self.aio_handle,
Expand Down
20 changes: 12 additions & 8 deletions deepspeed/runtime/swap_tensor/partitioned_param_swapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def _configure_aio(self, ds_config):

# Read/Write alignment for each thread during Intra-request parallelism
self.min_aio_bytes = max(MIN_AIO_BYTES, self.aio_config[AIO_BLOCK_SIZE])
self.aligned_bytes = AIO_ALIGNED_BYTES * self.aio_config[AIO_THREAD_COUNT]
self.aligned_bytes = AIO_ALIGNED_BYTES * self.aio_config[AIO_INTRA_OP_PARALLELISM]
self.numel_alignment = self.aligned_bytes // self.swap_element_size

self.elements_per_buffer = self.swap_config.buffer_size
Expand All @@ -108,13 +108,17 @@ def _configure_aio(self, ds_config):
self.available_buffer_ids = [i for i in range(self.param_buffer_count)]
self.reserved_buffer_ids = []

self.aio_read_handle = self.aio_handle(self.aio_config[AIO_BLOCK_SIZE], self.aio_config[AIO_QUEUE_DEPTH],
self.aio_config[AIO_SINGLE_SUBMIT], self.aio_config[AIO_OVERLAP_EVENTS],
self.aio_config[AIO_THREAD_COUNT])

self.aio_write_handle = self.aio_handle(self.aio_config[AIO_BLOCK_SIZE], self.aio_config[AIO_QUEUE_DEPTH],
self.aio_config[AIO_SINGLE_SUBMIT],
self.aio_config[AIO_OVERLAP_EVENTS], self.aio_config[AIO_THREAD_COUNT])
self.aio_read_handle = self.aio_handle(block_size=self.aio_config[AIO_BLOCK_SIZE],
queue_depth=self.aio_config[AIO_QUEUE_DEPTH],
single_submit=self.aio_config[AIO_SINGLE_SUBMIT],
overlap_events=self.aio_config[AIO_OVERLAP_EVENTS],
intra_op_parallelism=self.aio_config[AIO_INTRA_OP_PARALLELISM])

self.aio_write_handle = self.aio_handle(block_size=self.aio_config[AIO_BLOCK_SIZE],
queue_depth=self.aio_config[AIO_QUEUE_DEPTH],
single_submit=self.aio_config[AIO_SINGLE_SUBMIT],
overlap_events=self.aio_config[AIO_OVERLAP_EVENTS],
intra_op_parallelism=self.aio_config[AIO_INTRA_OP_PARALLELISM])

if self.use_gds:
self.buffers = torch.empty(int(self.aligned_elements_per_buffer * self.param_buffer_count),
Expand Down
18 changes: 11 additions & 7 deletions deepspeed/runtime/swap_tensor/pipelined_optimizer_swapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,17 @@ def __init__(self, swap_config, aio_config, base_folder, optimizer, largest_nume
device, dtype, timers)

aio_op = AsyncIOBuilder().load()
self.write_aio_handle = aio_op.aio_handle(aio_config[AIO_BLOCK_SIZE], aio_config[AIO_QUEUE_DEPTH],
aio_config[AIO_SINGLE_SUBMIT], aio_config[AIO_OVERLAP_EVENTS],
aio_config[AIO_THREAD_COUNT])

self.read_aio_handle = aio_op.aio_handle(aio_config[AIO_BLOCK_SIZE], aio_config[AIO_QUEUE_DEPTH],
aio_config[AIO_SINGLE_SUBMIT], aio_config[AIO_OVERLAP_EVENTS],
aio_config[AIO_THREAD_COUNT])
self.write_aio_handle = aio_op.aio_handle(block_size=aio_config[AIO_BLOCK_SIZE],
queue_depth=aio_config[AIO_QUEUE_DEPTH],
single_submit=aio_config[AIO_SINGLE_SUBMIT],
overlap_events=aio_config[AIO_OVERLAP_EVENTS],
intra_op_parallelism=aio_config[AIO_INTRA_OP_PARALLELISM])

self.read_aio_handle = aio_op.aio_handle(block_size=aio_config[AIO_BLOCK_SIZE],
queue_depth=aio_config[AIO_QUEUE_DEPTH],
single_submit=aio_config[AIO_SINGLE_SUBMIT],
overlap_events=aio_config[AIO_OVERLAP_EVENTS],
intra_op_parallelism=aio_config[AIO_INTRA_OP_PARALLELISM])

# Overlap gradient swap out
self.gradient_swapper = AsyncTensorSwapper(aio_handle=self.write_aio_handle,
Expand Down

0 comments on commit c9da489

Please sign in to comment.