Skip to content

Commit

Permalink
fix: integration tests
Browse files Browse the repository at this point in the history
Signed-off-by: Fernando Cainelli <fernando.cainelli-external@getyourguide.com>
  • Loading branch information
cainelli committed Apr 16, 2024
1 parent afce8ee commit f57ce42
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 15 deletions.
19 changes: 12 additions & 7 deletions source/common/grpc/async_client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ AsyncStreamImpl::AsyncStreamImpl(AsyncClientImpl& parent, absl::string_view serv
void AsyncStreamImpl::initialize(bool buffer_body_for_retry) {
const auto thread_local_cluster = parent_.cm_.getThreadLocalCluster(parent_.remote_cluster_name_);
if (thread_local_cluster == nullptr) {
onRemoteClose(Status::WellKnownGrpcStatus::Unavailable, "Cluster not available");
notifyRemoteClose(Status::WellKnownGrpcStatus::Unavailable, "Cluster not available");
http_reset_ = true;
return;
}
Expand All @@ -111,7 +111,7 @@ void AsyncStreamImpl::initialize(bool buffer_body_for_retry) {
dispatcher_ = &http_async_client.dispatcher();
stream_ = http_async_client.start(*this, options_.setBufferBodyForRetry(buffer_body_for_retry));
if (stream_ == nullptr) {
onRemoteClose(Status::WellKnownGrpcStatus::Unavailable, EMPTY_STRING);
notifyRemoteClose(Status::WellKnownGrpcStatus::Unavailable, EMPTY_STRING);
http_reset_ = true;
return;
}
Expand Down Expand Up @@ -212,22 +212,22 @@ void AsyncStreamImpl::onTrailers(Http::ResponseTrailerMapPtr&& trailers) {
if (!grpc_status) {
grpc_status = Status::WellKnownGrpcStatus::Unknown;
}
onRemoteClose(grpc_status.value(), grpc_message);
notifyRemoteClose(grpc_status.value(), grpc_message);
cleanup();
}

void AsyncStreamImpl::streamError(Status::GrpcStatus grpc_status, const std::string& message) {
callbacks_.onReceiveTrailingMetadata(Http::ResponseTrailerMapImpl::create());
onRemoteClose(grpc_status, message);
notifyRemoteClose(grpc_status, message);
resetStream();
}

void AsyncStreamImpl::onRemoteClose(Grpc::Status::GrpcStatus status, const std::string& message) {
callbacks_.onRemoteClose(status, message);
void AsyncStreamImpl::notifyRemoteClose(Grpc::Status::GrpcStatus status, const std::string& message) {
current_span_->setTag(Tracing::Tags::get().GrpcStatusCode, std::to_string(status));
if (status != Grpc::Status::WellKnownGrpcStatus::Ok) {
current_span_->setTag(Tracing::Tags::get().Error, Tracing::Tags::get().True);
}
callbacks_.onRemoteClose(status, message);
current_span_->finishSpan();
}

Expand Down Expand Up @@ -287,7 +287,11 @@ void AsyncRequestImpl::initialize(bool buffer_body_for_retry) {
this->sendMessageRaw(std::move(request_), true);
}

void AsyncRequestImpl::cancel() { this->resetStream(); }
void AsyncRequestImpl::cancel() {
AsyncStreamImpl::activeSpan().setTag(Tracing::Tags::get().Status, Tracing::Tags::get().Canceled);
AsyncStreamImpl::activeSpan().finishSpan();
this->resetStream();
}

void AsyncRequestImpl::onCreateInitialMetadata(Http::RequestHeaderMap& metadata) {
callbacks_.onCreateInitialMetadata(metadata);
Expand All @@ -306,6 +310,7 @@ void AsyncRequestImpl::onRemoteClose(Grpc::Status::GrpcStatus status, const std:
if (status != Grpc::Status::WellKnownGrpcStatus::Ok) {
callbacks_.onFailure(status, message, AsyncStreamImpl::activeSpan());
} else if (response_ == nullptr) {
AsyncStreamImpl::activeSpan().setTag(Tracing::Tags::get().Error, Tracing::Tags::get().True);
callbacks_.onFailure(Status::Internal, EMPTY_STRING, AsyncStreamImpl::activeSpan());
} else {
callbacks_.onSuccessRaw(std::move(response_), AsyncStreamImpl::activeSpan());
Expand Down
4 changes: 2 additions & 2 deletions source/common/grpc/async_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ class AsyncStreamImpl : public RawAsyncStream,
void trailerResponse(absl::optional<Status::GrpcStatus> grpc_status,
const std::string& grpc_message);

void onRemoteClose(Grpc::Status::GrpcStatus status, const std::string& message);
// Deliver notification and update span when the connection closes.
void notifyRemoteClose(Grpc::Status::GrpcStatus status, const std::string& message);

Event::Dispatcher* dispatcher_{};
Http::RequestMessagePtr headers_message_;
Expand Down Expand Up @@ -145,7 +146,6 @@ class AsyncRequestImpl : public AsyncRequest, public AsyncStreamImpl, RawAsyncSt

Buffer::InstancePtr request_;
RawAsyncRequestCallbacks& callbacks_;
Tracing::SpanPtr current_span_;
Buffer::InstancePtr response_;
};

Expand Down
21 changes: 15 additions & 6 deletions source/common/grpc/google_async_client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,10 @@ GoogleAsyncStreamImpl::GoogleAsyncStreamImpl(GoogleAsyncClientImpl& parent,
service_full_name_(service_full_name), method_name_(method_name), callbacks_(callbacks),
options_(options), unused_stream_info_(Http::Protocol::Http2, dispatcher_.timeSource(),
Network::ConnectionInfoProviderSharedPtr{}) {
parent_span.spawnChild(Tracing::EgressConfig::get(),
absl::StrCat("async ", service_full_name, ".", method_name, " egress"),
parent.timeSource().systemTime());
current_span_ =
parent_span.spawnChild(Tracing::EgressConfig::get(),
absl::StrCat("async ", service_full_name, ".", method_name, " egress"),
parent.timeSource().systemTime());
current_span_->setTag(Tracing::Tags::get().UpstreamCluster, parent.stat_prefix_);
current_span_->setTag(Tracing::Tags::get().UpstreamAddress, parent.target_uri_);
current_span_->setTag(Tracing::Tags::get().Component, Tracing::Tags::get().Proxy);
Expand Down Expand Up @@ -227,6 +228,11 @@ void GoogleAsyncStreamImpl::initialize(bool /*buffer_body_for_retry*/) {
void GoogleAsyncStreamImpl::notifyRemoteClose(Status::GrpcStatus grpc_status,
Http::ResponseTrailerMapPtr trailing_metadata,
const std::string& message) {
current_span_->setTag(Tracing::Tags::get().GrpcStatusCode, std::to_string(grpc_status));
if (grpc_status != Grpc::Status::WellKnownGrpcStatus::Ok) {
current_span_->setTag(Tracing::Tags::get().Error, Tracing::Tags::get().True);
}
current_span_->finishSpan();
if (grpc_status > Status::WellKnownGrpcStatus::MaximumKnown || grpc_status < 0) {
ENVOY_LOG(error, "notifyRemoteClose invalid gRPC status code {}", grpc_status);
// Set the grpc_status as InvalidCode but increment the Unknown stream to avoid out-of-range
Expand Down Expand Up @@ -464,6 +470,8 @@ void GoogleAsyncRequestImpl::initialize(bool buffer_body_for_retry) {
}

void GoogleAsyncRequestImpl::cancel() {
GoogleAsyncStreamImpl::activeSpan().setTag(Tracing::Tags::get().Status, Tracing::Tags::get().Canceled);
GoogleAsyncStreamImpl::activeSpan().finishSpan();
resetStream();
}

Expand All @@ -484,11 +492,12 @@ void GoogleAsyncRequestImpl::onRemoteClose(Grpc::Status::GrpcStatus status,
const std::string& message) {

if (status != Grpc::Status::WellKnownGrpcStatus::Ok) {
callbacks_.onFailure(status, message, Tracing::NullSpan::instance());
callbacks_.onFailure(status, message, GoogleAsyncStreamImpl::activeSpan());
} else if (response_ == nullptr) {
callbacks_.onFailure(Status::Internal, EMPTY_STRING, Tracing::NullSpan::instance());
GoogleAsyncStreamImpl::activeSpan().setTag(Tracing::Tags::get().Error, Tracing::Tags::get().True);
callbacks_.onFailure(Status::Internal, EMPTY_STRING, GoogleAsyncStreamImpl::activeSpan());
} else {
callbacks_.onSuccessRaw(std::move(response_), Tracing::NullSpan::instance());
callbacks_.onSuccessRaw(std::move(response_), GoogleAsyncStreamImpl::activeSpan());
}
}

Expand Down
7 changes: 7 additions & 0 deletions source/common/grpc/google_async_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,13 @@ class GoogleAsyncStreamImpl : public RawAsyncStream,
}
const StreamInfo::StreamInfo& streamInfo() const override { return unused_stream_info_; }

Tracing::Span& activeSpan() {
if (current_span_ != nullptr) {
return *current_span_;
}
return Tracing::NullSpan::instance();
}

protected:
bool callFailed() const { return call_failed_; }

Expand Down

0 comments on commit f57ce42

Please sign in to comment.