Skip to content

Commit

Permalink
HttpLifecycleObserver: notify when payload body is requested (#2230)
Browse files Browse the repository at this point in the history
Motivation:

Track how subscriber requests payload body items.

Modifications:

- Add `HttpRequestObserver#onRequestDataRequested(long)`;
- Add `HttpResponseObserver#onResponseDataRequested(long)`;
- Enhance `HttpLifecycleObserverTest` and `GrpcLifecycleObserverTest`;

Result:

Users can track how many items subscriber requests from payload body,
and time difference between requested-delivered items.
  • Loading branch information
idelpivnitskiy authored Jun 3, 2022
1 parent 45019c5 commit f9091b0
Show file tree
Hide file tree
Showing 10 changed files with 120 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ private static final class GrpcToHttpRequestObserver implements HttpRequestObser
this.observer = observer;
}

@Override
public void onRequestDataRequested(final long n) {
observer.onRequestDataRequested(n);
}

@Override
public void onRequestData(final Buffer data) {
observer.onRequestData(data);
Expand Down Expand Up @@ -132,6 +137,11 @@ private static final class GrpcToHttpResponseObserver implements HttpResponseObs
}
}

@Override
public void onResponseDataRequested(final long n) {
observer.onResponseDataRequested(n);
}

@Override
public void onResponseData(final Buffer data) {
observer.onResponseData(data);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,12 @@
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -236,6 +239,7 @@ private static void verifyObservers(boolean client, boolean error, boolean aggre
inOrder.verify(exchange).onRequest(any(StreamingHttpRequest.class));
}
inOrder.verify(exchange).onResponse(any(StreamingHttpResponse.class));
verify(response, atLeastOnce()).onResponseDataRequested(anyLong());
if (!error) {
inOrder.verify(response).onResponseData(any(Buffer.class));
inOrder.verify(response).onResponseTrailers(any(HttpHeaders.class));
Expand All @@ -258,6 +262,7 @@ private static void verifyObservers(boolean client, boolean error, boolean aggre
}
}

verify(request, atLeastOnce()).onRequestDataRequested(anyLong());
requestInOrder.verify(request).onRequestData(any(Buffer.class));
requestInOrder.verify(request, never()).onRequestTrailers(any(HttpHeaders.class));
requestInOrder.verify(request).onRequestComplete();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,15 @@ private static final class BiGrpcRequestObserver implements GrpcRequestObserver
this.second = requireNonNull(second);
}

@Override
public void onRequestDataRequested(final long n) {
try {
first.onRequestDataRequested(n);
} finally {
second.onRequestDataRequested(n);
}
}

@Override
public void onRequestData(final Buffer data) {
try {
Expand Down Expand Up @@ -184,6 +193,15 @@ private BiGrpcResponseObserver(final GrpcResponseObserver first, final GrpcRespo
this.second = requireNonNull(second);
}

@Override
public void onResponseDataRequested(final long n) {
try {
first.onResponseDataRequested(n);
} finally {
second.onResponseDataRequested(n);
}
}

@Override
public void onResponseData(final Buffer data) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ public GrpcRequestObserver onRequest(final HttpRequestMetaData requestMetaData)
return this;
}

@Override
public void onRequestDataRequested(final long n) {
}

@Override
public void onRequestData(final Buffer data) {
requestSize += data.readableBytes();
Expand Down Expand Up @@ -131,6 +135,10 @@ public GrpcResponseObserver onResponse(final HttpResponseMetaData responseMetaDa
return this;
}

@Override
public void onResponseDataRequested(final long n) {
}

@Override
public void onResponseData(final Buffer data) {
responseSize += data.readableBytes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,17 @@ interface HttpExchangeObserver {
*/
interface HttpRequestObserver {

/**
* Callback when subscriber requests {@code n} items of the request payload body.
* <p>
* May be invoked multiple times. Helps to track when items are requested and when they are
* {@link #onRequestData(Buffer) delivered}.
*
* @param n number of requested items
*/
default void onRequestDataRequested(long n) { // FIXME: 0.43 - consider removing default impl
}

/**
* Callback when the request payload body data chunk was observed.
* <p>
Expand Down Expand Up @@ -158,6 +169,16 @@ interface HttpRequestObserver {
*/
interface HttpResponseObserver {

/**
* Callback when subscriber requests {@code n} items of the response payload body.
* <p>
* May be invoked multiple times. Helps to track when items are requested and when they are
* {@link #onResponseData delivered}.
*
* @param n number of requested items
*/
void onResponseDataRequested(long n); // FIXME: 0.43 - consider removing default impl

/**
* Callback when the response payload body data chunk was observed.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,9 @@ final Single<StreamingHttpResponse> trackLifecycle(@Nullable final ConnectionInf
return NoopSubscriber.INSTANCE;
});
}
return p.beforeOnNext(item -> {
return p.beforeRequest(n -> safeReport(onRequest::onRequestDataRequested, n, onRequest,
"onRequestDataRequested"))
.beforeOnNext(item -> {
if (item instanceof Buffer) {
safeReport(onRequest::onRequestData, (Buffer) item, onRequest, "onRequestData");
} else if (item instanceof HttpHeaders) {
Expand Down Expand Up @@ -151,7 +153,9 @@ public void cancel() {
// needs to be applied last.
.map(resp -> {
exchangeContext.onResponse(resp);
return resp.transformMessageBody(p -> p.beforeOnNext(exchangeContext::onResponseBody));
return resp.transformMessageBody(p -> p
.beforeRequest(exchangeContext::onResponseDataRequested)
.beforeOnNext(exchangeContext::onResponseBody));
}).shareContextOnSubscribe();
});
}
Expand Down Expand Up @@ -192,6 +196,11 @@ void onResponse(HttpResponseMetaData responseMetaData) {
NoopHttpLifecycleObserver.NoopHttpResponseObserver.INSTANCE);
}

void onResponseDataRequested(final long n) {
assert onResponse != null;
safeReport(onResponse::onResponseDataRequested, n, onResponse, "onResponseDataRequested");
}

void onResponseBody(final Object item) {
assert onResponse != null;
if (item instanceof Buffer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ private NoopHttpRequestObserver() {
// Singleton
}

@Override
public void onRequestDataRequested(final long n) {
}

@Override
public void onRequestData(final Buffer data) {
}
Expand Down Expand Up @@ -107,6 +111,10 @@ private NoopHttpResponseObserver() {
// Singleton
}

@Override
public void onResponseDataRequested(final long n) {
}

@Override
public void onResponseData(final Buffer data) {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@
import static org.hamcrest.Matchers.sameInstance;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.atMostOnce;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.inOrder;
Expand Down Expand Up @@ -267,6 +269,7 @@ public Single<StreamingHttpResponse> handle(HttpServiceContext ctx,
clientInOrder.verify(clientExchangeObserver).onRequest(any(StreamingHttpRequest.class));
clientInOrder.verify(clientExchangeObserver).onConnectionSelected(any(ConnectionInfo.class));
clientInOrder.verify(clientExchangeObserver).onResponseCancel();
clientRequestInOrder.verify(clientRequestObserver).onRequestDataRequested(anyLong());
clientRequestInOrder.verify(clientRequestObserver).onRequestData(any(Buffer.class));
clientRequestInOrder.verify(clientRequestObserver).onRequestCancel();
clientInOrder.verify(clientExchangeObserver).onExchangeFinally();
Expand All @@ -276,11 +279,13 @@ public Single<StreamingHttpResponse> handle(HttpServiceContext ctx,
serverInOrder.verify(serverLifecycleObserver).onNewExchange();
serverInOrder.verify(serverExchangeObserver).onConnectionSelected(any(ConnectionInfo.class));
serverInOrder.verify(serverExchangeObserver).onRequest(any(StreamingHttpRequest.class));
serverRequestInOrder.verify(serverRequestObserver, atLeastOnce()).onRequestDataRequested(anyLong());
serverRequestInOrder.verify(serverRequestObserver).onRequestData(any(Buffer.class));
serverRequestInOrder.verify(serverRequestObserver).onRequestError(any(IOException.class));
// because of offloading, cancel from the IO-thread may race with an error propagated through request publisher:
verify(serverExchangeObserver, atMostOnce()).onResponseCancel();
verify(serverExchangeObserver, atMostOnce()).onResponse(any(StreamingHttpResponse.class));
verify(serverResponseObserver, atMostOnce()).onResponseDataRequested(anyLong());
verify(serverResponseObserver, atMostOnce()).onResponseComplete();
serverInOrder.verify(serverExchangeObserver).onExchangeFinally();
verifyNoMoreInteractions(serverLifecycleObserver, serverExchangeObserver,
Expand Down Expand Up @@ -323,7 +328,9 @@ public Single<StreamingHttpResponse> handle(HttpServiceContext ctx,
clientInOrder.verify(clientExchangeObserver).onConnectionSelected(any(ConnectionInfo.class));
clientInOrder.verify(clientExchangeObserver).onRequest(any(StreamingHttpRequest.class));
clientInOrder.verify(clientExchangeObserver).onResponse(any(StreamingHttpResponse.class));
clientInOrder.verify(clientResponseObserver, atLeastOnce()).onResponseDataRequested(anyLong());
clientInOrder.verify(clientResponseObserver).onResponseCancel();
verify(clientRequestObserver, atLeastOnce()).onRequestDataRequested(anyLong());
clientRequestInOrder.verify(clientRequestObserver).onRequestData(any(Buffer.class));
clientRequestInOrder.verify(clientRequestObserver).onRequestTrailers(any(HttpHeaders.class));
clientRequestInOrder.verify(clientRequestObserver).onRequestComplete();
Expand All @@ -335,8 +342,10 @@ public Single<StreamingHttpResponse> handle(HttpServiceContext ctx,
serverInOrder.verify(serverExchangeObserver).onConnectionSelected(any(ConnectionInfo.class));
serverInOrder.verify(serverExchangeObserver).onRequest(any(StreamingHttpRequest.class));
serverInOrder.verify(serverExchangeObserver).onResponse(any(StreamingHttpResponse.class));
verify(serverResponseObserver, atLeastOnce()).onResponseDataRequested(anyLong());
verify(serverResponseObserver, atMostOnce()).onResponseData(any(Buffer.class));
serverInOrder.verify(serverResponseObserver).onResponseCancel();
serverRequestInOrder.verify(serverRequestObserver, atLeastOnce()).onRequestDataRequested(anyLong());
serverRequestInOrder.verify(serverRequestObserver).onRequestData(any(Buffer.class));
serverRequestInOrder.verify(serverRequestObserver).onRequestComplete();
serverInOrder.verify(serverExchangeObserver).onExchangeFinally();
Expand Down Expand Up @@ -367,11 +376,13 @@ private static void verifyObservers(boolean client, HttpLifecycleObserver lifecy
inOrder.verify(exchange).onRequest(any(StreamingHttpRequest.class));
}
inOrder.verify(exchange).onResponse(any(StreamingHttpResponse.class));
verify(response, atLeastOnce()).onResponseDataRequested(anyLong());
inOrder.verify(response, hasMessageBody ? times(1) : never()).onResponseData(any(Buffer.class));
inOrder.verify(response, hasTrailers && !client ? times(1) : never())
.onResponseTrailers(any(HttpHeaders.class));
inOrder.verify(response).onResponseComplete();

verify(request, atLeastOnce()).onRequestDataRequested(anyLong());
requestInOrder.verify(request, hasMessageBody ? times(1) : never()).onRequestData(any(Buffer.class));
requestInOrder.verify(request, hasTrailers && client ? times(1) : never())
.onRequestTrailers(any(HttpHeaders.class));
Expand All @@ -393,8 +404,10 @@ private static void verifyError(boolean client, HttpLifecycleObserver lifecycle,
inOrder.verify(exchange).onRequest(any(StreamingHttpRequest.class));
}
inOrder.verify(exchange).onResponse(any(StreamingHttpResponse.class));
verify(response, atLeastOnce()).onResponseDataRequested(anyLong());
inOrder.verify(response).onResponseError(!client ? DELIBERATE_EXCEPTION : any());

verify(request, atLeastOnce()).onRequestDataRequested(anyLong());
requestInOrder.verify(request, never()).onRequestTrailers(any(HttpHeaders.class));
requestInOrder.verify(request).onRequestComplete();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,15 @@ private static final class BiHttpRequestObserver implements HttpRequestObserver
this.second = requireNonNull(second);
}

@Override
public void onRequestDataRequested(final long n) {
try {
first.onRequestDataRequested(n);
} finally {
second.onRequestDataRequested(n);
}
}

@Override
public void onRequestData(final Buffer data) {
try {
Expand Down Expand Up @@ -183,6 +192,15 @@ private BiHttpResponseObserver(final HttpResponseObserver first, final HttpRespo
this.second = requireNonNull(second);
}

@Override
public void onResponseDataRequested(final long n) {
try {
first.onResponseDataRequested(n);
} finally {
second.onResponseDataRequested(n);
}
}

@Override
public void onResponseData(final Buffer data) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ public HttpRequestObserver onRequest(final HttpRequestMetaData requestMetaData)
return this;
}

@Override
public void onRequestDataRequested(final long n) {
}

@Override
public void onRequestData(final Buffer data) {
requestSize += data.readableBytes();
Expand Down Expand Up @@ -127,6 +131,10 @@ public HttpResponseObserver onResponse(final HttpResponseMetaData responseMetaDa
return this;
}

@Override
public void onResponseDataRequested(final long n) {
}

@Override
public void onResponseData(final Buffer data) {
responseSize += data.readableBytes();
Expand Down

0 comments on commit f9091b0

Please sign in to comment.