Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
1413d69
fixed functionality of cpu locked tensor
jomayeri Sep 30, 2024
87a7c69
Merge branch 'master' into jomayeri/aio-locked-tensor
jomayeri Sep 30, 2024
c13bb10
enabling cpu locked in unittests, and fixing compilation errors
jomayeri Oct 1, 2024
248cec4
Merge branch 'jomayeri/aio-locked-tensor' of github.com:microsoft/Dee…
jomayeri Oct 1, 2024
b909702
passing gds tests
jomayeri Oct 2, 2024
b1ee711
renaming all instances of num_threads
jomayeri Oct 2, 2024
ada1b83
updating function names to match
jomayeri Oct 2, 2024
1cb88ce
fix formatting
jomayeri Oct 2, 2024
f5528da
variable name change to fix compilation
jomayeri Oct 2, 2024
f576d29
formatting
jomayeri Oct 2, 2024
5a47bf3
update references in tutorial
jomayeri Oct 3, 2024
fe93fdc
Merge branch 'master' into jomayeri/aio-locked-tensor
jomayeri Oct 7, 2024
b2866cb
Merge branch 'master' into jomayeri/aio-locked-tensor
tjruwase Oct 7, 2024
9c93d2c
Merge branch 'master' into jomayeri/aio-locked-tensor
tjruwase Oct 8, 2024
884c0fd
async_io operator for CPU accelerator
tjruwase Oct 9, 2024
a5ba643
Merge branch 'jomayeri/aio-locked-tensor' of github.com:microsoft/Dee…
tjruwase Oct 9, 2024
ea0e45b
Merge branch 'master' into jomayeri/aio-locked-tensor
tjruwase Oct 9, 2024
98988cd
Formatting; Use int64_t
tjruwase Oct 9, 2024
b30cda5
Merge branch 'jomayeri/aio-locked-tensor' of github.com:microsoft/Dee…
tjruwase Oct 9, 2024
90e25da
Skip fp16 tests on CPU
tjruwase Oct 9, 2024
60ae3e0
Merge branch 'master' into jomayeri/aio-locked-tensor
tjruwase Oct 9, 2024
a008d4c
Merge branch 'master' into jomayeri/aio-locked-tensor
tjruwase Oct 9, 2024
59d8dfa
Merge branch 'master' into jomayeri/aio-locked-tensor
tjruwase Oct 9, 2024
8a52388
Add Cuda 12.6
tjruwase Oct 9, 2024
d1afe4c
Merge branch 'jomayeri/aio-locked-tensor' of github.com:microsoft/Dee…
tjruwase Oct 9, 2024
0182208
Merge branch 'master' into jomayeri/aio-locked-tensor
tjruwase Oct 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions accelerator/cpu_accelerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,9 +301,9 @@ def get_op_builder(self, class_name):
# is op_builder from deepspeed or a 3p version? this should only succeed if it's deepspeed
# if successful this also means we're doing a local install and not JIT compile path
from op_builder import __deepspeed__ # noqa: F401 # type: ignore
from op_builder.cpu import CCLCommBuilder, ShareMemCommBuilder, FusedAdamBuilder, CPUAdamBuilder, NotImplementedBuilder
from op_builder.cpu import AsyncIOBuilder, CCLCommBuilder, ShareMemCommBuilder, FusedAdamBuilder, CPUAdamBuilder, NotImplementedBuilder
except ImportError:
from deepspeed.ops.op_builder.cpu import CCLCommBuilder, ShareMemCommBuilder, FusedAdamBuilder, CPUAdamBuilder, NotImplementedBuilder
from deepspeed.ops.op_builder.cpu import AsyncIOBuilder, CCLCommBuilder, ShareMemCommBuilder, FusedAdamBuilder, CPUAdamBuilder, NotImplementedBuilder

if class_name == "CCLCommBuilder":
return CCLCommBuilder
Expand All @@ -313,6 +313,8 @@ def get_op_builder(self, class_name):
return FusedAdamBuilder
elif class_name == "CPUAdamBuilder":
return CPUAdamBuilder
elif class_name == "AsyncIOBuilder":
return AsyncIOBuilder
else:
# return a NotImplementedBuilder to avoid get NoneType[Name] in unit tests
return NotImplementedBuilder
Expand Down
43 changes: 21 additions & 22 deletions csrc/aio/common/deepspeed_aio_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ static void _get_aio_latencies(std::vector<std::chrono::duration<double>>& raw_l
std::accumulate(lat_usec.begin(), lat_usec.end(), 0) / lat_usec.size();
}

static void _do_io_submit_singles(const long long int n_iocbs,
const long long int iocb_index,
static void _do_io_submit_singles(const int64_t n_iocbs,
const int64_t iocb_index,
std::unique_ptr<aio_context>& aio_ctxt,
std::vector<std::chrono::duration<double>>& submit_times)
{
Expand All @@ -89,8 +89,8 @@ static void _do_io_submit_singles(const long long int n_iocbs,
}
}

static void _do_io_submit_block(const long long int n_iocbs,
const long long int iocb_index,
static void _do_io_submit_block(const int64_t n_iocbs,
const int64_t iocb_index,
std::unique_ptr<aio_context>& aio_ctxt,
std::vector<std::chrono::duration<double>>& submit_times)
{
Expand All @@ -109,18 +109,18 @@ static void _do_io_submit_block(const long long int n_iocbs,
assert(submit_ret > 0);
}

static int _do_io_complete(const long long int min_completes,
const long long int max_completes,
static int _do_io_complete(const int64_t min_completes,
const int64_t max_completes,
std::unique_ptr<aio_context>& aio_ctxt,
std::vector<std::chrono::duration<double>>& reap_times)
{
const auto start_time = std::chrono::high_resolution_clock::now();
long long int n_completes = io_pgetevents(aio_ctxt->_io_ctxt,
min_completes,
max_completes,
aio_ctxt->_io_events.data(),
nullptr,
nullptr);
int64_t n_completes = io_pgetevents(aio_ctxt->_io_ctxt,
min_completes,
max_completes,
aio_ctxt->_io_events.data(),
nullptr,
nullptr);
reap_times.push_back(std::chrono::high_resolution_clock::now() - start_time);
assert(n_completes >= min_completes);
return n_completes;
Expand All @@ -134,7 +134,7 @@ void do_aio_operation_sequential(const bool read_op,
{
struct io_prep_context prep_ctxt(read_op, xfer_ctxt, aio_ctxt->_block_size, &aio_ctxt->_iocbs);

const auto num_io_blocks = static_cast<long long int>(
const auto num_io_blocks = static_cast<int64_t>(
ceil(static_cast<double>(xfer_ctxt->_num_bytes) / aio_ctxt->_block_size));
#if DEBUG_DS_AIO_PERF
const auto io_op_name = std::string(read_op ? "read" : "write");
Expand All @@ -145,15 +145,14 @@ void do_aio_operation_sequential(const bool read_op,
std::vector<std::chrono::duration<double>> submit_times;
std::vector<std::chrono::duration<double>> reap_times;
const auto max_queue_bytes =
static_cast<long long int>(aio_ctxt->_queue_depth * aio_ctxt->_block_size);
static_cast<int64_t>(aio_ctxt->_queue_depth * aio_ctxt->_block_size);

auto start = std::chrono::high_resolution_clock::now();
for (long long iocb_index = 0; iocb_index < num_io_blocks;
iocb_index += aio_ctxt->_queue_depth) {
for (int64_t iocb_index = 0; iocb_index < num_io_blocks; iocb_index += aio_ctxt->_queue_depth) {
const auto start_offset = iocb_index * aio_ctxt->_block_size;
const auto start_buffer = (char*)xfer_ctxt->_mem_buffer + start_offset;
const auto n_iocbs =
min(static_cast<long long>(aio_ctxt->_queue_depth), (num_io_blocks - iocb_index));
min(static_cast<int64_t>(aio_ctxt->_queue_depth), (num_io_blocks - iocb_index));
const auto num_bytes = min(max_queue_bytes, (xfer_ctxt->_num_bytes - start_offset));
prep_ctxt.prep_iocbs(n_iocbs, num_bytes, start_buffer, start_offset);

Expand Down Expand Up @@ -285,13 +284,13 @@ int open_file(const char* filename, const bool read_op)

int regular_read(const char* filename, std::vector<char>& buffer)
{
long long int num_bytes;
int64_t num_bytes;
const auto f_size = get_file_size(filename, num_bytes);
assert(f_size != -1);
buffer.resize(num_bytes);
const auto fd = open(filename, O_RDONLY, 0600);
assert(fd != -1);
long long int read_bytes = 0;
int64_t read_bytes = 0;
auto r = 0;
do {
const auto buffer_ptr = buffer.data() + read_bytes;
Expand All @@ -309,23 +308,23 @@ int regular_read(const char* filename, std::vector<char>& buffer)
return 0;
}

static bool _validate_buffer(const char* filename, void* aio_buffer, const long long int num_bytes)
static bool _validate_buffer(const char* filename, void* aio_buffer, const int64_t num_bytes)
{
std::vector<char> regular_buffer;
const auto reg_ret = regular_read(filename, regular_buffer);
assert(0 == reg_ret);
std::cout << "regular read of " << filename << " returned " << regular_buffer.size() << " bytes"
<< std::endl;

if (static_cast<long long int>(regular_buffer.size()) != num_bytes) { return false; }
if (static_cast<int64_t>(regular_buffer.size()) != num_bytes) { return false; }

return (0 == memcmp(aio_buffer, regular_buffer.data(), regular_buffer.size()));
}

bool validate_aio_operation(const bool read_op,
const char* filename,
void* aio_buffer,
const long long int num_bytes)
const int64_t num_bytes)
{
const auto msg_suffix = std::string("deepspeed_aio_") +
std::string(read_op ? "read()" : "write()") +
Expand Down
2 changes: 1 addition & 1 deletion csrc/aio/common/deepspeed_aio_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,4 @@ int regular_read(const char* filename, std::vector<char>& buffer);
bool validate_aio_operation(const bool read_op,
const char* filename,
void* aio_buffer,
const long long int num_bytes);
const int64_t num_bytes);
18 changes: 9 additions & 9 deletions csrc/aio/common/deepspeed_aio_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ const int c_block_size = 128 * 1024;
const int c_io_queue_depth = 8;

io_xfer_ctxt::io_xfer_ctxt(const int fd,
const long long int file_offset,
const long long int num_bytes,
const int64_t file_offset,
const int64_t num_bytes,
const void* buffer)
: _fd(fd), _base_offset(file_offset), _mem_buffer(buffer), _num_bytes(num_bytes)
{
Expand All @@ -36,7 +36,7 @@ io_prep_context::io_prep_context(const bool read_op,
void io_prep_context::prep_iocbs(const int n_iocbs,
const size_t num_bytes,
const void* start_buffer,
const long long int start_offset)
const int64_t start_offset)
{
assert(static_cast<size_t>(n_iocbs) <= _iocbs->size());
for (auto i = 0; i < n_iocbs; ++i) {
Expand Down Expand Up @@ -64,24 +64,24 @@ io_prep_generator::io_prep_generator(const bool read_op,
_next_iocb_index(0)
{
_num_io_blocks =
static_cast<long long int>(ceil(static_cast<double>(xfer_ctxt->_num_bytes) / block_size));
static_cast<int64_t>(ceil(static_cast<double>(xfer_ctxt->_num_bytes) / block_size));
_remaining_io_blocks = _num_io_blocks;
}

int io_prep_generator::prep_iocbs(const int n_iocbs, std::vector<struct iocb*>* iocbs)
{
if ((_remaining_bytes) == 0 || (_remaining_io_blocks == 0)) {
assert(static_cast<long long int>(_remaining_bytes) == _remaining_io_blocks);
assert(static_cast<int64_t>(_remaining_bytes) == _remaining_io_blocks);
return 0;
}

assert(static_cast<size_t>(n_iocbs) <= iocbs->size());

auto actual_n_iocbs = min(static_cast<long long int>(n_iocbs), _remaining_io_blocks);
auto actual_n_iocbs = min(static_cast<int64_t>(n_iocbs), _remaining_io_blocks);
for (auto i = 0; i < actual_n_iocbs; ++i, ++_next_iocb_index) {
const auto xfer_offset = _xfer_ctxt->_base_offset + (_next_iocb_index * _block_size);
const auto xfer_buffer = (char*)_xfer_ctxt->_mem_buffer + xfer_offset;
const auto num_bytes = min(static_cast<long long int>(_block_size), _remaining_bytes);
const auto num_bytes = min(static_cast<int64_t>(_block_size), _remaining_bytes);

if (_read_op) {
io_prep_pread(iocbs->at(i), _xfer_ctxt->_fd, xfer_buffer, num_bytes, xfer_offset);
Expand All @@ -95,15 +95,15 @@ int io_prep_generator::prep_iocbs(const int n_iocbs, std::vector<struct iocb*>*
return actual_n_iocbs;
}

int get_file_size(const char* filename, long long int& size)
int get_file_size(const char* filename, int64_t& size)
{
struct stat st;
if (stat(filename, &st) == -1) { return -1; }
size = st.st_size;
return 0;
}

void* ds_page_aligned_alloc(const size_t size, const bool lock)
void* ds_page_aligned_alloc(const int64_t size, const bool lock)
{
void* ptr;
int retval;
Expand Down
22 changes: 11 additions & 11 deletions csrc/aio/common/deepspeed_aio_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ Functionality for swapping optimizer tensors to/from (NVMe) storage devices.

struct io_xfer_ctxt {
const int _fd;
const long long int _base_offset;
const int64_t _base_offset;
const void* _mem_buffer;
const long long int _num_bytes;
const int64_t _num_bytes;

io_xfer_ctxt(const int fd,
const long long int file_offset,
const long long int num_bytes,
const int64_t file_offset,
const int64_t num_bytes,
const void* buffer);
};

Expand All @@ -54,18 +54,18 @@ struct io_prep_context {
void prep_iocbs(const int n_iocbs,
const size_t num_bytes,
const void* start_buffer,
const long long int start_offset);
const int64_t start_offset);
};

struct io_prep_generator {
const bool _read_op;
const std::unique_ptr<io_xfer_ctxt>& _xfer_ctxt;
const size_t _block_size;

long long int _remaining_bytes;
long long int _num_io_blocks;
long long int _remaining_io_blocks;
long long int _next_iocb_index;
int64_t _remaining_bytes;
int64_t _num_io_blocks;
int64_t _remaining_io_blocks;
int64_t _next_iocb_index;

io_prep_generator(const bool read_op,
const std::unique_ptr<io_xfer_ctxt>& xfer_ctxt,
Expand All @@ -74,6 +74,6 @@ struct io_prep_generator {
int prep_iocbs(const int n_iocbs, std::vector<struct iocb*>* iocbs);
};

void* ds_page_aligned_alloc(const size_t size, const bool lock = false);
void* ds_page_aligned_alloc(const int64_t size, const bool lock = false);

int get_file_size(const char* filename, long long int& size);
int get_file_size(const char* filename, int64_t& size);
8 changes: 4 additions & 4 deletions csrc/aio/py_lib/deepspeed_aio_op_desc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,16 @@ io_op_desc_t::io_op_desc_t(const bool read_op,
const torch::Tensor& buffer,
const int fd,
const char* filename,
const long long int file_num_bytes,
const int num_threads,
const int64_t file_num_bytes,
const int intra_op_parallelism,
const bool validate)
: _read_op(read_op),
_buffer(buffer),
_fd(fd),
_filename(filename),
_file_num_bytes(file_num_bytes),
_num_threads(num_threads),
_num_bytes_per_thread(file_num_bytes / num_threads),
_intra_op_parallelism(intra_op_parallelism),
_num_bytes_per_thread(file_num_bytes / intra_op_parallelism),
_validate(validate)
{
}
Expand Down
10 changes: 5 additions & 5 deletions csrc/aio/py_lib/deepspeed_aio_op_desc.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,18 @@ struct io_op_desc_t {
torch::Tensor _buffer;
int _fd;
const std::string _filename;
const long long int _file_num_bytes;
const int _num_threads;
const long long int _num_bytes_per_thread;
const int64_t _file_num_bytes;
const int _intra_op_parallelism;
const int64_t _num_bytes_per_thread;
torch::Tensor _contiguous_buffer;
const bool _validate;

io_op_desc_t(const bool read_op,
const torch::Tensor& buffer,
const int fd,
const char* filename,
const long long int file_num_bytes,
const int num_threads,
const int64_t file_num_bytes,
const int intra_op_parallelism,
const bool validate);

virtual void run(const int tid,
Expand Down
Loading