Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable the "received byte count" feature. #2004

Merged
merged 14 commits into from
Jan 28, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ envoy_stream_intel PlatformBridgeFilter::streamIntel() {
RELEASE_ASSERT(decoder_callbacks_, "StreamInfo accessed before filter callbacks are set");
auto& info = decoder_callbacks_->streamInfo();
// FIXME: Stream handle cannot currently be set from the filter context.
envoy_stream_intel stream_intel{-1, -1, 0};
envoy_stream_intel stream_intel{-1, -1, 0, 0};
if (info.upstreamInfo()) {
stream_intel.connection_id = info.upstreamInfo()->upstreamConnectionId().value_or(-1);
}
Expand Down
30 changes: 18 additions & 12 deletions library/common/http/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,23 +41,27 @@ void Client::DirectStreamCallbacks::encodeHeaders(const ResponseHeaderMap& heade

ASSERT(http_client_.getStream(direct_stream_.stream_handle_,
GetStreamFilters::ALLOW_FOR_ALL_STREAMS));
direct_stream_.saveLatestStreamIntel();
if (end_stream) {
closeStream();
}

// Capture some metadata before potentially closing the stream.
absl::string_view alpn = "";
uint64_t response_status = Utility::getResponseStatus(headers);
if (direct_stream_.request_decoder_ &&
direct_stream_.request_decoder_->streamInfo().upstreamInfo() &&
direct_stream_.request_decoder_->streamInfo().upstreamInfo()->upstreamSslConnection()) {
alpn = direct_stream_.request_decoder_->streamInfo()
.upstreamInfo()
->upstreamSslConnection()
->alpn();
if (direct_stream_.request_decoder_) {
direct_stream_.saveLatestStreamIntel();
const auto& info = direct_stream_.request_decoder_->streamInfo();
// Set the initial number of bytes consumed for the non terminal callbacks.
direct_stream_.stream_intel_.consumed_bytes_from_response =
info.getUpstreamBytesMeter() ? info.getUpstreamBytesMeter()->headerBytesReceived() : 0;
// Capture the alpn if available.
if (info.upstreamInfo() && info.upstreamInfo()->upstreamSslConnection()) {
alpn = info.upstreamInfo()->upstreamSslConnection()->alpn();
}
}

if (end_stream) {
closeStream();
}

// Track success for later bookkeeping (stream could still be reset).
uint64_t response_status = Utility::getResponseStatus(headers);
success_ = CodeUtility::is2xx(response_status);

ENVOY_LOG(debug, "[S{}] dispatching to platform response headers for stream (end_stream={}):\n{}",
Expand Down Expand Up @@ -123,6 +127,8 @@ void Client::DirectStreamCallbacks::sendDataToBridge(Buffer::Instance& data, boo

// Cap by bytes_to_send_ if and only if applying explicit flow control.
uint32_t bytes_to_send = calculateBytesToSend(data, bytes_to_send_);
// Update the number of bytes consumed by this non terminal callback.
direct_stream_.stream_intel_.consumed_bytes_from_response += bytes_to_send;
// Only send end stream if all data is being sent.
bool send_end_stream = end_stream && (bytes_to_send == data.length());

Expand Down
2 changes: 1 addition & 1 deletion library/common/http/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ class Client : public Logger::Loggable<Logger::Id::http> {
// read faster than the mobile caller can process it.
bool explicit_flow_control_ = false;
// Latest intel data retrieved from the StreamInfo.
envoy_stream_intel stream_intel_{-1, -1, 0};
envoy_stream_intel stream_intel_{-1, -1, 0, 0};
envoy_final_stream_intel envoy_final_stream_intel_{-1, -1, -1, -1, -1, -1, -1, -1,
-1, -1, -1, 0, 0, 0, 0};
StreamInfo::BytesMeterSharedPtr bytes_meter_;
Expand Down
3 changes: 2 additions & 1 deletion library/common/jni/jni_utility.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,13 @@ jbyteArray native_data_to_array(JNIEnv* env, envoy_data data) {
}

jlongArray native_stream_intel_to_array(JNIEnv* env, envoy_stream_intel stream_intel) {
jlongArray j_array = env->NewLongArray(3);
jlongArray j_array = env->NewLongArray(4);
jlong* critical_array = static_cast<jlong*>(env->GetPrimitiveArrayCritical(j_array, nullptr));
RELEASE_ASSERT(critical_array != nullptr, "unable to allocate memory in jni_utility");
critical_array[0] = static_cast<jlong>(stream_intel.stream_id);
critical_array[1] = static_cast<jlong>(stream_intel.connection_id);
critical_array[2] = static_cast<jlong>(stream_intel.attempt_count);
critical_array[3] = static_cast<jlong>(stream_intel.consumed_bytes_from_response);
// Here '0' (for which there is no named constant) indicates we want to commit the changes back
// to the JVM and free the c array, where applicable.
env->ReleasePrimitiveArrayCritical(j_array, critical_array, 0);
Expand Down
6 changes: 6 additions & 0 deletions library/common/types/c_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,12 @@ typedef struct {
int64_t connection_id;
// The number of internal attempts to carry out a request/operation. 0 if not present.
uint64_t attempt_count;
// Number of bytes consumed by the non terminal callbacks out of the response.
// NOTE: on terminal callbacks (on_complete, on_error_, on_cancel), this value will not be equal
// to envoy_final_stream_intel.received_byte_count. The latter represents the real number
// of bytes received before decompression. consumed_bytes_from_response omits the number
// number of bytes related to the Status Line, and is after decompression.
uint64_t consumed_bytes_from_response;
} envoy_stream_intel;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ class EnvoyStreamIntelImpl implements EnvoyStreamIntel {
private long streamId;
private long connectionId;
private long attemptCount;
private long consumedBytesFromResponse;

EnvoyStreamIntelImpl(long[] values) {
streamId = values[0];
connectionId = values[1];
attemptCount = values[2];
consumedBytesFromResponse = values[3];
}

@Override
Expand All @@ -27,4 +29,9 @@ public long getConnectionId() {
public long getAttemptCount() {
return attemptCount;
}

@Override
public long getConsumedBytesFromResponse() {
return consumedBytesFromResponse;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

/**
* Exposes internal HTTP stream metrics, context, and other details sent once on stream end.
*
* Note: a value of -1 means "not present" for any field where the name is suffixed with "Ms".
*/
public interface EnvoyFinalStreamIntel {
/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* Exposes internal HTTP stream metrics, context, and other details.
*/
public interface EnvoyStreamIntel {

/**
* An internal identifier for the stream.
*/
Expand All @@ -18,4 +19,14 @@ public interface EnvoyStreamIntel {
* The number of internal attempts to carry out a request/operation.
*/
public long getAttemptCount();

/**
* The number of bytes consumed by the non terminal callbacks, from the response.
*
* <p>>NOTE: on terminal callbacks (on_complete, on_error_, on_cancel), this value will not be
* equal to {@link EnvoyFinalStreamIntel#getReceivedByteCount()}. The latter represents the real
* number of bytes received before decompression. getConsumedBytesFromResponse() omits the number
* number of bytes related to the Status Line, and is after decompression.
*/
public long getConsumedBytesFromResponse();
}
Loading