Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature] asyncio support readv/writev #7

Merged
merged 2 commits into from
Jul 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 34 additions & 8 deletions csrc/aio.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ void AIOAsyncIO::wait()
for (int i = 0; i < num_events; i++) /* 开始获取每一个event并且做相应处理 */
{
struct io_event event = events[i];
auto *data = (IOData *)event.data;
std::unique_ptr<IOData> data(static_cast<IOData *>(event.data));
if (data->type == WRITE)
this->n_write_events--;
else if (data->type == READ)
Expand All @@ -52,9 +52,6 @@ void AIOAsyncIO::write(int fd, void *buffer, size_t n_bytes, unsigned long long

io_prep_pwrite(&iocb, fd, buffer, n_bytes, (long long)offset); // 初始化这个异步I/O需求 counter为偏移量

data->type = WRITE;
data->iov.iov_base = buffer;
data->iov.iov_len = n_bytes;
iocb.data = data;
io_submit(this->io_ctx, 1, &iocbs); // 提交这个I/O不会堵塞

Expand All @@ -69,11 +66,8 @@ void AIOAsyncIO::read(int fd, void *buffer, size_t n_bytes, unsigned long long o
struct iocb *iocbs = &iocb;
auto *data = new IOData(READ, callback);

io_prep_pwrite(&iocb, fd, buffer, n_bytes, (long long)offset);
io_prep_pread(&iocb, fd, buffer, n_bytes, (long long)offset);

data->type = READ;
data->iov.iov_base = buffer;
data->iov.iov_len = n_bytes;
iocb.data = data;
io_submit(this->io_ctx, 1, &iocbs); /* 提交这个I/O不会堵塞 */

Expand All @@ -97,3 +91,35 @@ void AIOAsyncIO::synchronize()
while (this->n_write_events > 0 || this->n_read_events > 0)
wait();
}

void AIOAsyncIO::writev(int fd, const iovec *iov, unsigned int iovcnt, unsigned long long offset, callback_t callback)
{
struct iocb iocb
{
}; //建立一个异步I/O需求
struct iocb *iocbs = &iocb;
auto *data = new IOData(WRITE, callback, iov);

io_prep_pwritev(&iocb, fd, iov, iovcnt, (long long)offset); // 初始化这个异步I/O需求 counter为偏移量

iocb.data = data;
io_submit(this->io_ctx, 1, &iocbs); // 提交这个I/O不会堵塞

this->n_write_events++;
}

void AIOAsyncIO::readv(int fd, const iovec *iov, unsigned int iovcnt, unsigned long long offset, callback_t callback)
{
struct iocb iocb
{
}; //建立一个异步I/O需求
struct iocb *iocbs = &iocb;
auto *data = new IOData(READ, callback, iov);

io_prep_preadv(&iocb, fd, iov, iovcnt, (long long)offset);

iocb.data = data;
io_submit(this->io_ctx, 1, &iocbs); /* 提交这个I/O不会堵塞 */

this->n_read_events++;
}
2 changes: 2 additions & 0 deletions csrc/include/aio.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ class AIOAsyncIO : public AsyncIO

void write(int fd, void *buffer, size_t n_bytes, unsigned long long offset, callback_t callback);
void read(int fd, void *buffer, size_t n_bytes, unsigned long long offset, callback_t callback);
void writev(int fd, const iovec *iov, unsigned int iovcnt, unsigned long long offset, callback_t callback);
void readv(int fd, const iovec *iov, unsigned int iovcnt, unsigned long long offset, callback_t callback);

void sync_write_events();
void sync_read_events();
Expand Down
14 changes: 10 additions & 4 deletions csrc/include/asyncio.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,16 @@ enum IOType

struct IOData
{
iovec iov;
IOType type;
callback_t callback;

IOData(IOType type) : type(type), callback(nullptr) {}
IOData(IOType type, callback_t callback) : type(type), callback(callback) {}
const iovec *iov;

IOData(IOType type, callback_t callback = nullptr, const iovec *iov = nullptr) : type(type), callback(callback), iov(iov) {}
~IOData()
{
if (iov)
delete iov;
}
};

class AsyncIO
Expand All @@ -28,6 +32,8 @@ class AsyncIO

virtual void write(int fd, void *buffer, size_t n_bytes, unsigned long long offset, callback_t callback) = 0;
virtual void read(int fd, void *buffer, size_t n_bytes, unsigned long long offset, callback_t callback) = 0;
virtual void writev(int fd, const iovec *iov, unsigned int iovcnt, unsigned long long offset, callback_t callback) = 0;
virtual void readv(int fd, const iovec *iov, unsigned int iovcnt, unsigned long long offset, callback_t callback) = 0;

virtual void sync_write_events() = 0;
virtual void sync_read_events() = 0;
Expand Down
2 changes: 2 additions & 0 deletions csrc/include/uring.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ class UringAsyncIO : public AsyncIO

void write(int fd, void *buffer, size_t n_bytes, unsigned long long offset, callback_t callback);
void read(int fd, void *buffer, size_t n_bytes, unsigned long long offset, callback_t callback);
void writev(int fd, const iovec *iov, unsigned int iovcnt, unsigned long long offset, callback_t callback);
void readv(int fd, const iovec *iov, unsigned int iovcnt, unsigned long long offset, callback_t callback);

void sync_write_events();
void sync_read_events();
Expand Down
28 changes: 22 additions & 6 deletions csrc/uring.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,7 @@ void UringAsyncIO::write(int fd, void *buffer, size_t n_bytes, unsigned long lon
{
io_uring_sqe *sqe = io_uring_get_sqe(&this->ring);
IOData *data = new IOData(WRITE, callback);
data->iov.iov_base = buffer;
data->iov.iov_len = n_bytes;
io_uring_prep_writev(sqe, fd, &data->iov, 1, offset);
io_uring_prep_write(sqe, fd, buffer, n_bytes, offset);
io_uring_sqe_set_data(sqe, data);
io_uring_submit(&this->ring);
this->n_write_events++;
Expand All @@ -50,9 +48,7 @@ void UringAsyncIO::read(int fd, void *buffer, size_t n_bytes, unsigned long long
{
io_uring_sqe *sqe = io_uring_get_sqe(&this->ring);
IOData *data = new IOData(READ, callback);
data->iov.iov_base = buffer;
data->iov.iov_len = n_bytes;
io_uring_prep_readv(sqe, fd, &data->iov, 1, offset);
io_uring_prep_read(sqe, fd, buffer, n_bytes, offset);
io_uring_sqe_set_data(sqe, data);
io_uring_submit(&this->ring);
this->n_read_events++;
Expand All @@ -75,3 +71,23 @@ void UringAsyncIO::synchronize()
while (this->n_write_events > 0 || this->n_read_events > 0)
wait();
}

void UringAsyncIO::writev(int fd, const iovec *iov, unsigned int iovcnt, unsigned long long offset, callback_t callback)
{
io_uring_sqe *sqe = io_uring_get_sqe(&this->ring);
IOData *data = new IOData(WRITE, callback, iov);
io_uring_prep_writev(sqe, fd, iov, iovcnt, offset);
io_uring_sqe_set_data(sqe, data);
io_uring_submit(&this->ring);
this->n_write_events++;
}

void UringAsyncIO::readv(int fd, const iovec *iov, unsigned int iovcnt, unsigned long long offset, callback_t callback)
{
io_uring_sqe *sqe = io_uring_get_sqe(&this->ring);
IOData *data = new IOData(READ, callback, iov);
io_uring_prep_readv(sqe, fd, iov, iovcnt, offset);
io_uring_sqe_set_data(sqe, data);
io_uring_submit(&this->ring);
this->n_read_events++;
}