diff --git a/docs/cn/http_service.md b/docs/cn/http_service.md index d266c2f5e7..a10af58d45 100644 --- a/docs/cn/http_service.md +++ b/docs/cn/http_service.md @@ -347,6 +347,8 @@ brpc server支持发送超大或无限长的body。方法如下: 3. 发送完毕后确保所有的`butil::intrusive_ptr`都析构以释放资源。 +另外,利用该特性可以轻松实现Server-Sent Events(SSE)服务,从而使客户端能够通过 HTTP 连接从服务器自动接收更新。S非常适合构建诸如chatGPT这类实时应用程序,应用例子详见[http_server.cpp](https://github.com/apache/brpc/blob/master/example/http_c++/http_server.cpp)中的HttpSSEServiceImpl。 + # 持续接收 目前brpc server不支持在收齐http请求的header部分后就调用服务回调,即brpc server不适合接收超长或无限长的body。 diff --git a/docs/en/http_service.md b/docs/en/http_service.md index d1f5c044b7..1dd3fa36cf 100644 --- a/docs/en/http_service.md +++ b/docs/en/http_service.md @@ -348,6 +348,8 @@ brpc server is capable of sending large or infinite sized body, in following ste 3. After usage, destruct all `butil::intrusive_ptr` to release related resources. +In addition, we can easily implement Server-Sent Events(SSE) with this feature, which enables a client to receive automatic updates from a server via an HTTP connection. SSE could be used to build real-time applications such as chatGPT. Please refer to HttpSSEServiceImpl in [http_server.cpp](https://github.com/apache/brpc/blob/master/example/http_c++/http_server.cpp) for more details. + # Progressive receiving Currently brpc server doesn't support calling the service callback once header part in the http request is parsed. In other words, brpc server is not suitable for receiving large or infinite sized body. diff --git a/example/http_c++/http.proto b/example/http_c++/http.proto index 9b5d2c0d18..8581294f46 100644 --- a/example/http_c++/http.proto +++ b/example/http_c++/http.proto @@ -37,3 +37,7 @@ service QueueService { rpc stop(HttpRequest) returns (HttpResponse); rpc getstats(HttpRequest) returns (HttpResponse); }; + +service HttpSSEService { + rpc stream(HttpRequest) returns (HttpResponse); +}; diff --git a/example/http_c++/http_server.cpp b/example/http_c++/http_server.cpp index af373ce806..9a1595b97f 100644 --- a/example/http_c++/http_server.cpp +++ b/example/http_c++/http_server.cpp @@ -151,6 +151,56 @@ class QueueServiceImpl : public example::QueueService { } }; +class HttpSSEServiceImpl : public HttpSSEService { +public: + HttpSSEServiceImpl() {} + virtual ~HttpSSEServiceImpl() {} + + struct PredictJobArgs { + std::vector input_ids; + butil::intrusive_ptr pa; + }; + + static void* Predict(void* raw_args) { + std::unique_ptr args(static_cast(raw_args)); + if (args->pa == NULL) { + LOG(ERROR) << "ProgressiveAttachment is NULL"; + return NULL; + } + for (int i = 0; i < 100; ++i) { + char buf[48]; + int len = snprintf(buf, sizeof(buf), "event: foo\ndata: Hello, world! (%d)\n\n", i); + args->pa->Write(buf, len); + + // sleep a while to send another part. + bthread_usleep(10000 * 10); + } + return NULL; + } + + void stream(google::protobuf::RpcController* cntl_base, + const HttpRequest*, + HttpResponse*, + google::protobuf::Closure* done) { + brpc::ClosureGuard done_guard(done); + brpc::Controller* cntl = + static_cast(cntl_base); + + // Send the first SSE response + cntl->http_response().set_content_type("text/event-stream"); + cntl->http_response().set_status_code(200); + cntl->http_response().SetHeader("Connection", "keep-alive"); + cntl->http_response().SetHeader("Cache-Control", "no-cache"); + + // Send the generated words with progressiveAttachment + std::unique_ptr args(new PredictJobArgs); + args->pa = cntl->CreateProgressiveAttachment(); + args->input_ids = {101, 102}; + bthread_t th; + bthread_start_background(&th, NULL, Predict, args.release()); + } +}; + } // namespace example int main(int argc, char* argv[]) { @@ -163,6 +213,7 @@ int main(int argc, char* argv[]) { example::HttpServiceImpl http_svc; example::FileServiceImpl file_svc; example::QueueServiceImpl queue_svc; + example::HttpSSEServiceImpl sse_svc; // Add services into server. Notice the second parameter, because the // service is put on stack, we don't want server to delete it, otherwise @@ -185,6 +236,11 @@ int main(int argc, char* argv[]) { LOG(ERROR) << "Fail to add queue_svc"; return -1; } + if (server.AddService(&sse_svc, + brpc::SERVER_DOESNT_OWN_SERVICE) != 0) { + LOG(ERROR) << "Fail to add sse_svc"; + return -1; + } // Start the server. brpc::ServerOptions options;