From bd2445a4612b4a26644bda6154ad65b58483867f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Igor=20Chor=C4=85=C5=BCewicz?= Date: Mon, 6 Dec 2021 18:28:08 +0100 Subject: [PATCH] Add example with coroutines How it works: When co_await async_memcpy is called, the compiler calls async_memcpy::awaitable::await_suspend() which creates a chined future. This chained future first executes memcpy and then calls a function to resume the coroutine (async_memcpy_print). async_memcpy_print returns a (lazy) future object which can be used to start execution by calling wait(). --- CMakeLists.txt | 4 +- examples/CMakeLists.txt | 1 + examples/basic_cpp/basic.cpp | 109 +++++++++++++++++ examples/basic_cpp/coroutine_helpers.hpp | 144 +++++++++++++++++++++++ 4 files changed, 257 insertions(+), 1 deletion(-) create mode 100644 examples/basic_cpp/basic.cpp create mode 100644 examples/basic_cpp/coroutine_helpers.hpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 1faefd0..2eee4d3 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -5,7 +5,7 @@ cmake_minimum_required(VERSION 3.3) -project(miniasync C) +project(miniasync C CXX) set(VERSION_MAJOR 0) set(VERSION_MINOR 1) @@ -34,6 +34,8 @@ option(TESTS_USE_VALGRIND "enable tests with valgrind (if found)" ON) set(TEST_DIR ${CMAKE_CURRENT_BINARY_DIR}/test CACHE STRING "working directory for tests") +set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fcoroutines") + include(FindPerl) include(FindThreads) include(CMakePackageConfigHelpers) diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 55c8aa5..3367189 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -43,3 +43,4 @@ endfunction() # add all the examples with a use of the add_example function defined above add_example(basic basic/basic.c) +add_example(basic_cpp basic_cpp/basic.cpp) diff --git a/examples/basic_cpp/basic.cpp b/examples/basic_cpp/basic.cpp new file mode 100644 index 0000000..56211a0 --- /dev/null +++ b/examples/basic_cpp/basic.cpp @@ -0,0 +1,109 @@ +// SPDX-License-Identifier: BSD-3-Clause +/* Copyright 2021, Intel Corporation */ + +/* + * basic.cpp -- example showing miniasync integration with coroutines + */ + +#include "libminiasync.h" + +#include +#include + +#include "coroutine_helpers.hpp" + +/* Queue from which executor will take future */ +// XXX - could we put this into task? (to avoid global variable) +std::queue, std::coroutine_handle<>>> futures; + +struct memcpy_task +{ + memcpy_task(char *dst, char *src, size_t n) { + pthread_mover = std::shared_ptr(vdm_new(vdm_descriptor_pthreads()), &vdm_delete); + fut = vdm_memcpy(pthread_mover.get(), (void*)dst, (void*)src, n); + } + + bool await_ready() + { + return false; + } + + void await_suspend(std::coroutine_handle<> h) + { + futures.emplace(std::vector{fut}, h); + } + + void await_resume() {} + + // XXX: change to unique_ptr after fixing when_all_awaitable + std::shared_ptr pthread_mover; + struct vdm_memcpy_future fut; +}; + +/* Executor loop */ +void wait(struct runtime *r) +{ + while (futures.size()) { + auto &p = futures.front(); + + std::vector futs; + for (auto &f : p.first) + futs.emplace_back(FUTURE_AS_RUNNABLE(&f)); + + runtime_wait_multiple(r, futs.data(), futs.size()); + p.second(); // resume coroutine + futures.pop(); + } +} + +task async_mempcy(char *dst, char *src, size_t n) +{ + std::cout << "Before memcpy" << std::endl; + co_await memcpy_task{dst, src, n/2}; + std::cout << "After memcpy " << ((char*) dst) << std::endl; + co_await memcpy_task{dst + n/2, src + n/2, n - n/2}; + std::cout << "After second memcpy " << ((char*) dst) << std::endl; + + auto a1 = memcpy_task{dst, src, 1}; + auto a2 = memcpy_task{dst + 1, src, 1}; + auto a3 = memcpy_task{dst + 2, src, 1}; + + co_await when_all(a1, a2, a3); + std::cout << "After 3 concurrent memcopies " << ((char*) dst) << std::endl; +} + +task async_memcpy_print(char *dst, char *src, size_t n, std::string to_print) +{ + auto a1 = async_mempcy(dst, src, n/2); + auto a2 = async_mempcy(dst + n/2, src + n/2, n - n/2); + + co_await when_all(a1, a2); + + std::cout << to_print << std::endl; +} + +int +main(int argc, char *argv[]) +{ + auto r = runtime_new(); + + static constexpr auto buffer_size = 10; + static constexpr auto to_copy = "something"; + static constexpr auto to_print = "async print!"; + + char buffer[buffer_size] = {0}; + { + auto future = async_memcpy_print(buffer, std::string(to_copy).data(), buffer_size, to_print); + + std::cout << "inside main" << std::endl; + + future.h.resume(); + wait(r); + + std::cout << buffer << std::endl; + } + + runtime_delete(r); + + return 0; +} diff --git a/examples/basic_cpp/coroutine_helpers.hpp b/examples/basic_cpp/coroutine_helpers.hpp new file mode 100644 index 0000000..19ea285 --- /dev/null +++ b/examples/basic_cpp/coroutine_helpers.hpp @@ -0,0 +1,144 @@ +// SPDX-License-Identifier: BSD-3-Clause +/* Copyright 2021, Intel Corporation */ +// SPDX-License-Identifier: MIT +/* Copyright (c) Lewis Baker */ + +#include +#include +#include +#include +#include +#include + +#ifndef MINIASYNC_COROUTINE_HELPERS +#define MINIASYNC_COROUTINE_HELPERS + +/* Helper structures for coroutines, they are heavily inspired by + * https://github.com/lewissbaker/cppcoro + */ + +/* This is a generic task which supports continuation. */ +struct task { + struct promise_type { + struct final_awaitable + { + bool await_ready() const noexcept { return false; } + void await_resume() noexcept {} + + std::coroutine_handle<> await_suspend(std::coroutine_handle h) noexcept { + auto &cont = h.promise().cont; + return cont ? cont : std::noop_coroutine(); + } + }; + + + task get_return_object() { return task{std::coroutine_handle::from_promise(*this)}; } + std::suspend_always initial_suspend() { return {}; } + auto final_suspend() noexcept { return final_awaitable{}; } + void return_void() {} + void unhandled_exception() {} + + std::coroutine_handle<> cont; + }; + + void wait() { + h.resume(); + } + + bool await_ready() { return !h || h.done();} + std::coroutine_handle<> await_suspend(std::coroutine_handle<> aw) { + h.promise().cont = aw; + return h; + } + void await_resume() {} + + std::coroutine_handle h; +}; + +namespace detail { +struct when_all_task { + struct promise_type { + std::atomic *counter; + std::coroutine_handle<> continuation; + + void start(std::atomic& counter, std::coroutine_handle<> continuation) + { + this->counter = &counter; + this->continuation = continuation; + + std::coroutine_handle::from_promise(*this).resume(); + } + + struct final_awaitable + { + bool await_ready() const noexcept { return false; } + void await_resume() noexcept {} + + void await_suspend(std::coroutine_handle h) noexcept { + auto cnt = h.promise().counter->fetch_sub(1); + if (cnt - 1 == 0) { + h.promise().continuation.resume(); + } + } + }; + + when_all_task get_return_object() { return when_all_task{std::coroutine_handle::from_promise(*this)}; } + std::suspend_always initial_suspend() { return {}; } + auto final_suspend() noexcept { return final_awaitable{}; } + void return_void() {} + void unhandled_exception() {} + }; + + void start(std::atomic& counter, std::coroutine_handle<> continuation) + { + h.promise().start(counter, continuation); + } + + std::coroutine_handle h; +}; + +template +when_all_task make_when_all_task(Awaitable awaitable) +{ + co_await awaitable; +} + +template +struct when_all_ready_awaitable +{ + when_all_ready_awaitable(std::vector&& tasks): counter(tasks.size()), tasks(std::move(tasks)) + { + } + + bool await_ready() + { + return false; + } + + void await_suspend(std::coroutine_handle<> h) + { + for (auto&& task : tasks) + { + task.start(counter, h); + } + } + + void await_resume() {} + + std::atomic counter = 0; + std::vector tasks; +}; +} + +template +auto when_all(Awaitables&&... awaitables) +{ + std::vector tasks; + + for (auto &&a : {awaitables...}) + tasks.emplace_back(detail::make_when_all_task(std::move(a))); + + return detail::when_all_ready_awaitable(std::move(tasks)); +} + +#endif MINIASYNC_COROUTINE_HELPERS