Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support c++20 coroutine #2121

Merged
merged 7 commits into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
193 changes: 193 additions & 0 deletions docs/cn/coroutine.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
# C++20 协程支持

bRPC 支持 C++20 协程说明文档。

> 注:该功能是实验性的,请勿在生产环境下使用。

## 使用说明

### 适用场景

C++协程适用于极高并发的场景。由于bthread使用了mmap,存在系统限制,一个进程bthread数量一般最多到万级别,如果采用同步方式用一个bthread来处理一个请求,那么请求的并发度也只能到万级别。如果采用异步方式来写代码,可以达到更高的并发,但又会导致代码难以维护。这时我们就可以使用C++协程,以类似同步的方式来写代码,而达到异步的性能效果。

### 使用前提

1. 需要使用支持c++20的编译器,如gcc 11
2. 需要编译选项中加上 `-std=c++20`

### 简单示例

以下例子显示了如何在bRPC中启动一个C++20协程,在协程中发起 RPC调用,并等待返回结果。

```cpp
#include <brpc/channel.h>
#include <brpc/coroutine.h>

// 协程函数的返回类型,需要是brpc::experimental::Awaitable<T>
// T是函数返回的实际数据类型
brpc::experimental::Awaitable<int> RpcCall(brpc::Channel& channel) {
EchoRequest request;
EchoResponse response;
EchoService_Stub stub(&_channel);
brpc::Controller cntl;
brpc::experimental::AwaitableDone done;
stub.Echo(&cntl, &request, &response, &done);
// 等待RPC返回结果
co_await done.awaitable();
// 返回数据,注意这里用co_return而不是return
// 因为函数返回值类型是brpc::experimental::Awaitable<int>而不是int
co_return cntl.ErrorCode();
}

brpc::experimental::Awaitable<void> CoroutineMain(const char* server) {
brpc::Channel channel;
channel.Init(server, NULL);
// co_await会从Awaitable<int>得到int类型的返回值
int code = co_await RpcCall(channel);
printf("Rpc result:%d\n", code);
}

int main() {
// 启动协程
brpc::experimental::Coroutine coro(CoroutineMain("127.0.0.1:8080"));
// 等待协程执行完成
coro.join();
return 0;
}
```

更完整的例子可以查看源码中的`example/coroutine/coroutine_server.cpp`文件。

### 更多用法

1. 在非协程环境下等待一个协程执行完成:

```cpp
brpc::experimental::Coroutine coro(func(args));
coro.join();
```

2. 在非协程环境下等待协程完成并获取返回值:

```cpp
brpc::experimental::Coroutine coro(func(args)); // func的返回值类型为Awaitable<int>
int result = coro.join<int>();
```

3. 在协程环境下等待协程执行完成:

```cpp
brpc::experimental::Coroutine coro(func(args));
... // 做一些其它事情
co_await coro.awaitable();
```

4. 在协程环境下等待协程执行完成并获取返回值:

```cpp
brpc::experimental::Coroutine coro(func(args)); // func的返回值类型为Awaitable<int>
... // 做一些其它事情
int ret = co_await coro.awaitable<int>();
```

5. 在协程环境下sleep:
```cpp
co_await brpc::experimental::Coroutine::usleep(1000);
```

### 注意事项

1. 协程不保证一个函数的上下文都在同一个pthread或同一个bthread下执行。在co_await之后,代码所在的pthread或bthread可能发生变化,因此依赖于pthread或bthread的线程局部变量的代码(比如rpcz功能)将无法正确工作。
2. 不应在协程中使用阻塞bthread(如bthread_join、同步RPC)或阻塞pthread的函数,否则可能导致死锁或者长耗时。
3. 不要在不必要的地方使用协程,如下面的代码,虽然也能正常工作,但没有意义:

```cpp
brpc::experimental::Awaitable<int> inplace_func() {
co_return 123;
}
```

### 实现极致性能

如果确保服务的处理代码都运行在协程之中,并且没有任何阻塞bthread或阻塞pthread操作,则可以开启`usercode_in_coroutine`这个flag。开启这个flag之后,bRPC会简化服务端处理逻辑,减少不必要的bthread开销。在这种情况下,实际的工作线程数量将由event_dispatcher_num控制,而不再是由bthread worker数量控制。

## 实现原理

### C++20协程实现原理

为了方便理解,我们把上面的CoroutineMain函数稍微改写一下,把co_await前后的逻辑分成两部分:

```cpp
brpc::experimental::Awaitable<void> CoroutineMain(const char* server) {
brpc::Channel channel;
channel.Init(server, NULL);
brpc::experimental::Awaitable<int> awaitable = RpcCall(channel);

int code = co_await awaitable;
printf("Rpc result:%d\n", code);
}
```

上面的代码实际上是怎么执行的呢?当你使用co_await关键字的时候,编译器会把co_await后面的步骤转换成一个callback函数,把这个callback传给实际co_await的那个`Awaitable<T>`对象,比如上面的CoroutineMain函数,经过编译器转换后会变成大概如下的逻辑(简化版,实际要比这个复杂得多):

```cpp

brpc::experimental::Awaitable<void> CoroutineMain(const char* server) {
// 根据函数返回类型,找到Awaitable<void>的名为promise_type的子类
// 在函数的入口,创建一个promise_type类型的对象
auto promise = new brpc::experimental::Awaitable<void>::promise_type();
// 从promise对象中创建返回Awaitable对象
Awaitable<void> ret = promise->get_return_object();

// co_await之前的逻辑,保持不变
brpc::Channel channel;
channel.Init(server, NULL);
brpc::experimental::Awaitable<int> awaitable = RpcCall(channel);

// co_await的逻辑,转成一个await_suspend的函数调用,传入一个callback函数
awaitable.await_suspend([promise, &awaitable]() {
// co_await之后的逻辑,转移到callback函数中
int code = awaitable.await_resume();
printf("Rpc result:%d\n", code);
// 在final_suspend里面,会做一些唤醒调用者、资源释放的工作
promise->final_suspend();
delete promise;
})
// 返回Awaitable<void>对象,以便上层函数进行处理
return ret;
}
```

也就是说,co_await就是一个语法转换器,把看似同步的代码转化成异步调用的代码,仅此而已。至于Awaitable类和promise类的具体实现,编译器就不关心了,这是基础库需要做的。比如在brpc中封装了brpc::experimental::Awaitable类和promise子类,实现了await_suspend/await_resume等逻辑,使协程可以正确的工作起来。

### 原子等待操作

上面我们看到的是一个中间函数,它co_await一个子函数返回的Awaitable对象,然后自己也返回一个Awaitable对象。这样层层调用一定有一个尽头,即原子等待操作,它会返回Awaitable对象,但是它内部不再有co_await/co_return这样的语句了。目前实现了3种原子等待操作,未来可以扩展更多。

1. 等待RPC返回结果: `AwaitableDone::awaitable()`
2. 等待sleep: `Coroutine::usleep()`
3. 等待另一个协程完成: `Coroutine::awaitable()`

下面是一个原子等待操作的示例实现,我们需要手动创建一个promise对象,设置set_needs_suspend(),然后发起一个异步调用(如bthread_timer_add),在回调函数里设置好返回值、调用promise->on_done(),最后根据promise返回Awaitable对象即可。

```cpp
inline Awaitable<int> Coroutine::usleep(int sleep_us) {
auto promise = new detail::AwaitablePromise<int>();
promise->set_needs_suspend();
bthread_timer_t timer;
auto abstime = butil::microseconds_from_now(sleep_us);
auto cb = [](void* p) {
auto promise = static_cast<detail::AwaitablePromise<int>*>(p);
promise->set_value(0);
promise->on_done();
};
bthread_timer_add(&timer, abstime, cb, promise);
return Awaitable<int>(promise);
}
```

### 协程与多线程

上面我们可以看到,协程本质上就是一种callback,和线程没有直接关系。它可以是单线程的,也可以是多线程的,这完全取决于它的原子等待操作里是怎么调用callback的。在bRPC的环境里,callback有可能从另一个pthread或bthread发起,所以协程也是需要考虑多线程问题。比如,有可能在调用co_await语句之前,要等待的事情就已经结束了,对于这种情况co_await应该立即返回。

协程和线程可以一起使用,比如我们可以使用bthread将任务scale到多核,然后在任务内部的子任务用协程来实现异步化。
87 changes: 87 additions & 0 deletions example/coroutine/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

LINK_SO=1
NEED_GPERFTOOLS=0
BRPC_PATH=../..
include $(BRPC_PATH)/config.mk
# Notes on the flags:
# 1. Added -fno-omit-frame-pointer: perf/tcmalloc-profiler use frame pointers by default
CXXFLAGS+=$(CPPFLAGS) -std=c++20 -DNDEBUG -O2 -pipe -W -Wall -Wno-unused-parameter -fPIC -fno-omit-frame-pointer
#CXXFLAGS+= -fsanitize=address
#STATIC_LINKINGS+= -Wl,-Bstatic -lasan -Wl,-Bdynamic
ifeq ($(NEED_GPERFTOOLS), 1)
CXXFLAGS+=-DBRPC_ENABLE_CPU_PROFILER
endif
HDRS+=$(BRPC_PATH)/src
#HDRS+=$(BRPC_PATH)/output/include
LIBS+=$(BRPC_PATH)/output/lib

HDRPATHS=$(addprefix -I, $(HDRS))
LIBPATHS=$(addprefix -L, $(LIBS))
COMMA=,
SOPATHS=$(addprefix -Wl$(COMMA)-rpath$(COMMA), $(LIBS))

SERVER_SOURCES = coroutine_server.cpp
PROTOS = $(wildcard *.proto)

PROTO_OBJS = $(PROTOS:.proto=.pb.o)
PROTO_GENS = $(PROTOS:.proto=.pb.h) $(PROTOS:.proto=.pb.cc)
SERVER_OBJS = $(addsuffix .o, $(basename $(SERVER_SOURCES)))

ifeq ($(SYSTEM),Darwin)
ifneq ("$(LINK_SO)", "")
STATIC_LINKINGS += -lbrpc
else
# *.a must be explicitly specified in clang
STATIC_LINKINGS += $(BRPC_PATH)/output/lib/libbrpc.a
endif
LINK_OPTIONS_SO = $^ $(STATIC_LINKINGS) $(DYNAMIC_LINKINGS)
LINK_OPTIONS = $^ $(STATIC_LINKINGS) $(DYNAMIC_LINKINGS)
else ifeq ($(SYSTEM),Linux)
STATIC_LINKINGS += -lbrpc
LINK_OPTIONS_SO = -Xlinker "-(" $^ -Xlinker "-)" $(STATIC_LINKINGS) $(DYNAMIC_LINKINGS)
LINK_OPTIONS = -Xlinker "-(" $^ -Wl,-Bstatic $(STATIC_LINKINGS) -Wl,-Bdynamic -Xlinker "-)" $(DYNAMIC_LINKINGS)
endif

.PHONY:all
all: coroutine_server

.PHONY:clean
clean:
@echo "> Cleaning"
rm -rf coroutine_server $(PROTO_GENS) $(PROTO_OBJS) $(SERVER_OBJS)

coroutine_server:$(PROTO_OBJS) $(SERVER_OBJS)
@echo "> Linking $@"
ifneq ("$(LINK_SO)", "")
$(CXX) $(LIBPATHS) $(SOPATHS) $(LINK_OPTIONS_SO) -o $@
else
$(CXX) $(LIBPATHS) $(LINK_OPTIONS) -o $@
endif

%.pb.cc %.pb.h:%.proto
@echo "> Generating $@"
$(PROTOC) --cpp_out=. --proto_path=. $(PROTOC_EXTRA_ARGS) $<

%.o:%.cpp
@echo "> Compiling $@"
$(CXX) -c $(HDRPATHS) $(CXXFLAGS) $< -o $@

%.o:%.cc
@echo "> Compiling $@"
$(CXX) -c $(HDRPATHS) $(CXXFLAGS) $< -o $@
Loading
Loading