Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable the "received byte count" feature. #2004

Merged
merged 14 commits into from
Jan 28, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ envoy_stream_intel PlatformBridgeFilter::streamIntel() {
RELEASE_ASSERT(decoder_callbacks_, "StreamInfo accessed before filter callbacks are set");
auto& info = decoder_callbacks_->streamInfo();
// FIXME: Stream handle cannot currently be set from the filter context.
envoy_stream_intel stream_intel{-1, -1, 0};
envoy_stream_intel stream_intel{-1, -1, 0, 0};
if (info.upstreamInfo()) {
stream_intel.connection_id = info.upstreamInfo()->upstreamConnectionId().value_or(-1);
}
Expand Down
30 changes: 18 additions & 12 deletions library/common/http/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,23 +41,27 @@ void Client::DirectStreamCallbacks::encodeHeaders(const ResponseHeaderMap& heade

ASSERT(http_client_.getStream(direct_stream_.stream_handle_,
GetStreamFilters::ALLOW_FOR_ALL_STREAMS));
direct_stream_.saveLatestStreamIntel();
if (end_stream) {
closeStream();
}

// Capture some metadata before potentially closing the stream.
absl::string_view alpn = "";
uint64_t response_status = Utility::getResponseStatus(headers);
if (direct_stream_.request_decoder_ &&
direct_stream_.request_decoder_->streamInfo().upstreamInfo() &&
direct_stream_.request_decoder_->streamInfo().upstreamInfo()->upstreamSslConnection()) {
alpn = direct_stream_.request_decoder_->streamInfo()
.upstreamInfo()
->upstreamSslConnection()
->alpn();
if (direct_stream_.request_decoder_) {
direct_stream_.saveLatestStreamIntel();
const auto& info = direct_stream_.request_decoder_->streamInfo();
// Set the initial number of bytes consumed for the non terminal callbacks.
direct_stream_.stream_intel_.consumed_bytes_from_response =
info.getUpstreamBytesMeter() ? info.getUpstreamBytesMeter()->headerBytesReceived() : 0;
// Capture the alpn if available.
if (info.upstreamInfo() && info.upstreamInfo()->upstreamSslConnection()) {
alpn = info.upstreamInfo()->upstreamSslConnection()->alpn();
}
}

if (end_stream) {
closeStream();
}

// Track success for later bookkeeping (stream could still be reset).
uint64_t response_status = Utility::getResponseStatus(headers);
success_ = CodeUtility::is2xx(response_status);

ENVOY_LOG(debug, "[S{}] dispatching to platform response headers for stream (end_stream={}):\n{}",
Expand Down Expand Up @@ -123,6 +127,8 @@ void Client::DirectStreamCallbacks::sendDataToBridge(Buffer::Instance& data, boo

// Cap by bytes_to_send_ if and only if applying explicit flow control.
uint32_t bytes_to_send = calculateBytesToSend(data, bytes_to_send_);
// Update the number of bytes consumed by this non terminal callback.
direct_stream_.stream_intel_.consumed_bytes_from_response += bytes_to_send;
// Only send end stream if all data is being sent.
bool send_end_stream = end_stream && (bytes_to_send == data.length());

Expand Down
2 changes: 1 addition & 1 deletion library/common/http/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ class Client : public Logger::Loggable<Logger::Id::http> {
// read faster than the mobile caller can process it.
bool explicit_flow_control_ = false;
// Latest intel data retrieved from the StreamInfo.
envoy_stream_intel stream_intel_{-1, -1, 0};
envoy_stream_intel stream_intel_{-1, -1, 0, 0};
envoy_final_stream_intel envoy_final_stream_intel_{-1, -1, -1, -1, -1, -1, -1, -1,
-1, -1, -1, 0, 0, 0, 0};
StreamInfo::BytesMeterSharedPtr bytes_meter_;
Expand Down
3 changes: 2 additions & 1 deletion library/common/jni/jni_utility.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,13 @@ jbyteArray native_data_to_array(JNIEnv* env, envoy_data data) {
}

jlongArray native_stream_intel_to_array(JNIEnv* env, envoy_stream_intel stream_intel) {
jlongArray j_array = env->NewLongArray(3);
jlongArray j_array = env->NewLongArray(4);
jlong* critical_array = static_cast<jlong*>(env->GetPrimitiveArrayCritical(j_array, nullptr));
RELEASE_ASSERT(critical_array != nullptr, "unable to allocate memory in jni_utility");
critical_array[0] = static_cast<jlong>(stream_intel.stream_id);
critical_array[1] = static_cast<jlong>(stream_intel.connection_id);
critical_array[2] = static_cast<jlong>(stream_intel.attempt_count);
critical_array[3] = static_cast<jlong>(stream_intel.consumed_bytes_from_response);
// Here '0' (for which there is no named constant) indicates we want to commit the changes back
// to the JVM and free the c array, where applicable.
env->ReleasePrimitiveArrayCritical(j_array, critical_array, 0);
Expand Down
6 changes: 6 additions & 0 deletions library/common/types/c_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,12 @@ typedef struct {
int64_t connection_id;
// The number of internal attempts to carry out a request/operation. 0 if not present.
uint64_t attempt_count;
// Number of bytes consumed by the non terminal callbacks out of the response.
// NOTE: on terminal callbacks (on_complete, on_error_, on_cancel), this value will not be equal
// to envoy_final_stream_intel.received_byte_count. The latter represents the real number
// of bytes received before decompression. consumed_bytes_from_response omits the number
// number of bytes related to the Status Line, and is after decompression.
uint64_t consumed_bytes_from_response;
} envoy_stream_intel;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ class EnvoyStreamIntelImpl implements EnvoyStreamIntel {
private long streamId;
private long connectionId;
private long attemptCount;
private long receivedByteCount;

EnvoyStreamIntelImpl(long[] values) {
streamId = values[0];
connectionId = values[1];
attemptCount = values[2];
receivedByteCount = values[3];
}

@Override
Expand All @@ -27,4 +29,9 @@ public long getConnectionId() {
public long getAttemptCount() {
return attemptCount;
}

@Override
public long getReceivedByteCount() {
return receivedByteCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

/**
* Exposes internal HTTP stream metrics, context, and other details sent once on stream end.
*
* Note: a value of -1 means "not present" for any field where the name is suffixed with "Ms".
*/
public interface EnvoyFinalStreamIntel {
/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,8 @@ public interface EnvoyStreamIntel {
* The number of internal attempts to carry out a request/operation.
*/
public long getAttemptCount();
/*
* The number of bytes received from upstream.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update commments?

Copy link
Contributor Author

@carloseltuerto carloseltuerto Jan 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comments updated, and also using the same names everywhere. Thanks.

*/
public long getReceivedByteCount();
}
75 changes: 57 additions & 18 deletions library/java/org/chromium/net/impl/CronetUrlRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.AbstractMap;
import java.util.AbstractMap.SimpleEntry;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
Expand All @@ -29,6 +30,7 @@
import org.chromium.net.CallbackException;
import org.chromium.net.CronetException;
import org.chromium.net.InlineExecutionProhibitedException;
import org.chromium.net.RequestFinishedInfo;
import org.chromium.net.UploadDataProvider;

/** UrlRequest, backed by Envoy-Mobile. */
Expand Down Expand Up @@ -88,7 +90,7 @@ public final class CronetUrlRequest extends UrlRequestBase {

private final String mUserAgent;
private final HeadersList mRequestHeaders = new HeadersList();
private final List<String> mUrlChain = new ArrayList<>();
private final Collection<Object> mRequestAnnotations;
private final CronetUrlRequestContext mRequestContext;
private final AtomicBoolean mWaitingOnRedirect = new AtomicBoolean(false);
private final AtomicBoolean mWaitingOnRead = new AtomicBoolean(false);
Expand All @@ -111,11 +113,16 @@ public final class CronetUrlRequest extends UrlRequestBase {

/* These don't change with redirects */
private String mInitialMethod;
private CronetUploadDataStream mUploadDataStream;
private final Executor mUserExecutor;
private final VersionSafeCallbacks.UrlRequestCallback mCallback;
private final String mInitialUrl;
private final VersionSafeCallbacks.RequestFinishedInfoListener mRequestFinishedListener;
private final ConditionVariable mStartBlock = new ConditionVariable();

private CronetUploadDataStream mUploadDataStream;

private volatile CronetException mException;

/**
* Holds a subset of StatusValues - {@link State#STARTED} can represent {@link
* Status#SENDING_REQUEST} or {@link Status#WAITING_FOR_RESPONSE}. While the distinction isn't
Expand All @@ -127,13 +134,16 @@ public final class CronetUrlRequest extends UrlRequestBase {
* only used with the STARTED state, so it's inconsequential.
*/
@StatusValues private volatile int mAdditionalStatusDetails = Status.INVALID;
private final AtomicReference<CronetException> mError = new AtomicReference<>();

/* These change with redirects. */
private final AtomicReference<EnvoyHTTPStream> mStream = new AtomicReference<>();
private final List<String> mUrlChain = new ArrayList<>();
private EnvoyFinalStreamIntel mEnvoyFinalStreamIntel;
private long mBytesReceivedFromRedirects = 0;
private long mBytesReceivedFromLastRedirect = 0;
private CronvoyHttpCallbacks mCronvoyCallbacks;
private String mCurrentUrl;
private UrlResponseInfoImpl mUrlResponseInfo;
private volatile UrlResponseInfoImpl mUrlResponseInfo;
private String mPendingRedirectUrl;

/**
Expand All @@ -142,8 +152,9 @@ public final class CronetUrlRequest extends UrlRequestBase {
*/
CronetUrlRequest(CronetUrlRequestContext cronvoyEngine, Callback callback, Executor executor,
String url, String userAgent, boolean allowDirectExecutor,
boolean trafficStatsTagSet, int trafficStatsTag, boolean trafficStatsUidSet,
int trafficStatsUid) {
Collection<Object> connectionAnnotations, boolean trafficStatsTagSet,
int trafficStatsTag, boolean trafficStatsUidSet, int trafficStatsUid,
RequestFinishedInfo.Listener requestFinishedListener) {
if (url == null) {
throw new NullPointerException("URL is required");
}
Expand All @@ -154,11 +165,17 @@ public final class CronetUrlRequest extends UrlRequestBase {
throw new NullPointerException("Executor is required");
}
mCallback = new VersionSafeCallbacks.UrlRequestCallback(callback);
mRequestFinishedListener =
requestFinishedListener != null
? new VersionSafeCallbacks.RequestFinishedInfoListener(requestFinishedListener)
: null;
mRequestContext = cronvoyEngine;
mAllowDirectExecutor = allowDirectExecutor;
mUserExecutor = executor;
mInitialUrl = url;
mCurrentUrl = url;
mUserAgent = userAgent;
mRequestAnnotations = connectionAnnotations;
}

@Override
Expand Down Expand Up @@ -305,8 +322,8 @@ public boolean isDone() {

@Override
public void getStatus(StatusListener listener) {
@StatusValues int extraStatus = mAdditionalStatusDetails;
@State int state = mState.get();
int extraStatus = mAdditionalStatusDetails;

@StatusValues final int status;
switch (state) {
Expand Down Expand Up @@ -401,7 +418,6 @@ private static int determineNextErrorState(boolean streamEnded, @State int origi
}

private void enterErrorState(CronetException error) {
mError.compareAndSet(null, error);
@State int originalState;
@State int updatedState;
do {
Expand All @@ -414,6 +430,7 @@ private void enterErrorState(CronetException error) {
if (isTerminalState(originalState)) {
return;
}
mException = error;
fireCloseUploadDataProvider();
if (updatedState == State.ERROR_PENDING_CANCEL) {
CronvoyHttpCallbacks cronvoyCallbacks = this.mCronvoyCallbacks;
Expand Down Expand Up @@ -469,12 +486,17 @@ private void fireCloseUploadDataProvider() {
}
}

// This method is only called when in STARTED state. This means a "cancel" request won't be
// executed immediately - that quite important here, otherwise this would lead to unfortunate
// race conditions. A "cancel" request will then be honnored on the first callback.
private void fireOpenConnection() {
if (mInitialMethod == null) {
mInitialMethod = "GET";
}
mUrlResponseInfo = null;
mEnvoyFinalStreamIntel = null;
mBytesReceivedFromRedirects += mBytesReceivedFromLastRedirect;
mAdditionalStatusDetails = Status.CONNECTING;
mUrlResponseInfo = new UrlResponseInfoImpl();
mUrlChain.add(mCurrentUrl);
Map<String, List<String>> envoyRequestHeaders =
buildEnvoyRequestHeaders(mInitialMethod, mRequestHeaders, mUploadDataStream, mUserAgent,
Expand Down Expand Up @@ -568,7 +590,7 @@ public void run() {
try {
mCallback.onCanceled(CronetUrlRequest.this, mUrlResponseInfo);
} catch (Exception exception) {
Log.e(TAG, "Exception in onCanceled method", exception);
Log.e(CronetUrlRequestContext.LOG_TAG, "Exception in onCanceled method", exception);
}
}
};
Expand All @@ -582,7 +604,7 @@ public void run() {
try {
mCallback.onSucceeded(CronetUrlRequest.this, mUrlResponseInfo);
} catch (Exception exception) {
Log.e(TAG, "Exception in onSucceeded method", exception);
Log.e(CronetUrlRequestContext.LOG_TAG, "Exception in onSucceeded method", exception);
}
}
};
Expand All @@ -594,9 +616,9 @@ void onFailed() {
@Override
public void run() {
try {
mCallback.onFailed(CronetUrlRequest.this, mUrlResponseInfo, mError.get());
mCallback.onFailed(CronetUrlRequest.this, mUrlResponseInfo, mException);
} catch (Exception exception) {
Log.e(TAG, "Exception in onFailed method", exception);
Log.e(CronetUrlRequestContext.LOG_TAG, "Exception in onFailed method", exception);
}
}
};
Expand Down Expand Up @@ -631,6 +653,19 @@ private boolean streamEnded() {
return cronvoyCallbacks != null && cronvoyCallbacks.mEndStream;
}

private void recordEnvoyFinalStreamIntel(EnvoyFinalStreamIntel envoyFinalStreamIntel) {
mEnvoyFinalStreamIntel = envoyFinalStreamIntel;
if (mUrlResponseInfo != null) { // Null if cancelled before receiving a Response.
mUrlResponseInfo.setReceivedByteCount(envoyFinalStreamIntel.getReceivedByteCount() +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so I thought cronvoy had the following behavior
if we'd read 100 header bytes and 5000 body bytes from the network but reset the stream when only 2500 bytes had been sent up to the java layer then reset the stream, that we'd want to only report the 100+2500 bytes the java layer had seen? I assumed you'd be doing the byte accounting of "headers + body bytes cronet had seen" in the java layers but it looks like we're reporting all bytes including buffered ones? Is my understanding off?

Copy link
Contributor Author

@carloseltuerto carloseltuerto Jan 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The mUrlResponseInfo is expected to be null if the request is cancelled before receiving the ResponseHeader

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right, but if java gets the headers, and say half the body bytes which were read off the wire, what would cronet expect to report for bytes read: bytes read off the wire, or bytes handed up to java?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

still curious about this one

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good question. There are no tests for that. Here is what I found after many hours of "cout": Cronet is not consistent when reporting the received byte count. There is one test with an OK response where all the bytes are counted, i.e. == streamInfo().getUpstreamBytesMeter()->wireBytesReceived() == 86, and there is another test with a "NOT FOUND" where the status line is excluded from received byte count, and the expected received byte count is 120 (should be 142)

I "cout"ed the Buffer in codec_impl.cc This is the content of the test with "an OK response":

=========
HTTP/1.1 200 OK^M
Connection: close^M
Content-Length: 3^M
Content-Type: text/plain^M
^M
GET
=========== size: 86

The size is perfectly matching the value in the original Cronet test.

This is the content of the test with a "NOT FOUND":

=========
HTTP/1.1 404 Not Found^M
Content-Length: 96^M
^M
<!DOCTYPE html>
<html>
<head>
<title>Not found</title>
<p>Test page loaded.</p>
</head>
</html>

=========== size: 142

But this one the original Cronet test expects 120 bytes: this looks like the size without the Status Line.

I'm not yet sure how to reconcile this.

mBytesReceivedFromRedirects);
}
}

private void recordEnvoyStreamIntel(EnvoyStreamIntel envoyStreamIntel) {
mUrlResponseInfo.setReceivedByteCount(envoyStreamIntel.getReceivedByteCount() +
mBytesReceivedFromRedirects);
}

private static class HeadersList extends ArrayList<Map.Entry<String, String>> {}

private static class DirectExecutor implements Executor {
Expand Down Expand Up @@ -666,9 +701,8 @@ public Executor getExecutor() {
@Override
public void onHeaders(Map<String, List<String>> headers, boolean endStream,
EnvoyStreamIntel streamIntel) {
if (isAbandoned()) {
return;
}
mUrlResponseInfo = new UrlResponseInfoImpl();
recordEnvoyStreamIntel(streamIntel);
mEndStream = endStream;
List<String> statuses = headers.get(":status");
final int responseCode =
Expand Down Expand Up @@ -698,6 +732,7 @@ public void onHeaders(Map<String, List<String>> headers, boolean endStream,
}

if (locationField != null) {
mBytesReceivedFromLastRedirect = streamIntel.getReceivedByteCount();
cancel(); // Abort the the original request - we are being redirected.
}

Expand Down Expand Up @@ -734,6 +769,7 @@ public void onData(ByteBuffer data, boolean endStream, EnvoyStreamIntel streamIn
if (isAbandoned()) {
return;
}
recordEnvoyStreamIntel(streamIntel);
mEndStream = endStream;
@State int originalState;
@State int updatedState;
Expand Down Expand Up @@ -789,6 +825,7 @@ public void onError(int errorCode, String message, int attemptCount,
if (isAbandoned()) {
return;
}
recordEnvoyFinalStreamIntel(finalStreamIntel);
mEndStream = true;
@State int originalState;
@State int updatedState;
Expand All @@ -811,6 +848,7 @@ public void onCancel(EnvoyStreamIntel streamIntel, EnvoyFinalStreamIntel finalSt
if (isAbandoned()) {
return;
}
recordEnvoyFinalStreamIntel(finalStreamIntel);
mEndStream = true;
@State int originalState;
@State int updatedState;
Expand Down Expand Up @@ -852,7 +890,7 @@ public void onComplete(EnvoyStreamIntel streamIntel, EnvoyFinalStreamIntel final
if (isAbandoned()) {
return;
}
mUrlResponseInfo.setReceivedByteCount(finalStreamIntel.getSentByteCount());
recordEnvoyFinalStreamIntel(finalStreamIntel);
if (successReady(SucceededState.ON_COMPLETE_RECEIVED)) {
onSucceeded();
}
Expand Down Expand Up @@ -900,7 +938,7 @@ void readData(int size) {
*/
void cancel() {
EnvoyHTTPStream stream = mStream.get();
if (isAbandoned() || mEndStream) {
if (this != mCronvoyCallbacks || mEndStream) {
return;
}
@CancelState int oldState = mCancelState.getAndSet(CancelState.CANCELLED);
Expand Down Expand Up @@ -936,6 +974,7 @@ private void setUrlResponseInfo(Map<String, List<String>> responseHeaders, int r
// TODO(https://github.com/envoyproxy/envoy-mobile/issues/1426) set receivedByteCount
// TODO(https://github.com/envoyproxy/envoy-mobile/issues/1622) support proxy
// TODO(https://github.com/envoyproxy/envoy-mobile/issues/1546) negotiated protocol
// TODO(https://github.com/envoyproxy/envoy-mobile/issues/1578) http caching
mUrlResponseInfo.setResponseValues(
new ArrayList<>(mUrlChain), responseCode, HttpReason.getReason(responseCode),
Collections.unmodifiableList(headerList), false, selectedTransport, ":0");
Expand Down
Loading