Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A ByteStream abstraction to represent streaming content. #298

Merged
merged 28 commits into from
Oct 11, 2018

Conversation

mikkokar
Copy link
Contributor

@mikkokar mikkokar commented Oct 3, 2018

Raising this just to get some preliminary comments for this work.

  • The PR adds a ByteStream class that abstracts a live asynchronous content stream. The ByteStream implements a reactive-streams Publisher interface and is implemented on top of reactor-core Flux.

  • Netty ByteBuf has been encapsulated inside a new Styx Buffer class.

this.upstream.subscribe(this);
return future;
} else {
throw new IllegalStateException("Secondary subscription!");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps a message like "This operator may only be subscribed to once" would be clearer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will change to:

throw new IllegalStateException("ByteStreamAggregator may only be started once.");

And for what it is worth I have changed the class name to ByteStreamAggregator. As this is not really an operator. It is a consumer of a byte stream, not an operator that modifies the stream.

this.subscription.request(Long.MAX_VALUE);
} else {
subscription.cancel();
throw new IllegalStateException("Second onSubscribe event to AggregateOperator");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we should use the same exception + message as the other method (create a throwSubscriptionException method or something?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to throw new IllegalStateException("ByteStreamAggregator supports only one Producer instance.");

public final class Buffer {
private final ByteBuf delegate;

public Buffer(ByteBuf byteBuf) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By having this constructor, netty ByteBuf is still visible in our API. I'm not sure what the solution is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. I still need to think about this.

@@ -59,27 +50,27 @@
* chunk must be processed as they arrive. It is also useful for dealing with
* very large content sizes, and in situations where content size is not known
* upfront.
* <p>
* <p>z
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

z

}

@Test
public void publisherBackpressure() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a proper test name.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

supportsReactiveBackpressure?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

supportsBackpressure?

@@ -107,7 +108,7 @@ private synchronized void returnIfConnected(Optional<ConnectionPool> connectionP
return origin
.map(ConnectionPool::borrowConnection)
.orElseGet(() -> {
request.releaseContentBuffers();
toObservable(request.body()).forEach(it -> it.delegate().release());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this use request.body().drop() now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way to do this now is:

// Guaranteed to aggregate an empty body:
request.body().drop().aggregate(1);

@mikkokar mikkokar force-pushed the content-stream branch 2 times, most recently from bb2efca to 61a88e1 Compare October 5, 2018 13:58
import static io.netty.buffer.Unpooled.copiedBuffer;
import static java.util.Objects.requireNonNull;

/**
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment is mostly telling us things we already know - that we're in Styx, that classes are a form of abstraction, etc.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should avoid saying "A Styx byte buffer" and just say "A byte buffer". Otherwise it seems like we're implying that we have some kind of unique (unspecified) byte buffer behaviour. To us, it seems important because we're differentiating it from Netty ByteBuf, but someone looking at the docs for the first time wouldn't be thinking about that (and can see that it is a Styx class from the package name).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Makes sense.

this.status = statusWithCode(response.status().code());
this.version = httpVersion(response.version().toString());
this.headers = response.headers().newBuilder();
this.body = contentStream;
this.body = byteStream;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is byteStream allowed to be null?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. Added the null checks in Builder. Do you think it still warrants one here?

Enable previously commented out tests.
Add ContentStream Events.
…Java objects.

Use the new ByteStream class in HTTP message objects.
ByteStream: Rename discard() to drop().
Fix HttpResponseTest: canRemoveResponseBody.
@@ -46,7 +51,7 @@ private StaticResponseHandler(int status, String text) {

@Override
public StyxObservable<HttpResponse> handle(HttpRequest request, HttpInterceptor.Context context) {
return StyxObservable.of(HttpResponse.response(statusWithCode(status)).body(StyxObservable.of(text), UTF_8).build());
return StyxObservable.of(response(statusWithCode(status)).body(new ByteStream(toPublisher(Observable.just(new Buffer(text, UTF_8))))).build());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we use Flux.just here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like Flux can be used here.

@@ -57,11 +61,11 @@ public AppHandler(Origin origin) {
request.queryParam("status").ifPresent(status ->
responseBuilder
.status(httpResponseStatus(status))
.body(StyxObservable.of("Returning requested status (" + status + ")"), UTF_8)
.body(new ByteStream(toPublisher(Observable.just(new Buffer("Returning requested status (" + status + ")", UTF_8)))))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As in other parts of coude, Is there any reason (visibility?) why we cannot use Flux.just() instead of creating an rxJava Observable with .just() and transforming it to a publisher ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it looks it is OK to use Flux.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't want to add the comment in every occurrence but you can search for Observable.just()...

@@ -65,7 +65,7 @@ class AggregatingPluginSpec extends FunSpec

it("Gets response from aggregating plugin (with body)") {
mockServer.stub("/body", responseSupplier(
() => response(OK).body(StyxObservable.from(Seq(chunk("a"), chunk("b"), chunk("c"), chunk("d"), chunk("e")).asJava)).build()
() => response(OK).body(new ByteStream(RxReactiveStreams.toPublisher(Observable.from(Seq(chunk("a"), chunk("b"), chunk("c"), chunk("d"), chunk("e")).asJava)))).build()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

static import?

@@ -127,7 +126,7 @@ class OriginClosesConnectionSpec extends FunSuite
.toStreamingRequest)

responseObservable
.doOnNext((t: HttpResponse) => toRxObservable(t.body()).subscribe(contentSubscriber))
.doOnNext((t: HttpResponse) => RxReactiveStreams.toObservable(t.body()).subscribe(contentSubscriber))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

static import?

@@ -88,14 +85,14 @@ class AsyncDelayPlugin extends PluginAdapter {
chain.proceed(request)
.flatMap(asJavaFunction((response: HttpResponse) => {

val transformedContent: Observable[ByteBuf] = toRxObservable(response.body())
val transformedContent: Observable[Buffer] = RxReactiveStreams.toObservable(response.body())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

static import?

* <ul>
* <li> When the mapping function returns a new {@link Buffer} instance, the reference count for
* the old one is automatically decremented.</li>
* <li> When the mapping function modifies the {@link Buffer} in place, returning the same instance
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Our Buffer class is currently immutable, so this description may be confusing.

Copy link
Contributor Author

@mikkokar mikkokar Oct 10, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would leave this open for now. Performance-wise it would be sensible to allow in-place modifications for the buffer. That would require further modifications to the Buffer class. WDYT?

@mikkokar mikkokar merged commit 5d9b79c into ExpediaGroup:master Oct 11, 2018
@mikkokar mikkokar deleted the content-stream branch October 11, 2018 08:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants