Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How do server create stream, and write message to client using stream #2863

Open
FredyVia opened this issue Jan 5, 2025 · 0 comments
Open

Comments

@FredyVia
Copy link

FredyVia commented Jan 5, 2025

modified from streaming_echo_c++ example
here is my client.cpp

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

// A client sending requests to server in batch every 1 second.

#include "echo.pb.h"
#include <brpc/channel.h>
#include <brpc/stream.h>
#include <butil/logging.h>
#include <gflags/gflags.h>
#include <glog/logging.h>

DEFINE_bool(send_attachment, true, "Carry attachment along with requests");
DEFINE_string(connection_type, "",
              "Connection type. Available values: single, pooled, short");
DEFINE_string(server, "0.0.0.0:8001", "IP Address of server");
DEFINE_int32(timeout_ms, 100, "RPC timeout in milliseconds");
DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)");

class StreamReceiver : public brpc::StreamInputHandler {
public:
  virtual int on_received_messages(brpc::StreamId id,
                                   butil::IOBuf *const messages[],
                                   size_t size) {
    for (size_t i = 0; i < size; ++i) {
      LOG(INFO) << "msg[" << i << "]=" << *messages[i];
    }
    return 0;
  }
  virtual void on_idle_timeout(brpc::StreamId id) {
    LOG(INFO) << "Stream=" << id << " has no data transmission for a while";
  }
  virtual void on_closed(brpc::StreamId id) {
    LOG(INFO) << "Stream=" << id << " is closed";
  }
};

int main(int argc, char *argv[]) {
  // Parse gflags. We recommend you to use gflags as well.
  google::ParseCommandLineFlags(&argc, &argv, true);

  // A Channel represents a communication line to a Server. Notice that
  // Channel is thread-safe and can be shared by all threads in your program.
  brpc::Channel channel;

  // Initialize the channel, NULL means using default options.
  brpc::ChannelOptions options;
  options.protocol = brpc::PROTOCOL_BAIDU_STD;
  options.connection_type = FLAGS_connection_type;
  options.timeout_ms = FLAGS_timeout_ms /*milliseconds*/;
  options.max_retry = FLAGS_max_retry;
  if (channel.Init(FLAGS_server.c_str(), &options) != 0) {
    LOG(ERROR) << "Fail to initialize channel";
    return -1;
  }

  // Normally, you should not call a Channel directly, but instead construct
  // a stub Service wrapping it. stub can be shared by all threads as well.
  example::FileService_Stub stub(&channel);
  brpc::Controller cntl;
  brpc::StreamId stream1, stream2;
  if (brpc::StreamCreate(&stream1, cntl, nullptr) != 0) {
    LOG(ERROR) << "Fail to create stream1";
    return -1;
  }
  LOG(INFO) << "Created Stream=" << stream1;
  example::PutRequest put_request;
  example::PutResponse put_response;
  put_request.set_message("I'm a RPC to connect stream1");
  stub.put(&cntl, &put_request, &put_response, NULL);
  if (cntl.Failed()) {
    LOG(ERROR) << "Fail to connect stream1, " << cntl.ErrorText();
    return -1;
  }

  for (int i = 0; i < 3; i++) {
    butil::IOBuf msg;
    msg.append("put: abcdefghijklmnopqrstuvwxyz");
    CHECK_EQ(0, brpc::StreamWrite(stream1, msg));
    sleep(1);
  }

  brpc::StreamClose(stream1);

  cntl.Reset();
  StreamReceiver *receiver = new StreamReceiver();
  brpc::StreamOptions stream_options;
  stream_options.handler = receiver;
  example::GetRequest get_request;
  example::GetResponse get_response;
  get_request.set_message("I'm a RPC to connect stream2");
  if (brpc::StreamCreate(&stream2, cntl, &stream_options) != 0) {
    LOG(ERROR) << "Fail to create stream";
  }

  stub.get(&cntl, &get_request, &get_response, NULL);
  LOG(INFO) << get_response.message();
  while (!brpc::IsAskedToQuit()) {
    LOG(INFO) << "waiting";
    sleep(1);
  }

  brpc::StreamClose(stream2);
  LOG(INFO) << "EchoClient is going to quit";
  return 0;
}

here is my server.cpp

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

// A server to receive EchoRequest and send back EchoResponse.

#include "echo.pb.h"
#include <brpc/server.h>
#include <brpc/stream.h>
#include <butil/logging.h>
#include <gflags/gflags.h>

DEFINE_bool(send_attachment, true, "Carry attachment along with response");
DEFINE_int32(port, 8001, "TCP Port of this server");
DEFINE_int32(idle_timeout_s, -1,
             "Connection will be closed if there is no "
             "read/write operations during the last `idle_timeout_s'");

class StreamReceiver : public brpc::StreamInputHandler {
public:
  virtual int on_received_messages(brpc::StreamId id,
                                   butil::IOBuf *const messages[],
                                   size_t size) {
    std::ostringstream os;
    for (size_t i = 0; i < size; ++i) {
      os << "msg[" << i << "]=" << *messages[i];
    }
    LOG(INFO) << "Received from Stream=" << id << ": " << os.str();
    return 0;
  }
  virtual void on_idle_timeout(brpc::StreamId id) {
    LOG(INFO) << "Stream=" << id << " has no data transmission for a while";
  }
  virtual void on_closed(brpc::StreamId id) {
    LOG(INFO) << "Stream=" << id << " is closed";
    brpc::StreamClose(id);
  }
};

// Your implementation of example::EchoService
class StreamingFileService : public example::FileService {
public:
  StreamingFileService() : stream_id(brpc::INVALID_STREAM_ID) {}
  virtual ~StreamingFileService() { brpc::StreamClose(stream_id); };
  virtual void get(google::protobuf::RpcController *controller,
                   const example::GetRequest *request,
                   example::GetResponse *response,
                   google::protobuf::Closure *done) {
    // This object helps you to call done->Run() in RAII style. If you need
    // to process the request asynchronously, pass done_guard.release().
    brpc::ClosureGuard done_guard(done);
    brpc::Controller *cntl = static_cast<brpc::Controller *>(controller);
    LOG(INFO) << request->message();

    if (brpc::StreamAccept(&stream_id, *cntl, nullptr) != 0) {
      cntl->SetFailed("Fail to accept stream");
      return;
    }
    LOG(INFO) << "accept stream success";
    for (int i = 0; i < 3; i++) {
      butil::IOBuf msg;
      msg.append("GET: ABCDEFGHIJKLMNOPQRSTUVWXYZ");
      LOG(INFO) << "writing";
      CHECK_EQ(0, brpc::StreamWrite(stream_id, msg));
      sleep(1);
    }
    response->set_message("Accepted stream");
  }

  virtual void put(google::protobuf::RpcController *controller,
                   const example::PutRequest *request,
                   example::PutResponse *response,
                   google::protobuf::Closure *done) {
    // This object helps you to call done->Run() in RAII style. If you need
    // to process the request asynchronously, pass done_guard.release().
    brpc::ClosureGuard done_guard(done);
    LOG(INFO) << request->message();

    brpc::Controller *cntl = static_cast<brpc::Controller *>(controller);
    brpc::StreamOptions stream_options;
    stream_options.handler = &_receiver;

    if (brpc::StreamAccept(&stream_id, *cntl, &stream_options) != 0) {
      cntl->SetFailed("Fail to accept stream");
      return;
    }
    response->set_message("Accepted stream");
  }

private:
  StreamReceiver _receiver;
  brpc::StreamId stream_id;
};

int main(int argc, char *argv[]) {
  // Parse gflags. We recommend you to use gflags as well.
  google::ParseCommandLineFlags(&argc, &argv, true);

  // Generally you only need one Server.
  brpc::Server server;

  // Instance of your service.
  StreamingFileService echo_service_impl;

  // Add the service into server. Notice the second parameter, because the
  // service is put on stack, we don't want server to delete it, otherwise
  // use brpc::SERVER_OWNS_SERVICE.
  if (server.AddService(&echo_service_impl, brpc::SERVER_DOESNT_OWN_SERVICE) !=
      0) {
    LOG(ERROR) << "Fail to add service";
    return -1;
  }

  // Start the server.
  brpc::ServerOptions options;
  options.idle_timeout_sec = FLAGS_idle_timeout_s;
  if (server.Start(FLAGS_port, &options) != 0) {
    LOG(ERROR) << "Fail to start EchoServer";
    return -1;
  }

  // Wait until Ctrl-C is pressed, then Stop() and Join() the server.
  server.RunUntilAskedToQuit();
  return 0;
}

here is my echo.proto

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

syntax="proto2";
package example;

option cc_generic_services = true;

message PutRequest {
      required string message = 1;
};

message PutResponse {
      required string message = 1;
};

message GetRequest {
      required string message = 1;
};

message GetResponse {
      required string message = 1;
};

service FileService {
      rpc put(PutRequest) returns (PutResponse);
      rpc get(GetRequest) returns (GetResponse);
};

stream1 is ok, but stream2 is always waiting for remote side;
here is my log
server side:

streaming_echo_c++ git:(master) ✗ ./build/streaming_echo_server
WARNING: Logging before InitGoogleLogging() is written to STDERR
I20250105 17:16:02.871757 123360058110400 server.cpp:1212] Server[StreamingFileService] is serving on port=8001.
I20250105 17:16:02.871828 123360058110400 server.cpp:1215] Check out http://ThinkCentre-K70:8001 in web browser.
I20250105 17:16:04.740287 123359568332480 server.cpp:90] I'm a RPC to connect stream1
I20250105 17:16:04.742151 123359882905280 server.cpp:41] Received from Stream=186: msg[0]=put: abcdefghijklmnopqrstuvwxyz
I20250105 17:16:05.742070 123359769659072 server.cpp:41] Received from Stream=186: msg[0]=put: abcdefghijklmnopqrstuvwxyz
I20250105 17:16:06.742141 123359568332480 server.cpp:41] Received from Stream=186: msg[0]=put: abcdefghijklmnopqrstuvwxyz
I20250105 17:16:07.742296 123359893391040 server.cpp:48] Stream=186 is closed
I20250105 17:16:07.742332 123359882905280 server.cpp:66] I'm a RPC to connect stream2
I20250105 17:16:07.742352 123359882905280 server.cpp:72] accept stream success
I20250105 17:16:07.742371 123359882905280 server.cpp:76] writing
I20250105 17:16:08.742444 123359882905280 server.cpp:76] writing
I20250105 17:16:09.742537 123359882905280 server.cpp:76] writing

client side:

WARNING: Logging before InitGoogleLogging() is written to STDERR
I20250105 17:16:04.731174 124031020396992 client.cpp:80] Created Stream=1
I20250105 17:16:07.842358 124030950573760 client.cpp:48] Stream=8589934593 is closed
I20250105 17:16:07.842363 124031020396992 client.cpp:111] 
I20250105 17:16:07.842392 124031020396992 client.cpp:113] waiting
I20250105 17:16:08.842461 124031020396992 client.cpp:113] waiting
I20250105 17:16:09.842549 124031020396992 client.cpp:113] waiting
I20250105 17:16:10.842635 124031020396992 client.cpp:113] waiting
I20250105 17:16:11.842722 124031020396992 client.cpp:113] waiting
I20250105 17:16:12.842808 124031020396992 client.cpp:113] waiting
I20250105 17:16:13.842920 124031020396992 client.cpp:113] waiting
I20250105 17:16:14.843019 124031020396992 client.cpp:113] waiting
I20250105 17:16:15.843068 124031020396992 client.cpp:113] waiting
^CI20250105 17:16:16.404974 124031020396992 client.cpp:118] EchoClient is going to quit
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant