Skip to content

Commit

Permalink
Merge pull request #2375 from thorneliu/master
Browse files Browse the repository at this point in the history
add HTTP server-sent-events(SSE) example in brpc http server
  • Loading branch information
Huixxi authored Oct 3, 2023
2 parents 28e44a0 + 283ac46 commit aa0f052
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 0 deletions.
2 changes: 2 additions & 0 deletions docs/cn/http_service.md
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,8 @@ brpc server支持发送超大或无限长的body。方法如下:

3. 发送完毕后确保所有的`butil::intrusive_ptr<brpc::ProgressiveAttachment>`都析构以释放资源。

另外,利用该特性可以轻松实现Server-Sent Events(SSE)服务,从而使客户端能够通过 HTTP 连接从服务器自动接收更新。非常适合构建诸如chatGPT这类实时应用程序,应用例子详见[http_server.cpp](https://github.com/apache/brpc/blob/master/example/http_c++/http_server.cpp)中的HttpSSEServiceImpl。

# 持续接收

目前brpc server不支持在收齐http请求的header部分后就调用服务回调,即brpc server不适合接收超长或无限长的body。
Expand Down
2 changes: 2 additions & 0 deletions docs/en/http_service.md
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,8 @@ brpc server is capable of sending large or infinite sized body, in following ste

3. After usage, destruct all `butil::intrusive_ptr<brpc::ProgressiveAttachment>` to release related resources.

In addition, we can easily implement Server-Sent Events(SSE) with this feature, which enables a client to receive automatic updates from a server via a HTTP connection. SSE could be used to build real-time applications such as chatGPT. Please refer to HttpSSEServiceImpl in [http_server.cpp](https://github.com/apache/brpc/blob/master/example/http_c++/http_server.cpp) for more details.

# Progressive receiving

Currently brpc server doesn't support calling the service callback once header part in the http request is parsed. In other words, brpc server is not suitable for receiving large or infinite sized body.
Expand Down
4 changes: 4 additions & 0 deletions example/http_c++/http.proto
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,7 @@ service QueueService {
rpc stop(HttpRequest) returns (HttpResponse);
rpc getstats(HttpRequest) returns (HttpResponse);
};

service HttpSSEService {
rpc stream(HttpRequest) returns (HttpResponse);
};
56 changes: 56 additions & 0 deletions example/http_c++/http_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,56 @@ class QueueServiceImpl : public example::QueueService {
}
};

class HttpSSEServiceImpl : public HttpSSEService {
public:
HttpSSEServiceImpl() {}
virtual ~HttpSSEServiceImpl() {}

struct PredictJobArgs {
std::vector<uint32_t> input_ids;
butil::intrusive_ptr<brpc::ProgressiveAttachment> pa;
};

static void* Predict(void* raw_args) {
std::unique_ptr<PredictJobArgs> args(static_cast<PredictJobArgs*>(raw_args));
if (args->pa == NULL) {
LOG(ERROR) << "ProgressiveAttachment is NULL";
return NULL;
}
for (int i = 0; i < 100; ++i) {
char buf[48];
int len = snprintf(buf, sizeof(buf), "event: foo\ndata: Hello, world! (%d)\n\n", i);
args->pa->Write(buf, len);

// sleep a while to send another part.
bthread_usleep(10000 * 10);
}
return NULL;
}

void stream(google::protobuf::RpcController* cntl_base,
const HttpRequest*,
HttpResponse*,
google::protobuf::Closure* done) {
brpc::ClosureGuard done_guard(done);
brpc::Controller* cntl =
static_cast<brpc::Controller*>(cntl_base);

// Send the first SSE response
cntl->http_response().set_content_type("text/event-stream");
cntl->http_response().set_status_code(200);
cntl->http_response().SetHeader("Connection", "keep-alive");
cntl->http_response().SetHeader("Cache-Control", "no-cache");

// Send the generated words with progressiveAttachment
std::unique_ptr<PredictJobArgs> args(new PredictJobArgs);
args->pa = cntl->CreateProgressiveAttachment();
args->input_ids = {101, 102};
bthread_t th;
bthread_start_background(&th, NULL, Predict, args.release());
}
};

} // namespace example

int main(int argc, char* argv[]) {
Expand All @@ -163,6 +213,7 @@ int main(int argc, char* argv[]) {
example::HttpServiceImpl http_svc;
example::FileServiceImpl file_svc;
example::QueueServiceImpl queue_svc;
example::HttpSSEServiceImpl sse_svc;

// Add services into server. Notice the second parameter, because the
// service is put on stack, we don't want server to delete it, otherwise
Expand All @@ -185,6 +236,11 @@ int main(int argc, char* argv[]) {
LOG(ERROR) << "Fail to add queue_svc";
return -1;
}
if (server.AddService(&sse_svc,
brpc::SERVER_DOESNT_OWN_SERVICE) != 0) {
LOG(ERROR) << "Fail to add sse_svc";
return -1;
}

// Start the server.
brpc::ServerOptions options;
Expand Down

0 comments on commit aa0f052

Please sign in to comment.