Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A ByteStream abstraction to represent streaming content. #298

Merged
merged 28 commits into from
Oct 11, 2018
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
c7558e9
Add ContentStream interface and an RxContentStream implementation.
mikkokar Sep 20, 2018
cf88b4a
Add ContentStream.map method.
mikkokar Sep 20, 2018
9b2a52c
Add consume method to ContentStream.
mikkokar Sep 20, 2018
40a909f
Implement Reactive Streams RxContentPublisher.
mikkokar Sep 25, 2018
e848df4
Implement ContentAggregator, RxContentConsumer for the new content st…
mikkokar Sep 26, 2018
e9c2d79
Tidy up. Add copyright notices to new files. Etc.
mikkokar Sep 26, 2018
f53ed22
Add DiscardOperator. Rename ContentAggregator class to AggregateOpera…
mikkokar Sep 27, 2018
7586320
Add ByteStream and MappingOperator classes.
mikkokar Sep 27, 2018
198bb1b
Refactor DiscardOperator (to resemble MappingOperator).
mikkokar Sep 27, 2018
72fab60
Disable checkstyle for RxContentPublisher and RxContentStream FSMs.
mikkokar Sep 27, 2018
4cd25eb
Use rxjava-reactive-streams to bridge between reactive streams and Rx…
mikkokar Sep 28, 2018
1648e0e
Deprecate RxContentPublisher and RxContentConsumer classes in favour of
mikkokar Sep 28, 2018
de4e99e
Use Reactor-Core in Styx-API HTTP message objects and API tests.
mikkokar Oct 1, 2018
594e109
Replace DiscardOperator and MappingOperator with corresponding reactor
mikkokar Oct 1, 2018
1fde3b0
Make ByteStream to implement Publisher<Buffer> API.
mikkokar Oct 1, 2018
2b2687b
Builder setter methods for HTTP message body.
mikkokar Oct 2, 2018
f11a4b9
ByteStream: Add end-of-stream future.
mikkokar Oct 2, 2018
4fe01a0
ByteStream: doOnEnd action.
mikkokar Oct 2, 2018
4b676e7
Move ByteSream into api package.
mikkokar Oct 3, 2018
5ae13c9
Tidy up pom files.
mikkokar Oct 3, 2018
65824b6
Address code review comments.
mikkokar Oct 3, 2018
e7dbe9e
Hide Netty ByteBuf from the Buffer class.
mikkokar Oct 5, 2018
85f2133
HttpRequest/Response .toFull method: Release aggregated byte buffer a…
mikkokar Oct 5, 2018
5c54c66
Resolved some low-hanging code review comments.
mikkokar Oct 10, 2018
d589190
Add Javadocs to Buffer and ByteStream classes.
mikkokar Oct 10, 2018
d5bd3b8
Add static import.
mikkokar Oct 10, 2018
1515026
Address code review comments.
mikkokar Oct 10, 2018
0eb4ddc
Improve JavaDoc for Buffer.java.
mikkokar Oct 10, 2018
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions components/api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,28 @@
<artifactId>rxjava</artifactId>
</dependency>

<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxjava-reactive-streams</artifactId>
</dependency>

<!-- https://mvnrepository.com/artifact/io.projectreactor/reactor-core -->
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>

<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
</dependency>

<dependency>
<groupId>org.hdrhistogram</groupId>
<artifactId>HdrHistogram</artifactId>
Expand Down
73 changes: 73 additions & 0 deletions components/api/src/main/java/com/hotels/styx/api/Buffer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
Copyright (C) 2013-2018 Expedia Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package com.hotels.styx.api;

import io.netty.buffer.ByteBuf;

import java.nio.charset.Charset;

import static io.netty.buffer.Unpooled.copiedBuffer;
import static java.util.Objects.requireNonNull;

/**
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment is mostly telling us things we already know - that we're in Styx, that classes are a form of abstraction, etc.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should avoid saying "A Styx byte buffer" and just say "A byte buffer". Otherwise it seems like we're implying that we have some kind of unique (unspecified) byte buffer behaviour. To us, it seems important because we're differentiating it from Netty ByteBuf, but someone looking at the docs for the first time wouldn't be thinking about that (and can see that it is a Styx class from the package name).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Makes sense.

* A Styx byte buffer.
*
*/
public final class Buffer {
private final ByteBuf delegate;

Buffer(ByteBuf byteBuf) {
this.delegate = requireNonNull(byteBuf);
}

/**
* Creates a new Buffer from {@link String} content with specified character encoding.
*
* @param content content
* @param charset desired character encoding
*/
public Buffer(String content, Charset charset) {
this(copiedBuffer(content, charset));
}

/**
* Returns a size of the Buffer in bytes.
* @return a size in bytes
*/
public int size() {
return delegate.readableBytes();
}

/**
* Returns buffer content as array of bytes.
*
* @return a byte array
*/
public byte[] content() {
byte[] bytes = new byte[delegate.readableBytes()];
delegate.getBytes(delegate.readerIndex(), bytes);
return bytes;
}

/**
* The underlying Netty ByteBuf.
*
* @return a Netty ByteBuf object
*/
ByteBuf delegate() {
return delegate;
}
}
139 changes: 139 additions & 0 deletions components/api/src/main/java/com/hotels/styx/api/ByteStream.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
Copyright (C) 2013-2018 Expedia Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package com.hotels.styx.api;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.core.publisher.Flux;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Function;

import static java.util.Objects.requireNonNull;

/**
mikkokar marked this conversation as resolved.
Show resolved Hide resolved
* A stream of Styx byte {@link Buffer} objects constituting a HTTP message body.
*
* This {@code ByteStream} class implements a reactive streams {@link Publisher} interface,
* therefore being interoperable with other conforming libraries such as Reactor and
* Rx Java 2.0.
*
* The class provides a set of operations to transform and inspect the byte stream.
*
* The class also provides methods for consuming the stream.
**
*/
public class ByteStream implements Publisher<Buffer> {
private final Publisher<Buffer> stream;

/**
* Create a new {@code ByteStream} from a reactive streams {@link Publisher}.
*
* @param stream a reactive streams {@link Publisher}
*/
public ByteStream(Publisher<Buffer> stream) {
mikkokar marked this conversation as resolved.
Show resolved Hide resolved
this.stream = requireNonNull(stream);
}

/**
* Transform the stream by performing a mapping operation on each {@link Buffer} object.
*
* The mapping operation automatically maintains the @{link Buffer} reference counts as
* follows:
*
* <ul>
* <li> When the mapping function returns a new {@link Buffer} instance, the reference count for
* the old one is automatically decremented.</li>
* <li> When the mapping function modifies the {@link Buffer} in place, returning the same instance
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Our Buffer class is currently immutable, so this description may be confusing.

Copy link
Contributor Author

@mikkokar mikkokar Oct 10, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would leave this open for now. Performance-wise it would be sensible to allow in-place modifications for the buffer. That would require further modifications to the Buffer class. WDYT?

* back, the reference count is unchanged.</li>
* </ul>
*
* @param mapping a mapping function
*
* @return a new, mapped {@code ByteStream} object
*/
public ByteStream map(Function<Buffer, Buffer> mapping) {
return new ByteStream(Flux.from(stream).map(releaseOldBuffers(mapping)));
}

private static Function<Buffer, Buffer> releaseOldBuffers(Function<Buffer, Buffer> mapping) {
return buffer -> {
Buffer buffer2 = mapping.apply(buffer);
if (buffer != buffer2) {
buffer.delegate().release();
}
return buffer2;
};
}

/**
* Transform the stream by dropping all {@link Buffer} objects.
*
* The {@code drop} returns a new {@code ByteStream} object with all upstream
* buffers removed. The {@code drop} automatically decrements the reference
* counts for each dropped {@link Buffer}.
*
* @return an empty {@link ByteStream}
*/
public ByteStream drop() {
return new ByteStream(Flux.from(stream)
.doOnNext(buffer -> buffer.delegate().release())
.filter(buffer -> false));
}

/**
* Run a provided action at the end of the byte stream.
*
* The provided action must accept an {@code Optional<Throwable>} argument,
* which is be set to {@code Optional.empty} if this stream finished successfully,
* or an {@code Optional.of(cause)} when this stream terminated with an error.
*
* @param action an action function
*
* @return an unmodified {@code ByteStream} with an action function attached
*/
public ByteStream doOnEnd(Consumer<Optional<Throwable>> action) {
return new ByteStream(Flux.from(this.stream)
.doOnError(cause -> action.accept(Optional.of(cause)))
.doOnComplete(() -> action.accept(Optional.empty()))
);
}

/**
* Consumes the stream by collecting it into an aggregate {@link Buffer} object.
*
* The aggregate {@link Buffer} object must be released after use.
*
* @param maxContentBytes maximum size for the aggregated buffer
* @return a future of aggregated buffer
*/
public CompletableFuture<Buffer> aggregate(int maxContentBytes) {
return new ByteStreamAggregator(this.stream, maxContentBytes)
.apply();
}

/**
* Consume the {@link ByteStream} by providing a reactive streams {@link Subscriber}.
*
* @param subscriber a reactive streams {@link Subscriber}
*/
@Override
public void subscribe(Subscriber<? super Buffer> subscriber) {
stream.subscribe(subscriber);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
Copyright (C) 2013-2018 Expedia Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package com.hotels.styx.api;

import io.netty.buffer.CompositeByteBuf;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;

import static io.netty.buffer.Unpooled.compositeBuffer;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;

class ByteStreamAggregator implements Subscriber<Buffer> {
private final Publisher<Buffer> upstream;
private final int maxSize;
private final CompletableFuture<Buffer> future = new CompletableFuture<>();
private final AtomicBoolean active = new AtomicBoolean();
private final CompositeByteBuf aggregated = compositeBuffer();
private Subscription subscription;

ByteStreamAggregator(Publisher<Buffer> upstream, int maxSize) {
this.upstream = requireNonNull(upstream);
this.maxSize = maxSize;
}

public CompletableFuture<Buffer> apply() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is always called immediately after construction, perhaps it would make sense to put the code directly into the constructor?

This would also guarantee that it can only be used once.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would expose a half-constructed ByteStreamAggregator object to the upstream.

Also because the ByteStreamAggregator is package private, the apply method is never visible to Styx-API users, and therefore we don't have to solve this problem right now.

if (active.compareAndSet(false, true)) {
this.upstream.subscribe(this);
return future;
} else {
throw new IllegalStateException("ByteStreamAggregator may only be started once.");
}
}

@Override
public void onSubscribe(Subscription subscription) {
if (this.subscription == null) {
this.subscription = subscription;
this.subscription.request(Long.MAX_VALUE);
} else {
subscription.cancel();
throw new IllegalStateException("ByteStreamAggregator supports only one Producer instance.");
}
}

@Override
public void onNext(Buffer part) {
long newSize = aggregated.readableBytes() + part.size();

if (newSize > maxSize) {
part.delegate().release();
aggregated.release();
subscription.cancel();
this.future.completeExceptionally(
new ContentOverflowException(format("Maximum content size exceeded. Maximum size allowed is %d bytes.", maxSize)));
} else {
aggregated.addComponent(part.delegate());
aggregated.writerIndex(aggregated.writerIndex() + part.size());
}
}

@Override
public void onError(Throwable cause) {
aggregated.release();
subscription.cancel();
future.completeExceptionally(cause);
}

@Override
public void onComplete() {
future.complete(new Buffer(aggregated));
}

}
Loading