Skip to content

Commit a1f98bd

Browse files
jomayeritjruwase
andauthored
AIO CPU Locked Tensor (#6592)
Restoring the functionality of the cpu locked tensor in the AIO library. Make async_io operator available for CPU accelerator, i.e., CPU only environment. --------- Co-authored-by: Olatunji Ruwase <olruwase@microsoft.com>
1 parent 7d751ee commit a1f98bd

29 files changed

+362
-196
lines changed

accelerator/cpu_accelerator.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -301,9 +301,9 @@ def get_op_builder(self, class_name):
301301
# is op_builder from deepspeed or a 3p version? this should only succeed if it's deepspeed
302302
# if successful this also means we're doing a local install and not JIT compile path
303303
from op_builder import __deepspeed__ # noqa: F401 # type: ignore
304-
from op_builder.cpu import CCLCommBuilder, ShareMemCommBuilder, FusedAdamBuilder, CPUAdamBuilder, NotImplementedBuilder
304+
from op_builder.cpu import AsyncIOBuilder, CCLCommBuilder, ShareMemCommBuilder, FusedAdamBuilder, CPUAdamBuilder, NotImplementedBuilder
305305
except ImportError:
306-
from deepspeed.ops.op_builder.cpu import CCLCommBuilder, ShareMemCommBuilder, FusedAdamBuilder, CPUAdamBuilder, NotImplementedBuilder
306+
from deepspeed.ops.op_builder.cpu import AsyncIOBuilder, CCLCommBuilder, ShareMemCommBuilder, FusedAdamBuilder, CPUAdamBuilder, NotImplementedBuilder
307307

308308
if class_name == "CCLCommBuilder":
309309
return CCLCommBuilder
@@ -313,6 +313,8 @@ def get_op_builder(self, class_name):
313313
return FusedAdamBuilder
314314
elif class_name == "CPUAdamBuilder":
315315
return CPUAdamBuilder
316+
elif class_name == "AsyncIOBuilder":
317+
return AsyncIOBuilder
316318
else:
317319
# return a NotImplementedBuilder to avoid get NoneType[Name] in unit tests
318320
return NotImplementedBuilder

csrc/aio/common/deepspeed_aio_common.cpp

Lines changed: 21 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,8 @@ static void _get_aio_latencies(std::vector<std::chrono::duration<double>>& raw_l
6868
std::accumulate(lat_usec.begin(), lat_usec.end(), 0) / lat_usec.size();
6969
}
7070

71-
static void _do_io_submit_singles(const long long int n_iocbs,
72-
const long long int iocb_index,
71+
static void _do_io_submit_singles(const int64_t n_iocbs,
72+
const int64_t iocb_index,
7373
std::unique_ptr<aio_context>& aio_ctxt,
7474
std::vector<std::chrono::duration<double>>& submit_times)
7575
{
@@ -89,8 +89,8 @@ static void _do_io_submit_singles(const long long int n_iocbs,
8989
}
9090
}
9191

92-
static void _do_io_submit_block(const long long int n_iocbs,
93-
const long long int iocb_index,
92+
static void _do_io_submit_block(const int64_t n_iocbs,
93+
const int64_t iocb_index,
9494
std::unique_ptr<aio_context>& aio_ctxt,
9595
std::vector<std::chrono::duration<double>>& submit_times)
9696
{
@@ -109,18 +109,18 @@ static void _do_io_submit_block(const long long int n_iocbs,
109109
assert(submit_ret > 0);
110110
}
111111

112-
static int _do_io_complete(const long long int min_completes,
113-
const long long int max_completes,
112+
static int _do_io_complete(const int64_t min_completes,
113+
const int64_t max_completes,
114114
std::unique_ptr<aio_context>& aio_ctxt,
115115
std::vector<std::chrono::duration<double>>& reap_times)
116116
{
117117
const auto start_time = std::chrono::high_resolution_clock::now();
118-
long long int n_completes = io_pgetevents(aio_ctxt->_io_ctxt,
119-
min_completes,
120-
max_completes,
121-
aio_ctxt->_io_events.data(),
122-
nullptr,
123-
nullptr);
118+
int64_t n_completes = io_pgetevents(aio_ctxt->_io_ctxt,
119+
min_completes,
120+
max_completes,
121+
aio_ctxt->_io_events.data(),
122+
nullptr,
123+
nullptr);
124124
reap_times.push_back(std::chrono::high_resolution_clock::now() - start_time);
125125
assert(n_completes >= min_completes);
126126
return n_completes;
@@ -134,7 +134,7 @@ void do_aio_operation_sequential(const bool read_op,
134134
{
135135
struct io_prep_context prep_ctxt(read_op, xfer_ctxt, aio_ctxt->_block_size, &aio_ctxt->_iocbs);
136136

137-
const auto num_io_blocks = static_cast<long long int>(
137+
const auto num_io_blocks = static_cast<int64_t>(
138138
ceil(static_cast<double>(xfer_ctxt->_num_bytes) / aio_ctxt->_block_size));
139139
#if DEBUG_DS_AIO_PERF
140140
const auto io_op_name = std::string(read_op ? "read" : "write");
@@ -145,15 +145,14 @@ void do_aio_operation_sequential(const bool read_op,
145145
std::vector<std::chrono::duration<double>> submit_times;
146146
std::vector<std::chrono::duration<double>> reap_times;
147147
const auto max_queue_bytes =
148-
static_cast<long long int>(aio_ctxt->_queue_depth * aio_ctxt->_block_size);
148+
static_cast<int64_t>(aio_ctxt->_queue_depth * aio_ctxt->_block_size);
149149

150150
auto start = std::chrono::high_resolution_clock::now();
151-
for (long long iocb_index = 0; iocb_index < num_io_blocks;
152-
iocb_index += aio_ctxt->_queue_depth) {
151+
for (int64_t iocb_index = 0; iocb_index < num_io_blocks; iocb_index += aio_ctxt->_queue_depth) {
153152
const auto start_offset = iocb_index * aio_ctxt->_block_size;
154153
const auto start_buffer = (char*)xfer_ctxt->_mem_buffer + start_offset;
155154
const auto n_iocbs =
156-
min(static_cast<long long>(aio_ctxt->_queue_depth), (num_io_blocks - iocb_index));
155+
min(static_cast<int64_t>(aio_ctxt->_queue_depth), (num_io_blocks - iocb_index));
157156
const auto num_bytes = min(max_queue_bytes, (xfer_ctxt->_num_bytes - start_offset));
158157
prep_ctxt.prep_iocbs(n_iocbs, num_bytes, start_buffer, start_offset);
159158

@@ -285,13 +284,13 @@ int open_file(const char* filename, const bool read_op)
285284

286285
int regular_read(const char* filename, std::vector<char>& buffer)
287286
{
288-
long long int num_bytes;
287+
int64_t num_bytes;
289288
const auto f_size = get_file_size(filename, num_bytes);
290289
assert(f_size != -1);
291290
buffer.resize(num_bytes);
292291
const auto fd = open(filename, O_RDONLY, 0600);
293292
assert(fd != -1);
294-
long long int read_bytes = 0;
293+
int64_t read_bytes = 0;
295294
auto r = 0;
296295
do {
297296
const auto buffer_ptr = buffer.data() + read_bytes;
@@ -309,23 +308,23 @@ int regular_read(const char* filename, std::vector<char>& buffer)
309308
return 0;
310309
}
311310

312-
static bool _validate_buffer(const char* filename, void* aio_buffer, const long long int num_bytes)
311+
static bool _validate_buffer(const char* filename, void* aio_buffer, const int64_t num_bytes)
313312
{
314313
std::vector<char> regular_buffer;
315314
const auto reg_ret = regular_read(filename, regular_buffer);
316315
assert(0 == reg_ret);
317316
std::cout << "regular read of " << filename << " returned " << regular_buffer.size() << " bytes"
318317
<< std::endl;
319318

320-
if (static_cast<long long int>(regular_buffer.size()) != num_bytes) { return false; }
319+
if (static_cast<int64_t>(regular_buffer.size()) != num_bytes) { return false; }
321320

322321
return (0 == memcmp(aio_buffer, regular_buffer.data(), regular_buffer.size()));
323322
}
324323

325324
bool validate_aio_operation(const bool read_op,
326325
const char* filename,
327326
void* aio_buffer,
328-
const long long int num_bytes)
327+
const int64_t num_bytes)
329328
{
330329
const auto msg_suffix = std::string("deepspeed_aio_") +
331330
std::string(read_op ? "read()" : "write()") +

csrc/aio/common/deepspeed_aio_common.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,4 +35,4 @@ int regular_read(const char* filename, std::vector<char>& buffer);
3535
bool validate_aio_operation(const bool read_op,
3636
const char* filename,
3737
void* aio_buffer,
38-
const long long int num_bytes);
38+
const int64_t num_bytes);

csrc/aio/common/deepspeed_aio_utils.cpp

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ const int c_block_size = 128 * 1024;
1818
const int c_io_queue_depth = 8;
1919

2020
io_xfer_ctxt::io_xfer_ctxt(const int fd,
21-
const long long int file_offset,
22-
const long long int num_bytes,
21+
const int64_t file_offset,
22+
const int64_t num_bytes,
2323
const void* buffer)
2424
: _fd(fd), _base_offset(file_offset), _mem_buffer(buffer), _num_bytes(num_bytes)
2525
{
@@ -36,7 +36,7 @@ io_prep_context::io_prep_context(const bool read_op,
3636
void io_prep_context::prep_iocbs(const int n_iocbs,
3737
const size_t num_bytes,
3838
const void* start_buffer,
39-
const long long int start_offset)
39+
const int64_t start_offset)
4040
{
4141
assert(static_cast<size_t>(n_iocbs) <= _iocbs->size());
4242
for (auto i = 0; i < n_iocbs; ++i) {
@@ -64,24 +64,24 @@ io_prep_generator::io_prep_generator(const bool read_op,
6464
_next_iocb_index(0)
6565
{
6666
_num_io_blocks =
67-
static_cast<long long int>(ceil(static_cast<double>(xfer_ctxt->_num_bytes) / block_size));
67+
static_cast<int64_t>(ceil(static_cast<double>(xfer_ctxt->_num_bytes) / block_size));
6868
_remaining_io_blocks = _num_io_blocks;
6969
}
7070

7171
int io_prep_generator::prep_iocbs(const int n_iocbs, std::vector<struct iocb*>* iocbs)
7272
{
7373
if ((_remaining_bytes) == 0 || (_remaining_io_blocks == 0)) {
74-
assert(static_cast<long long int>(_remaining_bytes) == _remaining_io_blocks);
74+
assert(static_cast<int64_t>(_remaining_bytes) == _remaining_io_blocks);
7575
return 0;
7676
}
7777

7878
assert(static_cast<size_t>(n_iocbs) <= iocbs->size());
7979

80-
auto actual_n_iocbs = min(static_cast<long long int>(n_iocbs), _remaining_io_blocks);
80+
auto actual_n_iocbs = min(static_cast<int64_t>(n_iocbs), _remaining_io_blocks);
8181
for (auto i = 0; i < actual_n_iocbs; ++i, ++_next_iocb_index) {
8282
const auto xfer_offset = _xfer_ctxt->_base_offset + (_next_iocb_index * _block_size);
8383
const auto xfer_buffer = (char*)_xfer_ctxt->_mem_buffer + xfer_offset;
84-
const auto num_bytes = min(static_cast<long long int>(_block_size), _remaining_bytes);
84+
const auto num_bytes = min(static_cast<int64_t>(_block_size), _remaining_bytes);
8585

8686
if (_read_op) {
8787
io_prep_pread(iocbs->at(i), _xfer_ctxt->_fd, xfer_buffer, num_bytes, xfer_offset);
@@ -95,15 +95,15 @@ int io_prep_generator::prep_iocbs(const int n_iocbs, std::vector<struct iocb*>*
9595
return actual_n_iocbs;
9696
}
9797

98-
int get_file_size(const char* filename, long long int& size)
98+
int get_file_size(const char* filename, int64_t& size)
9999
{
100100
struct stat st;
101101
if (stat(filename, &st) == -1) { return -1; }
102102
size = st.st_size;
103103
return 0;
104104
}
105105

106-
void* ds_page_aligned_alloc(const size_t size, const bool lock)
106+
void* ds_page_aligned_alloc(const int64_t size, const bool lock)
107107
{
108108
void* ptr;
109109
int retval;

csrc/aio/common/deepspeed_aio_utils.h

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,13 @@ Functionality for swapping optimizer tensors to/from (NVMe) storage devices.
3030

3131
struct io_xfer_ctxt {
3232
const int _fd;
33-
const long long int _base_offset;
33+
const int64_t _base_offset;
3434
const void* _mem_buffer;
35-
const long long int _num_bytes;
35+
const int64_t _num_bytes;
3636

3737
io_xfer_ctxt(const int fd,
38-
const long long int file_offset,
39-
const long long int num_bytes,
38+
const int64_t file_offset,
39+
const int64_t num_bytes,
4040
const void* buffer);
4141
};
4242

@@ -54,18 +54,18 @@ struct io_prep_context {
5454
void prep_iocbs(const int n_iocbs,
5555
const size_t num_bytes,
5656
const void* start_buffer,
57-
const long long int start_offset);
57+
const int64_t start_offset);
5858
};
5959

6060
struct io_prep_generator {
6161
const bool _read_op;
6262
const std::unique_ptr<io_xfer_ctxt>& _xfer_ctxt;
6363
const size_t _block_size;
6464

65-
long long int _remaining_bytes;
66-
long long int _num_io_blocks;
67-
long long int _remaining_io_blocks;
68-
long long int _next_iocb_index;
65+
int64_t _remaining_bytes;
66+
int64_t _num_io_blocks;
67+
int64_t _remaining_io_blocks;
68+
int64_t _next_iocb_index;
6969

7070
io_prep_generator(const bool read_op,
7171
const std::unique_ptr<io_xfer_ctxt>& xfer_ctxt,
@@ -74,6 +74,6 @@ struct io_prep_generator {
7474
int prep_iocbs(const int n_iocbs, std::vector<struct iocb*>* iocbs);
7575
};
7676

77-
void* ds_page_aligned_alloc(const size_t size, const bool lock = false);
77+
void* ds_page_aligned_alloc(const int64_t size, const bool lock = false);
7878

79-
int get_file_size(const char* filename, long long int& size);
79+
int get_file_size(const char* filename, int64_t& size);

csrc/aio/py_lib/deepspeed_aio_op_desc.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,16 @@ io_op_desc_t::io_op_desc_t(const bool read_op,
1111
const torch::Tensor& buffer,
1212
const int fd,
1313
const char* filename,
14-
const long long int file_num_bytes,
15-
const int num_threads,
14+
const int64_t file_num_bytes,
15+
const int intra_op_parallelism,
1616
const bool validate)
1717
: _read_op(read_op),
1818
_buffer(buffer),
1919
_fd(fd),
2020
_filename(filename),
2121
_file_num_bytes(file_num_bytes),
22-
_num_threads(num_threads),
23-
_num_bytes_per_thread(file_num_bytes / num_threads),
22+
_intra_op_parallelism(intra_op_parallelism),
23+
_num_bytes_per_thread(file_num_bytes / intra_op_parallelism),
2424
_validate(validate)
2525
{
2626
}

csrc/aio/py_lib/deepspeed_aio_op_desc.h

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,18 +14,18 @@ struct io_op_desc_t {
1414
torch::Tensor _buffer;
1515
int _fd;
1616
const std::string _filename;
17-
const long long int _file_num_bytes;
18-
const int _num_threads;
19-
const long long int _num_bytes_per_thread;
17+
const int64_t _file_num_bytes;
18+
const int _intra_op_parallelism;
19+
const int64_t _num_bytes_per_thread;
2020
torch::Tensor _contiguous_buffer;
2121
const bool _validate;
2222

2323
io_op_desc_t(const bool read_op,
2424
const torch::Tensor& buffer,
2525
const int fd,
2626
const char* filename,
27-
const long long int file_num_bytes,
28-
const int num_threads,
27+
const int64_t file_num_bytes,
28+
const int intra_op_parallelism,
2929
const bool validate);
3030

3131
virtual void run(const int tid,

0 commit comments

Comments
 (0)