diff --git a/docs/cn/coroutine.md b/docs/cn/coroutine.md new file mode 100644 index 0000000000..cdce5b9e0a --- /dev/null +++ b/docs/cn/coroutine.md @@ -0,0 +1,193 @@ +# C++20 协程支持 + +bRPC 支持 C++20 协程说明文档。 + +> 注:该功能是实验性的,请勿在生产环境下使用。 + +## 使用说明 + +### 适用场景 + +C++协程适用于极高并发的场景。由于bthread使用了mmap,存在系统限制,一个进程bthread数量一般最多到万级别,如果采用同步方式用一个bthread来处理一个请求,那么请求的并发度也只能到万级别。如果采用异步方式来写代码,可以达到更高的并发,但又会导致代码难以维护。这时我们就可以使用C++协程,以类似同步的方式来写代码,而达到异步的性能效果。 + +### 使用前提 + +1. 需要使用支持c++20的编译器,如gcc 11 +2. 需要编译选项中加上 `-std=c++20` + +### 简单示例 + +以下例子显示了如何在bRPC中启动一个C++20协程,在协程中发起 RPC调用,并等待返回结果。 + +```cpp +#include +#include + +// 协程函数的返回类型,需要是brpc::experimental::Awaitable +// T是函数返回的实际数据类型 +brpc::experimental::Awaitable RpcCall(brpc::Channel& channel) { + EchoRequest request; + EchoResponse response; + EchoService_Stub stub(&_channel); + brpc::Controller cntl; + brpc::experimental::AwaitableDone done; + stub.Echo(&cntl, &request, &response, &done); + // 等待RPC返回结果 + co_await done.awaitable(); + // 返回数据,注意这里用co_return而不是return + // 因为函数返回值类型是brpc::experimental::Awaitable而不是int + co_return cntl.ErrorCode(); +} + +brpc::experimental::Awaitable CoroutineMain(const char* server) { + brpc::Channel channel; + channel.Init(server, NULL); + // co_await会从Awaitable得到int类型的返回值 + int code = co_await RpcCall(channel); + printf("Rpc result:%d\n", code); +} + +int main() { + // 启动协程 + brpc::experimental::Coroutine coro(CoroutineMain("127.0.0.1:8080")); + // 等待协程执行完成 + coro.join(); + return 0; +} +``` + +更完整的例子可以查看源码中的`example/coroutine/coroutine_server.cpp`文件。 + +### 更多用法 + +1. 在非协程环境下等待一个协程执行完成: + +```cpp +brpc::experimental::Coroutine coro(func(args)); +coro.join(); +``` + +2. 在非协程环境下等待协程完成并获取返回值: + +```cpp +brpc::experimental::Coroutine coro(func(args)); // func的返回值类型为Awaitable +int result = coro.join(); +``` + +3. 在协程环境下等待协程执行完成: + +```cpp +brpc::experimental::Coroutine coro(func(args)); +... // 做一些其它事情 +co_await coro.awaitable(); +``` + +4. 在协程环境下等待协程执行完成并获取返回值: + +```cpp +brpc::experimental::Coroutine coro(func(args)); // func的返回值类型为Awaitable +... // 做一些其它事情 +int ret = co_await coro.awaitable(); +``` + +5. 在协程环境下sleep: +```cpp +co_await brpc::experimental::Coroutine::usleep(1000); +``` + +### 注意事项 + +1. 协程不保证一个函数的上下文都在同一个pthread或同一个bthread下执行。在co_await之后,代码所在的pthread或bthread可能发生变化,因此依赖于pthread或bthread的线程局部变量的代码(比如rpcz功能)将无法正确工作。 +2. 不应在协程中使用阻塞bthread(如bthread_join、同步RPC)或阻塞pthread的函数,否则可能导致死锁或者长耗时。 +3. 不要在不必要的地方使用协程,如下面的代码,虽然也能正常工作,但没有意义: + +```cpp +brpc::experimental::Awaitable inplace_func() { + co_return 123; +} +``` + +### 实现极致性能 + +如果确保服务的处理代码都运行在协程之中,并且没有任何阻塞bthread或阻塞pthread操作,则可以开启`usercode_in_coroutine`这个flag。开启这个flag之后,bRPC会简化服务端处理逻辑,减少不必要的bthread开销。在这种情况下,实际的工作线程数量将由event_dispatcher_num控制,而不再是由bthread worker数量控制。 + +## 实现原理 + +### C++20协程实现原理 + +为了方便理解,我们把上面的CoroutineMain函数稍微改写一下,把co_await前后的逻辑分成两部分: + +```cpp +brpc::experimental::Awaitable CoroutineMain(const char* server) { + brpc::Channel channel; + channel.Init(server, NULL); + brpc::experimental::Awaitable awaitable = RpcCall(channel); + + int code = co_await awaitable; + printf("Rpc result:%d\n", code); +} +``` + +上面的代码实际上是怎么执行的呢?当你使用co_await关键字的时候,编译器会把co_await后面的步骤转换成一个callback函数,把这个callback传给实际co_await的那个`Awaitable`对象,比如上面的CoroutineMain函数,经过编译器转换后会变成大概如下的逻辑(简化版,实际要比这个复杂得多): + +```cpp + +brpc::experimental::Awaitable CoroutineMain(const char* server) { + // 根据函数返回类型,找到Awaitable的名为promise_type的子类 + // 在函数的入口,创建一个promise_type类型的对象 + auto promise = new brpc::experimental::Awaitable::promise_type(); + // 从promise对象中创建返回Awaitable对象 + Awaitable ret = promise->get_return_object(); + + // co_await之前的逻辑,保持不变 + brpc::Channel channel; + channel.Init(server, NULL); + brpc::experimental::Awaitable awaitable = RpcCall(channel); + + // co_await的逻辑,转成一个await_suspend的函数调用,传入一个callback函数 + awaitable.await_suspend([promise, &awaitable]() { + // co_await之后的逻辑,转移到callback函数中 + int code = awaitable.await_resume(); + printf("Rpc result:%d\n", code); + // 在final_suspend里面,会做一些唤醒调用者、资源释放的工作 + promise->final_suspend(); + delete promise; + }) + // 返回Awaitable对象,以便上层函数进行处理 + return ret; +} +``` + +也就是说,co_await就是一个语法转换器,把看似同步的代码转化成异步调用的代码,仅此而已。至于Awaitable类和promise类的具体实现,编译器就不关心了,这是基础库需要做的。比如在brpc中封装了brpc::experimental::Awaitable类和promise子类,实现了await_suspend/await_resume等逻辑,使协程可以正确的工作起来。 + +### 原子等待操作 + +上面我们看到的是一个中间函数,它co_await一个子函数返回的Awaitable对象,然后自己也返回一个Awaitable对象。这样层层调用一定有一个尽头,即原子等待操作,它会返回Awaitable对象,但是它内部不再有co_await/co_return这样的语句了。目前实现了3种原子等待操作,未来可以扩展更多。 + +1. 等待RPC返回结果: `AwaitableDone::awaitable()` +2. 等待sleep: `Coroutine::usleep()` +3. 等待另一个协程完成: `Coroutine::awaitable()` + +下面是一个原子等待操作的示例实现,我们需要手动创建一个promise对象,设置set_needs_suspend(),然后发起一个异步调用(如bthread_timer_add),在回调函数里设置好返回值、调用promise->on_done(),最后根据promise返回Awaitable对象即可。 + +```cpp +inline Awaitable Coroutine::usleep(int sleep_us) { + auto promise = new detail::AwaitablePromise(); + promise->set_needs_suspend(); + bthread_timer_t timer; + auto abstime = butil::microseconds_from_now(sleep_us); + auto cb = [](void* p) { + auto promise = static_cast*>(p); + promise->set_value(0); + promise->on_done(); + }; + bthread_timer_add(&timer, abstime, cb, promise); + return Awaitable(promise); +} +``` + +### 协程与多线程 + +上面我们可以看到,协程本质上就是一种callback,和线程没有直接关系。它可以是单线程的,也可以是多线程的,这完全取决于它的原子等待操作里是怎么调用callback的。在bRPC的环境里,callback有可能从另一个pthread或bthread发起,所以协程也是需要考虑多线程问题。比如,有可能在调用co_await语句之前,要等待的事情就已经结束了,对于这种情况co_await应该立即返回。 + +协程和线程可以一起使用,比如我们可以使用bthread将任务scale到多核,然后在任务内部的子任务用协程来实现异步化。 \ No newline at end of file diff --git a/example/coroutine/Makefile b/example/coroutine/Makefile new file mode 100644 index 0000000000..2dd5631d26 --- /dev/null +++ b/example/coroutine/Makefile @@ -0,0 +1,87 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +LINK_SO=1 +NEED_GPERFTOOLS=0 +BRPC_PATH=../.. +include $(BRPC_PATH)/config.mk +# Notes on the flags: +# 1. Added -fno-omit-frame-pointer: perf/tcmalloc-profiler use frame pointers by default +CXXFLAGS+=$(CPPFLAGS) -std=c++20 -DNDEBUG -O2 -pipe -W -Wall -Wno-unused-parameter -fPIC -fno-omit-frame-pointer +#CXXFLAGS+= -fsanitize=address +#STATIC_LINKINGS+= -Wl,-Bstatic -lasan -Wl,-Bdynamic +ifeq ($(NEED_GPERFTOOLS), 1) + CXXFLAGS+=-DBRPC_ENABLE_CPU_PROFILER +endif +HDRS+=$(BRPC_PATH)/src +#HDRS+=$(BRPC_PATH)/output/include +LIBS+=$(BRPC_PATH)/output/lib + +HDRPATHS=$(addprefix -I, $(HDRS)) +LIBPATHS=$(addprefix -L, $(LIBS)) +COMMA=, +SOPATHS=$(addprefix -Wl$(COMMA)-rpath$(COMMA), $(LIBS)) + +SERVER_SOURCES = coroutine_server.cpp +PROTOS = $(wildcard *.proto) + +PROTO_OBJS = $(PROTOS:.proto=.pb.o) +PROTO_GENS = $(PROTOS:.proto=.pb.h) $(PROTOS:.proto=.pb.cc) +SERVER_OBJS = $(addsuffix .o, $(basename $(SERVER_SOURCES))) + +ifeq ($(SYSTEM),Darwin) + ifneq ("$(LINK_SO)", "") + STATIC_LINKINGS += -lbrpc + else + # *.a must be explicitly specified in clang + STATIC_LINKINGS += $(BRPC_PATH)/output/lib/libbrpc.a + endif + LINK_OPTIONS_SO = $^ $(STATIC_LINKINGS) $(DYNAMIC_LINKINGS) + LINK_OPTIONS = $^ $(STATIC_LINKINGS) $(DYNAMIC_LINKINGS) +else ifeq ($(SYSTEM),Linux) + STATIC_LINKINGS += -lbrpc + LINK_OPTIONS_SO = -Xlinker "-(" $^ -Xlinker "-)" $(STATIC_LINKINGS) $(DYNAMIC_LINKINGS) + LINK_OPTIONS = -Xlinker "-(" $^ -Wl,-Bstatic $(STATIC_LINKINGS) -Wl,-Bdynamic -Xlinker "-)" $(DYNAMIC_LINKINGS) +endif + +.PHONY:all +all: coroutine_server + +.PHONY:clean +clean: + @echo "> Cleaning" + rm -rf coroutine_server $(PROTO_GENS) $(PROTO_OBJS) $(SERVER_OBJS) + +coroutine_server:$(PROTO_OBJS) $(SERVER_OBJS) + @echo "> Linking $@" +ifneq ("$(LINK_SO)", "") + $(CXX) $(LIBPATHS) $(SOPATHS) $(LINK_OPTIONS_SO) -o $@ +else + $(CXX) $(LIBPATHS) $(LINK_OPTIONS) -o $@ +endif + +%.pb.cc %.pb.h:%.proto + @echo "> Generating $@" + $(PROTOC) --cpp_out=. --proto_path=. $(PROTOC_EXTRA_ARGS) $< + +%.o:%.cpp + @echo "> Compiling $@" + $(CXX) -c $(HDRPATHS) $(CXXFLAGS) $< -o $@ + +%.o:%.cc + @echo "> Compiling $@" + $(CXX) -c $(HDRPATHS) $(CXXFLAGS) $< -o $@ diff --git a/example/coroutine/coroutine_server.cpp b/example/coroutine/coroutine_server.cpp new file mode 100644 index 0000000000..9df50b04b6 --- /dev/null +++ b/example/coroutine/coroutine_server.cpp @@ -0,0 +1,145 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// A server to receive EchoRequest and send back EchoResponse. + +#include +#include +#include +#include +#include +#include "echo.pb.h" + +DEFINE_int32(port, 8000, "TCP Port of this server"); +DEFINE_int32(sleep_us, 1000000, "Server sleep us"); +DEFINE_bool(enable_coroutine, true, "Enable coroutine"); + +using brpc::experimental::Awaitable; +using brpc::experimental::AwaitableDone; +using brpc::experimental::Coroutine; + +namespace example { +class EchoServiceImpl : public EchoService { +public: + EchoServiceImpl() { + brpc::ChannelOptions options; + options.timeout_ms = FLAGS_sleep_us / 1000 * 2 + 100; + options.max_retry = 0; + CHECK(_channel.Init(butil::EndPoint(butil::IP_ANY, FLAGS_port), &options) == 0); + } + + virtual ~EchoServiceImpl() {} + + void Echo(google::protobuf::RpcController* cntl_base, + const EchoRequest* request, + EchoResponse* response, + google::protobuf::Closure* done) override { + // brpc::Controller* cntl = + // static_cast(cntl_base); + + if (FLAGS_enable_coroutine) { + Coroutine(EchoAsync(request, response, done), true); + } else { + brpc::ClosureGuard done_guard(done); + bthread_usleep(FLAGS_sleep_us); + response->set_message(request->message()); + } + } + + Awaitable EchoAsync(const EchoRequest* request, + EchoResponse* response, + google::protobuf::Closure* done) { + brpc::ClosureGuard done_guard(done); + co_await Coroutine::usleep(FLAGS_sleep_us); + response->set_message(request->message()); + } + + void Proxy(google::protobuf::RpcController* cntl_base, + const EchoRequest* request, + EchoResponse* response, + google::protobuf::Closure* done) override { + // brpc::Controller* cntl = + // static_cast(cntl_base); + + if (FLAGS_enable_coroutine) { + Coroutine(ProxyAsync(request, response, done), true); + } else { + brpc::ClosureGuard done_guard(done); + EchoService_Stub stub(&_channel); + brpc::Controller cntl; + stub.Echo(&cntl, request, response, NULL); + if (cntl.Failed()) { + response->set_message(cntl.ErrorText()); + } + } + } + + Awaitable ProxyAsync(const EchoRequest* request, + EchoResponse* response, + google::protobuf::Closure* done) { + brpc::ClosureGuard done_guard(done); + EchoService_Stub stub(&_channel); + brpc::Controller cntl; + AwaitableDone done2; + stub.Echo(&cntl, request, response, &done2); + co_await done2.awaitable(); + if (cntl.Failed()) { + response->set_message(cntl.ErrorText()); + } + } + +private: + brpc::Channel _channel; +}; +} // namespace example + +int main(int argc, char* argv[]) { + bthread_setconcurrency(BTHREAD_MIN_CONCURRENCY); + + // Parse gflags. We recommend you to use gflags as well. + GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true); + if (FLAGS_enable_coroutine) { + GFLAGS_NS::SetCommandLineOption("usercode_in_coroutine", "true"); + } + + // Generally you only need one Server. + brpc::Server server; + + // Instance of your service. + example::EchoServiceImpl echo_service_impl; + + // Add the service into server. Notice the second parameter, because the + // service is put on stack, we don't want server to delete it, otherwise + // use brpc::SERVER_OWNS_SERVICE. + if (server.AddService(&echo_service_impl, + brpc::SERVER_DOESNT_OWN_SERVICE) != 0) { + LOG(ERROR) << "Fail to add service"; + return -1; + } + + // Start the server. + brpc::ServerOptions options; + options.num_threads = BTHREAD_MIN_CONCURRENCY; + if (server.Start(FLAGS_port, &options) != 0) { + LOG(ERROR) << "Fail to start EchoServer"; + return -1; + } + + // Wait until Ctrl-C is pressed, then Stop() and Join() the server. + server.RunUntilAskedToQuit(); + return 0; +} \ No newline at end of file diff --git a/example/coroutine/echo.proto b/example/coroutine/echo.proto new file mode 100644 index 0000000000..ef5cc8ab77 --- /dev/null +++ b/example/coroutine/echo.proto @@ -0,0 +1,34 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +syntax="proto2"; +package example; + +option cc_generic_services = true; + +message EchoRequest { + required string message = 1; +}; + +message EchoResponse { + required string message = 1; +}; + +service EchoService { + rpc Echo(EchoRequest) returns (EchoResponse); + rpc Proxy(EchoRequest) returns (EchoResponse); +}; diff --git a/src/brpc/controller.cpp b/src/brpc/controller.cpp index f49a27a92f..bfe278ffb9 100644 --- a/src/brpc/controller.cpp +++ b/src/brpc/controller.cpp @@ -125,6 +125,7 @@ const Controller* GetSubControllerOfSelectiveChannel( const RPCSender* sender, int index); DECLARE_bool(usercode_in_pthread); +DECLARE_bool(usercode_in_coroutine); static const int MAX_RETRY_COUNT = 1000; static bvar::Adder* g_ncontroller = NULL; @@ -684,7 +685,7 @@ void Controller::OnVersionedRPCReturned(const CompletionInfo& info, } END_OF_RPC: - if (new_bthread) { + if (new_bthread && !FLAGS_usercode_in_coroutine) { // [ Essential for -usercode_in_pthread=true ] // When -usercode_in_pthread is on, the reserved threads (set by // -usercode_backup_threads) may all block on bthread_id_lock in diff --git a/src/brpc/coroutine.h b/src/brpc/coroutine.h new file mode 100644 index 0000000000..513f402280 --- /dev/null +++ b/src/brpc/coroutine.h @@ -0,0 +1,148 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef BRPC_COROUTINE_H +#define BRPC_COROUTINE_H + +#if __cplusplus >= 202002L + +#define BRPC_ENABLE_COROUTINE 1 + +#include +#include +#include +#include "brpc/callback.h" + +namespace brpc { +namespace experimental { + +namespace detail { +class AwaitablePromiseBase; +template +class AwaitablePromise; +} + +class AwaitableDone; +class Coroutine; + +// WARN:The bRPC coroutine feature is experimental, DO NOT use in production environment! + +// Awaitable is used as coroutine return type, for example: +// Awaitable func1() { +// co_return 42; +// } +// Awaitable func2() { +// int ret = co_await func1(); +// co_return std::to_string(ret); +// } +template +class Awaitable { +public: + using promise_type = detail::AwaitablePromise; + + ~Awaitable() {} + + // NOTE: compiler will generate calls to these functions automatically, + // DO NOT call them manually + bool await_ready(); + template + void await_suspend(std::coroutine_handle > awaiting); + T await_resume(); + +private: +friend class detail::AwaitablePromise; +friend class AwaitableDone; +friend class Coroutine; + + Awaitable() = delete; + Awaitable(promise_type* p) : _promise(p) {} + + promise_type* promise() { + return _promise; + } + + promise_type* _promise; +}; + +// Utility for a coroutine to wait for RPC call. Usage: +// AwaitableDone done; +// stub.CallMethod(&cntl, &req, &resp, &done); +// co_await done.awaitable(); +// +class AwaitableDone : public google::protobuf::Closure { +public: + AwaitableDone(); + + void Run() override; + + Awaitable& awaitable() { + return _awaitable; + } +private: + Awaitable _awaitable; +}; + +// Class for management of coroutine +// 1. To create a new coroutine and wait it finish: +// Awaitable func(double val); +// +// int main() { +// Coroutine coro(func(1.0)); +// coro.join(); +// } +// 2. To wait a coroutine in another coroutine: +// Awaitable another_func() { +// Coroutine coro(func(1.0)); +// co_await coro.awaitable(); +// } +// 3. To create a detached coroutine without waiting: +// Coroutine coro(func(1.0), true); +// 4. To sleep in a coroutine: +// co_await Coroutine::usleep(100); +// +// NOTE: Inside coroutine function, DO NOT call pthread-blocking or +// bthread-blocking functions (eg. bthread_join(), bthread_usleep(), syncronized RPC), +// otherwise may cause dead lock or long latency. +class Coroutine { +public: + template + Coroutine(Awaitable&& aw, bool detach = false); + + ~Coroutine(); + + template + T join(); + + template + Awaitable awaitable(); + + static Awaitable usleep(int sleep_us); + +private: + detail::AwaitablePromiseBase* _promise{nullptr}; + bool _waited{false}; + std::atomic* _butex{nullptr}; +}; + +} // namespace experimental +} // namespace brpc + +#include "brpc/coroutine_inl.h" + +#endif // __cplusplus >= 202002L + +#endif // BRPC_COROUTINE_H \ No newline at end of file diff --git a/src/brpc/coroutine_inl.h b/src/brpc/coroutine_inl.h new file mode 100644 index 0000000000..1ff400544d --- /dev/null +++ b/src/brpc/coroutine_inl.h @@ -0,0 +1,312 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef BRPC_COROUTINE_INL_H +#define BRPC_COROUTINE_INL_H + +#include "bthread/unstable.h" // bthread_timer_add +#include "bthread/butex.h" // butex_wake/butex_wait + +namespace brpc { +namespace experimental { + +namespace detail { + +class AwaitablePromiseBase { +public: + AwaitablePromiseBase() { + } + + virtual ~AwaitablePromiseBase() { + delete _suspended_or_done; + } + + virtual void resume() = 0; + virtual void destroy() = 0; + + bool needs_suspend() { + return _suspended_or_done != nullptr; + } + + void set_needs_suspend() { + _suspended_or_done = new std::atomic(); + _suspended_or_done->store(false); + } + + // For a Coroutine's leaf function + // Its caller will be suspended, waiting for its done. + // But the suspend and done are always in different threads. + // It may suspend before done, or done before suspend. + // So we use an atomic, after first suspend_or_done() it will become true. + // Then the second suspend_or_done(), exchange(true) will returns true. + // Then we can safely delete this. + void suspend_or_done() { + if (_suspended_or_done->exchange(true)) { + // Already suspend AND done + if (_caller) { + // The leaf function has finished, resume its caller. + _caller->resume(); + } + delete this; + } + } + + void on_suspend() { suspend_or_done(); } + void on_done() { suspend_or_done(); } + + void set_callback(std::function cb) { + _callback = cb; + } + + void set_caller(AwaitablePromiseBase* caller) { + _caller = caller; + } + + // When the coroutine function begins, initial_suspend() will be called + auto initial_suspend() { + // Always suspend the function, later resume() will make it start to run + return std::suspend_always{}; + } + + // When the coroutine function throws unhandled exception, unhandled_exception() will be called + void unhandled_exception() { + LOG(ERROR) << "Coroutine throws unhandled exception!"; + std::exit(1); + } + + // When the coroutine function ends, final_suspend() will be called + auto final_suspend() noexcept { + if (_caller) { + // The caller is waiting for this function to return + // Now it can be resumed + _caller->resume(); + } + if (_callback) { + _callback(); + _callback = nullptr; + } + // Returns suspend_never{} so that the coroutine will be destroyed and the AwaitablePromise be deleted. + // DO NOT call destroy() here, which will cause double destruct of RAII objects. + // DO NOT call delete this here, which will cause malloc and free not match. + return std::suspend_never{}; + } + +private: + // For a Coroutine's root function, it needs a callback to notify its waiter + std::function _callback; + // For a Coroutine's leaf function, it is always resumed from another thread. + // It needs an atomic variable to keep thread safety. + // Non-leaf function does't need this, so we defined it as an optional pointer. + std::atomic* _suspended_or_done{nullptr}; + // For a Coroutine's non-root function, it needs to resume its caller when it finished. + AwaitablePromiseBase* _caller{nullptr}; +}; + +template +class AwaitablePromise : public AwaitablePromiseBase { +public: + T value() { + return _value; + } + + void set_value(T value) { + _value = value; + } + + void resume() override { + _coro.resume(); + } + + void destroy() override { + _coro.destroy(); + } + + // When we call a coroutine function, an AwaitablePromise will be created. + // Then call its get_return_object() to return an Awaitable. + auto get_return_object() { + _coro = std::coroutine_handle::from_promise(*this); + return Awaitable(this); + } + + // When we call co_return in a function, return_value() will be called. + auto return_value(T v) { + _value = v; + return std::suspend_never{}; + } + +private: + T _value; + std::coroutine_handle _coro; +}; + +template <> +class AwaitablePromise : public AwaitablePromiseBase { +public: + void resume() override { + _coro.resume(); + } + + void destroy() override { + _coro.destroy(); + } + + // When we call a coroutine function, an AwaitablePromise will be created. + // Then call its get_return_object() to return an Awaitable. + auto get_return_object() { + _coro = std::coroutine_handle::from_promise(*this); + return Awaitable(this); + } + + // When we call return in a coroutine function, return_value() will be called. + auto return_value() { + return std::suspend_never{}; + } + +private: + std::coroutine_handle _coro; +}; + +} // namespace detail + +// When co_await an Awaitable, await_ready() will be called automatically. +template +inline bool Awaitable::await_ready() { + // Always returns false so that the caller will be suspended at the co_await point. + return false; +} + +// If await_ready returns false, await_suspend() will be called automatically. +template +template +inline void Awaitable::await_suspend(std::coroutine_handle > awaiting) { + _promise->set_caller(&awaiting.promise()); + if (_promise->needs_suspend()) { + _promise->on_suspend(); + return; + } + _promise->resume(); +} + +// When the caller resumes from co_await, await_resume() will be called to get return value +template +inline T Awaitable::await_resume() { + if constexpr (!std::is_same::value) { + return _promise->value(); + } +} + +inline AwaitableDone::AwaitableDone() + : _awaitable(new detail::AwaitablePromise) { + _awaitable.promise()->set_needs_suspend(); +} + +inline void AwaitableDone::Run() { + _awaitable.promise()->on_done(); +} + +template +inline Coroutine::Coroutine(Awaitable&& aw, bool detach) { + detail::AwaitablePromise* origin_promise = aw.promise(); + CHECK(origin_promise); + + if (!detach) { + // Create butex for join() + _butex = bthread::butex_create_checked >(); + _butex->store(0); + + // Create AwaitablePromise for awaitable() + _promise = new detail::AwaitablePromise(); + _promise->set_needs_suspend(); + + auto cb = [this, origin_promise]() { + if constexpr (!std::is_same::value) { + dynamic_cast*>(_promise)->set_value(origin_promise->value()); + } + // wakeup join() + _butex->store(1); + bthread::butex_wake(_butex); + + // wakeup co_await on awaitable() + _promise->on_done(); + }; + origin_promise->set_callback(cb); + } + + // Start to run the coroutine + origin_promise->resume(); +} + +inline Coroutine::~Coroutine() { + if (_promise != nullptr && !_waited) { + join(); + } + if (_butex) { + bthread::butex_destroy(_butex); + _butex = nullptr; + } +} + +template +inline T Coroutine::join() { + CHECK(_promise != nullptr) << "join() can not be called to detached coroutine!"; + CHECK(_waited == false) << "awaitable() or join() can only be called once!"; + _waited = true; + bthread::butex_wait(_butex, 0, nullptr); + if constexpr (!std::is_same::value) { + auto promise = dynamic_cast*>(_promise); + CHECK(promise != nullptr) << "join type not match"; + T ret = promise->value(); + _promise->on_suspend(); + return ret; + } else { + _promise->on_suspend(); + } +} + +template +inline Awaitable Coroutine::awaitable() { + CHECK(_promise != nullptr) << "awaitable() can not be called to detached coroutine!"; + CHECK(_waited == false) << "awaitable() or join() can only be called once!"; + auto promise = dynamic_cast*>(_promise); + CHECK(promise != nullptr) << "awaitable type not match"; + _waited = true; + return Awaitable(promise); +} + +// NOTE: the caller will be resumed on bthread timer thread, +// bthread only have one timer thread, this may be performance bottle-neck +inline Awaitable Coroutine::usleep(int sleep_us) { + auto promise = new detail::AwaitablePromise(); + promise->set_needs_suspend(); + bthread_timer_t timer; + auto abstime = butil::microseconds_from_now(sleep_us); + auto cb = [](void* p) { + auto promise = static_cast*>(p); + promise->set_value(0); + promise->on_done(); + }; + if (bthread_timer_add(&timer, abstime, cb, promise) != 0) { + promise->set_value(-1); + promise->on_done(); + } + return Awaitable(promise); +} + +} // namespace experimental +} // namespace brpc + +#endif // BRPC_COROUTINE_INL_H \ No newline at end of file diff --git a/src/brpc/event_dispatcher.cpp b/src/brpc/event_dispatcher.cpp index f747206a57..689e80da7a 100644 --- a/src/brpc/event_dispatcher.cpp +++ b/src/brpc/event_dispatcher.cpp @@ -33,6 +33,8 @@ DEFINE_int32(event_dispatcher_num, 1, "Number of event dispatcher"); DEFINE_bool(usercode_in_pthread, false, "Call user's callback in pthreads, use bthreads otherwise"); +DEFINE_bool(usercode_in_coroutine, false, + "User's callback are run in coroutine, no bthread or pthread blocking call"); static EventDispatcher* g_edisp = NULL; static pthread_once_t g_edisp_once = PTHREAD_ONCE_INIT; diff --git a/src/brpc/input_messenger.cpp b/src/brpc/input_messenger.cpp index e619af749e..3f740b1c15 100644 --- a/src/brpc/input_messenger.cpp +++ b/src/brpc/input_messenger.cpp @@ -66,6 +66,7 @@ DEFINE_int32(socket_keepalive_count, -1, "Set number of keepalives of sockets before close if this value is positive"); DECLARE_bool(usercode_in_pthread); +DECLARE_bool(usercode_in_coroutine); DECLARE_uint64(max_body_size); const size_t MSG_SIZE_WINDOW = 10; // Take last so many message into stat. @@ -195,7 +196,7 @@ static void QueueMessage(InputMessageBase* to_run_msg, BTHREAD_ATTR_NORMAL) | BTHREAD_NOSIGNAL; tmp.keytable_pool = keytable_pool; tmp.tag = bthread_self_tag(); - if (bthread_start_background( + if (!FLAGS_usercode_in_coroutine && bthread_start_background( &th, &tmp, ProcessInputMessage, to_run_msg) == 0) { ++*num_bthread_created; } else { diff --git a/src/brpc/socket.cpp b/src/brpc/socket.cpp index 2a391f3594..9248dd1883 100644 --- a/src/brpc/socket.cpp +++ b/src/brpc/socket.cpp @@ -100,6 +100,7 @@ DEFINE_int32(connect_timeout_as_unreachable, 3, "fails the main socket as well when this socket is pooled."); DECLARE_int32(health_check_timeout_ms); +DECLARE_bool(usercode_in_coroutine); static bool validate_connect_timeout_as_unreachable(const char*, int32_t v) { return v >= 2 && v < 1000/*large enough*/; @@ -2197,7 +2198,9 @@ int Socket::StartInputEvent(SocketId id, uint32_t events, bthread_attr_t attr = thread_attr; attr.keytable_pool = p->_keytable_pool; attr.tag = bthread_self_tag(); - if (bthread_start_urgent(&tid, &attr, ProcessEvent, p) != 0) { + if (FLAGS_usercode_in_coroutine) { + ProcessEvent(p); + } else if (bthread_start_urgent(&tid, &attr, ProcessEvent, p) != 0) { LOG(FATAL) << "Fail to start ProcessEvent"; ProcessEvent(p); } diff --git a/src/butil/containers/stack_container.h b/src/butil/containers/stack_container.h index 111e7e5e5f..5679ab8636 100644 --- a/src/butil/containers/stack_container.h +++ b/src/butil/containers/stack_container.h @@ -36,8 +36,14 @@ namespace butil { template class StackAllocator : public std::allocator { public: - typedef typename std::allocator::pointer pointer; - typedef typename std::allocator::size_type size_type; +#if __cplusplus >= 202002L + typedef typename std::allocator_traits > Allocator; +#else + typedef typename std::allocator Allocator; +#endif + + typedef typename Allocator::pointer pointer; + typedef typename Allocator::size_type size_type; // Backing store for the allocator. The container owner is responsible for // maintaining this for as long as any containers using this allocator are @@ -109,7 +115,12 @@ class StackAllocator : public std::allocator { source_->used_stack_buffer_ = true; return source_->stack_buffer(); } else { +#if __cplusplus >= 202002L + (void)hint; + return std::allocator::allocate(n); +#else return std::allocator::allocate(n, hint); +#endif } } diff --git a/src/butil/type_traits.h b/src/butil/type_traits.h index 4f67fccad0..5f342db3d9 100644 --- a/src/butil/type_traits.h +++ b/src/butil/type_traits.h @@ -92,7 +92,15 @@ template struct is_pointer : false_type {}; template struct is_pointer : true_type {}; #if defined(BUTIL_CXX11_ENABLED) + +#if __cplusplus >= 202002L +template struct is_pod +: integral_constant::value && + std::is_trivial::value)> {}; +#else template struct is_pod : std::is_pod {}; +#endif + #else // We can't get is_pod right without compiler help, so fail conservatively. // We will assume it's false except for arithmetic types, enumerations, diff --git a/test/brpc_coroutine_unittest.cpp b/test/brpc_coroutine_unittest.cpp new file mode 100644 index 0000000000..b89c1408cb --- /dev/null +++ b/test/brpc_coroutine_unittest.cpp @@ -0,0 +1,221 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include "brpc/server.h" +#include "brpc/channel.h" +#include "brpc/coroutine.h" +#include "echo.pb.h" + +int main(int argc, char* argv[]) { +#ifdef BRPC_ENABLE_COROUTINE + testing::InitGoogleTest(&argc, argv); + GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true); + return RUN_ALL_TESTS(); +#else + printf("bRPC coroutine is not enabled, please add -std=c++20 to compile options\n"); + return 0; +#endif +} + +#ifdef BRPC_ENABLE_COROUTINE + +using brpc::experimental::Awaitable; +using brpc::experimental::AwaitableDone; +using brpc::experimental::Coroutine; + +class Trace { +public: + Trace(const std::string& name) { + _name = name; + LOG(INFO) << "enter " << name; + } + ~Trace() { + LOG(INFO) << "exit " << _name; + } +private: + std::string _name; +}; + +class EchoServiceImpl : public test::EchoService { +public: + EchoServiceImpl() {} + virtual ~EchoServiceImpl() {} + virtual void Echo(google::protobuf::RpcController* cntl_base, + const test::EchoRequest* request, + test::EchoResponse* response, + google::protobuf::Closure* done) { + // brpc::Controller* cntl = (brpc::Controller*)cntl_base; + // brpc::ClosureGuard done_guard(done); + // response->set_message(request->message()); + + // Create a detached coroutine, so the current bthread will return at once. + Coroutine(EchoAsync(request, response, done), true); + } + + Awaitable EchoAsync(const test::EchoRequest* request, + test::EchoResponse* response, + google::protobuf::Closure* done) { + Trace t("EchoAsync"); + // This is important to test RAII object's destruction after coroutine finished + brpc::ClosureGuard done_guard(done); + if (request->has_sleep_us()) { + LOG(INFO) << "sleep " << request->sleep_us() << " us at server side"; + co_await Coroutine::usleep(request->sleep_us()); + } + response->set_message(request->message()); + } +}; + +class CoroutineTest : public ::testing::Test{ +protected: + CoroutineTest() {}; + virtual ~CoroutineTest(){}; + virtual void SetUp() {}; + virtual void TearDown() {}; +}; + + +static int delay_us = 0; + +Awaitable inplace_func(const std::string& input) { + Trace t("inplace_func"); + co_return input; +} + +Awaitable inplace_func2() { + Trace t("inplace_func2"); + co_await inplace_func("123"); + co_return 0.5; +} + +Awaitable sleep_func() { + Trace t("sleep_func"); + int64_t s = butil::monotonic_time_us(); + auto aw = Coroutine::usleep(1000); + usleep(delay_us); + co_await aw; + int cost = butil::monotonic_time_us() - s; + EXPECT_GE(cost, 1000); + LOG(INFO) << "after usleep:" << cost; + co_return 123; +} + +Awaitable exception_func() { + Trace t("exception_func"); + throw std::string("error"); +} + +Awaitable func(brpc::Channel& channel, int* out) { + Trace t("func"); + test::EchoService_Stub stub(&channel); + test::EchoRequest request; + request.set_message("hello world"); + test::EchoResponse response; + brpc::Controller cntl; + + LOG(INFO) << "before start coroutine"; + Coroutine coro(sleep_func()); + usleep(delay_us); + LOG(INFO) << "before wait coroutine"; + int ret = co_await coro.awaitable(); + EXPECT_EQ(ret, 123); + LOG(INFO) << "after wait coroutine, ret:" << ret; + + auto str = co_await inplace_func("hello"); + EXPECT_EQ("hello", str); + + float num = 0.0; + try { + num = co_await exception_func(); + } catch(std::string str) { + EXPECT_EQ("error", str); + num = 1.0; + } + EXPECT_EQ(1.0, num); + + AwaitableDone done; + LOG(INFO) << "start echo"; + stub.Echo(&cntl, &request, &response, &done); + LOG(INFO) << "after echo"; + usleep(delay_us); + co_await done.awaitable(); + LOG(INFO) << "after wait"; + EXPECT_FALSE(cntl.Failed()) << cntl.ErrorText(); + EXPECT_EQ("hello world", response.message()); + + cntl.Reset(); + request.set_sleep_us(2000); + AwaitableDone done2; + LOG(INFO) << "start echo2"; + int64_t s = butil::monotonic_time_us(); + stub.Echo(&cntl, &request, &response, &done2); + LOG(INFO) << "after echo2"; + co_await done2.awaitable(); + int cost = butil::monotonic_time_us() - s; + LOG(INFO) << "after wait2"; + EXPECT_GE(cost, 2000); + EXPECT_FALSE(cntl.Failed()) << cntl.ErrorText(); + EXPECT_EQ("hello world", response.message()); + + *out = 456; +} + +TEST_F(CoroutineTest, coroutine) { + butil::EndPoint ep; + ASSERT_EQ(0, str2endpoint("127.0.0.1:8613", &ep)); + + brpc::Server server; + EchoServiceImpl service; + server.AddService(&service, brpc::SERVER_DOESNT_OWN_SERVICE); + ASSERT_EQ(0, server.Start(ep, NULL)); + + brpc::Channel channel; + brpc::ChannelOptions options; + ASSERT_EQ(0, channel.Init(ep, &options)); + + int out = 0; + Coroutine coro(func(channel, &out)); + coro.join(); + ASSERT_EQ(456, out); + + out = 0; + delay_us = 10000; + Coroutine coro2(func(channel, &out)); + coro2.join(); + ASSERT_EQ(456, out); + delay_us = 0; + + Coroutine coro3(inplace_func2()); + double d = coro3.join(); + ASSERT_EQ(0.5, d); + + Coroutine coro4(inplace_func("abc")); + coro4.join(); + + Coroutine coro5(sleep_func()); + coro5.join(); + + Coroutine coro6(inplace_func2(), true); + Coroutine coro7(inplace_func("abc"), true); + Coroutine coro8(sleep_func(), true); + usleep(10000); // wait sleep_func() to complete + + LOG(INFO) << "test case finished"; +} + +#endif // BRPC_ENABLE_COROUTINE \ No newline at end of file