-
Notifications
You must be signed in to change notification settings - Fork 4.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
dubbo_proxy: Redefine DecoderFilter interface and add EncoderFilter support #7118
dubbo_proxy: Redefine DecoderFilter interface and add EncoderFilter support #7118
Conversation
@lizan the author requested a review from you, but do you prefer to have a non-senior maintainer read it first? @gengleilei drive-by comment: this meaty PR probably deserves more content in the description. Why is the DecoderFilter being redefined, and why is there now a need for an EncoderFilter? |
/assign @zyfjeff |
6a505aa
to
128e382
Compare
@jmarantz sorry, I've added some more detailed instructions on redefining DecoderFilter and supporting EncoderFilter. |
@lizan I'm sorry, recently a little busy, I will be review recently |
|
||
protected: | ||
ActiveMessage& parent_; | ||
const bool dual_filter_ : 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where use?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ActiveMessageDecoderFilter and ActiveMessagerEncodeFilter will use dual-filter.
@@ -32,21 +31,6 @@ Network::FilterStatus ConnectionManager::onData(Buffer::Instance& data, bool end | |||
|
|||
if (end_stream) { | |||
ENVOY_CONN_LOG(trace, "downstream half-closed", read_callbacks_->connection()); | |||
|
|||
// Downstream has closed. Unless we're waiting for an upstream connection to complete a oneway | |||
// request, close. The special case for oneway requests allows them to complete before the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why remove
Downstream half closed, does it need to wait for the response of the upstream?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry, the code was deleted by mistake while merging, I have restored it.
* If successful, the RpcInvocation removed from the buffer | ||
* | ||
* @param buffer the currently buffered dubbo data | ||
* @body_size the complete RpcInvocation size |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
* If successful, the RpcResult removed from the buffer | ||
* | ||
* @param buffer the currently buffered dubbo data | ||
* @body_size the complete RpcResult size |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
/** | ||
* Encoder filter interface. | ||
*/ | ||
class EncoderFilter : public StreamEncoder, public FilterBase { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add EncoderFilter unit tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
This pull request has been automatically marked as stale because it has not had activity in the last 7 days. It will be closed in 7 days if no further activity occurs. Please feel free to give a status update now, ping for review, or re-open when it's ready. Thank you for your contributions! |
128e382
to
223c493
Compare
/retest |
🤷♀️ nothing to rebuild. |
stopped_ = true; | ||
break; | ||
} | ||
decoder_->onData(request_buffer_, underflow); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why doesn't care about the return value for onData here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At present, when Decoder processes data, it either finishes processing or needs to wait until the data is entered, so there is no stop.
// continueDecoding()/continueEncoding() MUST be called if continued filter iteration is desired. | ||
StopIteration, | ||
// Continue iteration to remaining filters, but ignore any subsequent data or trailers. This | ||
// results in creating a header only request/response. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix comments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
size_t total_size = 0, size; | ||
// TODO(zyfjeff): Add format checker | ||
std::string dubbo_version = HessianUtils::peekString(buffer, &size); | ||
total_size = total_size + size; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
total_size += size;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
std::string method_name = HessianUtils::peekString(buffer, &size, total_size); | ||
total_size = total_size + size; | ||
|
||
ContextImpl* context_impl = static_cast<ContextImpl*>(context.get()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ASSERT(context)?
Why do you want to take raw pointer here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry,there is no need to convert, this part is left out when modify the code.
/wait |
const ResponseStatus status_; | ||
ResponseType encode(MessageMetadata& metadata, DubboProxy::Protocol& protocol, | ||
Buffer::Instance& buffer) const override { | ||
ASSERT(buffer.length() == 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't append data to a buffer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, at present, I haven't thought of any scenarios that require append data, and I can let go of this restriction if necessary.
@@ -145,7 +139,7 @@ void ConnectionManager::sendLocalReply(MessageMetadata& metadata, | |||
|
|||
Buffer::OwnedImpl buffer; | |||
const DubboFilters::DirectResponse::ResponseType result = | |||
response.encode(metadata, *protocol_, *deserializer_, buffer); | |||
response.encode(metadata, *protocol_, buffer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
try catch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
MessageMetadataSharedPtr metadata) { | ||
ASSERT(serializer_); | ||
|
||
ContextImpl* context_impl = static_cast<ContextImpl*>(context.get()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove static_cast
, don't use raw point here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
namespace NetworkFilters { | ||
namespace DubboProxy { | ||
|
||
struct ContextBase : public Context { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not use class? this class is not a simple POD type, also contains some virtual method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because it's simple, you don't need to actively add public declarations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think here should use the class, this is not a simple POD type. Can't go to easy access to the data members of the break of object-oriented rules
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
}; | ||
|
||
struct ContextImpl : public ContextBase { | ||
ContextImpl() : ContextBase() {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
=default
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -137,19 +141,22 @@ MethodRouteEntryImpl::~MethodRouteEntryImpl() {} | |||
|
|||
RouteConstSharedPtr MethodRouteEntryImpl::matches(const MessageMetadata& metadata, | |||
uint64_t random_value) const { | |||
if (metadata.hasHeaders() && !RouteEntryImplBase::headersMatch(metadata.headers())) { | |||
ASSERT(metadata.hasInvocationInfo()); | |||
const auto invocation = static_cast<const RpcInvocationImpl*>(&metadata.invocation_info()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why use a raw point?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
7c266f8
to
e84dda6
Compare
Please merge the master to resolve conflicts |
Router::RouteMatcherPtr matcher = | ||
Router::NamedRouteMatcherConfigFactory::getFactory(type).createRouteMatcher( | ||
config.route_config(), context); | ||
route_matcher_ = std::move(matcher); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
route_matcher_ = Router::NamedRouteMatcherConfigFactory::getFactory(type).createRouteMatcher( config.route_config(), context);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
SerializationType serialization_type) override; | ||
Network::FilterStatus messageEnd(MessageMetadataSharedPtr metadata) override; | ||
|
||
FilterStatus onMessageDecoded(MessageMetadataSharedPtr metadata, ContextSharedPtr ctx) override; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
const ContextSharedPtr?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
const cannot be used here because it changes the value of message_origin_buffer_ in the Context, as used in the onMessageDecoded function:
upstream_request_buffer_.move(ctx->message_origin_data(), ctx->message_size());
@@ -137,19 +141,22 @@ MethodRouteEntryImpl::~MethodRouteEntryImpl() {} | |||
|
|||
RouteConstSharedPtr MethodRouteEntryImpl::matches(const MessageMetadata& metadata, | |||
uint64_t random_value) const { | |||
if (metadata.hasHeaders() && !RouteEntryImplBase::headersMatch(metadata.headers())) { | |||
ASSERT(metadata.hasInvocationInfo()); | |||
const auto& invocation = static_cast<const RpcInvocationImpl&>(metadata.invocation_info()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove static_cast
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It cannot be deleted here because the static_cast is needed to complete the conversion of the base class reference to the derived class reference, which will fail to compile after deletion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Downcasting, Should use dynamic_cast
and judge whether the result is nullptr
after the cast, rather than use staic_cast
directly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -81,13 +83,15 @@ bool ParameterRouteEntryImpl::matchParameter(absl::string_view request_data, | |||
|
|||
RouteConstSharedPtr ParameterRouteEntryImpl::matches(const MessageMetadata& metadata, | |||
uint64_t random_value) const { | |||
if (!metadata.hasParameters()) { | |||
ASSERT(metadata.hasInvocationInfo()); | |||
const auto& invocation = static_cast<const RpcInvocationImpl&>(metadata.invocation_info()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove static_cast
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto
|
||
const std::string& fromType(RouteMatcherType type) const { | ||
const auto& itor = routeMatcherNameMap.find(type); | ||
if (itor != routeMatcherNameMap.end()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ASSERT(itor != routeMatcherNameMap.end())
return itor->second;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
const std::string& fromType(ProtocolType protocol_type, SerializationType type) const { | ||
const auto& itor = protocolSerializerTypeNameMap.find(generateKey(protocol_type, type)); | ||
if (itor != protocolSerializerTypeNameMap.end()) { | ||
return itor->second; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ASSERT(itor != protocolSerializerTypeNameMap.end())
return itor->second;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
const std::string& fromType(SerializationType type) const { | ||
const auto& itor = serializerTypeNameMap.find(type); | ||
if (itor != serializerTypeNameMap.end()) { | ||
return itor->second; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ASSERT(itor != serializerTypeNameMap.end())
return itor->second;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
const std::string& fromType(ProtocolType type) const { | ||
const auto& itor = protocolTypeNameMap.find(type); | ||
if (itor != protocolTypeNameMap.end()) { | ||
return itor->second; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ASSERT(itor != protocolTypeNameMap.end())
return itor->second;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
absl::optional<std::string> group_; | ||
ParameterValueMapPtr parameter_map_; | ||
HeaderMapPtr headers_; // attachment | ||
uint8_t serialization_type_{static_cast<uint8_t>(SerializationType::Hessian2)}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not use SerializationType
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be compatible with the internal HSF protocol, both HSF and Dubbo define SerializationType, but with different values.
|
||
protected: | ||
std::string service_name_; | ||
absl::optional<std::string> method_name_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove optional? The method name why will not exist?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
/ wait |
e84dda6
to
550ae32
Compare
/ wait |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, @lizan Passed the first review
Signed-off-by: leilei.gll <leilei.gll@alibaba-inc.com>
Signed-off-by: leilei.gll <leilei.gll@alibaba-inc.com>
Signed-off-by: leilei.gll <leilei.gll@alibaba-inc.com>
Signed-off-by: leilei.gll <leilei.gll@alibaba-inc.com>
Signed-off-by: leilei.gll <leilei.gll@alibaba-inc.com>
Signed-off-by: leilei.gll <leilei.gll@alibaba-inc.com>
88a08f1
to
a5ce22a
Compare
@lizan Please help to review, thanks. |
ASSERT(encoder_filter_action_ != nullptr); | ||
|
||
if (!local_response_sent_) { | ||
std::list<ActiveMessageEncoderFilterPtr>::iterator entry = commonEncodePrefix(filter, state); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
put this in the for statement below?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
Buffer::Instance& buffer) const override { | ||
ASSERT(buffer.length() == 0); | ||
|
||
ENVOY_LOG(debug, "err {}", what()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make the log message more descriptive?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
~RpcResultImpl() override = default; | ||
|
||
bool hasException() const override { return has_exception_; } | ||
void SetException(bool has_exception) { has_exception_ = has_exception; } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
setException
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
virtual ResponseStatus responseStatus() const PURE; | ||
virtual ~RpcResult() = default; | ||
virtual bool hasException() const PURE; | ||
virtual bool hasValue() const PURE; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where is this used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry,hasValue function is not used, I have deleted it.
Signed-off-by: leilei.gll <leilei.gll@alibaba-inc.com>
Description: Redefine DecoderFilter interface and add EncoderFilter support
Risk Level: low
Testing: unit test
Docs Changes: N/A
Release Notes: N/A
[Optional Fixes #Issue]
[Optional Deprecated:]
Redefining DecoderFilter is mainly for 2 reasons:
As mentioned above, we merged the internal and open source versions. EncoderFilter is currently needed internally, such as the retry mechanism. we will consider adding more features to Dubbo later, similar to HTTP, so EncoderFilter is also necessary. we want to bring more features to users around Dubbo.