Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add HTTP server-sent-events(SSE) example in brpc http server #2375

Merged
merged 1 commit into from
Oct 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/cn/http_service.md
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,8 @@ brpc server支持发送超大或无限长的body。方法如下:

3. 发送完毕后确保所有的`butil::intrusive_ptr<brpc::ProgressiveAttachment>`都析构以释放资源。

另外,利用该特性可以轻松实现Server-Sent Events(SSE)服务,从而使客户端能够通过 HTTP 连接从服务器自动接收更新。非常适合构建诸如chatGPT这类实时应用程序,应用例子详见[http_server.cpp](https://github.com/apache/brpc/blob/master/example/http_c++/http_server.cpp)中的HttpSSEServiceImpl。

# 持续接收

目前brpc server不支持在收齐http请求的header部分后就调用服务回调,即brpc server不适合接收超长或无限长的body。
Expand Down
2 changes: 2 additions & 0 deletions docs/en/http_service.md
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,8 @@ brpc server is capable of sending large or infinite sized body, in following ste

3. After usage, destruct all `butil::intrusive_ptr<brpc::ProgressiveAttachment>` to release related resources.

In addition, we can easily implement Server-Sent Events(SSE) with this feature, which enables a client to receive automatic updates from a server via a HTTP connection. SSE could be used to build real-time applications such as chatGPT. Please refer to HttpSSEServiceImpl in [http_server.cpp](https://github.com/apache/brpc/blob/master/example/http_c++/http_server.cpp) for more details.

# Progressive receiving

Currently brpc server doesn't support calling the service callback once header part in the http request is parsed. In other words, brpc server is not suitable for receiving large or infinite sized body.
Expand Down
4 changes: 4 additions & 0 deletions example/http_c++/http.proto
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,7 @@ service QueueService {
rpc stop(HttpRequest) returns (HttpResponse);
rpc getstats(HttpRequest) returns (HttpResponse);
};

service HttpSSEService {
rpc stream(HttpRequest) returns (HttpResponse);
};
56 changes: 56 additions & 0 deletions example/http_c++/http_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,56 @@ class QueueServiceImpl : public example::QueueService {
}
};

class HttpSSEServiceImpl : public HttpSSEService {
public:
HttpSSEServiceImpl() {}
virtual ~HttpSSEServiceImpl() {}

struct PredictJobArgs {
std::vector<uint32_t> input_ids;
butil::intrusive_ptr<brpc::ProgressiveAttachment> pa;
};

static void* Predict(void* raw_args) {
std::unique_ptr<PredictJobArgs> args(static_cast<PredictJobArgs*>(raw_args));
if (args->pa == NULL) {
LOG(ERROR) << "ProgressiveAttachment is NULL";
return NULL;
}
for (int i = 0; i < 100; ++i) {
char buf[48];
int len = snprintf(buf, sizeof(buf), "event: foo\ndata: Hello, world! (%d)\n\n", i);
args->pa->Write(buf, len);

// sleep a while to send another part.
bthread_usleep(10000 * 10);
}
return NULL;
}

void stream(google::protobuf::RpcController* cntl_base,
const HttpRequest*,
HttpResponse*,
google::protobuf::Closure* done) {
brpc::ClosureGuard done_guard(done);
brpc::Controller* cntl =
static_cast<brpc::Controller*>(cntl_base);

// Send the first SSE response
cntl->http_response().set_content_type("text/event-stream");
cntl->http_response().set_status_code(200);
cntl->http_response().SetHeader("Connection", "keep-alive");
cntl->http_response().SetHeader("Cache-Control", "no-cache");

// Send the generated words with progressiveAttachment
std::unique_ptr<PredictJobArgs> args(new PredictJobArgs);
args->pa = cntl->CreateProgressiveAttachment();
args->input_ids = {101, 102};
bthread_t th;
bthread_start_background(&th, NULL, Predict, args.release());
}
};

} // namespace example

int main(int argc, char* argv[]) {
Expand All @@ -163,6 +213,7 @@ int main(int argc, char* argv[]) {
example::HttpServiceImpl http_svc;
example::FileServiceImpl file_svc;
example::QueueServiceImpl queue_svc;
example::HttpSSEServiceImpl sse_svc;

// Add services into server. Notice the second parameter, because the
// service is put on stack, we don't want server to delete it, otherwise
Expand All @@ -185,6 +236,11 @@ int main(int argc, char* argv[]) {
LOG(ERROR) << "Fail to add queue_svc";
return -1;
}
if (server.AddService(&sse_svc,
brpc::SERVER_DOESNT_OWN_SERVICE) != 0) {
LOG(ERROR) << "Fail to add sse_svc";
return -1;
}

// Start the server.
brpc::ServerOptions options;
Expand Down