Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable the "received byte count" feature. #2004

Merged
merged 14 commits into from
Jan 28, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ envoy_stream_intel PlatformBridgeFilter::streamIntel() {
RELEASE_ASSERT(decoder_callbacks_, "StreamInfo accessed before filter callbacks are set");
auto& info = decoder_callbacks_->streamInfo();
// FIXME: Stream handle cannot currently be set from the filter context.
envoy_stream_intel stream_intel{-1, -1, 0};
envoy_stream_intel stream_intel{-1, -1, 0, 0};
if (info.upstreamInfo()) {
stream_intel.connection_id = info.upstreamInfo()->upstreamConnectionId().value_or(-1);
}
Expand Down
11 changes: 7 additions & 4 deletions library/common/http/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ void Client::DirectStreamCallbacks::encodeHeaders(const ResponseHeaderMap& heade

ASSERT(http_client_.getStream(direct_stream_.stream_handle_,
GetStreamFilters::ALLOW_FOR_ALL_STREAMS));
direct_stream_.saveLatestStreamIntel();
direct_stream_.saveLatestStreamIntel(headerBytesReceived());
if (end_stream) {
closeStream();
}
Expand Down Expand Up @@ -84,7 +84,7 @@ void Client::DirectStreamCallbacks::encodeData(Buffer::Instance& data, bool end_

ASSERT(http_client_.getStream(direct_stream_.stream_handle_,
GetStreamFilters::ALLOW_FOR_ALL_STREAMS));
direct_stream_.saveLatestStreamIntel();
direct_stream_.saveLatestStreamIntel(bytesReceived());
if (end_stream) {
closeStream();
}
Expand Down Expand Up @@ -147,7 +147,7 @@ void Client::DirectStreamCallbacks::encodeTrailers(const ResponseTrailerMap& tra

ASSERT(http_client_.getStream(direct_stream_.stream_handle_,
GetStreamFilters::ALLOW_FOR_ALL_STREAMS));
direct_stream_.saveLatestStreamIntel();
direct_stream_.saveLatestStreamIntel(bytesReceived());
closeStream(); // Trailers always indicate the end of the stream.

// For explicit flow control, don't send data unless prompted.
Expand Down Expand Up @@ -291,20 +291,23 @@ envoy_final_stream_intel& Client::DirectStreamCallbacks::finalStreamIntel() {
return direct_stream_.envoy_final_stream_intel_;
}

void Client::DirectStream::saveLatestStreamIntel() {
void Client::DirectStream::saveLatestStreamIntel(uint64_t received_byte_count) {
const auto& info = request_decoder_->streamInfo();
if (info.upstreamInfo()) {
stream_intel_.connection_id = info.upstreamInfo()->upstreamConnectionId().value_or(-1);
}
stream_intel_.stream_id = static_cast<uint64_t>(stream_handle_);
stream_intel_.attempt_count = info.attemptCount().value_or(0);
stream_intel_.received_byte_count = received_byte_count;
}

void Client::DirectStream::saveFinalStreamIntel() {
if (!request_decoder_ || !parent_.getStream(stream_handle_, ALLOW_ONLY_FOR_OPEN_STREAMS)) {
return;
}
StreamInfo::setFinalStreamIntel(request_decoder_->streamInfo(), envoy_final_stream_intel_);
// stream_intel_ may have an outdated received_byte_count - the final one is correct.
stream_intel_.received_byte_count = envoy_final_stream_intel_.received_byte_count;
}

envoy_error Client::DirectStreamCallbacks::streamError() {
Expand Down
17 changes: 15 additions & 2 deletions library/common/http/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,19 @@ class Client : public Logger::Loggable<Logger::Id::http> {

private:
bool hasBufferedData() { return response_data_.get() && response_data_->length() != 0; }
const StreamInfo::StreamInfo& streamInfo() {
return direct_stream_.request_decoder_->streamInfo();
}
uint64_t headerBytesReceived() {
return streamInfo().getUpstreamBytesMeter()
? streamInfo().getUpstreamBytesMeter()->headerBytesReceived()
: 0;
}
uint64_t bytesReceived() {
return streamInfo().getUpstreamBytesMeter()
? streamInfo().getUpstreamBytesMeter()->wireBytesReceived()
: 0;
}

void sendDataToBridge(Buffer::Instance& data, bool end_stream);
void sendTrailersToBridge(const ResponseTrailerMap& trailers);
Expand Down Expand Up @@ -248,7 +261,7 @@ class Client : public Logger::Loggable<Logger::Id::http> {
}

// Latches stream information as it may not be available when accessed.
void saveLatestStreamIntel();
void saveLatestStreamIntel(uint64_t received_byte_count);

// Latches latency info from stream info before it goes away.
void saveFinalStreamIntel();
Expand Down Expand Up @@ -279,7 +292,7 @@ class Client : public Logger::Loggable<Logger::Id::http> {
// read faster than the mobile caller can process it.
bool explicit_flow_control_ = false;
// Latest intel data retrieved from the StreamInfo.
envoy_stream_intel stream_intel_{-1, -1, 0};
envoy_stream_intel stream_intel_{-1, -1, 0, 0};
envoy_final_stream_intel envoy_final_stream_intel_{-1, -1, -1, -1, -1, -1, -1,
-1, -1, -1, -1, 0, 0, 0};
StreamInfo::BytesMeterSharedPtr bytes_meter_;
Expand Down
3 changes: 2 additions & 1 deletion library/common/jni/jni_utility.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,13 @@ jbyteArray native_data_to_array(JNIEnv* env, envoy_data data) {
}

jlongArray native_stream_intel_to_array(JNIEnv* env, envoy_stream_intel stream_intel) {
jlongArray j_array = env->NewLongArray(3);
jlongArray j_array = env->NewLongArray(4);
jlong* critical_array = static_cast<jlong*>(env->GetPrimitiveArrayCritical(j_array, nullptr));
RELEASE_ASSERT(critical_array != nullptr, "unable to allocate memory in jni_utility");
critical_array[0] = static_cast<jlong>(stream_intel.stream_id);
critical_array[1] = static_cast<jlong>(stream_intel.connection_id);
critical_array[2] = static_cast<jlong>(stream_intel.attempt_count);
critical_array[3] = static_cast<jlong>(stream_intel.received_byte_count);
// Here '0' (for which there is no named constant) indicates we want to commit the changes back
// to the JVM and free the c array, where applicable.
env->ReleasePrimitiveArrayCritical(j_array, critical_array, 0);
Expand Down
5 changes: 5 additions & 0 deletions library/common/types/c_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,11 @@ typedef struct {
int64_t connection_id;
// The number of internal attempts to carry out a request/operation. 0 if not present.
uint64_t attempt_count;
// Mostly the number of bytes received from upstream. When this struct is sent through
// the onHeaders callback, its value is the size of the "trimmed" headers" - this does
// not include the status line. For all the other callbacks, it is truly what has been
// received up to now, not necessarily what has been processed by the callbacks.
uint64_t received_byte_count;
} envoy_stream_intel;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ class EnvoyStreamIntelImpl implements EnvoyStreamIntel {
private long streamId;
private long connectionId;
private long attemptCount;
private long receivedByteCount;

EnvoyStreamIntelImpl(long[] values) {
streamId = values[0];
connectionId = values[1];
attemptCount = values[2];
receivedByteCount = values[3];
}

@Override
Expand All @@ -27,4 +29,9 @@ public long getConnectionId() {
public long getAttemptCount() {
return attemptCount;
}

@Override
public long getReceivedByteCount() {
return receivedByteCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

/**
* Exposes internal HTTP stream metrics, context, and other details sent once on stream end.
*
* Note: a value of -1 means "not present" for any field where the name is suffixed with "Ms".
*/
public interface EnvoyFinalStreamIntel {
/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,8 @@ public interface EnvoyStreamIntel {
* The number of internal attempts to carry out a request/operation.
*/
public long getAttemptCount();
/*
* The number of bytes received from upstream.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update commments?

Copy link
Contributor Author

@carloseltuerto carloseltuerto Jan 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comments updated, and also using the same names everywhere. Thanks.

*/
public long getReceivedByteCount();
}
Loading