diff --git a/azure-pipelines.yml b/azure-pipelines.yml index 5cfcc865b..90c5d83f2 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -94,24 +94,30 @@ jobs: - job: 'cppLinux' pool: - vmImage: ubuntu-18.04 + vmImage: ubuntu-20.04 displayName: 'C++ (Linux)' steps: - script: | - sudo add-apt-repository -y ppa:ubuntu-toolchain-r/test sudo apt update - sudo apt install -y g++-7 libaio-dev uuid-dev libtbb-dev + sudo apt install -y g++ libaio-dev uuid-dev libtbb-dev displayName: 'Install depdendencies' - script: | - export CXX='g++-7' + git clone https://git.kernel.dk/liburing + cd liburing + git checkout liburing-0.7 + ./configure + sudo make install + displayName: Install Liburing + + - script: | cd cc mkdir -p build/Debug build/Release cd build/Debug - cmake -DCMAKE_BUILD_TYPE=Debug ../.. + cmake -DCMAKE_BUILD_TYPE=Debug -DUSE_URING=ON ../.. make -j cd ../../build/Release - cmake -DCMAKE_BUILD_TYPE=Release ../.. + cmake -DCMAKE_BUILD_TYPE=Release -DUSE_URING=ON ../.. make -j displayName: 'Compile' - script: | diff --git a/cc/CMakeLists.txt b/cc/CMakeLists.txt index 6952b91b8..4f92318b6 100644 --- a/cc/CMakeLists.txt +++ b/cc/CMakeLists.txt @@ -11,6 +11,7 @@ project(FASTER) # a flag `USE_BLOBS` that will link in azure's blob store library so that FASTER # can be used with a blob device for the hybrid log. OPTION(USE_BLOBS "Extend FASTER's hybrid log to blob store" OFF) +OPTION(USE_URING "Enable io_uring based IO handler" OFF) if (MSVC) set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /Zi /nologo /Gm- /W3 /WX /EHsc /GS /fp:precise /permissive- /Zc:wchar_t /Zc:forScope /Zc:inline /Gd /TP") @@ -26,6 +27,11 @@ else() set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -O0 -g -D_DEBUG") set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -O3 -g") + + if (USE_URING) + set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -DFASTER_URING") + set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -DFASTER_URING") + endif() endif() #Always set _DEBUG compiler directive when compiling bits regardless of target OS @@ -37,7 +43,7 @@ set_directory_properties(PROPERTIES COMPILE_DEFINITIONS_DEBUG "_DEBUG") configure_file(CMakeLists.txt.in googletest-download/CMakeLists.txt) execute_process(COMMAND ${CMAKE_COMMAND} -G "${CMAKE_GENERATOR}" . RESULT_VARIABLE result - WORKING_DIRECTORY ${CMAKE_BINARY_DIR}/googletest-download ) + WORKING_DIRECTORY ${CMAKE_BINARY_DIR}/googletest-download) if(result) message(FATAL_ERROR "CMake step for googletest failed: ${result}") endif() @@ -77,6 +83,9 @@ if(WIN32) set(FASTER_TEST_LINK_LIBS ${FASTER_TEST_LINK_LIBS} rpcrt4) else() set (FASTER_TEST_LINK_LIBS ${FASTER_TEST_LINK_LIBS} stdc++fs uuid tbb gcc aio m stdc++ pthread) + if(USE_URING) + set (FASTER_TEST_LINK_LIBS ${FASTER_TEST_LINK_LIBS} uring) + endif() # Using blob storage. Link in appropriate libraries. if(USE_BLOBS) set (FASTER_TEST_LINK_LIBS ${FASTER_TEST_LINK_LIBS} azurestorage cpprest boost_system crypto ssl) @@ -89,6 +98,9 @@ if(WIN32) set (FASTER_BENCHMARK_LINK_LIBS ${FASTER_LINK_LIBS} rpcrt4 wsock32 Ws2_32) else() set (FASTER_BENCHMARK_LINK_LIBS ${FASTER_BENCHMARK_LINK_LIBS} stdc++fs uuid tbb gcc aio m stdc++ pthread) + if(USE_URING) + set (FASTER_BENCHMARK_LINK_LIBS ${FASTER_BENCHMARK_LINK_LIBS} uring) + endif() endif() #Function to automate building test binaries diff --git a/cc/src/environment/file_linux.cc b/cc/src/environment/file_linux.cc index 9eb21b9be..461a28d21 100644 --- a/cc/src/environment/file_linux.cc +++ b/cc/src/environment/file_linux.cc @@ -196,6 +196,121 @@ Status QueueFile::ScheduleOperation(FileOperationType operationType, uint8_t* bu return Status::Ok; } +#ifdef FASTER_URING + +bool UringIoHandler::TryComplete() { + struct io_uring_cqe* cqe = nullptr; + cq_lock_.Acquire(); + int res = io_uring_peek_cqe(ring_, &cqe); + if(res == 0 && cqe) { + int io_res = cqe->res; + auto *context = reinterpret_cast(io_uring_cqe_get_data(cqe)); + io_uring_cqe_seen(ring_, cqe); + cq_lock_.Release(); + Status return_status; + size_t byte_transferred; + if (io_res < 0) { + // Retry if it is failed..... + sq_lock_.Acquire(); + struct io_uring_sqe *sqe = io_uring_get_sqe(ring_); + assert(sqe != 0); + if (context->is_read_) { + io_uring_prep_readv(sqe, context->fd_, &context->vec_, 1, context->offset_); + } else { + io_uring_prep_writev(sqe, context->fd_, &context->vec_, 1, context->offset_); + } + io_uring_sqe_set_data(sqe, context); + int retry_res = io_uring_submit(ring_); + assert(retry_res == 1); + sq_lock_.Release(); + return false; + } else { + return_status = Status::Ok; + byte_transferred = io_res; + } + context->callback(context->caller_context, return_status, byte_transferred); + lss_allocator.Free(context); + return true; + } else { + cq_lock_.Release(); + return false; + } +} + +Status UringFile::Open(FileCreateDisposition create_disposition, const FileOptions& options, + UringIoHandler* handler, bool* exists) { + int flags = 0; + if(options.unbuffered) { + flags |= O_DIRECT; + } + RETURN_NOT_OK(File::Open(flags, create_disposition, exists)); + if(exists && !*exists) { + return Status::Ok; + } + + ring_ = handler->io_uring(); + sq_lock_ = handler->sq_lock(); + return Status::Ok; +} + +Status UringFile::Read(size_t offset, uint32_t length, uint8_t* buffer, + IAsyncContext& context, AsyncIOCallback callback) const { + DCHECK_ALIGNMENT(offset, length, buffer); +#ifdef IO_STATISTICS + ++read_count_; + bytes_read_ += length; +#endif + return const_cast(this)->ScheduleOperation(FileOperationType::Read, buffer, + offset, length, context, callback); +} + +Status UringFile::Write(size_t offset, uint32_t length, const uint8_t* buffer, + IAsyncContext& context, AsyncIOCallback callback) { + DCHECK_ALIGNMENT(offset, length, buffer); +#ifdef IO_STATISTICS + bytes_written_ += length; +#endif + return ScheduleOperation(FileOperationType::Write, const_cast(buffer), offset, length, + context, callback); +} + +Status UringFile::ScheduleOperation(FileOperationType operationType, uint8_t* buffer, + size_t offset, uint32_t length, IAsyncContext& context, + AsyncIOCallback callback) { + auto io_context = alloc_context(sizeof(UringIoHandler::IoCallbackContext)); + if (!io_context.get()) return Status::OutOfMemory; + + IAsyncContext* caller_context_copy; + RETURN_NOT_OK(context.DeepCopy(caller_context_copy)); + + bool is_read = operationType == FileOperationType::Read; + new(io_context.get()) UringIoHandler::IoCallbackContext(is_read, fd_, buffer, length, offset, caller_context_copy, callback); + + sq_lock_->Acquire(); + struct io_uring_sqe *sqe = io_uring_get_sqe(ring_); + assert(sqe != 0); + + if (is_read) { + io_uring_prep_readv(sqe, fd_, &io_context->vec_, 1, offset); + //io_uring_prep_read(sqe, fd_, buffer, length, offset); + } else { + io_uring_prep_writev(sqe, fd_, &io_context->vec_, 1, offset); + //io_uring_prep_write(sqe, fd_, buffer, length, offset); + } + io_uring_sqe_set_data(sqe, io_context.get()); + + int res = io_uring_submit(ring_); + sq_lock_->Release(); + if (res != 1) { + return Status::IOError; + } + + io_context.release(); + return Status::Ok; +} + +#endif + #undef DCHECK_ALIGNMENT } diff --git a/cc/src/environment/file_linux.h b/cc/src/environment/file_linux.h index 1b98d9ce7..5dd8d146c 100644 --- a/cc/src/environment/file_linux.h +++ b/cc/src/environment/file_linux.h @@ -11,6 +11,10 @@ #include #include +#ifdef FASTER_URING +#include +#endif + #include "../core/async.h" #include "../core/status.h" #include "file_common.h" @@ -250,5 +254,154 @@ class QueueFile : public File { io_context_t io_object_; }; +#ifdef FASTER_URING + +class alignas(64) SpinLock { +public: + SpinLock(): locked_(false) {} + + void Acquire() noexcept { + for (;;) { + if (!locked_.exchange(true, std::memory_order_acquire)) { + return; + } + + while (locked_.load(std::memory_order_relaxed)) { + __builtin_ia32_pause(); + } + } + } + + void Release() noexcept { + locked_.store(false, std::memory_order_release); + } +private: + std::atomic_bool locked_; +}; + +class UringFile; + +/// The QueueIoHandler class encapsulates completions for async file I/O, where the completions +/// are put on the AIO completion queue. +class UringIoHandler { + public: + typedef UringFile async_file_t; + + private: + constexpr static int kMaxEvents = 128; + + public: + UringIoHandler() { + ring_ = new struct io_uring(); + int ret = io_uring_queue_init(kMaxEvents, ring_, 0); + assert(ret == 0); + } + + UringIoHandler(size_t max_threads) { + ring_ = new struct io_uring(); + int ret = io_uring_queue_init(kMaxEvents, ring_, 0); + assert(ret == 0); + } + + /// Move constructor + UringIoHandler(UringIoHandler&& other) { + ring_ = other.ring_; + other.ring_ = 0; + } + + ~UringIoHandler() { + if (ring_ != 0) { + io_uring_queue_exit(ring_); + delete ring_; + } + } + + /* + /// Invoked whenever a Linux AIO completes. + static void IoCompletionCallback(io_context_t ctx, struct iocb* iocb, long res, long res2); + */ + struct IoCallbackContext { + IoCallbackContext(bool is_read, int fd, uint8_t* buffer, size_t length, size_t offset, core::IAsyncContext* context_, core::AsyncIOCallback callback_) + : is_read_(is_read) + , fd_(fd) + , vec_{buffer, length} + , offset_(offset) + , caller_context{ context_ } + , callback{ callback_ } {} + + bool is_read_; + + int fd_; + struct iovec vec_; + size_t offset_; + + /// Caller callback context. + core::IAsyncContext* caller_context; + + /// The caller's asynchronous callback function + core::AsyncIOCallback callback; + }; + + inline struct io_uring* io_uring() const { + return ring_; + } + + inline SpinLock* sq_lock() { + return &sq_lock_; + } + + /// Try to execute the next IO completion on the queue, if any. + bool TryComplete(); + + private: + /// The io_uring for all the I/Os + struct io_uring* ring_; + SpinLock sq_lock_, cq_lock_; +}; + +/// The UringFile class encapsulates asynchronous reads and writes, using the specified +/// io_uring +class UringFile : public File { + public: + UringFile() + : File() + , ring_{ nullptr } { + } + UringFile(const std::string& filename) + : File(filename) + , ring_{ nullptr } { + } + /// Move constructor + UringFile(UringFile&& other) + : File(std::move(other)) + , ring_{ other.ring_ } + , sq_lock_{ other.sq_lock_ } { + } + /// Move assignment operator. + UringFile& operator=(UringFile&& other) { + File::operator=(std::move(other)); + ring_ = other.ring_; + sq_lock_ = other.sq_lock_; + return *this; + } + + core::Status Open(FileCreateDisposition create_disposition, const FileOptions& options, + UringIoHandler* handler, bool* exists = nullptr); + + core::Status Read(size_t offset, uint32_t length, uint8_t* buffer, + core::IAsyncContext& context, core::AsyncIOCallback callback) const; + core::Status Write(size_t offset, uint32_t length, const uint8_t* buffer, + core::IAsyncContext& context, core::AsyncIOCallback callback); + + private: + core::Status ScheduleOperation(FileOperationType operationType, uint8_t* buffer, size_t offset, + uint32_t length, core::IAsyncContext& context, core::AsyncIOCallback callback); + + struct io_uring* ring_; + SpinLock* sq_lock_; +}; + +#endif + } } // namespace FASTER::environment diff --git a/cc/test/CMakeLists.txt b/cc/test/CMakeLists.txt index 67df2443d..ac31da059 100644 --- a/cc/test/CMakeLists.txt +++ b/cc/test/CMakeLists.txt @@ -1,10 +1,16 @@ ADD_FASTER_TEST(in_memory_test "") ADD_FASTER_TEST(malloc_fixed_page_size_test "") ADD_FASTER_TEST(paging_queue_test "paging_test.h") +if((NOT MSVC) AND USE_URING) +ADD_FASTER_TEST(paging_uring_test "paging_test.h") +endif() if(MSVC) ADD_FASTER_TEST(paging_threadpool_test "paging_test.h") endif() ADD_FASTER_TEST(recovery_queue_test "recovery_test.h") +if((NOT MSVC) AND USE_URING) +ADD_FASTER_TEST(recovery_uring_test "recovery_test.h") +endif() if(MSVC) ADD_FASTER_TEST(recovery_threadpool_test "recovery_test.h") endif() diff --git a/cc/test/paging_uring_test.cc b/cc/test/paging_uring_test.cc new file mode 100644 index 000000000..c66d2cd6b --- /dev/null +++ b/cc/test/paging_uring_test.cc @@ -0,0 +1,27 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. + +#include +#include +#include +#include +#include +#include +#include "gtest/gtest.h" +#include "core/faster.h" +#include "device/file_system_disk.h" + +using namespace FASTER::core; + +typedef FASTER::environment::UringIoHandler handler_t; + +#define CLASS PagingTest_Uring + +#include "paging_test.h" + +#undef CLASS + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/cc/test/recovery_uring_test.cc b/cc/test/recovery_uring_test.cc new file mode 100644 index 000000000..301a52f43 --- /dev/null +++ b/cc/test/recovery_uring_test.cc @@ -0,0 +1,31 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. + +#include +#include +#include +#include +#include +#include +#include +#include +#include "gtest/gtest.h" +#include "core/faster.h" +#include "core/light_epoch.h" +#include "core/thread.h" +#include "device/file_system_disk.h" + +using namespace FASTER::core; + +typedef FASTER::environment::UringIoHandler handler_t; + +#define CLASS RecoveryTest_Uring + +#include "recovery_test.h" + +#undef CLASS + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +}