Skip to content

Commit

Permalink
[C++] Linux liburing based I/O handler. (#387)
Browse files Browse the repository at this point in the history
* Add uring IO handler.

* minor update.

* Make uring an option.

Co-authored-by: Badrish Chandramouli <badrishc@microsoft.com>
  • Loading branch information
dongx-psu and badrishc authored Oct 4, 2021
1 parent 9fef961 commit e0e148f
Show file tree
Hide file tree
Showing 7 changed files with 357 additions and 7 deletions.
18 changes: 12 additions & 6 deletions azure-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -94,24 +94,30 @@ jobs:

- job: 'cppLinux'
pool:
vmImage: ubuntu-18.04
vmImage: ubuntu-20.04
displayName: 'C++ (Linux)'

steps:
- script: |
sudo add-apt-repository -y ppa:ubuntu-toolchain-r/test
sudo apt update
sudo apt install -y g++-7 libaio-dev uuid-dev libtbb-dev
sudo apt install -y g++ libaio-dev uuid-dev libtbb-dev
displayName: 'Install depdendencies'
- script: |
export CXX='g++-7'
git clone https://git.kernel.dk/liburing
cd liburing
git checkout liburing-0.7
./configure
sudo make install
displayName: Install Liburing
- script: |
cd cc
mkdir -p build/Debug build/Release
cd build/Debug
cmake -DCMAKE_BUILD_TYPE=Debug ../..
cmake -DCMAKE_BUILD_TYPE=Debug -DUSE_URING=ON ../..
make -j
cd ../../build/Release
cmake -DCMAKE_BUILD_TYPE=Release ../..
cmake -DCMAKE_BUILD_TYPE=Release -DUSE_URING=ON ../..
make -j
displayName: 'Compile'
- script: |
Expand Down
14 changes: 13 additions & 1 deletion cc/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ project(FASTER)
# a flag `USE_BLOBS` that will link in azure's blob store library so that FASTER
# can be used with a blob device for the hybrid log.
OPTION(USE_BLOBS "Extend FASTER's hybrid log to blob store" OFF)
OPTION(USE_URING "Enable io_uring based IO handler" OFF)

if (MSVC)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /Zi /nologo /Gm- /W3 /WX /EHsc /GS /fp:precise /permissive- /Zc:wchar_t /Zc:forScope /Zc:inline /Gd /TP")
Expand All @@ -26,6 +27,11 @@ else()

set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -O0 -g -D_DEBUG")
set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -O3 -g")

if (USE_URING)
set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -DFASTER_URING")
set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -DFASTER_URING")
endif()
endif()

#Always set _DEBUG compiler directive when compiling bits regardless of target OS
Expand All @@ -37,7 +43,7 @@ set_directory_properties(PROPERTIES COMPILE_DEFINITIONS_DEBUG "_DEBUG")
configure_file(CMakeLists.txt.in googletest-download/CMakeLists.txt)
execute_process(COMMAND ${CMAKE_COMMAND} -G "${CMAKE_GENERATOR}" .
RESULT_VARIABLE result
WORKING_DIRECTORY ${CMAKE_BINARY_DIR}/googletest-download )
WORKING_DIRECTORY ${CMAKE_BINARY_DIR}/googletest-download)
if(result)
message(FATAL_ERROR "CMake step for googletest failed: ${result}")
endif()
Expand Down Expand Up @@ -77,6 +83,9 @@ if(WIN32)
set(FASTER_TEST_LINK_LIBS ${FASTER_TEST_LINK_LIBS} rpcrt4)
else()
set (FASTER_TEST_LINK_LIBS ${FASTER_TEST_LINK_LIBS} stdc++fs uuid tbb gcc aio m stdc++ pthread)
if(USE_URING)
set (FASTER_TEST_LINK_LIBS ${FASTER_TEST_LINK_LIBS} uring)
endif()
# Using blob storage. Link in appropriate libraries.
if(USE_BLOBS)
set (FASTER_TEST_LINK_LIBS ${FASTER_TEST_LINK_LIBS} azurestorage cpprest boost_system crypto ssl)
Expand All @@ -89,6 +98,9 @@ if(WIN32)
set (FASTER_BENCHMARK_LINK_LIBS ${FASTER_LINK_LIBS} rpcrt4 wsock32 Ws2_32)
else()
set (FASTER_BENCHMARK_LINK_LIBS ${FASTER_BENCHMARK_LINK_LIBS} stdc++fs uuid tbb gcc aio m stdc++ pthread)
if(USE_URING)
set (FASTER_BENCHMARK_LINK_LIBS ${FASTER_BENCHMARK_LINK_LIBS} uring)
endif()
endif()

#Function to automate building test binaries
Expand Down
115 changes: 115 additions & 0 deletions cc/src/environment/file_linux.cc
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,121 @@ Status QueueFile::ScheduleOperation(FileOperationType operationType, uint8_t* bu
return Status::Ok;
}

#ifdef FASTER_URING

bool UringIoHandler::TryComplete() {
struct io_uring_cqe* cqe = nullptr;
cq_lock_.Acquire();
int res = io_uring_peek_cqe(ring_, &cqe);
if(res == 0 && cqe) {
int io_res = cqe->res;
auto *context = reinterpret_cast<UringIoHandler::IoCallbackContext*>(io_uring_cqe_get_data(cqe));
io_uring_cqe_seen(ring_, cqe);
cq_lock_.Release();
Status return_status;
size_t byte_transferred;
if (io_res < 0) {
// Retry if it is failed.....
sq_lock_.Acquire();
struct io_uring_sqe *sqe = io_uring_get_sqe(ring_);
assert(sqe != 0);
if (context->is_read_) {
io_uring_prep_readv(sqe, context->fd_, &context->vec_, 1, context->offset_);
} else {
io_uring_prep_writev(sqe, context->fd_, &context->vec_, 1, context->offset_);
}
io_uring_sqe_set_data(sqe, context);
int retry_res = io_uring_submit(ring_);
assert(retry_res == 1);
sq_lock_.Release();
return false;
} else {
return_status = Status::Ok;
byte_transferred = io_res;
}
context->callback(context->caller_context, return_status, byte_transferred);
lss_allocator.Free(context);
return true;
} else {
cq_lock_.Release();
return false;
}
}

Status UringFile::Open(FileCreateDisposition create_disposition, const FileOptions& options,
UringIoHandler* handler, bool* exists) {
int flags = 0;
if(options.unbuffered) {
flags |= O_DIRECT;
}
RETURN_NOT_OK(File::Open(flags, create_disposition, exists));
if(exists && !*exists) {
return Status::Ok;
}

ring_ = handler->io_uring();
sq_lock_ = handler->sq_lock();
return Status::Ok;
}

Status UringFile::Read(size_t offset, uint32_t length, uint8_t* buffer,
IAsyncContext& context, AsyncIOCallback callback) const {
DCHECK_ALIGNMENT(offset, length, buffer);
#ifdef IO_STATISTICS
++read_count_;
bytes_read_ += length;
#endif
return const_cast<UringFile*>(this)->ScheduleOperation(FileOperationType::Read, buffer,
offset, length, context, callback);
}

Status UringFile::Write(size_t offset, uint32_t length, const uint8_t* buffer,
IAsyncContext& context, AsyncIOCallback callback) {
DCHECK_ALIGNMENT(offset, length, buffer);
#ifdef IO_STATISTICS
bytes_written_ += length;
#endif
return ScheduleOperation(FileOperationType::Write, const_cast<uint8_t*>(buffer), offset, length,
context, callback);
}

Status UringFile::ScheduleOperation(FileOperationType operationType, uint8_t* buffer,
size_t offset, uint32_t length, IAsyncContext& context,
AsyncIOCallback callback) {
auto io_context = alloc_context<UringIoHandler::IoCallbackContext>(sizeof(UringIoHandler::IoCallbackContext));
if (!io_context.get()) return Status::OutOfMemory;

IAsyncContext* caller_context_copy;
RETURN_NOT_OK(context.DeepCopy(caller_context_copy));

bool is_read = operationType == FileOperationType::Read;
new(io_context.get()) UringIoHandler::IoCallbackContext(is_read, fd_, buffer, length, offset, caller_context_copy, callback);

sq_lock_->Acquire();
struct io_uring_sqe *sqe = io_uring_get_sqe(ring_);
assert(sqe != 0);

if (is_read) {
io_uring_prep_readv(sqe, fd_, &io_context->vec_, 1, offset);
//io_uring_prep_read(sqe, fd_, buffer, length, offset);
} else {
io_uring_prep_writev(sqe, fd_, &io_context->vec_, 1, offset);
//io_uring_prep_write(sqe, fd_, buffer, length, offset);
}
io_uring_sqe_set_data(sqe, io_context.get());

int res = io_uring_submit(ring_);
sq_lock_->Release();
if (res != 1) {
return Status::IOError;
}

io_context.release();
return Status::Ok;
}

#endif

#undef DCHECK_ALIGNMENT

}
Expand Down
153 changes: 153 additions & 0 deletions cc/src/environment/file_linux.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
#include <sys/stat.h>
#include <unistd.h>

#ifdef FASTER_URING
#include <liburing.h>
#endif

#include "../core/async.h"
#include "../core/status.h"
#include "file_common.h"
Expand Down Expand Up @@ -250,5 +254,154 @@ class QueueFile : public File {
io_context_t io_object_;
};

#ifdef FASTER_URING

class alignas(64) SpinLock {
public:
SpinLock(): locked_(false) {}

void Acquire() noexcept {
for (;;) {
if (!locked_.exchange(true, std::memory_order_acquire)) {
return;
}

while (locked_.load(std::memory_order_relaxed)) {
__builtin_ia32_pause();
}
}
}

void Release() noexcept {
locked_.store(false, std::memory_order_release);
}
private:
std::atomic_bool locked_;
};

class UringFile;

/// The QueueIoHandler class encapsulates completions for async file I/O, where the completions
/// are put on the AIO completion queue.
class UringIoHandler {
public:
typedef UringFile async_file_t;

private:
constexpr static int kMaxEvents = 128;

public:
UringIoHandler() {
ring_ = new struct io_uring();
int ret = io_uring_queue_init(kMaxEvents, ring_, 0);
assert(ret == 0);
}

UringIoHandler(size_t max_threads) {
ring_ = new struct io_uring();
int ret = io_uring_queue_init(kMaxEvents, ring_, 0);
assert(ret == 0);
}

/// Move constructor
UringIoHandler(UringIoHandler&& other) {
ring_ = other.ring_;
other.ring_ = 0;
}

~UringIoHandler() {
if (ring_ != 0) {
io_uring_queue_exit(ring_);
delete ring_;
}
}

/*
/// Invoked whenever a Linux AIO completes.
static void IoCompletionCallback(io_context_t ctx, struct iocb* iocb, long res, long res2);
*/
struct IoCallbackContext {
IoCallbackContext(bool is_read, int fd, uint8_t* buffer, size_t length, size_t offset, core::IAsyncContext* context_, core::AsyncIOCallback callback_)
: is_read_(is_read)
, fd_(fd)
, vec_{buffer, length}
, offset_(offset)
, caller_context{ context_ }
, callback{ callback_ } {}

bool is_read_;

int fd_;
struct iovec vec_;
size_t offset_;

/// Caller callback context.
core::IAsyncContext* caller_context;

/// The caller's asynchronous callback function
core::AsyncIOCallback callback;
};

inline struct io_uring* io_uring() const {
return ring_;
}

inline SpinLock* sq_lock() {
return &sq_lock_;
}

/// Try to execute the next IO completion on the queue, if any.
bool TryComplete();

private:
/// The io_uring for all the I/Os
struct io_uring* ring_;
SpinLock sq_lock_, cq_lock_;
};

/// The UringFile class encapsulates asynchronous reads and writes, using the specified
/// io_uring
class UringFile : public File {
public:
UringFile()
: File()
, ring_{ nullptr } {
}
UringFile(const std::string& filename)
: File(filename)
, ring_{ nullptr } {
}
/// Move constructor
UringFile(UringFile&& other)
: File(std::move(other))
, ring_{ other.ring_ }
, sq_lock_{ other.sq_lock_ } {
}
/// Move assignment operator.
UringFile& operator=(UringFile&& other) {
File::operator=(std::move(other));
ring_ = other.ring_;
sq_lock_ = other.sq_lock_;
return *this;
}

core::Status Open(FileCreateDisposition create_disposition, const FileOptions& options,
UringIoHandler* handler, bool* exists = nullptr);

core::Status Read(size_t offset, uint32_t length, uint8_t* buffer,
core::IAsyncContext& context, core::AsyncIOCallback callback) const;
core::Status Write(size_t offset, uint32_t length, const uint8_t* buffer,
core::IAsyncContext& context, core::AsyncIOCallback callback);

private:
core::Status ScheduleOperation(FileOperationType operationType, uint8_t* buffer, size_t offset,
uint32_t length, core::IAsyncContext& context, core::AsyncIOCallback callback);

struct io_uring* ring_;
SpinLock* sq_lock_;
};

#endif

}
} // namespace FASTER::environment
6 changes: 6 additions & 0 deletions cc/test/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
ADD_FASTER_TEST(in_memory_test "")
ADD_FASTER_TEST(malloc_fixed_page_size_test "")
ADD_FASTER_TEST(paging_queue_test "paging_test.h")
if((NOT MSVC) AND USE_URING)
ADD_FASTER_TEST(paging_uring_test "paging_test.h")
endif()
if(MSVC)
ADD_FASTER_TEST(paging_threadpool_test "paging_test.h")
endif()
ADD_FASTER_TEST(recovery_queue_test "recovery_test.h")
if((NOT MSVC) AND USE_URING)
ADD_FASTER_TEST(recovery_uring_test "recovery_test.h")
endif()
if(MSVC)
ADD_FASTER_TEST(recovery_threadpool_test "recovery_test.h")
endif()
Expand Down
Loading

0 comments on commit e0e148f

Please sign in to comment.