diff --git a/source/common/grpc/async_client_impl.cc b/source/common/grpc/async_client_impl.cc index b006659faf46..c55bf00955b7 100644 --- a/source/common/grpc/async_client_impl.cc +++ b/source/common/grpc/async_client_impl.cc @@ -102,7 +102,7 @@ AsyncStreamImpl::AsyncStreamImpl(AsyncClientImpl& parent, absl::string_view serv void AsyncStreamImpl::initialize(bool buffer_body_for_retry) { const auto thread_local_cluster = parent_.cm_.getThreadLocalCluster(parent_.remote_cluster_name_); if (thread_local_cluster == nullptr) { - onRemoteClose(Status::WellKnownGrpcStatus::Unavailable, "Cluster not available"); + notifyRemoteClose(Status::WellKnownGrpcStatus::Unavailable, "Cluster not available"); http_reset_ = true; return; } @@ -111,7 +111,7 @@ void AsyncStreamImpl::initialize(bool buffer_body_for_retry) { dispatcher_ = &http_async_client.dispatcher(); stream_ = http_async_client.start(*this, options_.setBufferBodyForRetry(buffer_body_for_retry)); if (stream_ == nullptr) { - onRemoteClose(Status::WellKnownGrpcStatus::Unavailable, EMPTY_STRING); + notifyRemoteClose(Status::WellKnownGrpcStatus::Unavailable, EMPTY_STRING); http_reset_ = true; return; } @@ -212,22 +212,22 @@ void AsyncStreamImpl::onTrailers(Http::ResponseTrailerMapPtr&& trailers) { if (!grpc_status) { grpc_status = Status::WellKnownGrpcStatus::Unknown; } - onRemoteClose(grpc_status.value(), grpc_message); + notifyRemoteClose(grpc_status.value(), grpc_message); cleanup(); } void AsyncStreamImpl::streamError(Status::GrpcStatus grpc_status, const std::string& message) { callbacks_.onReceiveTrailingMetadata(Http::ResponseTrailerMapImpl::create()); - onRemoteClose(grpc_status, message); + notifyRemoteClose(grpc_status, message); resetStream(); } -void AsyncStreamImpl::onRemoteClose(Grpc::Status::GrpcStatus status, const std::string& message) { - callbacks_.onRemoteClose(status, message); +void AsyncStreamImpl::notifyRemoteClose(Grpc::Status::GrpcStatus status, const std::string& message) { current_span_->setTag(Tracing::Tags::get().GrpcStatusCode, std::to_string(status)); if (status != Grpc::Status::WellKnownGrpcStatus::Ok) { current_span_->setTag(Tracing::Tags::get().Error, Tracing::Tags::get().True); } + callbacks_.onRemoteClose(status, message); current_span_->finishSpan(); } @@ -287,7 +287,11 @@ void AsyncRequestImpl::initialize(bool buffer_body_for_retry) { this->sendMessageRaw(std::move(request_), true); } -void AsyncRequestImpl::cancel() { this->resetStream(); } +void AsyncRequestImpl::cancel() { + AsyncStreamImpl::activeSpan().setTag(Tracing::Tags::get().Status, Tracing::Tags::get().Canceled); + AsyncStreamImpl::activeSpan().finishSpan(); + this->resetStream(); +} void AsyncRequestImpl::onCreateInitialMetadata(Http::RequestHeaderMap& metadata) { callbacks_.onCreateInitialMetadata(metadata); @@ -306,6 +310,7 @@ void AsyncRequestImpl::onRemoteClose(Grpc::Status::GrpcStatus status, const std: if (status != Grpc::Status::WellKnownGrpcStatus::Ok) { callbacks_.onFailure(status, message, AsyncStreamImpl::activeSpan()); } else if (response_ == nullptr) { + AsyncStreamImpl::activeSpan().setTag(Tracing::Tags::get().Error, Tracing::Tags::get().True); callbacks_.onFailure(Status::Internal, EMPTY_STRING, AsyncStreamImpl::activeSpan()); } else { callbacks_.onSuccessRaw(std::move(response_), AsyncStreamImpl::activeSpan()); diff --git a/source/common/grpc/async_client_impl.h b/source/common/grpc/async_client_impl.h index 52a4f99e5953..6c9fc360b00f 100644 --- a/source/common/grpc/async_client_impl.h +++ b/source/common/grpc/async_client_impl.h @@ -103,7 +103,8 @@ class AsyncStreamImpl : public RawAsyncStream, void trailerResponse(absl::optional grpc_status, const std::string& grpc_message); - void onRemoteClose(Grpc::Status::GrpcStatus status, const std::string& message); + // Deliver notification and update span when the connection closes. + void notifyRemoteClose(Grpc::Status::GrpcStatus status, const std::string& message); Event::Dispatcher* dispatcher_{}; Http::RequestMessagePtr headers_message_; @@ -145,7 +146,6 @@ class AsyncRequestImpl : public AsyncRequest, public AsyncStreamImpl, RawAsyncSt Buffer::InstancePtr request_; RawAsyncRequestCallbacks& callbacks_; - Tracing::SpanPtr current_span_; Buffer::InstancePtr response_; }; diff --git a/source/common/grpc/google_async_client_impl.cc b/source/common/grpc/google_async_client_impl.cc index 3684c03be8ae..3647afcf264d 100644 --- a/source/common/grpc/google_async_client_impl.cc +++ b/source/common/grpc/google_async_client_impl.cc @@ -169,9 +169,10 @@ GoogleAsyncStreamImpl::GoogleAsyncStreamImpl(GoogleAsyncClientImpl& parent, service_full_name_(service_full_name), method_name_(method_name), callbacks_(callbacks), options_(options), unused_stream_info_(Http::Protocol::Http2, dispatcher_.timeSource(), Network::ConnectionInfoProviderSharedPtr{}) { - parent_span.spawnChild(Tracing::EgressConfig::get(), - absl::StrCat("async ", service_full_name, ".", method_name, " egress"), - parent.timeSource().systemTime()); + current_span_ = + parent_span.spawnChild(Tracing::EgressConfig::get(), + absl::StrCat("async ", service_full_name, ".", method_name, " egress"), + parent.timeSource().systemTime()); current_span_->setTag(Tracing::Tags::get().UpstreamCluster, parent.stat_prefix_); current_span_->setTag(Tracing::Tags::get().UpstreamAddress, parent.target_uri_); current_span_->setTag(Tracing::Tags::get().Component, Tracing::Tags::get().Proxy); @@ -227,6 +228,11 @@ void GoogleAsyncStreamImpl::initialize(bool /*buffer_body_for_retry*/) { void GoogleAsyncStreamImpl::notifyRemoteClose(Status::GrpcStatus grpc_status, Http::ResponseTrailerMapPtr trailing_metadata, const std::string& message) { + current_span_->setTag(Tracing::Tags::get().GrpcStatusCode, std::to_string(grpc_status)); + if (grpc_status != Grpc::Status::WellKnownGrpcStatus::Ok) { + current_span_->setTag(Tracing::Tags::get().Error, Tracing::Tags::get().True); + } + current_span_->finishSpan(); if (grpc_status > Status::WellKnownGrpcStatus::MaximumKnown || grpc_status < 0) { ENVOY_LOG(error, "notifyRemoteClose invalid gRPC status code {}", grpc_status); // Set the grpc_status as InvalidCode but increment the Unknown stream to avoid out-of-range @@ -464,6 +470,8 @@ void GoogleAsyncRequestImpl::initialize(bool buffer_body_for_retry) { } void GoogleAsyncRequestImpl::cancel() { + GoogleAsyncStreamImpl::activeSpan().setTag(Tracing::Tags::get().Status, Tracing::Tags::get().Canceled); + GoogleAsyncStreamImpl::activeSpan().finishSpan(); resetStream(); } @@ -484,11 +492,12 @@ void GoogleAsyncRequestImpl::onRemoteClose(Grpc::Status::GrpcStatus status, const std::string& message) { if (status != Grpc::Status::WellKnownGrpcStatus::Ok) { - callbacks_.onFailure(status, message, Tracing::NullSpan::instance()); + callbacks_.onFailure(status, message, GoogleAsyncStreamImpl::activeSpan()); } else if (response_ == nullptr) { - callbacks_.onFailure(Status::Internal, EMPTY_STRING, Tracing::NullSpan::instance()); + GoogleAsyncStreamImpl::activeSpan().setTag(Tracing::Tags::get().Error, Tracing::Tags::get().True); + callbacks_.onFailure(Status::Internal, EMPTY_STRING, GoogleAsyncStreamImpl::activeSpan()); } else { - callbacks_.onSuccessRaw(std::move(response_), Tracing::NullSpan::instance()); + callbacks_.onSuccessRaw(std::move(response_), GoogleAsyncStreamImpl::activeSpan()); } } diff --git a/source/common/grpc/google_async_client_impl.h b/source/common/grpc/google_async_client_impl.h index 15f59d42588e..a7688d646d48 100644 --- a/source/common/grpc/google_async_client_impl.h +++ b/source/common/grpc/google_async_client_impl.h @@ -238,6 +238,13 @@ class GoogleAsyncStreamImpl : public RawAsyncStream, } const StreamInfo::StreamInfo& streamInfo() const override { return unused_stream_info_; } + Tracing::Span& activeSpan() { + if (current_span_ != nullptr) { + return *current_span_; + } + return Tracing::NullSpan::instance(); + } + protected: bool callFailed() const { return call_failed_; }