Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable the "received byte count" feature. #2004

Merged
merged 14 commits into from
Jan 28, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ envoy_stream_intel PlatformBridgeFilter::streamIntel() {
RELEASE_ASSERT(decoder_callbacks_, "StreamInfo accessed before filter callbacks are set");
auto& info = decoder_callbacks_->streamInfo();
// FIXME: Stream handle cannot currently be set from the filter context.
envoy_stream_intel stream_intel{-1, -1, 0};
envoy_stream_intel stream_intel{-1, -1, 0, 0};
if (info.upstreamInfo()) {
stream_intel.connection_id = info.upstreamInfo()->upstreamConnectionId().value_or(-1);
}
Expand Down
3 changes: 3 additions & 0 deletions library/common/http/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,9 @@ void Client::DirectStream::saveLatestStreamIntel() {
}
stream_intel_.stream_id = static_cast<uint64_t>(stream_handle_);
stream_intel_.attempt_count = info.attemptCount().value_or(0);
if (info.getUpstreamBytesMeter()) {
stream_intel_.received_byte_count = info.getUpstreamBytesMeter()->wireBytesReceived();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So two ways to address your flaking tests.
You can either look at each point we send stream intel up, and subtract buffered data, or you could send up all received bytes, and all buffered data each time, and calculate "bytes_sent_up = bytes_received - bytes_still_buffered" in cronvoy

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The crux of the problem is on the encodeHeaders invocation, determining how many bytes are left unprocessed is not obvious - there is no Buffer to interrogate.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but on encodeHeaders we pass it all up, no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, no. The received byte count already includes some coming from the response body. How can we subtract those?

}
}

void Client::DirectStream::saveFinalStreamIntel() {
Expand Down
2 changes: 1 addition & 1 deletion library/common/http/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ class Client : public Logger::Loggable<Logger::Id::http> {
// read faster than the mobile caller can process it.
bool explicit_flow_control_ = false;
// Latest intel data retrieved from the StreamInfo.
envoy_stream_intel stream_intel_{-1, -1, 0};
envoy_stream_intel stream_intel_{-1, -1, 0, 0};
envoy_final_stream_intel envoy_final_stream_intel_;
StreamInfo::BytesMeterSharedPtr bytes_meter_;
};
Expand Down
3 changes: 2 additions & 1 deletion library/common/jni/jni_utility.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,13 @@ jbyteArray native_data_to_array(JNIEnv* env, envoy_data data) {
}

jlongArray native_stream_intel_to_array(JNIEnv* env, envoy_stream_intel stream_intel) {
jlongArray j_array = env->NewLongArray(3);
jlongArray j_array = env->NewLongArray(4);
jlong* critical_array = static_cast<jlong*>(env->GetPrimitiveArrayCritical(j_array, nullptr));
RELEASE_ASSERT(critical_array != nullptr, "unable to allocate memory in jni_utility");
critical_array[0] = static_cast<jlong>(stream_intel.stream_id);
critical_array[1] = static_cast<jlong>(stream_intel.connection_id);
critical_array[2] = static_cast<jlong>(stream_intel.attempt_count);
critical_array[3] = static_cast<jlong>(stream_intel.received_byte_count);
// Here '0' (for which there is no named constant) indicates we want to commit the changes back
// to the JVM and free the c array, where applicable.
env->ReleasePrimitiveArrayCritical(j_array, critical_array, 0);
Expand Down
2 changes: 2 additions & 0 deletions library/common/types/c_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ typedef struct {
int64_t connection_id;
// The number of internal attempts to carry out a request/operation. 0 if not present.
uint64_t attempt_count;
// The number of bytes received from upstream.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is close but not 100% accurate right?
Generally we'll include all the body bytes read, except on the call to pass headers up at which point we only send up header bytes. Do you think clarifying is going to cause more confusion than it solves?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Concocted something - done.

uint64_t received_byte_count;
} envoy_stream_intel;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ class EnvoyStreamIntelImpl implements EnvoyStreamIntel {
private long streamId;
private long connectionId;
private long attemptCount;
private long receivedByteCount;

EnvoyStreamIntelImpl(long[] values) {
streamId = values[0];
connectionId = values[1];
attemptCount = values[2];
receivedByteCount = values[3];
}

@Override
Expand All @@ -27,4 +29,9 @@ public long getConnectionId() {
public long getAttemptCount() {
return attemptCount;
}

@Override
public long getReceivedByteCount() {
return receivedByteCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,8 @@ public interface EnvoyStreamIntel {
* The number of internal attempts to carry out a request/operation.
*/
public long getAttemptCount();
/*
* The number of bytes received from upstream.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update commments?

Copy link
Contributor Author

@carloseltuerto carloseltuerto Jan 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comments updated, and also using the same names everywhere. Thanks.

*/
public long getReceivedByteCount();
}
33 changes: 30 additions & 3 deletions library/java/org/chromium/net/impl/CronetUrlRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ public final class CronetUrlRequest extends UrlRequestBase {

private final String mUserAgent;
private final HeadersList mRequestHeaders = new HeadersList();
private final List<String> mUrlChain = new ArrayList<>();
private final CronetUrlRequestContext mRequestContext;
private final AtomicBoolean mWaitingOnRedirect = new AtomicBoolean(false);
private final AtomicBoolean mWaitingOnRead = new AtomicBoolean(false);
Expand Down Expand Up @@ -131,6 +130,8 @@ public final class CronetUrlRequest extends UrlRequestBase {

/* These change with redirects. */
private final AtomicReference<EnvoyHTTPStream> mStream = new AtomicReference<>();
private final List<String> mUrlChain = new ArrayList<>();
private final List<EnvoyFinalStreamIntel> mEnvoyFinalStreamIntels = new ArrayList<>();
private CronvoyHttpCallbacks mCronvoyCallbacks;
private String mCurrentUrl;
private UrlResponseInfoImpl mUrlResponseInfo;
Expand Down Expand Up @@ -631,6 +632,25 @@ private boolean streamEnded() {
return cronvoyCallbacks != null && cronvoyCallbacks.mEndStream;
}

private void recordEnvoyFinalStreamIntel(EnvoyFinalStreamIntel envoyFinalStreamIntel) {
mEnvoyFinalStreamIntels.add(envoyFinalStreamIntel);
long bytesReceived = 0;
// This in only called by the network Thread - no concurrency issue.
for (EnvoyFinalStreamIntel intel : mEnvoyFinalStreamIntels) {
bytesReceived += intel.getReceivedByteCount();
}
mUrlResponseInfo.setReceivedByteCount(bytesReceived);
}

private void recordEnvoyStreamIntel(EnvoyStreamIntel envoyStreamIntel) {
long bytesReceived = envoyStreamIntel.getReceivedByteCount();
// This in only called by the network Thread - no concurrency issue.
for (EnvoyFinalStreamIntel intel : mEnvoyFinalStreamIntels) {
bytesReceived += intel.getReceivedByteCount();
}
mUrlResponseInfo.setReceivedByteCount(bytesReceived);
}

private static class HeadersList extends ArrayList<Map.Entry<String, String>> {}

private static class DirectExecutor implements Executor {
Expand Down Expand Up @@ -666,6 +686,7 @@ public Executor getExecutor() {
@Override
public void onHeaders(Map<String, List<String>> headers, boolean endStream,
EnvoyStreamIntel streamIntel) {
recordEnvoyStreamIntel(streamIntel);
if (isAbandoned()) {
return;
}
Expand Down Expand Up @@ -731,6 +752,7 @@ public void run() {

@Override
public void onData(ByteBuffer data, boolean endStream, EnvoyStreamIntel streamIntel) {
recordEnvoyStreamIntel(streamIntel);
if (isAbandoned()) {
return;
}
Expand Down Expand Up @@ -768,6 +790,7 @@ public void run() {

@Override
public void onTrailers(Map<String, List<String>> trailers, EnvoyStreamIntel streamIntel) {
recordEnvoyStreamIntel(streamIntel);
if (isAbandoned()) {
return;
}
Expand All @@ -786,6 +809,7 @@ public void onTrailers(Map<String, List<String>> trailers, EnvoyStreamIntel stre
@Override
public void onError(int errorCode, String message, int attemptCount,
EnvoyStreamIntel streamIntel, EnvoyFinalStreamIntel finalStreamIntel) {
recordEnvoyFinalStreamIntel(finalStreamIntel);
if (isAbandoned()) {
return;
}
Expand All @@ -808,6 +832,7 @@ public void onError(int errorCode, String message, int attemptCount,

@Override
public void onCancel(EnvoyStreamIntel streamIntel, EnvoyFinalStreamIntel finalStreamIntel) {
recordEnvoyFinalStreamIntel(finalStreamIntel);
if (isAbandoned()) {
return;
}
Expand All @@ -828,6 +853,7 @@ public void onCancel(EnvoyStreamIntel streamIntel, EnvoyFinalStreamIntel finalSt

@Override
public void onSendWindowAvailable(EnvoyStreamIntel streamIntel) {
recordEnvoyStreamIntel(streamIntel);
if (isAbandoned()) {
return;
}
Expand All @@ -849,10 +875,10 @@ public void onSendWindowAvailable(EnvoyStreamIntel streamIntel) {

@Override
public void onComplete(EnvoyStreamIntel streamIntel, EnvoyFinalStreamIntel finalStreamIntel) {
recordEnvoyFinalStreamIntel(finalStreamIntel);
if (isAbandoned()) {
return;
}
mUrlResponseInfo.setReceivedByteCount(finalStreamIntel.getSentByteCount());
if (successReady(SucceededState.ON_COMPLETE_RECEIVED)) {
onSucceeded();
}
Expand Down Expand Up @@ -900,7 +926,7 @@ void readData(int size) {
*/
void cancel() {
EnvoyHTTPStream stream = mStream.get();
if (isAbandoned() || mEndStream) {
if (this != mCronvoyCallbacks || mEndStream) {
return;
}
@CancelState int oldState = mCancelState.getAndSet(CancelState.CANCELLED);
Expand Down Expand Up @@ -936,6 +962,7 @@ private void setUrlResponseInfo(Map<String, List<String>> responseHeaders, int r
// TODO(https://github.com/envoyproxy/envoy-mobile/issues/1426) set receivedByteCount
// TODO(https://github.com/envoyproxy/envoy-mobile/issues/1622) support proxy
// TODO(https://github.com/envoyproxy/envoy-mobile/issues/1546) negotiated protocol
// TODO(https://github.com/envoyproxy/envoy-mobile/issues/1578) http caching
mUrlResponseInfo.setResponseValues(
new ArrayList<>(mUrlChain), responseCode, HttpReason.getReason(responseCode),
Collections.unmodifiableList(headerList), false, selectedTransport, ":0");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class MockStream internal constructor(underlyingStream: MockEnvoyHTTPStream) : S
override fun getStreamId(): Long { return 0 }
override fun getConnectionId(): Long { return 0 }
override fun getAttemptCount(): Long { return 0 }
override fun getReceivedByteCount(): Long { return 0 }
}

private val mockFinalStreamIntel = object : EnvoyFinalStreamIntel {
Expand Down
7 changes: 5 additions & 2 deletions test/common/integration/client_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ typedef struct {
uint32_t on_complete_calls;
uint32_t on_error_calls;
uint32_t on_cancel_calls;
uint32_t received_byte_count;
std::string status;
ConditionalInitializer* terminal_callback;
} callbacks_called;
Expand All @@ -62,12 +63,13 @@ class ClientIntegrationTest : public BaseIntegrationTest,
});

bridge_callbacks_.context = &cc_;
bridge_callbacks_.on_headers = [](envoy_headers c_headers, bool, envoy_stream_intel,
bridge_callbacks_.on_headers = [](envoy_headers c_headers, bool, envoy_stream_intel intel,
void* context) -> void* {
Http::ResponseHeaderMapPtr response_headers = toResponseHeaders(c_headers);
callbacks_called* cc_ = static_cast<callbacks_called*>(context);
cc_->on_headers_calls++;
cc_->status = response_headers->Status()->value().getStringView();
cc_->received_byte_count = intel.received_byte_count;
return nullptr;
};
bridge_callbacks_.on_data = [](envoy_data c_data, bool, envoy_stream_intel,
Expand Down Expand Up @@ -143,7 +145,7 @@ name: api_listener
Http::ClientPtr http_client_{};
envoy_http_callbacks bridge_callbacks_;
ConditionalInitializer terminal_callback_;
callbacks_called cc_ = {0, 0, 0, 0, 0, "", &terminal_callback_};
callbacks_called cc_ = {0, 0, 0, 0, 0, 0, "", &terminal_callback_};
};

INSTANTIATE_TEST_SUITE_P(IpVersions, ClientIntegrationTest,
Expand Down Expand Up @@ -206,6 +208,7 @@ TEST_P(ClientIntegrationTest, Basic) {
ASSERT_EQ(cc_.status, "200");
ASSERT_EQ(cc_.on_data_calls, 2);
ASSERT_EQ(cc_.on_complete_calls, 1);
ASSERT_EQ(cc_.received_byte_count, 67);

// stream_success gets charged for 2xx status codes.
test_server_->waitForCounterEq("http.client.stream_success", 1);
Expand Down
14 changes: 9 additions & 5 deletions test/java/org/chromium/net/CronetUrlRequestTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,9 @@ public void testRedirectAsync() throws Exception {
checkResponseInfoHeader(callback.mRedirectResponseInfoList.get(0), "redirect-header",
"header-value");

// Original bytesReceived: 73
UrlResponseInfo expected = createUrlResponseInfo(
new String[] {NativeTestServer.getRedirectURL()}, "Found", 302, 73, "Content-Length", "92",
new String[] {NativeTestServer.getRedirectURL()}, "Found", 302, -1, "Content-Length", "92",
"Location", "/success.txt", "redirect-header", "header-value");
mTestRule.assertResponseEquals(expected, callback.mRedirectResponseInfoList.get(0));

Expand Down Expand Up @@ -266,9 +267,10 @@ public void testRedirectAsync() throws Exception {
assertEquals(ResponseStep.ON_SUCCEEDED, callback.mResponseStep);
assertEquals(NativeTestServer.SUCCESS_BODY, callback.mResponseAsString);

// Original bytesReceived: 258
UrlResponseInfo urlResponseInfo = createUrlResponseInfo(
new String[] {NativeTestServer.getRedirectURL(), NativeTestServer.getSuccessURL()}, "OK",
200, 258, "Content-Length", "20", "Content-Type", "text/plain",
200, -1, "Content-Length", "20", "Content-Type", "text/plain",
"Access-Control-Allow-Origin", "*", "header-name", "header-value", "multi-header-name",
"header-value1", "multi-header-name", "header-value2");

Expand Down Expand Up @@ -666,17 +668,19 @@ public void testMockMultiRedirect() throws Exception {
assertEquals(2, callback.mRedirectResponseInfoList.size());

// Check first redirect (multiredirect.html -> redirect.html)
// Original receivedBytes: 76
UrlResponseInfo firstExpectedResponseInfo = createUrlResponseInfo(
new String[] {NativeTestServer.getMultiRedirectURL()}, "Found", 302, 76, "Content-Length",
new String[] {NativeTestServer.getMultiRedirectURL()}, "Found", 302, -1, "Content-Length",
"92", "Location", "/redirect.html", "redirect-header0", "header-value");
UrlResponseInfo firstRedirectResponseInfo = callback.mRedirectResponseInfoList.get(0);
mTestRule.assertResponseEquals(firstExpectedResponseInfo, firstRedirectResponseInfo);

// Check second redirect (redirect.html -> success.txt)
// Original receivedBytes: 334
UrlResponseInfo secondExpectedResponseInfo = createUrlResponseInfo(
new String[] {NativeTestServer.getMultiRedirectURL(), NativeTestServer.getRedirectURL(),
NativeTestServer.getSuccessURL()},
"OK", 200, 334, "Content-Length", "20", "Content-Type", "text/plain",
"OK", 200, -1, "Content-Length", "20", "Content-Type", "text/plain",
"Access-Control-Allow-Origin", "*", "header-name", "header-value", "multi-header-name",
"header-value1", "multi-header-name", "header-value2");

Expand All @@ -693,7 +697,7 @@ public void testMockNotFound() throws Exception {
TestUrlRequestCallback callback = startAndWaitForComplete(NativeTestServer.getNotFoundURL());
UrlResponseInfo expected =
createUrlResponseInfo(new String[] {NativeTestServer.getNotFoundURL()}, "Not Found", 404,
140, "Content-Length", "96");
142, "Content-Length", "96");
mTestRule.assertResponseEquals(expected, callback.mResponseInfo);
assertTrue(callback.mHttpResponseDataLength != 0);
assertEquals(0, callback.mRedirectCount);
Expand Down
8 changes: 5 additions & 3 deletions test/java/org/chromium/net/testing/CronetTestRule.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public static class CronetTestFramework {

private static ExperimentalCronetEngine createEngine(Context context) {
ExperimentalCronetEngine.Builder builder = new ExperimentalCronetEngine.Builder(context);
((CronetEngineBuilderImpl)builder.getBuilderDelegate()).setLogLevel("info");
((CronetEngineBuilderImpl)builder.getBuilderDelegate()).setLogLevel("off");
return builder.enableQuic(true).build();
}

Expand Down Expand Up @@ -253,8 +253,10 @@ public void assertResponseEquals(UrlResponseInfo expected, UrlResponseInfo actua
assertEquals(expected.getUrl(), actual.getUrl());
// Transferred bytes and proxy server are not supported in pure java
if (!testingJavaImpl()) {
// TODO("https://github.com/envoyproxy/envoy-mobile/issues/1426"): uncomment the assert
// assertEquals(expected.getReceivedByteCount(), actual.getReceivedByteCount());
// TODO("https://github.com/envoyproxy/envoy-mobile/issues/1426"): remove the "if" crutch
if (expected.getReceivedByteCount() >= 0) {
assertEquals(expected.getReceivedByteCount(), actual.getReceivedByteCount());
}
assertEquals(expected.getProxyServer(), actual.getProxyServer());
// This is a place where behavior intentionally differs between native and java
assertEquals(expected.getNegotiatedProtocol(), actual.getNegotiatedProtocol());
Expand Down