Skip to content
This repository has been archived by the owner on Mar 22, 2023. It is now read-only.

Commit

Permalink
Add example with coroutines
Browse files Browse the repository at this point in the history
How it works:

When co_await async_memcpy is called, the compiler calls
async_memcpy::awaitable::await_suspend() which creates a
chined future. This chained future first executes memcpy and
then calls a function to resume the coroutine (async_memcpy_print).

async_memcpy_print returns a (lazy) future object which can be used
to start execution by calling wait().
  • Loading branch information
igchor committed Dec 8, 2021
1 parent f153ac6 commit bd2445a
Show file tree
Hide file tree
Showing 4 changed files with 257 additions and 1 deletion.
4 changes: 3 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

cmake_minimum_required(VERSION 3.3)

project(miniasync C)
project(miniasync C CXX)

set(VERSION_MAJOR 0)
set(VERSION_MINOR 1)
Expand Down Expand Up @@ -34,6 +34,8 @@ option(TESTS_USE_VALGRIND "enable tests with valgrind (if found)" ON)
set(TEST_DIR ${CMAKE_CURRENT_BINARY_DIR}/test
CACHE STRING "working directory for tests")

set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fcoroutines")

include(FindPerl)
include(FindThreads)
include(CMakePackageConfigHelpers)
Expand Down
1 change: 1 addition & 0 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,4 @@ endfunction()

# add all the examples with a use of the add_example function defined above
add_example(basic basic/basic.c)
add_example(basic_cpp basic_cpp/basic.cpp)
109 changes: 109 additions & 0 deletions examples/basic_cpp/basic.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// SPDX-License-Identifier: BSD-3-Clause
/* Copyright 2021, Intel Corporation */

/*
* basic.cpp -- example showing miniasync integration with coroutines
*/

#include "libminiasync.h"

#include <iostream>
#include <queue>

#include "coroutine_helpers.hpp"

/* Queue from which executor will take future */
// XXX - could we put this into task? (to avoid global variable)
std::queue<std::pair<std::vector<struct vdm_memcpy_future>, std::coroutine_handle<>>> futures;

struct memcpy_task
{
memcpy_task(char *dst, char *src, size_t n) {
pthread_mover = std::shared_ptr<vdm>(vdm_new(vdm_descriptor_pthreads()), &vdm_delete);
fut = vdm_memcpy(pthread_mover.get(), (void*)dst, (void*)src, n);
}

bool await_ready()
{
return false;
}

void await_suspend(std::coroutine_handle<> h)
{
futures.emplace(std::vector<struct vdm_memcpy_future>{fut}, h);
}

void await_resume() {}

// XXX: change to unique_ptr after fixing when_all_awaitable
std::shared_ptr<vdm> pthread_mover;
struct vdm_memcpy_future fut;
};

/* Executor loop */
void wait(struct runtime *r)
{
while (futures.size()) {
auto &p = futures.front();

std::vector<future*> futs;
for (auto &f : p.first)
futs.emplace_back(FUTURE_AS_RUNNABLE(&f));

runtime_wait_multiple(r, futs.data(), futs.size());
p.second(); // resume coroutine
futures.pop();
}
}

task async_mempcy(char *dst, char *src, size_t n)
{
std::cout << "Before memcpy" << std::endl;
co_await memcpy_task{dst, src, n/2};
std::cout << "After memcpy " << ((char*) dst) << std::endl;
co_await memcpy_task{dst + n/2, src + n/2, n - n/2};
std::cout << "After second memcpy " << ((char*) dst) << std::endl;

auto a1 = memcpy_task{dst, src, 1};
auto a2 = memcpy_task{dst + 1, src, 1};
auto a3 = memcpy_task{dst + 2, src, 1};

co_await when_all(a1, a2, a3);
std::cout << "After 3 concurrent memcopies " << ((char*) dst) << std::endl;
}

task async_memcpy_print(char *dst, char *src, size_t n, std::string to_print)
{
auto a1 = async_mempcy(dst, src, n/2);
auto a2 = async_mempcy(dst + n/2, src + n/2, n - n/2);

co_await when_all(a1, a2);

std::cout << to_print << std::endl;
}

int
main(int argc, char *argv[])
{
auto r = runtime_new();

static constexpr auto buffer_size = 10;
static constexpr auto to_copy = "something";
static constexpr auto to_print = "async print!";

char buffer[buffer_size] = {0};
{
auto future = async_memcpy_print(buffer, std::string(to_copy).data(), buffer_size, to_print);

std::cout << "inside main" << std::endl;

future.h.resume();
wait(r);

std::cout << buffer << std::endl;
}

runtime_delete(r);

return 0;
}
144 changes: 144 additions & 0 deletions examples/basic_cpp/coroutine_helpers.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
// SPDX-License-Identifier: BSD-3-Clause
/* Copyright 2021, Intel Corporation */
// SPDX-License-Identifier: MIT
/* Copyright (c) Lewis Baker */

#include <coroutine>
#include <utility>
#include <string>
#include <atomic>
#include <functional>
#include <memory>

#ifndef MINIASYNC_COROUTINE_HELPERS
#define MINIASYNC_COROUTINE_HELPERS

/* Helper structures for coroutines, they are heavily inspired by
* https://github.com/lewissbaker/cppcoro
*/

/* This is a generic task which supports continuation. */
struct task {
struct promise_type {
struct final_awaitable
{
bool await_ready() const noexcept { return false; }
void await_resume() noexcept {}

std::coroutine_handle<> await_suspend(std::coroutine_handle<task::promise_type> h) noexcept {
auto &cont = h.promise().cont;
return cont ? cont : std::noop_coroutine();
}
};


task get_return_object() { return task{std::coroutine_handle<task::promise_type>::from_promise(*this)}; }
std::suspend_always initial_suspend() { return {}; }
auto final_suspend() noexcept { return final_awaitable{}; }
void return_void() {}
void unhandled_exception() {}

std::coroutine_handle<> cont;
};

void wait() {
h.resume();
}

bool await_ready() { return !h || h.done();}
std::coroutine_handle<> await_suspend(std::coroutine_handle<> aw) {
h.promise().cont = aw;
return h;
}
void await_resume() {}

std::coroutine_handle<task::promise_type> h;
};

namespace detail {
struct when_all_task {
struct promise_type {
std::atomic<int> *counter;
std::coroutine_handle<> continuation;

void start(std::atomic<int>& counter, std::coroutine_handle<> continuation)
{
this->counter = &counter;
this->continuation = continuation;

std::coroutine_handle<when_all_task::promise_type>::from_promise(*this).resume();
}

struct final_awaitable
{
bool await_ready() const noexcept { return false; }
void await_resume() noexcept {}

void await_suspend(std::coroutine_handle<when_all_task::promise_type> h) noexcept {
auto cnt = h.promise().counter->fetch_sub(1);
if (cnt - 1 == 0) {
h.promise().continuation.resume();
}
}
};

when_all_task get_return_object() { return when_all_task{std::coroutine_handle<when_all_task::promise_type>::from_promise(*this)}; }
std::suspend_always initial_suspend() { return {}; }
auto final_suspend() noexcept { return final_awaitable{}; }
void return_void() {}
void unhandled_exception() {}
};

void start(std::atomic<int>& counter, std::coroutine_handle<> continuation)
{
h.promise().start(counter, continuation);
}

std::coroutine_handle<when_all_task::promise_type> h;
};

template <typename Awaitable>
when_all_task make_when_all_task(Awaitable awaitable)
{
co_await awaitable;
}

template <typename Task>
struct when_all_ready_awaitable
{
when_all_ready_awaitable(std::vector<Task>&& tasks): counter(tasks.size()), tasks(std::move(tasks))
{
}

bool await_ready()
{
return false;
}

void await_suspend(std::coroutine_handle<> h)
{
for (auto&& task : tasks)
{
task.start(counter, h);
}
}

void await_resume() {}

std::atomic<int> counter = 0;
std::vector<Task> tasks;
};
}

template <typename... Awaitables>
auto when_all(Awaitables&&... awaitables)
{
std::vector<detail::when_all_task> tasks;

for (auto &&a : {awaitables...})
tasks.emplace_back(detail::make_when_all_task(std::move(a)));

return detail::when_all_ready_awaitable(std::move(tasks));
}

#endif MINIASYNC_COROUTINE_HELPERS

0 comments on commit bd2445a

Please sign in to comment.