diff --git a/servicetalk-grpc-netty/src/main/java/io/servicetalk/grpc/netty/GrpcToHttpLifecycleObserverBridge.java b/servicetalk-grpc-netty/src/main/java/io/servicetalk/grpc/netty/GrpcToHttpLifecycleObserverBridge.java
index 78908a31f5..edeaafda8d 100644
--- a/servicetalk-grpc-netty/src/main/java/io/servicetalk/grpc/netty/GrpcToHttpLifecycleObserverBridge.java
+++ b/servicetalk-grpc-netty/src/main/java/io/servicetalk/grpc/netty/GrpcToHttpLifecycleObserverBridge.java
@@ -94,6 +94,11 @@ private static final class GrpcToHttpRequestObserver implements HttpRequestObser
this.observer = observer;
}
+ @Override
+ public void onRequestDataRequested(final long n) {
+ observer.onRequestDataRequested(n);
+ }
+
@Override
public void onRequestData(final Buffer data) {
observer.onRequestData(data);
@@ -132,6 +137,11 @@ private static final class GrpcToHttpResponseObserver implements HttpResponseObs
}
}
+ @Override
+ public void onResponseDataRequested(final long n) {
+ observer.onResponseDataRequested(n);
+ }
+
@Override
public void onResponseData(final Buffer data) {
observer.onResponseData(data);
diff --git a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcLifecycleObserverTest.java b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcLifecycleObserverTest.java
index 2ffe5149a2..fab5d1e8ff 100644
--- a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcLifecycleObserverTest.java
+++ b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcLifecycleObserverTest.java
@@ -70,9 +70,12 @@
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
@@ -236,6 +239,7 @@ private static void verifyObservers(boolean client, boolean error, boolean aggre
inOrder.verify(exchange).onRequest(any(StreamingHttpRequest.class));
}
inOrder.verify(exchange).onResponse(any(StreamingHttpResponse.class));
+ verify(response, atLeastOnce()).onResponseDataRequested(anyLong());
if (!error) {
inOrder.verify(response).onResponseData(any(Buffer.class));
inOrder.verify(response).onResponseTrailers(any(HttpHeaders.class));
@@ -258,6 +262,7 @@ private static void verifyObservers(boolean client, boolean error, boolean aggre
}
}
+ verify(request, atLeastOnce()).onRequestDataRequested(anyLong());
requestInOrder.verify(request).onRequestData(any(Buffer.class));
requestInOrder.verify(request, never()).onRequestTrailers(any(HttpHeaders.class));
requestInOrder.verify(request).onRequestComplete();
diff --git a/servicetalk-grpc-utils/src/main/java/io/servicetalk/grpc/utils/BiGrpcLifecycleObserver.java b/servicetalk-grpc-utils/src/main/java/io/servicetalk/grpc/utils/BiGrpcLifecycleObserver.java
index d8be5a6fe4..4c08be9d59 100644
--- a/servicetalk-grpc-utils/src/main/java/io/servicetalk/grpc/utils/BiGrpcLifecycleObserver.java
+++ b/servicetalk-grpc-utils/src/main/java/io/servicetalk/grpc/utils/BiGrpcLifecycleObserver.java
@@ -128,6 +128,15 @@ private static final class BiGrpcRequestObserver implements GrpcRequestObserver
this.second = requireNonNull(second);
}
+ @Override
+ public void onRequestDataRequested(final long n) {
+ try {
+ first.onRequestDataRequested(n);
+ } finally {
+ second.onRequestDataRequested(n);
+ }
+ }
+
@Override
public void onRequestData(final Buffer data) {
try {
@@ -184,6 +193,15 @@ private BiGrpcResponseObserver(final GrpcResponseObserver first, final GrpcRespo
this.second = requireNonNull(second);
}
+ @Override
+ public void onResponseDataRequested(final long n) {
+ try {
+ first.onResponseDataRequested(n);
+ } finally {
+ second.onResponseDataRequested(n);
+ }
+ }
+
@Override
public void onResponseData(final Buffer data) {
try {
diff --git a/servicetalk-grpc-utils/src/main/java/io/servicetalk/grpc/utils/LoggingGrpcLifecycleObserver.java b/servicetalk-grpc-utils/src/main/java/io/servicetalk/grpc/utils/LoggingGrpcLifecycleObserver.java
index 2e21490b50..4cb496db4e 100644
--- a/servicetalk-grpc-utils/src/main/java/io/servicetalk/grpc/utils/LoggingGrpcLifecycleObserver.java
+++ b/servicetalk-grpc-utils/src/main/java/io/servicetalk/grpc/utils/LoggingGrpcLifecycleObserver.java
@@ -95,6 +95,10 @@ public GrpcRequestObserver onRequest(final HttpRequestMetaData requestMetaData)
return this;
}
+ @Override
+ public void onRequestDataRequested(final long n) {
+ }
+
@Override
public void onRequestData(final Buffer data) {
requestSize += data.readableBytes();
@@ -131,6 +135,10 @@ public GrpcResponseObserver onResponse(final HttpResponseMetaData responseMetaDa
return this;
}
+ @Override
+ public void onResponseDataRequested(final long n) {
+ }
+
@Override
public void onResponseData(final Buffer data) {
responseSize += data.readableBytes();
diff --git a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/HttpLifecycleObserver.java b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/HttpLifecycleObserver.java
index 053ddd8ef2..52e41ca7d7 100644
--- a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/HttpLifecycleObserver.java
+++ b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/HttpLifecycleObserver.java
@@ -109,6 +109,17 @@ interface HttpExchangeObserver {
*/
interface HttpRequestObserver {
+ /**
+ * Callback when subscriber requests {@code n} items of the request payload body.
+ *
+ * May be invoked multiple times. Helps to track when items are requested and when they are
+ * {@link #onRequestData(Buffer) delivered}.
+ *
+ * @param n number of requested items
+ */
+ default void onRequestDataRequested(long n) { // FIXME: 0.43 - consider removing default impl
+ }
+
/**
* Callback when the request payload body data chunk was observed.
*
@@ -158,6 +169,16 @@ interface HttpRequestObserver {
*/
interface HttpResponseObserver {
+ /**
+ * Callback when subscriber requests {@code n} items of the response payload body.
+ *
+ * May be invoked multiple times. Helps to track when items are requested and when they are
+ * {@link #onResponseData delivered}.
+ *
+ * @param n number of requested items
+ */
+ void onResponseDataRequested(long n); // FIXME: 0.43 - consider removing default impl
+
/**
* Callback when the response payload body data chunk was observed.
*
diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/AbstractLifecycleObserverHttpFilter.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/AbstractLifecycleObserverHttpFilter.java
index 2deb50510d..96e4bddbf4 100644
--- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/AbstractLifecycleObserverHttpFilter.java
+++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/AbstractLifecycleObserverHttpFilter.java
@@ -104,7 +104,9 @@ final Single trackLifecycle(@Nullable final ConnectionInf
return NoopSubscriber.INSTANCE;
});
}
- return p.beforeOnNext(item -> {
+ return p.beforeRequest(n -> safeReport(onRequest::onRequestDataRequested, n, onRequest,
+ "onRequestDataRequested"))
+ .beforeOnNext(item -> {
if (item instanceof Buffer) {
safeReport(onRequest::onRequestData, (Buffer) item, onRequest, "onRequestData");
} else if (item instanceof HttpHeaders) {
@@ -151,7 +153,9 @@ public void cancel() {
// needs to be applied last.
.map(resp -> {
exchangeContext.onResponse(resp);
- return resp.transformMessageBody(p -> p.beforeOnNext(exchangeContext::onResponseBody));
+ return resp.transformMessageBody(p -> p
+ .beforeRequest(exchangeContext::onResponseDataRequested)
+ .beforeOnNext(exchangeContext::onResponseBody));
}).shareContextOnSubscribe();
});
}
@@ -192,6 +196,11 @@ void onResponse(HttpResponseMetaData responseMetaData) {
NoopHttpLifecycleObserver.NoopHttpResponseObserver.INSTANCE);
}
+ void onResponseDataRequested(final long n) {
+ assert onResponse != null;
+ safeReport(onResponse::onResponseDataRequested, n, onResponse, "onResponseDataRequested");
+ }
+
void onResponseBody(final Object item) {
assert onResponse != null;
if (item instanceof Buffer) {
diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/NoopHttpLifecycleObserver.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/NoopHttpLifecycleObserver.java
index dfd397d70f..406e9f383b 100644
--- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/NoopHttpLifecycleObserver.java
+++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/NoopHttpLifecycleObserver.java
@@ -78,6 +78,10 @@ private NoopHttpRequestObserver() {
// Singleton
}
+ @Override
+ public void onRequestDataRequested(final long n) {
+ }
+
@Override
public void onRequestData(final Buffer data) {
}
@@ -107,6 +111,10 @@ private NoopHttpResponseObserver() {
// Singleton
}
+ @Override
+ public void onResponseDataRequested(final long n) {
+ }
+
@Override
public void onResponseData(final Buffer data) {
}
diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/HttpLifecycleObserverTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/HttpLifecycleObserverTest.java
index 0a411cc633..f2f2593b83 100644
--- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/HttpLifecycleObserverTest.java
+++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/HttpLifecycleObserverTest.java
@@ -73,6 +73,8 @@
import static org.hamcrest.Matchers.sameInstance;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.atMostOnce;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.inOrder;
@@ -267,6 +269,7 @@ public Single handle(HttpServiceContext ctx,
clientInOrder.verify(clientExchangeObserver).onRequest(any(StreamingHttpRequest.class));
clientInOrder.verify(clientExchangeObserver).onConnectionSelected(any(ConnectionInfo.class));
clientInOrder.verify(clientExchangeObserver).onResponseCancel();
+ clientRequestInOrder.verify(clientRequestObserver).onRequestDataRequested(anyLong());
clientRequestInOrder.verify(clientRequestObserver).onRequestData(any(Buffer.class));
clientRequestInOrder.verify(clientRequestObserver).onRequestCancel();
clientInOrder.verify(clientExchangeObserver).onExchangeFinally();
@@ -276,11 +279,13 @@ public Single handle(HttpServiceContext ctx,
serverInOrder.verify(serverLifecycleObserver).onNewExchange();
serverInOrder.verify(serverExchangeObserver).onConnectionSelected(any(ConnectionInfo.class));
serverInOrder.verify(serverExchangeObserver).onRequest(any(StreamingHttpRequest.class));
+ serverRequestInOrder.verify(serverRequestObserver, atLeastOnce()).onRequestDataRequested(anyLong());
serverRequestInOrder.verify(serverRequestObserver).onRequestData(any(Buffer.class));
serverRequestInOrder.verify(serverRequestObserver).onRequestError(any(IOException.class));
// because of offloading, cancel from the IO-thread may race with an error propagated through request publisher:
verify(serverExchangeObserver, atMostOnce()).onResponseCancel();
verify(serverExchangeObserver, atMostOnce()).onResponse(any(StreamingHttpResponse.class));
+ verify(serverResponseObserver, atMostOnce()).onResponseDataRequested(anyLong());
verify(serverResponseObserver, atMostOnce()).onResponseComplete();
serverInOrder.verify(serverExchangeObserver).onExchangeFinally();
verifyNoMoreInteractions(serverLifecycleObserver, serverExchangeObserver,
@@ -323,7 +328,9 @@ public Single handle(HttpServiceContext ctx,
clientInOrder.verify(clientExchangeObserver).onConnectionSelected(any(ConnectionInfo.class));
clientInOrder.verify(clientExchangeObserver).onRequest(any(StreamingHttpRequest.class));
clientInOrder.verify(clientExchangeObserver).onResponse(any(StreamingHttpResponse.class));
+ clientInOrder.verify(clientResponseObserver, atLeastOnce()).onResponseDataRequested(anyLong());
clientInOrder.verify(clientResponseObserver).onResponseCancel();
+ verify(clientRequestObserver, atLeastOnce()).onRequestDataRequested(anyLong());
clientRequestInOrder.verify(clientRequestObserver).onRequestData(any(Buffer.class));
clientRequestInOrder.verify(clientRequestObserver).onRequestTrailers(any(HttpHeaders.class));
clientRequestInOrder.verify(clientRequestObserver).onRequestComplete();
@@ -335,8 +342,10 @@ public Single handle(HttpServiceContext ctx,
serverInOrder.verify(serverExchangeObserver).onConnectionSelected(any(ConnectionInfo.class));
serverInOrder.verify(serverExchangeObserver).onRequest(any(StreamingHttpRequest.class));
serverInOrder.verify(serverExchangeObserver).onResponse(any(StreamingHttpResponse.class));
+ verify(serverResponseObserver, atLeastOnce()).onResponseDataRequested(anyLong());
verify(serverResponseObserver, atMostOnce()).onResponseData(any(Buffer.class));
serverInOrder.verify(serverResponseObserver).onResponseCancel();
+ serverRequestInOrder.verify(serverRequestObserver, atLeastOnce()).onRequestDataRequested(anyLong());
serverRequestInOrder.verify(serverRequestObserver).onRequestData(any(Buffer.class));
serverRequestInOrder.verify(serverRequestObserver).onRequestComplete();
serverInOrder.verify(serverExchangeObserver).onExchangeFinally();
@@ -367,11 +376,13 @@ private static void verifyObservers(boolean client, HttpLifecycleObserver lifecy
inOrder.verify(exchange).onRequest(any(StreamingHttpRequest.class));
}
inOrder.verify(exchange).onResponse(any(StreamingHttpResponse.class));
+ verify(response, atLeastOnce()).onResponseDataRequested(anyLong());
inOrder.verify(response, hasMessageBody ? times(1) : never()).onResponseData(any(Buffer.class));
inOrder.verify(response, hasTrailers && !client ? times(1) : never())
.onResponseTrailers(any(HttpHeaders.class));
inOrder.verify(response).onResponseComplete();
+ verify(request, atLeastOnce()).onRequestDataRequested(anyLong());
requestInOrder.verify(request, hasMessageBody ? times(1) : never()).onRequestData(any(Buffer.class));
requestInOrder.verify(request, hasTrailers && client ? times(1) : never())
.onRequestTrailers(any(HttpHeaders.class));
@@ -393,8 +404,10 @@ private static void verifyError(boolean client, HttpLifecycleObserver lifecycle,
inOrder.verify(exchange).onRequest(any(StreamingHttpRequest.class));
}
inOrder.verify(exchange).onResponse(any(StreamingHttpResponse.class));
+ verify(response, atLeastOnce()).onResponseDataRequested(anyLong());
inOrder.verify(response).onResponseError(!client ? DELIBERATE_EXCEPTION : any());
+ verify(request, atLeastOnce()).onRequestDataRequested(anyLong());
requestInOrder.verify(request, never()).onRequestTrailers(any(HttpHeaders.class));
requestInOrder.verify(request).onRequestComplete();
diff --git a/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/BiHttpLifecycleObserver.java b/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/BiHttpLifecycleObserver.java
index aeae791679..b110aa9416 100644
--- a/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/BiHttpLifecycleObserver.java
+++ b/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/BiHttpLifecycleObserver.java
@@ -127,6 +127,15 @@ private static final class BiHttpRequestObserver implements HttpRequestObserver
this.second = requireNonNull(second);
}
+ @Override
+ public void onRequestDataRequested(final long n) {
+ try {
+ first.onRequestDataRequested(n);
+ } finally {
+ second.onRequestDataRequested(n);
+ }
+ }
+
@Override
public void onRequestData(final Buffer data) {
try {
@@ -183,6 +192,15 @@ private BiHttpResponseObserver(final HttpResponseObserver first, final HttpRespo
this.second = requireNonNull(second);
}
+ @Override
+ public void onResponseDataRequested(final long n) {
+ try {
+ first.onResponseDataRequested(n);
+ } finally {
+ second.onResponseDataRequested(n);
+ }
+ }
+
@Override
public void onResponseData(final Buffer data) {
try {
diff --git a/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/LoggingHttpLifecycleObserver.java b/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/LoggingHttpLifecycleObserver.java
index 8be3531c46..7025447e41 100644
--- a/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/LoggingHttpLifecycleObserver.java
+++ b/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/LoggingHttpLifecycleObserver.java
@@ -91,6 +91,10 @@ public HttpRequestObserver onRequest(final HttpRequestMetaData requestMetaData)
return this;
}
+ @Override
+ public void onRequestDataRequested(final long n) {
+ }
+
@Override
public void onRequestData(final Buffer data) {
requestSize += data.readableBytes();
@@ -127,6 +131,10 @@ public HttpResponseObserver onResponse(final HttpResponseMetaData responseMetaDa
return this;
}
+ @Override
+ public void onResponseDataRequested(final long n) {
+ }
+
@Override
public void onResponseData(final Buffer data) {
responseSize += data.readableBytes();