Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

如何拓展lazy_io的范围 #93

Open
wangzhankun opened this issue Nov 11, 2023 · 4 comments
Open

如何拓展lazy_io的范围 #93

wangzhankun opened this issue Nov 11, 2023 · 4 comments

Comments

@wangzhankun
Copy link

wangzhankun commented Nov 11, 2023

我今天在学习io_uring时,发现其设计与我在用户态实现的异步IO几乎完全一致。进程间的异步IO是为了实现异步远程函数调用:

  1. 在两个进程之间通过共享内存的方式传输数据
  2. 共享内存被分为两个区域,A进程可读可写但B进程只读(类似io_uring的SQ);A进程可读但B进程可读可写(类似io_uring的CQ)
  3. 这两个队列是单生产者、单消费者模型,提供无锁接口,内部使用内存屏障做同步
  4. 当A进程发起RPC时,需要将参数以及要调用的函数ID写入到共享内存中,B进程在通过某种方法监测到有调用发生时,就会从共享内存中提取数据然后调用函数,之后将函数返回值写入到CQ
  5. A进程与B进程可以通过用户态中断(可以理解为signal机制,但是不需要Linux 内核参与,而是硬件实现的跨核中断)相互进行通知。(这样实现出来的就类似于io_uring的中断驱动机制)
  6. A进程和B进程也可以通过轮询的方法监测共享内存,类似于io_uring的轮询模式
  7. A进程异步,B进程轮询监测内存,类似于io_uring的内核轮询模式

请问我该如何适配co_context,时其支持我的设计方案呢?如果您方便的话,我想参考一下您的论文,如果不方便公开的话,可以发送到我的邮箱:bitwangzhankun@gmail.com

@Codesire-Deng
Copy link
Owner

Codesire-Deng commented Nov 11, 2023

你的设计方案包含一个完整的调度器,这是整个框架运行的核心逻辑。在 co_context 里,lazy_io 只负责告诉调度器“我想要等待什么”,调度器 io_context 的责任是监测结果和恢复协程。所以结论是:

  1. 你需要修改 io_context 的核心逻辑。它要能处理中断,或许还需要发起中断。(核心逻辑位于io_context::run() https://github.com/Codesire-Deng/co_context/blob/main/lib/co_context/io_context.cpp#L126-L151)
  2. 你需要实现类似 lazy_io 的 C++20 的 awaiter。以 RPC 中的 caller 为例,awaiter 负责暂停协程,写共享内存,令 io_context 发出请求并监测结果,等待被 io_context 恢复;协程恢复时,读取共享内存以获得 RPC 的返回值。

awaiter 的一个 demo 是 lazy::yield https://github.com/Codesire-Deng/co_context/blob/main/include/co_context/detail/lazy_io_awaiter.hpp#L868C14-L879 。它暂停了协程,但又立即令自己加入就绪队列。

@wangzhankun
Copy link
Author

我有没有可能在函数内部自定义调度呢?我看co_await的定义是会返回到caller:

task<> top1(){
    co_await top2();
}
task<> top2(){
    .... // 调度策略,如果接收到了用户态中断,选择被挂起的top3函数
    co_await top3();
}
task<> top3(){
}

我在top2中定义一下调度策略呢?或者说把调度策略作为函数添加到io_context中,然后在top2中调用。这样尽可能减少代码修改量。

@wangzhankun
Copy link
Author

想到了侵入比较小的方法:

  1. rpc caller运行在单独的io_context线程中
  2. 实例化io_context时在构造函数传入一个bool判定是否是rpc的线程,如果是则run使用rpc的调度策略

@Codesire-Deng
Copy link
Owner

Codesire-Deng commented Nov 11, 2023

不侵入的办法是有的,可以在 io_context 下再挂一个你的 scheduler。但这样会遇到一个问题:当你管理的所有协程都陷入暂停,io_context 发现就绪队列里面没有任何东西,io_uring 里也没有任何 IO,就会结束自己。但事实上稍后到来的中断会使一些协程加入到就绪队列,但 io_context 对此一无所知。

针对这个问题,一个办法是再启动一个 io_context,使它们不敢主动退出。

你可以试试运行这个 demo,留意 ready_queue 并非线程安全:

#include <co_context/all.hpp>
#include <queue>

using co_context::task;
using co_context::counting_semaphore;

struct scheduler {
    counting_semaphore ready_count{0};
    std::queue<std::coroutine_handle<>> ready_queue; // NOT thread-safe
    bool poison{false};                              // NOT thread-safe

    void stop() noexcept {
        poison = true;
        ready_count.release();
    }

    void post(std::coroutine_handle<> job_handle) {
        ready_queue.push(job_handle);
        ready_count.release();
    }

    void post(task<> &&job) {
        std::coroutine_handle<> job_handle = job.get_handle();
        job.detach(); // prevent ~task() from killing the coroutine
        post(job_handle);
    }

    task<> run() {
        while (!poison) {
            printf("scheduler::run(): acquire...\n");
            co_await ready_count.acquire();
            printf("scheduler::run(): task found\n");
            if (poison) { // double check the poison
                break;
            }
            std::coroutine_handle<> handle = ready_queue.front();
            ready_queue.pop();
            handle.resume();
        }
    }
};

struct my_awaiter {
    constexpr bool await_ready() noexcept { return false; }

    void await_suspend(std::coroutine_handle<> current) noexcept {
        printf("my_awaiter: suspend\n");
        // Do your job. When finished, re-post it anytime, anywhere.
        my_context->post(current);
    }

    void await_resume() const noexcept { printf("my_awaiter: resume\n"); }

    scheduler *my_context;
};

task<> my_job(scheduler &my_context) {
    printf("my_job: begin\n");
    co_await my_awaiter{&my_context};
    printf("my_job: end\n");
}

int main() {
    co_context::io_context ctx;
    co_context::io_context background;

    scheduler sch;
    sch.post(my_job(sch)); // post the first job

    ctx.co_spawn(sch.run()); // launch the scheduler
    ctx.start();
    background.start();
    ctx.join();
    return 0;
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants