-
Notifications
You must be signed in to change notification settings - Fork 79
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
A ByteStream abstraction to represent streaming content. #298
Conversation
this.upstream.subscribe(this); | ||
return future; | ||
} else { | ||
throw new IllegalStateException("Secondary subscription!"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps a message like "This operator may only be subscribed to once" would be clearer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will change to:
throw new IllegalStateException("ByteStreamAggregator may only be started once.");
And for what it is worth I have changed the class name to ByteStreamAggregator
. As this is not really an operator. It is a consumer of a byte stream, not an operator that modifies the stream.
this.subscription.request(Long.MAX_VALUE); | ||
} else { | ||
subscription.cancel(); | ||
throw new IllegalStateException("Second onSubscribe event to AggregateOperator"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like we should use the same exception + message as the other method (create a throwSubscriptionException
method or something?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to throw new IllegalStateException("ByteStreamAggregator supports only one Producer instance.");
public final class Buffer { | ||
private final ByteBuf delegate; | ||
|
||
public Buffer(ByteBuf byteBuf) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By having this constructor, netty ByteBuf
is still visible in our API. I'm not sure what the solution is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. I still need to think about this.
@@ -59,27 +50,27 @@ | |||
* chunk must be processed as they arrive. It is also useful for dealing with | |||
* very large content sizes, and in situations where content size is not known | |||
* upfront. | |||
* <p> | |||
* <p>z |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
z
} | ||
|
||
@Test | ||
public void publisherBackpressure() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a proper test name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
supportsReactiveBackpressure
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
supportsBackpressure
?
@@ -107,7 +108,7 @@ private synchronized void returnIfConnected(Optional<ConnectionPool> connectionP | |||
return origin | |||
.map(ConnectionPool::borrowConnection) | |||
.orElseGet(() -> { | |||
request.releaseContentBuffers(); | |||
toObservable(request.body()).forEach(it -> it.delegate().release()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this use request.body().drop()
now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The way to do this now is:
// Guaranteed to aggregate an empty body:
request.body().drop().aggregate(1);
bb2efca
to
61a88e1
Compare
import static io.netty.buffer.Unpooled.copiedBuffer; | ||
import static java.util.Objects.requireNonNull; | ||
|
||
/** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment is mostly telling us things we already know - that we're in Styx, that classes are a form of abstraction, etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should avoid saying "A Styx byte buffer" and just say "A byte buffer". Otherwise it seems like we're implying that we have some kind of unique (unspecified) byte buffer behaviour. To us, it seems important because we're differentiating it from Netty ByteBuf
, but someone looking at the docs for the first time wouldn't be thinking about that (and can see that it is a Styx class from the package name).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 Makes sense.
components/api/src/main/java/com/hotels/styx/api/ByteStream.java
Outdated
Show resolved
Hide resolved
components/api/src/main/java/com/hotels/styx/api/ByteStream.java
Outdated
Show resolved
Hide resolved
components/api/src/main/java/com/hotels/styx/api/ByteStreamAggregator.java
Outdated
Show resolved
Hide resolved
this.status = statusWithCode(response.status().code()); | ||
this.version = httpVersion(response.version().toString()); | ||
this.headers = response.headers().newBuilder(); | ||
this.body = contentStream; | ||
this.body = byteStream; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is byteStream
allowed to be null?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. Added the null checks in Builder. Do you think it still warrants one here?
components/api/src/test/java/com/hotels/styx/api/ByteStreamTest.java
Outdated
Show resolved
Hide resolved
components/server/src/main/java/com/hotels/styx/server/netty/connectors/HttpResponseWriter.java
Outdated
Show resolved
Hide resolved
Enable previously commented out tests.
Add ContentStream Events.
…tor. Add a file for ByteStream class.
…Java objects. Use the new ByteStream class in HTTP message objects.
RxReactiveStreams bridges.
ByteStream: Rename discard() to drop(). Fix HttpResponseTest: canRemoveResponseBody.
Change AggregateOperator name to ByteStreamAggregator.
…fter aggregation has finished. Remove HttpMessageBodies class.
61a88e1
to
5c54c66
Compare
...onents/server/src/main/java/com/hotels/styx/server/netty/connectors/HttpPipelineHandler.java
Outdated
Show resolved
Hide resolved
components/server/src/main/java/com/hotels/styx/server/netty/connectors/HttpResponseWriter.java
Show resolved
Hide resolved
@@ -46,7 +51,7 @@ private StaticResponseHandler(int status, String text) { | |||
|
|||
@Override | |||
public StyxObservable<HttpResponse> handle(HttpRequest request, HttpInterceptor.Context context) { | |||
return StyxObservable.of(HttpResponse.response(statusWithCode(status)).body(StyxObservable.of(text), UTF_8).build()); | |||
return StyxObservable.of(response(statusWithCode(status)).body(new ByteStream(toPublisher(Observable.just(new Buffer(text, UTF_8))))).build()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't we use Flux.just here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like Flux can be used here.
@@ -57,11 +61,11 @@ public AppHandler(Origin origin) { | |||
request.queryParam("status").ifPresent(status -> | |||
responseBuilder | |||
.status(httpResponseStatus(status)) | |||
.body(StyxObservable.of("Returning requested status (" + status + ")"), UTF_8) | |||
.body(new ByteStream(toPublisher(Observable.just(new Buffer("Returning requested status (" + status + ")", UTF_8))))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As in other parts of coude, Is there any reason (visibility?) why we cannot use Flux.just() instead of creating an rxJava Observable with .just() and transforming it to a publisher ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah it looks it is OK to use Flux.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't want to add the comment in every occurrence but you can search for Observable.just()...
@@ -65,7 +65,7 @@ class AggregatingPluginSpec extends FunSpec | |||
|
|||
it("Gets response from aggregating plugin (with body)") { | |||
mockServer.stub("/body", responseSupplier( | |||
() => response(OK).body(StyxObservable.from(Seq(chunk("a"), chunk("b"), chunk("c"), chunk("d"), chunk("e")).asJava)).build() | |||
() => response(OK).body(new ByteStream(RxReactiveStreams.toPublisher(Observable.from(Seq(chunk("a"), chunk("b"), chunk("c"), chunk("d"), chunk("e")).asJava)))).build() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
static import?
@@ -127,7 +126,7 @@ class OriginClosesConnectionSpec extends FunSuite | |||
.toStreamingRequest) | |||
|
|||
responseObservable | |||
.doOnNext((t: HttpResponse) => toRxObservable(t.body()).subscribe(contentSubscriber)) | |||
.doOnNext((t: HttpResponse) => RxReactiveStreams.toObservable(t.body()).subscribe(contentSubscriber)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
static import?
@@ -88,14 +85,14 @@ class AsyncDelayPlugin extends PluginAdapter { | |||
chain.proceed(request) | |||
.flatMap(asJavaFunction((response: HttpResponse) => { | |||
|
|||
val transformedContent: Observable[ByteBuf] = toRxObservable(response.body()) | |||
val transformedContent: Observable[Buffer] = RxReactiveStreams.toObservable(response.body()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
static import?
* <ul> | ||
* <li> When the mapping function returns a new {@link Buffer} instance, the reference count for | ||
* the old one is automatically decremented.</li> | ||
* <li> When the mapping function modifies the {@link Buffer} in place, returning the same instance |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Our Buffer
class is currently immutable, so this description may be confusing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would leave this open for now. Performance-wise it would be sensible to allow in-place modifications for the buffer. That would require further modifications to the Buffer class. WDYT?
Raising this just to get some preliminary comments for this work.
The PR adds a ByteStream class that abstracts a live asynchronous content stream. The ByteStream implements a reactive-streams
Publisher
interface and is implemented on top of reactor-coreFlux
.Netty
ByteBuf
has been encapsulated inside a new StyxBuffer
class.