Skip to content

Commit

Permalink
Batch create and accept stream (#2754)
Browse files Browse the repository at this point in the history
* feat: batch create and accept stream

* fix protobuf 22.5 compilation error related to thread_local in MacOS

* refine style

* modify code based on the code review feedback and add more tests

modify code based on the code review feedback  and add more tests
  • Loading branch information
jenrryyou authored Oct 9, 2024
1 parent d94c88f commit 7e5ec4f
Show file tree
Hide file tree
Showing 16 changed files with 649 additions and 67 deletions.
16 changes: 8 additions & 8 deletions .github/workflows/ci-macos.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ jobs:
cd build
make -j ${{env.proc_num}}
compile-with-make-protobuf22:
compile-with-make-protobuf23:
runs-on: macos-latest # https://github.com/actions/runner-images

steps:
Expand All @@ -67,9 +67,9 @@ jobs:
run: |
brew install openssl gnu-getopt coreutils gflags leveldb
# abseil 20230125.3
curl -o abseil.rb https://raw.githubusercontent.com/Homebrew/homebrew-core/1e04597501b3096952608efcb13301119a830b35/Formula/abseil.rb
# protobuf 22.5
curl -o protobuf.rb https://raw.githubusercontent.com/Homebrew/homebrew-core/1e04597501b3096952608efcb13301119a830b35/Formula/protobuf.rb
curl -o abseil.rb https://raw.githubusercontent.com/Homebrew/homebrew-core/b85b8dbf23ad509f163677a88ac72268f31e9c4a/Formula/abseil.rb
# protobuf 23.3
curl -o protobuf.rb https://raw.githubusercontent.com/Homebrew/homebrew-core/b85b8dbf23ad509f163677a88ac72268f31e9c4a/Formula/protobuf.rb
HOMEBREW_NO_INSTALLED_DEPENDENTS_CHECK=1 brew install --formula --ignore-dependencies ./abseil.rb ./protobuf.rb
- name: config_brpc
Expand All @@ -82,7 +82,7 @@ jobs:
run: |
make -j ${{env.proc_num}}
compile-with-cmake-protobuf22:
compile-with-cmake-protobuf23:
runs-on: macos-latest

steps:
Expand All @@ -92,9 +92,9 @@ jobs:
run: |
brew install openssl gflags leveldb
# abseil 20230125.3
curl -o abseil.rb https://raw.githubusercontent.com/Homebrew/homebrew-core/1e04597501b3096952608efcb13301119a830b35/Formula/abseil.rb
# protobuf 22.5
curl -o protobuf.rb https://raw.githubusercontent.com/Homebrew/homebrew-core/1e04597501b3096952608efcb13301119a830b35/Formula/protobuf.rb
curl -o abseil.rb https://raw.githubusercontent.com/Homebrew/homebrew-core/b85b8dbf23ad509f163677a88ac72268f31e9c4a/Formula/abseil.rb
# protobuf 23.3
curl -o protobuf.rb https://raw.githubusercontent.com/Homebrew/homebrew-core/b85b8dbf23ad509f163677a88ac72268f31e9c4a/Formula/protobuf.rb
HOMEBREW_NO_INSTALLED_DEPENDENTS_CHECK=1 brew install --formula --ignore-dependencies ./abseil.rb ./protobuf.rb
- name: cmake
Expand Down
17 changes: 15 additions & 2 deletions docs/cn/streaming_rpc.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ Streaming RPC保证:

# 建立Stream

目前Stream都由Client端建立。Client先在本地创建一个Stream,再通过一次RPC(必须使用baidu_std协议)与指定的Service建立一个Stream,如果Service在收到请求之后选择接受这个Stream, 那在response返回Client后Stream就会建立成功。过程中的任何错误都把RPC标记为失败,同时也意味着Stream创建失败。用linux下建立连接的过程打比方,Client先创建[socket](http://linux.die.net/man/7/socket)(创建Stream),再调用[connect](http://linux.die.net/man/2/connect)尝试与远端建立连接(通过RPC建立Stream),远端[accept](http://linux.die.net/man/2/accept)后连接就建立了(service接受后创建成功)。
目前Stream都由Client端建立。Client先在本地创建一个或者多个Stream,再通过一次RPC(必须使用baidu_std协议)与指定的Service建立一个Stream,如果Service在收到请求之后选择接受这批Stream, 那在response返回Client后这批Stream就会建立成功。过程中的任何错误都把RPC标记为失败,同时也意味着Stream创建失败。用linux下建立连接的过程打比方,Client先创建[socket](http://linux.die.net/man/7/socket)(创建Stream),再调用[connect](http://linux.die.net/man/2/connect)尝试与远端建立连接(通过RPC建立Stream),远端[accept](http://linux.die.net/man/2/accept)后连接就建立了(service接受后创建成功)。

> 如果Client尝试向不支持Streaming RPC的老Server建立Stream,将总是失败。
Expand Down Expand Up @@ -58,18 +58,31 @@ struct StreamOptions
// NULL, the Stream will be created with default options
// Return 0 on success, -1 otherwise
int StreamCreate(StreamId* request_stream, Controller &cntl, const StreamOptions* options);

// [Called at the client side for creating multiple streams]
// Create streams at client-side along with the |cntl|, which will be connected
// when receiving the response with streams from server-side. If |options| is
// NULL, the stream will be created with default options
// Return 0 on success, -1 otherwise
int StreamCreate(StreamIds& request_streams, int request_stream_size, Controller& cntl, const StreamOptions* options);
```
# 接受Stream
如果client在RPC上附带了一个Stream, service在收到RPC后可以通过调用StreamAccept接受。接受后Server端对应产生的Stream存放在response_stream中,Server可通过这个Stream向Client发送数据。
如果client在RPC上附带了一个或者多个Stream, service在收到RPC后可以通过调用StreamAccept接受。接受后Server端对应产生的Stream存放在response_stream中,Server可通过这个Stream向Client发送数据。
```c++
// [Called at the server side]
// Accept the Stream. If client didn't create a Stream with the request
// (cntl.has_remote_stream() returns false), this method would fail.
// Return 0 on success, -1 otherwise.
int StreamAccept(StreamId* response_stream, Controller &cntl, const StreamOptions* options);
// [Called at the server side for accepting multiple streams]
// Accept the streams. If client didn't create streams with the request
// (cntl.has_remote_stream() returns false), this method would fail.
// Return 0 on success, -1 otherwise.
int StreamAccept(StreamIds& response_stream, Controller& cntl, const StreamOptions* options);
```

# 读取Stream
Expand Down
16 changes: 15 additions & 1 deletion docs/en/streaming_rpc.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ For examples please refer to [example/streaming_echo_c++](https://github.com/apa

# Create a Stream

Currently stream is established by the client only. A new Stream object is created in client and then is used to issue an RPC (through baidu_std protocol) to the specified service. The service could accept this stream by responding to the request without error, thus a Stream is created once the client receives the response successfully. Any error during this process fails the RPC and thus fails the Stream creation. Take the Linux environment as an example, the client creates a [socket](http://linux.die.net/man/7/socket) first (creates a Stream), and then tries to establish a connection with the remote side by [connect](http://linux.die.net/man/2/connect) (establish a Stream through RPC). Finally the stream has been created once the remote side [accept](http://linux.die.net/man/2/accept) the request.
Currently streams are established by the client only. The new Stream objects are created in client and then are used to issue an RPC (through baidu_std protocol) to the specified service. The service could accept these streams by responding to the request without error, thus the Streams are created once the client receives the response successfully. Any error during this process fails the RPC and thus fails the Stream creation. Take the Linux environment as an example, the client creates a [socket](http://linux.die.net/man/7/socket) first (creates a Stream), and then tries to establish a connection with the remote side by [connect](http://linux.die.net/man/2/connect) (establish a Stream through RPC). Finally the stream has been created once the remote side [accept](http://linux.die.net/man/2/accept) the request.

> If the client tries to establish a stream to a server that doesn't support streaming RPC, it will always return failure.
Expand Down Expand Up @@ -58,6 +58,14 @@ struct StreamOptions
// NULL, the Stream will be created with default options
// Return 0 on success, -1 otherwise
int StreamCreate(StreamId* request_stream, Controller &cntl, const StreamOptions* options);

// [Called at the client side for creating multiple streams]
// Create streams at client-side along with the |cntl|, which will be connected
// when receiving the response with streams from server-side. If |options| is
// NULL, the stream will be created with default options
// Return 0 on success, -1 otherwise
int StreamCreate(StreamIds& request_streams, int request_stream_size, Controller& cntl,
const StreamOptions* options);
```
# Accept a Stream
Expand All @@ -70,6 +78,12 @@ If a Stream is attached inside the request of an RPC, the service can accept the
// (cntl.has_remote_stream() returns false), this method would fail.
// Return 0 on success, -1 otherwise.
int StreamAccept(StreamId* response_stream, Controller &cntl, const StreamOptions* options);
// [Called at the server side for accepting multiple streams]
// Accept the streams. If client didn't create streams with the request
// (cntl.has_remote_stream() returns false), this method would fail.
// Return 0 on success, -1 otherwise.
int StreamAccept(StreamIds& response_stream, Controller& cntl, const StreamOptions* options);
```

# Read from a Stream
Expand Down
140 changes: 140 additions & 0 deletions example/streaming_batch_echo_c++/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

cmake_minimum_required(VERSION 2.8.10)
project(streaming_batch_echo_c++ C CXX)

option(LINK_SO "Whether examples are linked dynamically" OFF)

execute_process(
COMMAND bash -c "find ${PROJECT_SOURCE_DIR}/../.. -type d -regex \".*output/include$\" | head -n1 | xargs dirname | tr -d '\n'"
OUTPUT_VARIABLE OUTPUT_PATH
)

set(CMAKE_PREFIX_PATH ${OUTPUT_PATH})

include(FindThreads)
include(FindProtobuf)
protobuf_generate_cpp(PROTO_SRC PROTO_HEADER echo.proto)
# include PROTO_HEADER
include_directories(${CMAKE_CURRENT_BINARY_DIR})

# Search for libthrift* by best effort. If it is not found and brpc is
# compiled with thrift protocol enabled, a link error would be reported.
find_library(THRIFT_LIB NAMES thrift)
if (NOT THRIFT_LIB)
set(THRIFT_LIB "")
endif()

find_path(BRPC_INCLUDE_PATH NAMES brpc/server.h)
if(LINK_SO)
find_library(BRPC_LIB NAMES brpc)
else()
find_library(BRPC_LIB NAMES libbrpc.a brpc)
endif()
if((NOT BRPC_INCLUDE_PATH) OR (NOT BRPC_LIB))
message(FATAL_ERROR "Fail to find brpc")
endif()
include_directories(${BRPC_INCLUDE_PATH})

find_path(GFLAGS_INCLUDE_PATH gflags/gflags.h)
find_library(GFLAGS_LIBRARY NAMES gflags libgflags)
if((NOT GFLAGS_INCLUDE_PATH) OR (NOT GFLAGS_LIBRARY))
message(FATAL_ERROR "Fail to find gflags")
endif()
include_directories(${GFLAGS_INCLUDE_PATH})

execute_process(
COMMAND bash -c "grep \"namespace [_A-Za-z0-9]\\+ {\" ${GFLAGS_INCLUDE_PATH}/gflags/gflags_declare.h | head -1 | awk '{print $2}' | tr -d '\n'"
OUTPUT_VARIABLE GFLAGS_NS
)
if(${GFLAGS_NS} STREQUAL "GFLAGS_NAMESPACE")
execute_process(
COMMAND bash -c "grep \"#define GFLAGS_NAMESPACE [_A-Za-z0-9]\\+\" ${GFLAGS_INCLUDE_PATH}/gflags/gflags_declare.h | head -1 | awk '{print $3}' | tr -d '\n'"
OUTPUT_VARIABLE GFLAGS_NS
)
endif()
if(CMAKE_SYSTEM_NAME STREQUAL "Darwin")
include(CheckFunctionExists)
CHECK_FUNCTION_EXISTS(clock_gettime HAVE_CLOCK_GETTIME)
if(NOT HAVE_CLOCK_GETTIME)
set(DEFINE_CLOCK_GETTIME "-DNO_CLOCK_GETTIME_IN_MAC")
endif()
endif()

set(CMAKE_CPP_FLAGS "${DEFINE_CLOCK_GETTIME} -DGFLAGS_NS=${GFLAGS_NS}")
set(CMAKE_CXX_FLAGS "${CMAKE_CPP_FLAGS} -DNDEBUG -O2 -D__const__=__unused__ -pipe -W -Wall -Wno-unused-parameter -fPIC -fno-omit-frame-pointer")

if(CMAKE_VERSION VERSION_LESS "3.1.3")
if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
endif()
if(CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
endif()
else()
set(CMAKE_CXX_STANDARD 11)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
endif()

find_path(LEVELDB_INCLUDE_PATH NAMES leveldb/db.h)
find_library(LEVELDB_LIB NAMES leveldb)
if ((NOT LEVELDB_INCLUDE_PATH) OR (NOT LEVELDB_LIB))
message(FATAL_ERROR "Fail to find leveldb")
endif()
include_directories(${LEVELDB_INCLUDE_PATH})

if(CMAKE_SYSTEM_NAME STREQUAL "Darwin")
set(OPENSSL_ROOT_DIR
"/usr/local/opt/openssl" # Homebrew installed OpenSSL
)
endif()

find_package(OpenSSL)
include_directories(${OPENSSL_INCLUDE_DIR})

set(DYNAMIC_LIB
${CMAKE_THREAD_LIBS_INIT}
${GFLAGS_LIBRARY}
${PROTOBUF_LIBRARIES}
${LEVELDB_LIB}
${OPENSSL_CRYPTO_LIBRARY}
${OPENSSL_SSL_LIBRARY}
${THRIFT_LIB}
dl
)

if(CMAKE_SYSTEM_NAME STREQUAL "Darwin")
set(DYNAMIC_LIB ${DYNAMIC_LIB}
pthread
"-framework CoreFoundation"
"-framework CoreGraphics"
"-framework CoreData"
"-framework CoreText"
"-framework Security"
"-framework Foundation"
"-Wl,-U,_MallocExtension_ReleaseFreeMemory"
"-Wl,-U,_ProfilerStart"
"-Wl,-U,_ProfilerStop"
"-Wl,-U,__Z13GetStackTracePPvii")
endif()

add_executable(streaming_batch_echo_client client.cpp ${PROTO_SRC} ${PROTO_HEADER})
add_executable(streaming_batch_echo_server server.cpp ${PROTO_SRC} ${PROTO_HEADER})

target_link_libraries(streaming_batch_echo_client ${BRPC_LIB} ${DYNAMIC_LIB})
target_link_libraries(streaming_batch_echo_server ${BRPC_LIB} ${DYNAMIC_LIB})
118 changes: 118 additions & 0 deletions example/streaming_batch_echo_c++/client.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// A client sending requests to server in batch every 1 second.

#include <gflags/gflags.h>
#include <butil/logging.h>
#include <brpc/channel.h>
#include <brpc/stream.h>
#include "echo.pb.h"

DEFINE_bool(send_attachment, true, "Carry attachment along with requests");
DEFINE_string(connection_type, "", "Connection type. Available values: single, pooled, short");
DEFINE_string(server, "0.0.0.0:8001", "IP Address of server");
DEFINE_int32(timeout_ms, 100, "RPC timeout in milliseconds");
DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)");

class StreamClientReceiver : public brpc::StreamInputHandler {
public:
virtual int on_received_messages(brpc::StreamId id,
butil::IOBuf *const messages[],
size_t size) {
std::ostringstream os;
for (size_t i = 0; i < size; ++i) {
os << "msg[" << i << "]=" << *messages[i];
}
LOG(INFO) << "Received from Stream=" << id << ": " << os.str();
return 0;
}
virtual void on_idle_timeout(brpc::StreamId id) {
LOG(INFO) << "Stream=" << id << " has no data transmission for a while";
}
virtual void on_closed(brpc::StreamId id) {
LOG(INFO) << "Stream=" << id << " is closed";
}

virtual void on_finished(brpc::StreamId id, int32_t finish_code) {
LOG(INFO) << "Stream=" << id << " is finished, code " << finish_code;
}
};

int main(int argc, char* argv[]) {
// Parse gflags. We recommend you to use gflags as well.
GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);

// A Channel represents a communication line to a Server. Notice that
// Channel is thread-safe and can be shared by all threads in your program.
brpc::Channel channel;

// Initialize the channel, NULL means using default options.
brpc::ChannelOptions options;
options.protocol = brpc::PROTOCOL_BAIDU_STD;
options.connection_type = FLAGS_connection_type;
options.timeout_ms = FLAGS_timeout_ms/*milliseconds*/;
options.max_retry = FLAGS_max_retry;
if (channel.Init(FLAGS_server.c_str(), NULL) != 0) {
LOG(ERROR) << "Fail to initialize channel";
return -1;
}

// Normally, you should not call a Channel directly, but instead construct
// a stub Service wrapping it. stub can be shared by all threads as well.
example::EchoService_Stub stub(&channel);
StreamClientReceiver receiver;
brpc::Controller cntl;
brpc::StreamIds streams;
brpc::StreamOptions stream_options;
stream_options.handler = &receiver;
if (brpc::StreamCreate(streams, 3, cntl, &stream_options) != 0) {
LOG(ERROR) << "Fail to create stream";
return -1;
}
for(size_t i = 0; i < streams.size(); ++i) {
LOG(INFO) << "Created Stream=" << streams[i];
}
example::EchoRequest request;
example::EchoResponse response;
request.set_message("I'm a RPC to connect stream");
stub.Echo(&cntl, &request, &response, NULL);
if (cntl.Failed()) {
LOG(ERROR) << "Fail to connect stream, " << cntl.ErrorText();
return -1;
}

while (!brpc::IsAskedToQuit()) {
butil::IOBuf msg1;
msg1.append("abcdefghijklmnopqrstuvwxyz");
CHECK_EQ(0, brpc::StreamWrite(streams[0], msg1));
butil::IOBuf msg2;
msg2.append("0123456789");
CHECK_EQ(0, brpc::StreamWrite(streams[1], msg2));
sleep(1);
butil::IOBuf msg3;
msg3.append("hello world");
CHECK_EQ(0, brpc::StreamWrite(streams[2], msg3));
sleep(1);
}

CHECK_EQ(0, brpc::StreamClose(streams[0]));
CHECK_EQ(0, brpc::StreamClose(streams[1]));
CHECK_EQ(0, brpc::StreamClose(streams[2]));
LOG(INFO) << "EchoClient is going to quit";
return 0;
}
Loading

0 comments on commit 7e5ec4f

Please sign in to comment.