diff --git a/docs/developer-guide/plugins-scenarios.md b/docs/developer-guide/plugins-scenarios.md index 80d73fde30..da03227f0a 100644 --- a/docs/developer-guide/plugins-scenarios.md +++ b/docs/developer-guide/plugins-scenarios.md @@ -4,258 +4,344 @@ ### Synchronously transforming requests -Transforming request object synchronously is trivial. Just transform the HttpRequest object via the HttpRequest.Builder -object, and pass it to the chain.proceed(), as the example demonstrates. - - import com.hotels.styx.api.HttpInterceptor; - import com.hotels.styx.api.HttpRequest; - import com.hotels.styx.api.HttpResponse; - - public class FooAppendingInterceptor implements HttpInterceptor { - @Override - public Observable intercept(HttpRequest request, Chain chain) { - return chain.proceed( - request.newBuilder() - .header("X-Foo", "Bar") - .build()); - } +Transforming a request object synchronously is trivial. By "synchronously" we mean in the +same thread, in non-blocking fashion. + +Just call `request.newBuilder()` to +create a new `HttpRequest.Builder` object. It has already been initialised with the copy of +the original `request`. Modify the builder as desired, construct a new version, and pass +it to the `chain.proceed()`, as the example demonstrates: + +```java +import com.hotels.styx.api.HttpRequest; +import com.hotels.styx.api.HttpResponse; +import com.hotels.styx.api.StyxObservable; +import com.hotels.styx.api.plugins.spi.Plugin; + +import static com.hotels.styx.api.HttpHeaderNames.USER_AGENT; + +public class SyncRequestPlugin implements Plugin { + @Override + public StyxObservable intercept(HttpRequest request, Chain chain) { + return chain.proceed( + request.newBuilder() + .header(USER_AGENT, "Styx/1.0 just testing plugins") + .build() + ); } +} +``` ### Synchronously transforming response -In this example, we will synchronously transom the response by adding an "X-Foo" header to it. - -Use Rx Java map() to transform a response object synchronously. - -Remember that chain.proceed() returns an Rx Observable of HttpResponse. We'll use map() to register a callback that Java -Rx framework calls back once the response becomes available. In the example below, it is the lambda expression -HttpResponse -> HttpResponse that is called when response is available. The lambda expression constructs a new -HttpResponse object with the new header added. - -The map() itself returns a new Rx Observable, satisfying the type signature for intercept() method. -Therefore it can be returned as such from intercept(). - - import com.hotels.styx.api.HttpInterceptor; - import com.hotels.styx.api.HttpRequest; - import com.hotels.styx.api.HttpResponse; - - public class FooAppendingInterceptor implements HttpInterceptor { - @Override - public Observable intercept(HttpRequest request, Chain chain) { - return chain.proceed(request) - .map(response -> response.newBuilder() - .header("X-Foo", "Bar") - .build()); - } - } - +This example demonstrates how to synchronously transform a HTTP response. We will +use a `StyxObservable.map` method to add an "X-Foo" header to the response. + +```java +import com.hotels.styx.api.HttpInterceptor; +import com.hotels.styx.api.HttpRequest; +import com.hotels.styx.api.HttpResponse; +import com.hotels.styx.api.StyxObservable; +import com.hotels.styx.api.plugins.spi.Plugin; + +public class SyncResponsePlugin implements Plugin { + @Override + public StyxObservable intercept(HttpRequest request, HttpInterceptor.Chain chain) { + return chain.proceed(request) + .map(response -> response.newBuilder() + .header("X-Foo", "bar") + .build() + ); + } +} +``` ### Asynchronously transform request object -Sometimes it is necessary to transform the request asynchronously. For example, it may be necessary to consult an -external service(s) like databases and such to look up necessary information for the request transformation. In this case: -1. Perform a non-blocking call to an asynchronous API. It is best to wrap such API behind Rx Java observables. -2. Call the chain.proceed(request) when the the asynchronous operation completes. -3. Because chain.proceed() itself is an asynchronous operation returning Observable[HttpResponse], it needs to be called from inside Rx flatMap() operator. - -In the following example, the interceptor either allows or rejects the request based on a query to an external service. -The API for external service is wrapped behind requestAccessQueryAsynch() method. Normally a method like this would -initiate a transaction to the external service, and return a Future, Promise, Rx Observable, etc. of the outcome. -For sake of simplicity, here it returns an Observable.just(true). - -The flatMap() operator allows us to register a function that itself performs an asynchronous operation. In our example, -requestAccessQueryAsynch() returns back an Observable[Boolean] indicating if the access is granted or not. When the -access query completes, Rx framework calls out to the registered function, which in this example is a lambda expression -of type Boolean => Observable[HttpResponse]. The lambda expression transforms the access grant outcome into a HttpResponse -object. This is an asynchronous transformation, because the lambda expression calls out to asynchronous chain.proceed(). - -Our lambda expression looks into the access grant outcome. When true, the request is proxied onwards by a call to -chain.proceed(request). Otherwise a FORBIDDEN response is returned. - - import com.hotels.styx.api.HttpInterceptor; - import com.hotels.styx.api.HttpRequest; - import com.hotels.styx.api.HttpResponse; - ... - - public class AsyncRequestDelayInterceptor implements HttpInterceptor { - private static final HttpResponse rejected = HttpResponse.Builder.response(FORBIDDEN).build(); - @Override - public Observable intercept(HttpRequest request, Chain chain) { - return requestAccessQueryAsync(request) - .flatMap((allowed) -> allowed ? chain.proceed(request) : Observable.just(rejected)); - } +Sometimes it is necessary to transform the request asynchronously. For example, you may need to +look up external key value stores to parameterise the request transformation. The example below +shows how to modify a request URL path based on an asynchronous lookup to an external database. + +```java +import com.hotels.styx.api.HttpInterceptor; +import com.hotels.styx.api.HttpRequest; +import com.hotels.styx.api.HttpResponse; +import com.hotels.styx.api.StyxObservable; +import com.hotels.styx.api.Url; +import com.hotels.styx.api.plugins.spi.Plugin; + +import java.util.concurrent.CompletableFuture; + +public class AsyncRequestInterceptor implements Plugin { + + @Override + public StyxObservable intercept(HttpRequest request, HttpInterceptor.Chain chain) { - private Observable requestAccessQueryAsync(HttpRequest request) { - // do something asynchronous - // ... - return Observable.create((downstream) -> { - CompletableFuture.supplyAsync(() -> { - downstream.onNext("foo"); - downstream.onComplete(); - }); - }); - } + return asyncUrlReplacement(request.url()) // (1) + .map(newUrl -> request.newBuilder() // (4) + .url(newUrl) + .build()) + .flatMap(chain::proceed); // (5) } - + + private static StyxObservable asyncUrlReplacement(Url url) { + return StyxObservable.from(pathReplacementService(url.path())) // (3) + .map(newPath -> new Url.Builder(url) + .path(newPath) + .build()); + } + + private static CompletableFuture pathReplacementService(String url) { + // Pretend to make a call here: + return CompletableFuture.completedFuture("/replacement/path"); // (2) + } +} +``` + +Step 1. We call the `asyncUrlReplacement`, which returns a `StyxObservable`. +The `asyncUrlReplacement` wraps a call to the remote service and converts +the outcome into a `StyxObservable`, which is the basis for our response observable. + +Step 2. A call to `pathReplacementService` makes a non-blocking call to the remote key/value store. +Well, at least we pretend to call the key value store, but in this example we'll just return a +completed future of a constant value. + +Step 3. `CompletableFuture` is converted to `StyxObservable`, so that other asynchronous +operations like `chain.proceed` can be bound to it later on. + +Step 4. The eventual outcome from the `asyncUrlReplacement` yields a new, modified URL instance. +We'll transform the `request` by substituting the URL path with the new one. +This is a quick synchronous operation so we'll do it in a `map` operator. + +Step 5. Finally, we will bind the outcome from `chain.proceed` into the response observable. +Remember that `chain.proceed` returns an `Observable` it is therefore +interface compatible and can be `flatMap`'d to the response observable. The resulting +response observable chain is returned from the `intercept`. + + ### Asynchronously transform response object -Processing a HTTP response asynchronously is very similar to processing response synchronously. Instead of Rx map() -operator, you'll use a flatMap(). In this example, we assume asyncOperation() makes a network call, performs I/O, -or off-loads response processing to a separate thread pool. For this reason it returns an Observable[HttpResponse] -so that it can emit a transformed response once completed. - -The Rx framework calls asyncOperation() once the the response becomes available from the adjacent plugin. A call to -asyncOperation() triggers another asynchronous operation, whose result becomes available via the returned observable, -which is linked to the Observable chain using the Rx flatMap() operator. - - - import rx.Observable; - import rx.schedulers.Schedulers; - import rx.lang.scala.ImplicitFunctionConversions.*; - import com.hotels.styx.api.HttpInterceptor; - import com.hotels.styx.api.HttpRequest; - import com.hotels.styx.api.HttpResponse; - import com.hotels.styx.api.HttpInterceptor.Chain; - ... - - public class AsyncResponseProcessingPlugin implements HttpInterceptor { - public Observable intercept(HttpRequest request, Chain chain) { - return chain.proceed(request).flatMap(this::asyncOperation); - } - - private Observable asyncOperation(HttpResponse response) { - return Observable.just(response); - } - } - -## Transforming HTTP Content +This example demonstrates asynchronous response processing. Here we pretend that `callTo3rdParty` method +makes a non-blocking request to retrieve string that is inserted into a response header. + +A `callTo3rdParty` returns a `CompletableFuture` which asynchronously produces a string value. We +will call this function when the HTTP response is received. + +```java +import com.hotels.styx.api.HttpInterceptor; +import com.hotels.styx.api.HttpRequest; +import com.hotels.styx.api.HttpResponse; +import com.hotels.styx.api.StyxObservable; +import com.hotels.styx.api.plugins.spi.Plugin; + +import java.util.concurrent.CompletableFuture; + +public class AsyncResponseInterceptor implements Plugin { + private static final String X_MY_HEADER = "X-My-Header"; + + @Override + public StyxObservable intercept(HttpRequest request, HttpInterceptor.Chain chain) { + return chain.proceed(request) // (1) + .flatMap(response -> // (3) + StyxObservable + .from(thirdPartyHeaderService(response.header(X_MY_HEADER).orElse("default"))) // (1) + .map(value -> // (4) + response.newBuilder() + .header(X_MY_HEADER, value) + .build()) + ); + } -HTTP content can be processed in a streaming fashion, or it can be aggregated into one big blob and decoded into a -business domain object. Here we will explain how to do this. And as always, this can be done in a synchronous or -asynchronous fashion. + private static CompletableFuture thirdPartyHeaderService(String myHeader) { + // Pretend to make a call here: + return CompletableFuture.completedFuture("value"); + } +} +``` -Content transformation always involves creating a new copy of a proxied HTTP object and overriding its body content. -Because the body itself is an Observable, it can be transformed using similar techniques as demonstrated for -the HTTP headers above. The example below transforms the HTTP response content in as streaming fashion using a -transformContent() function. The new response is created by invoking a response.newBuilder() method. Using the -resulting builder, the response body is overridden. +Step 1. We start by calling `chain.proceed(request)` to obtain a response observable. - chain.proceed(request) - .map((HttpResponse response) -> response.newBuilder() - .body(response.body().content().map(this::transformContent)) - .build() - ); +Step 2. A `callTo3rdParty` returns a CompletableFuture. We use `StyxObservable.from` to convert it +into a `StyxObservable` so that it can be bound to the response observable. -As always, the content can be transformed both synchronously or asynchronously. In this document we will explore all the options. +Step 3. The `flatMap` operator binds `callToThirdParty` into the response observable. -Content can be decoded to a business domain object for further processing. +Step 4. We will transform the HTTP response by inserting an outcome of `callToThirdParty`, or `value`, +into the response headers. -### Decode content into business domain objects -Decoding content into a business domain object is always an asynchronous operation. This is because Styx must wait -until it has received enough HTTP content to attempt decoding content. +## HTTP Content Transformations -HttpResponse object has a decode() method that can be used to decode HTTP response content into business domain objects. -The decode() method has the following type: +Styx exposes HTTP messages to interceptors as streaming `HttpRequest` and `HttpResponse` messages. +In this form, the interceptors can process the content in a streaming fashion. That is, they they +can look into, and modify the content as it streams through. - public Observable> decode(Function decoder, int maxContentBytes) - -The decode() takes two arguments: a decoder function, and a maxContentBytes integer. It returns an Observable> -which is underlines its asynchronous nature. The decode() accumulates entire HTTP response content into a single aggregated -byte buffer, and then calls the supplied decoder method passing in the aggregated buffer as a parameter. The decoder -is a function, or typically a lambda expression supplied by the plugin, that converts the aggregated http content into -a business domain object of type T. Styx will take care of managing all aspects related to direct memory buffers, such -as releasing them if necessary. +Alternatively, streaming messages can be aggregated into a `FullHttpRequest` or `FullHttpResponse` +messages. The full HTTP message body is then available for the interceptor to use. Note that content +aggregation is always an asynchronous operation. This is because the streaming HTTP message is +exposing the content, in byte buffers, as it arrives from the network, and Styx must wait until +all content has been received. -The maxContentBytes is a safety valve that prevents styx from accumulating too much content. Styx will only accumulate -up to maxContentBytes of content. If the content exceeds this amount, Styx will emit a ContentOverflowException. -This is to prevent out of memory exceptions in face of very large responses. -Because decode() itself is an asynchronous operation, the plugin must call it inside a flatMap() operator. -When the HTTP response has arrived, and its content decoded, a DecodedResponse instance is emitted. The DecodedResponse -contains the decoded business domain object along with a HTTP response builder that can be used for further transformations. +### Aggregating Content into Full Messages -In the example below, we will map over the decodedResponse to add a new "bytes_aggregated" header to the response, containing -a string length (as opposed to content length) of the received response content. Note that it is necessary to add the -response body back to the new response by calling decodedResponse.responseBuilder().body(). +```java +import com.hotels.styx.api.HttpRequest; +import com.hotels.styx.api.HttpResponse; +import com.hotels.styx.api.StyxObservable; +import com.hotels.styx.api.plugins.spi.Plugin; - - @Override - public Observable intercept(HttpRequest request, Chain chain) { - return chain.proceed(request) - .flatMap(response -> response.decode((byteBuf) -> byteBuf.toString(UTF_8), maxContentBytes)) - .map(decodedResponse -> decodedResponse.responseBuilder() - .header("test_plugin", "yes") - .header("bytes_aggregated", decodedResponse.body().length()) - .body(decodedResponse.body()) - .build()); - } - -### Synchronously transform streaming request content - -In the following example, we will transform the HTTP response content to upper case letters. - -Because the transformation to upper case does not involve IO or blocking operations, it can be done synchronously with -Rx map() operator. First we create a new observable called toUpperCase which contains the necessary transformation on -the body observable. Then we create a new HTTP request object passing in the transformed observable as a new body. - -Notice that the lambda expression for the map() operator receives a ByteBuf buf as an argument. We must assume it is -a reference counted buffer, and to avoid leaks we must call buf.release() before returning a copy from the lambda expression. - - import com.hotels.styx.api.HttpRequest; - import com.hotels.styx.api.HttpResponse; - import io.netty.buffer.ByteBuf; - import rx.Observable; - import static com.google.common.base.Charsets.UTF_8; - import static io.netty.buffer.Unpooled.copiedBuffer; - - ... - - @Override - public Observable intercept(HttpRequest request, Chain chain) { - Observable toUpperCase = request.body() - .content() - .map(buf -> { - String transformed = buf.toString(UTF_8).toUpperCase(); - buf.release(); - return copiedBuffer(transformed, UTF_8); - }); - - return chain.proceed( - request.newBuilder() - .body(toUpperCase) - .build() - ); - } - -### [DRAFT] Replace the HTTP response content with a new body and discarding the existing one +public class RequestAggregationPlugin implements Plugin { + private static final int MAX_CONTENT_LENGTH = 100000; -In the following example the plugin replaces the entire HTTP response content with a custom response content. + @Override + public StyxObservable intercept(HttpRequest request, Chain chain) { + return request.toFullRequest(MAX_CONTENT_LENGTH) + .map(fullRequest -> fullRequest.newBuilder() + .body(new byte[0], true) + .build()) + .flatMap(fullRequest -> chain.proceed(fullRequest.toStreamingRequest())); + } +} +``` -In this scenario the plugin creates a new HttpResponse based on the existing one. However changing entirely the response -content can be a source of memory leaks if the current content is not programmatically released. This is because the -Observable content resides in the direct memory whereas the rest of the plugin will be in the heap. If the content will -not be manually released it will remain in the direct memory and eventually it will cause Styx to fail with a: +`maxContentBytes` is a safety valve that prevents Styx from accumulating too much content +and possibly running out of memory. Styx only accumulates up to `maxContentBytes` of content. +`toFullRequest` fails when the content stream exceeds this amount, and Styx emits a +`ContentOverflowException`, -java.lang.OutOfMemoryError: Direct buffer memory -In order to avoid memory leaks it is necessary to release the existing ByteBuf by using the doOnNext() operator which -creates a new Observable with a side-effect behavior. In this specific case the side-effect is a call to a function which -releases the reference count. Make sure to subscribe to this new Observable otherwise the function will not be executed. - - import com.hotels.styx.api.HttpRequest; - import com.hotels.styx.api.HttpResponse; - import com.hotels.styx.api.plugins.spi.Plugin; - import io.netty.util.ReferenceCountUtil; - import rx.Observable; - - ... - - @Override - public Observable intercept(HttpRequest request, Chain chain) { - Observable responseObservable = chain.proceed(request); - return responseObservable.map(response -> { - response.body().content().doOnNext(byteBuf -> ReferenceCountUtil.release(byteBuf)).subscribe(); - return response.newBuilder().body("Custom HTTP response content").build(); - }); - } +### Transformations on Streaming HTTP response content + +Streaming HTTP body content can be transformed both synchronously and asynchronously. +However there are some pitfalls you need to know: + + - Reference counting. Styx exposes the content stream as an observable of reference counted + Netty `ByteBuf` objects. Ensure the reference counts are correctly decremented when + buffers are transformed. + + - Continuity (or discontinuity) of Styx content observable. Each content tranformation with + `map` or `flatMap` is a composition of some source observable. So is each content transformation + linked to some source observable, an ultimate source being the Styx server core. + It is the consumer's responsibility to ensure this link never gets broken. That is, you + are not allowed to just substitute the content observable with another one, unless it composes + to the previous content source. + + +### Synchronously transforming streaming request content + +In this rather contrived example we will transform HTTP response content to +upper case letters. + +Streaming `HttpRequest` content is a byte buffer stream. The stream can be accessed +with a `request.body()` method and its data type is `StyxObservable`. + +Because `toUpperCase` is non-blocking transformation we can compose it to the original +byte stream using a `map` operator. + +The mapping function decodes the original byte buffer `buf` to an UTF-8 encoded +Java `String`, and copies its upper case representation into a newly created byte buffer. +The old byte buffer `buf` is discarded. + +Because `buf` is a Netty reference counted `ByteBuf`, we must take care to decrement its +reference count by calling `buf.release()`. + +```java +public class MyPlugin extends Plugin { + @Override + public StyxObservable intercept(HttpRequest request, Chain chain) { + StyxObservable toUpperCase = request.body() + .map(buf -> { + buf.release(); + return copiedBuffer(buf.toString(UTF_8).toUpperCase(), UTF_8); + }); + + return chain.proceed( + request.newBuilder() + .body(toUpperCase) + .build() + ); + } +} +``` + +### Asynchronously transforming streaming request content + +Asynchronous content stream transformation is very similar to the synchronous transformation. +The only difference is that asynchronous transformation must be composed with a `flatMap` +instead of `map`. Otherwise the same discussion apply. As always it is important to take care +of the reference counting. + +```java +public class AsyncRequestContentTransformation implements Plugin { + + @Override + public StyxObservable intercept(HttpRequest request, Chain chain) { + StyxObservable contentMapping = request.body() + .flatMap(buf -> { + String content = buf.toString(UTF_8); + buf.release(); + return sendToRemoteServer(content) + .map(value -> copiedBuffer(value, UTF_8)); + }); + + return chain.proceed( + request.newBuilder() + .body(contentMapping) + .build() + ); + } + + StyxObservable sendToRemoteServer(String buf) { + return StyxObservable.of("modified 3rd party content"); + } +} +``` + +### Synchronously transforming streaming response content + +```java +public class AsyncResponseContentStreamTransformation implements Plugin { + @Override + public StyxObservable intercept(HttpRequest request, Chain chain) { + return chain.proceed(request) + .map(response -> { + StyxObservable contentMapping = response.body() + .map(buf -> { + buf.release(); + return copiedBuffer(buf.toString(UTF_8).toUpperCase(), UTF_8); + }); + return response.newBuilder() + .body(contentMapping) + .build(); + }); + } +} +``` + +### Asynchronously transforming streaming response content + +```java +public class AsyncResponseContentStreamTransformation implements Plugin { + @Override + public StyxObservable intercept(HttpRequest request, Chain chain) { + return chain.proceed(request) + .map(response -> { + StyxObservable contentMapping = response.body() + .flatMap(buf -> { + String content = buf.toString(UTF_8); + buf.release(); + return sendToRemoteServer(content) + .map(value -> copiedBuffer(value, UTF_8)); + }); + return response.newBuilder() + .body(contentMapping) + .build(); + }); + } + + StyxObservable sendToRemoteServer(String buf) { + return StyxObservable.of("modified 3rd party content"); + } +} +``` \ No newline at end of file