Skip to content

Commit

Permalink
Call on_failed before on_closed
Browse files Browse the repository at this point in the history
  • Loading branch information
chenBright committed Mar 11, 2024
1 parent dad8e92 commit 485ca10
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 287 deletions.
33 changes: 11 additions & 22 deletions src/brpc/stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,6 @@ DECLARE_int64(socket_max_streams_unconsumed_bytes);

const static butil::IOBuf *TIMEOUT_TASK = (butil::IOBuf*)-1L;

void StreamInputHandler::on_failed(StreamId id, int error_code,
const std::string& error_text) {
LOG(ERROR) << "`on_failed' should be override by user when "
"`split_closed_and_failed' returns true. "
"id=" << id << ", [" << error_code
<< "] " << error_text;
}

Stream::Stream()
: _host_socket(NULL)
, _fake_socket_weak_ref(NULL)
Expand Down Expand Up @@ -528,7 +520,7 @@ class MessageBatcher {
_total_length += buf->length();

}
size_t total_length() { return _total_length; }
size_t total_length() const { return _total_length; }
private:
butil::IOBuf** _storage;
size_t _cap;
Expand All @@ -548,19 +540,16 @@ int Stream::Consume(void *meta, bthread::TaskIterator<butil::IOBuf*>& iter) {
s->_host_socket = NULL;
}
if (s->_options.handler != NULL) {
if (s->_options.handler->split_closed_and_failed()) {
// Split closed and failed according to the error code.
int error_code;
std::string error_text;
{
BAIDU_SCOPED_LOCK(s->_connect_mutex);
error_code = s->_error_code;
error_text = s->_error_text;
}
if (error_code != 0) {
s->_options.handler->on_failed(s->id(), error_code, error_text);
return 0;
}
int error_code;
std::string error_text;
{
BAIDU_SCOPED_LOCK(s->_connect_mutex);
error_code = s->_error_code;
error_text = s->_error_text;
}
if (error_code != 0) {
// The stream is closed abnormally.
s->_options.handler->on_failed(s->id(), error_code, error_text);
}
s->_options.handler->on_closed(s->id());
}
Expand Down
13 changes: 3 additions & 10 deletions src/brpc/stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,11 @@ class StreamInputHandler {
butil::IOBuf *const messages[],
size_t size) = 0;
virtual void on_idle_timeout(StreamId id) = 0;
// 1. If `split_closed_and_failed` returns false(default),
// only `on_closed` will be called.
// 2. If `split_closed_and_failed` returns true,
// 2.1 `on_closed` will be called Whether `StreamClose`
// is called by local side or remote side.
// 2.2 `on_failed` will be called when the stream is
// closed abnormally.
virtual void on_closed(StreamId id) = 0;
// `on_failed` will be called before `on_closed`
// when the stream is closed abnormally.
virtual void on_failed(StreamId id, int error_code,
const std::string& error_text);

virtual bool split_closed_and_failed() const { return false; }
const std::string& error_text) {}
};

struct StreamOptions {
Expand Down
Loading

0 comments on commit 485ca10

Please sign in to comment.