diff --git a/CONTRIBUTORS.md b/CONTRIBUTORS.md index d5cc7f8..cbc29a5 100644 --- a/CONTRIBUTORS.md +++ b/CONTRIBUTORS.md @@ -2,6 +2,6 @@ These folks have contributed code and time to grpc-jersey. -- xorlev +- xorlev (Google LLC) - sypticus - gfecher \ No newline at end of file diff --git a/LICENSE b/LICENSE index 12414a7..35f02be 100644 --- a/LICENSE +++ b/LICENSE @@ -1,4 +1,4 @@ -Copyright 2017 Michael Rose and contributors. +Copyright 2017 Michael Rose, Google LLC, and contributors. Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/README.md b/README.md index e08ebc5..e948ce3 100644 --- a/README.md +++ b/README.md @@ -6,6 +6,7 @@ used by the [grpc-gateway](https://github.com/grpc-ecosystem/grpc-gateway) proje * [Example Usage](#example-usage) * [Operation modes](#operation-modes) * [HTTP and gRPC](#http-and-grpc) + * [Working with HTTP headers](#working-with-http-headers) * [Streaming RPCs](#streaming-rpcs) * [Error handling](#error-handling) * [Error Translation](#error-translation) @@ -264,6 +265,87 @@ data: {"request":{"s":"hello","uint3":0,"uint6":"0","int3":2,"int6":"0","bytearr data: {"request":{"s":"hello","uint3":0,"uint6":"0","int3":2,"int6":"0","bytearray":"","boolean":false,"f":0.0,"d":0.0,"enu":"FIRST","rep":[],"repStr":[]}} ``` +## Working with HTTP headers + +_NOTE:_ This only works for uses using the "proxy" configuration. Direct invocation mode does not support HTTP header +manipulation. + +grpc-jersey allows you to use and manipulate HTTP headers from within your RPC handler. To do so, you'll need to install +a server interceptor into your RPC stack. If you're using the recommended "dual-stack" configuration, you can modify it +like so: + +```java + // Service stack. This is where you define your interceptors. + ServerServiceDefinition serviceStack = ServerInterceptors.intercept( + GrpcJerseyPlatformInterceptors.intercept(new EchoTestService()), + new GrpcLoggingInterceptor(), + new MyAuthenticationInterceptor() // your interceptor stack. + ); +``` + +Preferably, you'll want to use the helper provided by `GrpcJerseyPlatformInterceptors`. Future platform interceptors +will be added here automatically, allowing your code to remain the same and take advantage of new functionality. +However, you can also use just the HttpHeaderInterceptor directly should you desire: + +```java + // Service stack. This is where you define your interceptors. + ServerServiceDefinition serviceStack = ServerInterceptors.intercept( + new EchoTestService(), + HttpHeaderInterceptors.serverInterceptor(), + new GrpcLoggingInterceptor(), + new MyAuthenticationInterceptor() // your interceptor stack. + ); +``` + +The client interceptor is automatically attached to your stubs by code generation after grpc-jersey 0.3.0. + +To read & manipulate HTTP headers, below is an example right from the `EchoTestService` in this project: + +```java + @Override + public void testMethod3(TestRequest request, StreamObserver responseObserver) { + for (Map.Entry header : HttpHeaderContext.requestHeaders().entries()) { + if (header.getKey().startsWith("grpc-jersey")) { + HttpHeaderContext.setResponseHeader(header.getKey(), header.getValue()); + } + } + + responseObserver.onNext(TestResponse.newBuilder().setRequest(request).build()); + responseObserver.onCompleted(); + } +``` + +`HttpHeaderContext` is your interface into the HTTP headers. You can see request headers with +`HttpHeaderContext.requestHeaders()` and set response headers with +`HttpHeaderContext.setResponseHeader(headerName, headerValue)` or +`HttpHeaderContext.addResponseHeader(headerName, headerValue)`, the former setting a single value (or list of values), +clearing existing ones, and the latter adding values. You can use `HttpHeaderContext.clearResponseHeader(headerName)` +or `HttpHeaderContext.clearResponseHeaders()` to remove header state. **Note:** this API is considered beta and may +change in the future. + +While `HttpHeaderContext` is gRPC Context-aware and request headers can be safely accessed from background threads +executed with an attached context, manipulating response headers should only be done from a single thread as no effort +is put into synchronizing the state. + +### Streaming + +Streaming RPCs will apply any headers set before the first message is sent or before the stream is closed if no messages +are sent. + +### Errors + +Headers are optionally applied by the GrpcJerseyErrorHandler on error (for unary RPCs). The default (provided) +implementation will honor headers set by the RPC handler on all error responses. + +Additionally, as per the caveats with streaming RPCs in general, any additional headers added to an in-progress +stream will be ignored, as headers can only be sent once in HTTP/1.x's common implementations, only headers present +before the first message (or close/error) will be applied. + +### Headers in the main gRPC Metadata (deprecated) + +HTTP request headers are read into the main gRPC Metadata when using the "proxy" mode by default, however this is +considered deprecated behavior. Utilizing the new `HttpHeaderContext` is the supported method. + ## Error handling grpc-jersey will translate errors raised inside your RPC handler. However, there is some nuance with regards to using @@ -402,6 +484,13 @@ instances. ## Releases +0.3.0 + - First-class HTTP header support. HTTP request headers are read into and attached to the gRPC Context. Likewise, + response headers can be controlled from within your RPC handler. See + [Working with HTTP headers](#working-with-http-headers). [#23](https://github.com/Xorlev/grpc-jersey/pull/23) + - **Breaking change:** API of GrpcJerseyErrorHandler has changed. If you haven't implemented a custom error handler, + this doesn't affect you. If so, please migrate your handler to the new API. + 0.2.0 - Server-to-client RPC streaming support. [#14](https://github.com/Xorlev/grpc-jersey/pull/14) - `ALREADY_EXISTS` gRPC error code now maps to `409 Conflict`. @@ -443,8 +532,9 @@ Short-term roadmap: - [X] Server streaming - [ ] Client streaming - [ ] BiDi streaming (true bidi streaming is impossible without websockets) -- [ ] Direct control of HTTP headers -- [ ] Deadline handling +- [x] Direct control of HTTP headers +- [ ] Out of the box CORS support +- [ ] Better deadline handling Long-term roadmap: - Potentially replace Jersey resources with servlet filter. This would make streaming easier. diff --git a/build.gradle b/build.gradle index eb61a56..291613c 100644 --- a/build.gradle +++ b/build.gradle @@ -16,6 +16,7 @@ plugins { ext { grpcVersion = "1.8.0" + jerseyVersion = "2.25.1" lombokVersion = "1.16.18" protobufVersion = "3.5.0" } @@ -92,8 +93,9 @@ project(":jersey-rpc-support") { compile "com.google.protobuf:protobuf-java-util:${protobufVersion}" compile "io.grpc:grpc-protobuf:${grpcVersion}" compile "io.grpc:grpc-stub:${grpcVersion}" + compile "javax.servlet:javax.servlet-api:3.1.0" compile "javax.ws.rs:javax.ws.rs-api:2.0.1" - provided "org.glassfish.jersey.core:jersey-server:2.25" + provided "org.glassfish.jersey.core:jersey-server:${jerseyVersion}" } protobuf { @@ -137,7 +139,7 @@ project(":protoc-gen-jersey") { compile "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.8.5" compile "com.fasterxml.jackson.core:jackson-databind:2.8.5" - testCompile 'org.glassfish.jersey.core:jersey-common:2.24.1' + testCompile "org.glassfish.jersey.core:jersey-common:${jerseyVersion}" } protobuf { @@ -263,9 +265,7 @@ project(":integration-test-base") { testCompile('io.dropwizard:dropwizard-testing:1.0.5') { exclude group: 'org.eclipse.jetty' } - testCompile 'org.glassfish.jersey.test-framework.providers:jersey-test-framework-provider-jetty:2.23.2' - - + testCompile "org.glassfish.jersey.test-framework.providers:jersey-test-framework-provider-grizzly2:${jerseyVersion}" } protobuf { @@ -310,7 +310,7 @@ project(":integration-test-serverstub") { exclude group: 'org.eclipse.jetty' } testCompile group: 'io.dropwizard', name: 'dropwizard-jetty', version: '0.9.3' - testCompile 'org.glassfish.jersey.test-framework.providers:jersey-test-framework-provider-jetty:2.25.1' + testCompile "org.glassfish.jersey.test-framework.providers:jersey-test-framework-provider-grizzly2:${jerseyVersion}" } protobuf { @@ -366,9 +366,7 @@ project(":integration-test-proxy") { exclude group: 'org.eclipse.jetty' } testCompile group: 'io.dropwizard', name: 'dropwizard-jetty', version: '0.9.3' - testCompile 'org.glassfish.jersey.test-framework.providers:jersey-test-framework-provider-jetty:2.25.1' - - + testCompile "org.glassfish.jersey.test-framework.providers:jersey-test-framework-provider-grizzly2:${jerseyVersion}" } protobuf { diff --git a/gradle.properties b/gradle.properties index 43f1c9f..76601fa 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1 +1 @@ -version=0.2.1-SNAPSHOT +version=0.3.0-SNAPSHOT diff --git a/integration-test-base/src/test/java/com/fullcontact/rpc/jersey/EchoTestService.java b/integration-test-base/src/test/java/com/fullcontact/rpc/jersey/EchoTestService.java index 77bc5e3..033249a 100644 --- a/integration-test-base/src/test/java/com/fullcontact/rpc/jersey/EchoTestService.java +++ b/integration-test-base/src/test/java/com/fullcontact/rpc/jersey/EchoTestService.java @@ -3,13 +3,17 @@ import com.fullcontact.rpc.TestRequest; import com.fullcontact.rpc.TestResponse; import com.fullcontact.rpc.TestServiceGrpc; +import com.google.common.collect.ImmutableMultimap; import com.google.protobuf.util.Durations; import com.google.rpc.DebugInfo; import com.google.rpc.RetryInfo; +import io.grpc.Context; import io.grpc.Metadata; import io.grpc.Status; import io.grpc.stub.StreamObserver; +import java.util.Map; + /** * gRPC service that echos the request into the response * @@ -30,6 +34,12 @@ public void testMethod2(TestRequest request, StreamObserver respon @Override public void testMethod3(TestRequest request, StreamObserver responseObserver) { + for (Map.Entry header : HttpHeaderContext.requestHeaders().entries()) { + if (header.getKey().startsWith("grpc-jersey")) { + HttpHeaderContext.setResponseHeader(header.getKey(), header.getValue()); + } + } + responseObserver.onNext(TestResponse.newBuilder().setRequest(request).build()); responseObserver.onCompleted(); } @@ -54,6 +64,8 @@ public void testMethod6(TestRequest request, StreamObserver respon @Override public void streamMethod1(TestRequest request, StreamObserver responseObserver) { + HttpHeaderContext.addResponseHeader("X-Stream-Test", "Hello, World!"); + for(int i = 0; i < request.getInt3(); i++) { responseObserver.onNext(TestResponse.newBuilder().setRequest(request).build()); diff --git a/integration-test-base/src/test/java/com/fullcontact/rpc/jersey/IntegrationBase.java b/integration-test-base/src/test/java/com/fullcontact/rpc/jersey/IntegrationBase.java index 69e3f05..a695c6f 100644 --- a/integration-test-base/src/test/java/com/fullcontact/rpc/jersey/IntegrationBase.java +++ b/integration-test-base/src/test/java/com/fullcontact/rpc/jersey/IntegrationBase.java @@ -30,6 +30,8 @@ public abstract class IntegrationBase { public abstract ResourceTestRule resources(); + public abstract boolean supportsHttpHeaders(); + @Test public void testBasicGet() throws Exception { // /users/{s}/{uint3}/{nt.f1} @@ -168,16 +170,20 @@ public void testPost__nestedBinding() throws Exception { @Test public void testAdvancedGet() throws Exception { // /users/{s=hello/**}/x/{uint3}/{nt.f1}/*/**/test - String responseJson = resources().getJerseyTest() + Response httpResponse = resources().getJerseyTest() .target("/users/hello/string1/test/x/1234/abcd/foo/bar/baz/test") .queryParam("d", 1234.5678) .queryParam("enu", "SECOND") .queryParam("uint3", "5678") // ensure path param has precedence .queryParam("x", "y") .request() + .header("grpc-jersey-Test", "Header") + .header("grpc-jersey-TestList", "1") + .header("grpc-jersey-TestList", "2") .buildGet() - .invoke(String.class); + .invoke(); + String responseJson = httpResponse.readEntity(String.class); TestResponse.Builder responseFromJson = TestResponse.newBuilder(); JsonFormat.parser().merge(responseJson, responseFromJson); TestResponse response = responseFromJson.build(); @@ -187,6 +193,15 @@ public void testAdvancedGet() throws Exception { assertThat(response.getRequest().getD()).isEqualTo(1234.5678); assertThat(response.getRequest().getEnu()).isEqualTo(TestEnum.SECOND); assertThat(response.getRequest().getNt().getF1()).isEqualTo("abcd"); + + if (supportsHttpHeaders()) { + assertThat(httpResponse.getStringHeaders().getFirst("grpc-jersey-Test")).isEqualTo("Header"); + + // According to http://www.w3.org/Protocols/rfc2616/rfc2616-sec4.html#sec4.2, multiple header values can be + // combined into a comma-separated list, but it doesn't say anything about parsing them back out again. + // grpc-jersey chooses to maintain Jersey's behavior and not impose its own header parsing code on top. + assertThat(httpResponse.getStringHeaders().get("grpc-jersey-TestList")).containsExactly("1,2"); + } } @Test @@ -217,7 +232,7 @@ public void testAdvancedGet__defaultEnumInResponse() throws Exception { public void testAdvancedGetFromYaml() throws Exception { // /yaml_users/{s=hello/**}/x/{uint3}/{nt.f1}/*/**/test String responseJson = resources().getJerseyTest() - .target("/yaml_users/hello/string1/test/x/1234/abcd/foo/bar/baz/test") + .target("/yaml_users/hello/string1/test/x/1234/testAdvancedGetFromYaml/foo/bar/baz/test") .queryParam("d", 1234.5678) .queryParam("enu", "SECOND") .queryParam("uint3", "5678") // ensure path param has precedence @@ -234,14 +249,14 @@ public void testAdvancedGetFromYaml() throws Exception { assertThat(response.getRequest().getUint3()).isEqualTo(1234); assertThat(response.getRequest().getD()).isEqualTo(1234.5678); assertThat(response.getRequest().getEnu()).isEqualTo(TestEnum.SECOND); - assertThat(response.getRequest().getNt().getF1()).isEqualTo("abcd"); + assertThat(response.getRequest().getNt().getF1()).isEqualTo("testAdvancedGetFromYaml"); } @Test public void testBasicGetFromYaml() throws Exception { // /yaml_users/{s}/{uint3}/{nt.f1} String responseJson = resources().getJerseyTest() - .target("/yaml_users/string1/1234/abcd") + .target("/yaml_users/string1/1234/testBasicGetFromYaml") .request() .buildGet() .invoke(String.class); @@ -252,7 +267,7 @@ public void testBasicGetFromYaml() throws Exception { assertThat(response.getRequest().getS()).isEqualTo("string1"); assertThat(response.getRequest().getUint3()).isEqualTo(1234); - assertThat(response.getRequest().getNt().getF1()).isEqualTo("abcd"); + assertThat(response.getRequest().getNt().getF1()).isEqualTo("testBasicGetFromYaml"); assertThat(false); } @@ -339,9 +354,157 @@ public void testStreamGet() throws Exception { assertThat(count).isEqualTo(10); } + @Test + public void testStreamGet_noMessages_returnsHeaders() throws Exception { + if (!supportsHttpHeaders()) { + return; + } + + Response response = resources().getJerseyTest() + .target("/stream/hello") + .queryParam("d", 1234.5678) + .queryParam("enu", "SECOND") + .queryParam("int3", "0") + .queryParam("x", "y") + .queryParam("nt.f1", "abcd") + .request() + .buildGet() + .invoke(); + + assertThat(response.getStatus()).isEqualTo(200); + + BufferedReader reader = new BufferedReader(new InputStreamReader(response.readEntity(InputStream.class))); + + int count = 0; + long now = System.currentTimeMillis(); + while(true) { + String json = reader.readLine(); + + if(Strings.isNullOrEmpty(json)) + break; + + TestResponse.Builder responseFromJson = TestResponse.newBuilder(); + JsonFormat.parser().merge(json, responseFromJson); + TestResponse r = responseFromJson.build(); + + assertThat(r.getRequest().getS()).isEqualTo("hello"); + assertThat(r.getRequest().getInt3()).isEqualTo(10); + assertThat(r.getRequest().getD()).isEqualTo(1234.5678); + assertThat(r.getRequest().getEnu()).isEqualTo(TestEnum.SECOND); + assertThat(r.getRequest().getNt().getF1()).isEqualTo("abcd"); + + count++; + + long after = System.currentTimeMillis(); + long duration = after - now; + + // This might be flaky, but we want to ensure that we're actually streaming + assertThat(duration).isLessThan(1000/2); + now = after; + } + + assertThat(count).isEqualTo(0); + + assertThat(response.getHeaderString("X-Stream-Test")).isEqualTo("Hello, World!"); + } + + + @Test + public void testStreamGet_withMessages_returnsHeaders() throws Exception { + if (!supportsHttpHeaders()) { + return; + } + + Response response = resources().getJerseyTest() + .target("/stream/hello") + .queryParam("d", 1234.5678) + .queryParam("enu", "SECOND") + .queryParam("int3", "10") + .queryParam("x", "y") + .queryParam("nt.f1", "abcd") + .request() + .buildGet() + .invoke(); + + assertThat(response.getStatus()).isEqualTo(200); + + BufferedReader reader = new BufferedReader(new InputStreamReader(response.readEntity(InputStream.class))); + + int count = 0; + long now = System.currentTimeMillis(); + while(true) { + String json = reader.readLine(); + + if(Strings.isNullOrEmpty(json)) + break; + + TestResponse.Builder responseFromJson = TestResponse.newBuilder(); + JsonFormat.parser().merge(json, responseFromJson); + TestResponse r = responseFromJson.build(); + + assertThat(r.getRequest().getS()).isEqualTo("hello"); + assertThat(r.getRequest().getInt3()).isEqualTo(10); + assertThat(r.getRequest().getD()).isEqualTo(1234.5678); + assertThat(r.getRequest().getEnu()).isEqualTo(TestEnum.SECOND); + assertThat(r.getRequest().getNt().getF1()).isEqualTo("abcd"); + + count++; + + long after = System.currentTimeMillis(); + long duration = after - now; + + // This might be flaky, but we want to ensure that we're actually streaming + assertThat(duration).isLessThan(1000/2); + now = after; + } + + assertThat(count).isEqualTo(10); + + assertThat(response.getHeaderString("X-Stream-Test")).isEqualTo("Hello, World!"); + } + + + @Test + public void testStreamGet_immediateError_returnsHeaders() throws Exception { + if (!supportsHttpHeaders()) { + return; + } + + Response response = resources().getJerseyTest() + .target("/stream/explode") + .queryParam("d", 1234.5678) + .queryParam("enu", "SECOND") + .queryParam("int3", "0") + .queryParam("x", "y") + .queryParam("nt.f1", "abcd") + .request() + .buildGet() + .invoke(); + + assertThat(response.getStatus()).isEqualTo(200); + + BufferedReader reader = new BufferedReader(new InputStreamReader(response.readEntity(InputStream.class))); + + String json = reader.readLine(); + Status.Builder statusBuilder = Status.newBuilder(); + JsonFormat.parser().merge(json, statusBuilder); + + // As expected, Status loses "cause" and "details" after transmission. + // Normally, details would be set, but JsonFormat doesn't support serializing Any. + Status expected = Status + .newBuilder() + .setCode(2) + .setMessage("HTTP 500 (gRPC: UNKNOWN)") + .build(); + + assertThat(statusBuilder.build()).isEqualTo(expected); + + assertThat(response.getHeaderString("X-Stream-Test")).isEqualTo("Hello, World!"); + } + @Test public void testStreamGetStatusError() throws Exception { - InputStream response = resources().getJerseyTest() + Response response = resources().getJerseyTest() .target("/stream/grpc_data_loss") .queryParam("d", 1234.5678) .queryParam("enu", "SECOND") @@ -350,9 +513,9 @@ public void testStreamGetStatusError() throws Exception { .queryParam("nt.f1", "abcd") .request() .buildGet() - .invoke(InputStream.class); + .invoke(); - BufferedReader reader = new BufferedReader(new InputStreamReader(response)); + BufferedReader reader = new BufferedReader(new InputStreamReader(response.readEntity(InputStream.class))); // int3 controls "successful" messages. Next request will throw. for (int i = 0; i < 10; i++) { @@ -381,6 +544,10 @@ public void testStreamGetStatusError() throws Exception { .build(); assertThat(statusBuilder.build()).isEqualTo(expected); + + if (supportsHttpHeaders()) { + assertThat(response.getHeaderString("X-Stream-Test")).isEqualTo("Hello, World!"); + } } @Test diff --git a/integration-test-proxy/src/test/java/com/fullcontact/rpc/jersey/ProxyIntegration.java b/integration-test-proxy/src/test/java/com/fullcontact/rpc/jersey/ProxyIntegration.java index 4650d8a..fff3912 100644 --- a/integration-test-proxy/src/test/java/com/fullcontact/rpc/jersey/ProxyIntegration.java +++ b/integration-test-proxy/src/test/java/com/fullcontact/rpc/jersey/ProxyIntegration.java @@ -2,12 +2,11 @@ import com.fullcontact.rpc.TestServiceGrpc; import com.fullcontact.rpc.TestServiceGrpcJerseyResource; - import io.dropwizard.testing.junit.ResourceTestRule; import io.grpc.Server; import io.grpc.inprocess.InProcessChannelBuilder; import io.grpc.inprocess.InProcessServerBuilder; -import org.glassfish.jersey.test.jetty.JettyTestContainerFactory; +import org.glassfish.jersey.test.grizzly.GrizzlyWebTestContainerFactory; import org.junit.ClassRule; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -22,14 +21,13 @@ @RunWith(JUnit4.class) public class ProxyIntegration extends IntegrationBase { private static Server server = InProcessServerBuilder.forName("TestService") - .addService(new EchoTestService()) - .build(); + .addService(GrpcJerseyPlatformInterceptors.intercept(new EchoTestService())) + .build(); static { try { server.start(); - } - catch(IOException e) { + } catch (IOException e) { e.printStackTrace(); } } @@ -43,13 +41,18 @@ public class ProxyIntegration extends IntegrationBase { @ClassRule public static final ResourceTestRule resources = - ResourceTestRule.builder() - .addResource(new TestServiceGrpcJerseyResource(stub)) - .setTestContainerFactory(new JettyTestContainerFactory()) - .build(); + ResourceTestRule.builder() + .addResource(new TestServiceGrpcJerseyResource(stub)) + .setTestContainerFactory(new GrizzlyWebTestContainerFactory()) + .build(); @Override public ResourceTestRule resources() { return resources; } + + @Override + public boolean supportsHttpHeaders() { + return true; + } } diff --git a/integration-test-proxy/src/test/java/com/fullcontact/rpc/jersey/ProxyIntegrationApp.java b/integration-test-proxy/src/test/java/com/fullcontact/rpc/jersey/ProxyIntegrationApp.java index cbb3b02..def94ef 100644 --- a/integration-test-proxy/src/test/java/com/fullcontact/rpc/jersey/ProxyIntegrationApp.java +++ b/integration-test-proxy/src/test/java/com/fullcontact/rpc/jersey/ProxyIntegrationApp.java @@ -2,7 +2,6 @@ import com.fullcontact.rpc.TestServiceGrpc; import com.fullcontact.rpc.TestServiceGrpcJerseyResource; - import io.dropwizard.Application; import io.dropwizard.Configuration; import io.dropwizard.setup.Environment; @@ -23,7 +22,8 @@ public static void main(String[] args) throws Exception { @Override public void run(Configuration configuration, Environment environment) throws Exception { Server server = InProcessServerBuilder.forName("TestService") - .addService(new EchoTestService()) + .addService(GrpcJerseyPlatformInterceptors + .intercept(new EchoTestService())) .build(); server.start(); @@ -34,6 +34,8 @@ public void run(Configuration configuration, Environment environment) throws Exc .directExecutor() .build()); + + environment.jersey().register(new TestServiceGrpcJerseyResource(stub)); } } diff --git a/integration-test-serverstub/src/test/java/com/fullcontact/rpc/jersey/ServerStubIntegration.java b/integration-test-serverstub/src/test/java/com/fullcontact/rpc/jersey/ServerStubIntegration.java index fbce71f..e33c9ba 100644 --- a/integration-test-serverstub/src/test/java/com/fullcontact/rpc/jersey/ServerStubIntegration.java +++ b/integration-test-serverstub/src/test/java/com/fullcontact/rpc/jersey/ServerStubIntegration.java @@ -3,7 +3,7 @@ import com.fullcontact.rpc.TestServiceGrpcJerseyResource; import io.dropwizard.testing.junit.ResourceTestRule; -import org.glassfish.jersey.test.jetty.JettyTestContainerFactory; +import org.glassfish.jersey.test.grizzly.GrizzlyWebTestContainerFactory; import org.junit.ClassRule; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -19,11 +19,16 @@ public class ServerStubIntegration extends IntegrationBase { public static final ResourceTestRule resources = ResourceTestRule.builder() .addResource(new TestServiceGrpcJerseyResource(new EchoTestService())) - .setTestContainerFactory(new JettyTestContainerFactory()) + .setTestContainerFactory(new GrizzlyWebTestContainerFactory()) .build(); @Override public ResourceTestRule resources() { return resources; } + + @Override + public boolean supportsHttpHeaders() { + return false; + } } diff --git a/jersey-rpc-support/src/main/java/com/fullcontact/rpc/jersey/ErrorHandler.java b/jersey-rpc-support/src/main/java/com/fullcontact/rpc/jersey/ErrorHandler.java index 0d0650f..20e126b 100644 --- a/jersey-rpc-support/src/main/java/com/fullcontact/rpc/jersey/ErrorHandler.java +++ b/jersey-rpc-support/src/main/java/com/fullcontact/rpc/jersey/ErrorHandler.java @@ -1,8 +1,9 @@ package com.fullcontact.rpc.jersey; -import javax.ws.rs.container.AsyncResponse; +import com.google.common.collect.ImmutableMultimap; import java.io.IOException; import java.util.Optional; +import javax.ws.rs.core.Response; /** * Registry for (currently) JVM-global gRPC error handlers. This allows users to override the error handling @@ -15,8 +16,8 @@ public final class ErrorHandler { private ErrorHandler() {} - static void handleUnaryError(Throwable t, AsyncResponse response) { - errorHandler.handleUnaryError(t, response); + static Optional handleUnaryError(Throwable t, ImmutableMultimap responseHeaders) { + return errorHandler.handleUnaryError(t, responseHeaders); } static Optional handleStreamingError(Throwable t) throws IOException { diff --git a/jersey-rpc-support/src/main/java/com/fullcontact/rpc/jersey/GrpcJerseyErrorHandler.java b/jersey-rpc-support/src/main/java/com/fullcontact/rpc/jersey/GrpcJerseyErrorHandler.java index d101e84..680b76d 100644 --- a/jersey-rpc-support/src/main/java/com/fullcontact/rpc/jersey/GrpcJerseyErrorHandler.java +++ b/jersey-rpc-support/src/main/java/com/fullcontact/rpc/jersey/GrpcJerseyErrorHandler.java @@ -1,24 +1,32 @@ package com.fullcontact.rpc.jersey; +import com.google.common.annotations.Beta; +import com.google.common.collect.ImmutableMultimap; import com.google.protobuf.InvalidProtocolBufferException; import com.google.rpc.Status; - -import javax.ws.rs.container.AsyncResponse; -import javax.ws.rs.core.Response; import java.io.IOException; +import java.util.Date; +import java.util.Map; import java.util.Optional; +import javax.ws.rs.container.AsyncResponse; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.ResponseBuilder; /** * Pluggable error handler used by the {@link JerseyUnaryObserver} and {@link JerseyStreamingObserver}. */ +@Beta public interface GrpcJerseyErrorHandler { /** * Handles an exception raised in a unary (request/response) RPC handler. * + * It is up to each implementation as to whether they honor the responseHeaders set by the RPC handler. + * * @param t throwable raised - * @param response JAX-RS AsyncResponse, can call cancel() or resume() with a string payload or {@link Response}. + * @param responseHeaders headers set by the RPC handler + * @return response JAX-RS Response. Returning {@link Optional#empty()} will call {@link AsyncResponse#cancel()}. */ - void handleUnaryError(Throwable t, AsyncResponse response); + Optional handleUnaryError(Throwable t, ImmutableMultimap responseHeaders); /** * Handles an exception raised in a server streaming RPC handler. As HTTP/1.1 practically doesn't support trailers, @@ -32,14 +40,25 @@ public interface GrpcJerseyErrorHandler { Optional handleStreamingError(Throwable t) throws IOException; class Default implements GrpcJerseyErrorHandler { - @Override - public void handleUnaryError(Throwable t, AsyncResponse asyncResponse) { - if(t instanceof InvalidProtocolBufferException) { - asyncResponse.resume(Response.status(Response.Status.BAD_REQUEST).entity(t.getMessage()).build()); + public Optional handleUnaryError(Throwable t, ImmutableMultimap responseHeaders) { + Response response; + if (t instanceof InvalidProtocolBufferException) { + response = Response.status(Response.Status.BAD_REQUEST).entity(t.getMessage()).build(); } else { - asyncResponse.resume(GrpcErrorUtil.createJerseyResponse(t)); + response = GrpcErrorUtil.createJerseyResponse(t); } + + if (!responseHeaders.isEmpty()) { + ResponseBuilder responseBuilder = Response.fromResponse(response); + for (Map.Entry entry : responseHeaders.entries()) { + responseBuilder.header(entry.getKey(), entry.getValue()); + } + + response = responseBuilder.build(); + } + + return Optional.of(response); } @Override diff --git a/jersey-rpc-support/src/main/java/com/fullcontact/rpc/jersey/GrpcJerseyPlatformInterceptors.java b/jersey-rpc-support/src/main/java/com/fullcontact/rpc/jersey/GrpcJerseyPlatformInterceptors.java new file mode 100644 index 0000000..3056863 --- /dev/null +++ b/jersey-rpc-support/src/main/java/com/fullcontact/rpc/jersey/GrpcJerseyPlatformInterceptors.java @@ -0,0 +1,22 @@ +package com.fullcontact.rpc.jersey; + +import io.grpc.*; + +/** + * Common set of interceptors and mutations done to stubs + channels. Users who opt into using this utility class to + * register common interceptors will automatically have new platform features when available. + * + * If your service depends on HTTP headers in the Context for further interceptors, make sure to apply this set of + * interceptors before your own. + */ +public class GrpcJerseyPlatformInterceptors { + private GrpcJerseyPlatformInterceptors() {} // Do not instantiate. + + public static ServerServiceDefinition intercept(BindableService bindableService) { + return intercept(bindableService.bindService()); + } + + public static ServerServiceDefinition intercept(ServerServiceDefinition serverServiceDefinition) { + return ServerInterceptors.intercept(serverServiceDefinition, HttpHeaderInterceptors.serverInterceptor()); + } +} diff --git a/jersey-rpc-support/src/main/java/com/fullcontact/rpc/jersey/HttpHeaderContext.java b/jersey-rpc-support/src/main/java/com/fullcontact/rpc/jersey/HttpHeaderContext.java new file mode 100644 index 0000000..b3c9c04 --- /dev/null +++ b/jersey-rpc-support/src/main/java/com/fullcontact/rpc/jersey/HttpHeaderContext.java @@ -0,0 +1,118 @@ +package com.fullcontact.rpc.jersey; + +import com.google.common.annotations.Beta; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableMultimap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Multimap; +import io.grpc.Context; +import java.util.Collection; +import java.util.Map; + +/** + * Utilities for retrieving and manipulating HTTP headers from a gRPC context. + * + *

Works by attaching an interceptor which bundles headers as a side-channel to the RPC and unbundles headers from + * the RPC responses side-channel. A server interceptor handles muxing and demuxing the headers into the Context. + * + * While `HttpHeaderContext` is gRPC {@link Context} aware and request headers can be safely accessed from background + * threads executed with an attached context, manipulating response headers should only be done from a single thread as + * no effort is put into synchronizing the state. + */ +@Beta +public class HttpHeaderContext { + static final Context.Key> REQUEST_HEADERS = Context + .keyWithDefault("grpc-jersey-request-headers", HashMultimap.create()); + static final Context.Key> RESPONSE_HEADERS = Context + .keyWithDefault("grpc-jersey-response-headers", HashMultimap.create()); + + private HttpHeaderContext() {} // Do not instantiate. + + /** + * Adds a header to the final list of output headers. Does not clear headers with existing name. + * + * Not thread-safe. + */ + public static void addResponseHeader(String name, String value) { + RESPONSE_HEADERS.get().put(name, value); + } + + /** + * Adds a header to the final list of output headers. Clear headers with existing name before adding. + * + * Not thread-safe. + */ + public static void setResponseHeader(String name, String value) { + clearResponseHeader(name); + addResponseHeader(name, value); + } + + /** + * Adds a header to the final list of output headers. Clear headers with existing name before adding. + * + * Not thread-safe. + */ + public static void setResponseHeader(String name, Collection value) { + clearResponseHeader(name); + RESPONSE_HEADERS.get().putAll(name, value); + } + + /** + * Removes a header from the set of response headers. + * + * Not thread-safe. + */ + public static void clearResponseHeader(String name) { + RESPONSE_HEADERS.get().removeAll(name); + } + + /** + * Removes all in-progress response headers. + * + * Not thread-safe. + */ + public static void clearResponseHeaders() { + RESPONSE_HEADERS.get().clear(); + } + + /** + * Returns an immutable copy of the request headers, if any. + */ + public static ImmutableMultimap requestHeaders() { + return ImmutableMultimap.copyOf(REQUEST_HEADERS.get()); + } + + /** + * Returns a immutable copy of the request headers, taking the first value of each header if there are multiple. + */ + public static ImmutableMap requestHeadersFirstValue() { + return firstValueFromEachKey(REQUEST_HEADERS.get()); + } + + /** + * Returns a immutable copy of the response headers. + */ + public static ImmutableMultimap responseHeaders() { + return ImmutableMultimap.copyOf(RESPONSE_HEADERS.get()); + } + + /** + * Returns a immutable copy of the response headers, taking the first value of each header if there are multiple. + */ + public static ImmutableMap responseHeadersFirstValue() { + return firstValueFromEachKey(RESPONSE_HEADERS.get()); + } + + private static ImmutableMap firstValueFromEachKey(Multimap multimap) { + ImmutableMap.Builder map = ImmutableMap.builder(); + + for (Map.Entry> entry : multimap.asMap().entrySet()) { + if (!entry.getValue().isEmpty()) { + map.put(entry.getKey(), Iterables.getFirst(entry.getValue(), null)); + } + } + + return map.build(); + } +} diff --git a/jersey-rpc-support/src/main/java/com/fullcontact/rpc/jersey/HttpHeaderInterceptors.java b/jersey-rpc-support/src/main/java/com/fullcontact/rpc/jersey/HttpHeaderInterceptors.java new file mode 100644 index 0000000..96c4e0e --- /dev/null +++ b/jersey-rpc-support/src/main/java/com/fullcontact/rpc/jersey/HttpHeaderInterceptors.java @@ -0,0 +1,173 @@ +package com.fullcontact.rpc.jersey; + +import com.fullcontact.rpc.Header; +import com.fullcontact.rpc.Headers; +import com.google.common.collect.*; +import io.grpc.*; +import io.grpc.protobuf.ProtoUtils; + +import javax.ws.rs.core.HttpHeaders; +import javax.ws.rs.core.Response; +import java.util.List; +import java.util.Map; + +import static com.fullcontact.rpc.jersey.HttpHeaderContext.REQUEST_HEADERS; +import static com.fullcontact.rpc.jersey.HttpHeaderContext.RESPONSE_HEADERS; + +public class HttpHeaderInterceptors { + private static final Metadata.Key HEADERS_KEY = ProtoUtils.keyForProto(Headers.getDefaultInstance()); + + private HttpHeaderInterceptors() {} // Do not instantiate. + + /** + * Returns the server interceptor necessary to make {@link HttpHeaderContext} work. It is recommended you use {@link + * GrpcJerseyPlatformInterceptors#intercept} if possible. + */ + public static HttpHeaderServerInterceptor serverInterceptor() { + return HttpHeaderServerInterceptor.INSTANCE; + } + + /** + * Returns the client interceptor used to extract the HTTP headers from the RPC sidechannel. Public for use in + * generated code, should not be used by the end user. + */ + public static HttpHeaderClientInterceptor clientInterceptor(HttpHeaders httpHeaders) { + return new HttpHeaderClientInterceptor(httpHeaders); + } + + private static Headers headersFromMultimap(Multimap headers) { + Headers.Builder builder = Headers.newBuilder(); + for (Map.Entry header : headers.entries()) { + builder.addHeader(Header.newBuilder() + .setName(header.getKey()) + .setValue(header.getValue()) + .build()); + } + + return builder.build(); + } + + private static ImmutableMultimap toMultimapFromHeaders(Headers headers) { + ImmutableMultimap.Builder builder = ImmutableMultimap.builder(); + for (Header header : headers.getHeaderList()) { + builder.put(header.getName(), header.getValue()); + } + + return builder.build(); + } + + private static ImmutableMultimap toMultimapFromJerseyHeaders(HttpHeaders headers) { + ImmutableMultimap.Builder builder = ImmutableMultimap.builder(); + for (Map.Entry> header : headers.getRequestHeaders().entrySet()) { + builder.putAll(header.getKey(), header.getValue()); + } + + return builder.build(); + } + + public static class HttpHeaderClientInterceptor implements ClientInterceptor { + private final ImmutableMultimap httpRequestHeaders; + + private ImmutableMultimap httpResponseHeaders = ImmutableMultimap.of(); + private boolean receivedHeaders = false; + + HttpHeaderClientInterceptor(HttpHeaders httpRequestHeaders) { + this.httpRequestHeaders = toMultimapFromJerseyHeaders(httpRequestHeaders); + } + + @Override + public ClientCall interceptCall(MethodDescriptor method, + CallOptions callOptions, Channel next) { + return new ForwardingClientCall.SimpleForwardingClientCall(next.newCall(method, callOptions)) { + @Override + public void start(Listener responseListener, Metadata headers) { + // Bundle known request headers into RPC side-channel. + headers.put(HEADERS_KEY, headersFromMultimap(httpRequestHeaders)); + + delegate().start( + new ForwardingClientCallListener.SimpleForwardingClientCallListener( + responseListener) { + @Override + public void onHeaders(Metadata headers) { + processMetadata(headers); + super.onHeaders(headers); + } + + @Override + public void onClose(Status status, Metadata trailers) { + processMetadata(trailers); + super.onClose(status, trailers); + } + + private void processMetadata(Metadata metadata) { + if (receivedHeaders) { + return; + } + + // Set response headers if present on RPC. + if (metadata.containsKey(HEADERS_KEY)) { + receivedHeaders = true; + httpResponseHeaders = toMultimapFromHeaders(metadata.get(HEADERS_KEY)); + } + } + }, headers); + } + }; + } + + ImmutableMultimap getHttpResponseHeaders() { + return httpResponseHeaders; + } + + Response.ResponseBuilder withResponseHeaders(Response.ResponseBuilder builder) { + if (!httpResponseHeaders.isEmpty()) { + for (Map.Entry header : httpResponseHeaders.entries()) { + builder.header(header.getKey(), header.getValue()); + } + } + + return builder; + } + } + + private static class HttpHeaderServerInterceptor implements ServerInterceptor { + private static final HttpHeaderServerInterceptor INSTANCE = new HttpHeaderServerInterceptor(); + + private HttpHeaderServerInterceptor() {} + + @Override + public ServerCall.Listener interceptCall( + ServerCall call, + Metadata headers, + ServerCallHandler next) { + Context context = Context.current(); + // Set request headers if present on RPC. + if (headers.containsKey(HEADERS_KEY)) { + context = context.withValue(REQUEST_HEADERS, toMultimapFromHeaders(headers.get(HEADERS_KEY))); + } + + ForwardingServerCall.SimpleForwardingServerCall simpleForwardingServerCall = + new ForwardingServerCall.SimpleForwardingServerCall(call) { + private boolean sentHeaders = false; + + @Override + public void sendHeaders(Metadata headers) { + headers.put(HEADERS_KEY, headersFromMultimap(RESPONSE_HEADERS.get())); + sentHeaders = true; + super.sendHeaders(headers); + } + + @Override + public void close(Status status, Metadata trailers) { + if (!sentHeaders) { + trailers.put(HEADERS_KEY, headersFromMultimap(RESPONSE_HEADERS.get())); + } + + super.close(status, trailers); + } + }; + + return Contexts.interceptCall(context, simpleForwardingServerCall, headers, next); + } + } +} diff --git a/jersey-rpc-support/src/main/java/com/fullcontact/rpc/jersey/JerseyStreamingObserver.java b/jersey-rpc-support/src/main/java/com/fullcontact/rpc/jersey/JerseyStreamingObserver.java index b7240f7..8da4588 100644 --- a/jersey-rpc-support/src/main/java/com/fullcontact/rpc/jersey/JerseyStreamingObserver.java +++ b/jersey-rpc-support/src/main/java/com/fullcontact/rpc/jersey/JerseyStreamingObserver.java @@ -1,69 +1,101 @@ package com.fullcontact.rpc.jersey; +import com.fullcontact.rpc.jersey.HttpHeaderInterceptors.HttpHeaderClientInterceptor; import com.google.common.collect.ImmutableList; import com.google.protobuf.Message; import io.grpc.stub.StreamObserver; -import org.glassfish.jersey.server.ChunkedOutput; +import javax.servlet.AsyncContext; +import javax.servlet.ServletOutputStream; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Variant; +import java.io.EOFException; import java.io.IOException; import java.util.List; +import java.util.Map; import java.util.Optional; /** - * gRPC StreamObserver which publishes to a Jersey ChunkedOutput. Used for server-side streaming of messages. + * gRPC StreamObserver which publishes JSON-formatted messages from a gRPC server stream. Uses underlying servlet. + * {@link AsyncContext}. * * @author Michael Rose (xorlev) */ public class JerseyStreamingObserver implements StreamObserver { public static final List VARIANT_LIST = ImmutableList.of( - new Variant(MediaType.APPLICATION_JSON_TYPE, (String) null, null), - new Variant(new MediaType("text", "event-stream"), (String) null, null) + new Variant(MediaType.APPLICATION_JSON_TYPE, (String) null, null), + new Variant(new MediaType("text", "event-stream"), (String) null, null) ); - private final ChunkedOutput output; + private final AsyncContext asyncContext; + private final HttpHeaderClientInterceptor httpHeaderClientInterceptor; + private final HttpServletResponse httpServletResponse; + private final ServletOutputStream outputStream; private final boolean sse; + private volatile boolean first = true; private volatile boolean closed = false; // Reusable buffer used in the context of a single streaming request, starts at 128 bytes. private StringBuilder buffer = new StringBuilder(128); - public JerseyStreamingObserver(ChunkedOutput output, boolean sse) { - this.output = output; + public JerseyStreamingObserver( + HttpHeaderClientInterceptor httpHeaderClientInterceptor, + HttpServletRequest httpServletRequest, + HttpServletResponse httpServletResponse, + boolean sse) + throws IOException { + this.asyncContext = httpServletRequest.getAsyncContext(); + this.httpHeaderClientInterceptor = httpHeaderClientInterceptor; + this.httpServletResponse = httpServletResponse; + this.outputStream = asyncContext.getResponse().getOutputStream(); this.sse = sse; } @Override public void onNext(V value) { - if(closed) + if (closed) { throw new IllegalStateException("JerseyStreamingObserver has already been closed"); + } + + addHeadersIfNotSent(); try { write(JsonHandler.streamPrinter().print(value)); - } - catch(IOException e) { + } catch (IOException e) { onError(e); } } @Override public void onError(Throwable t) { - closed = true; + if (t instanceof EOFException) { + closed = true; + // The client went away, there's not much we can do. + return; + } + try { + // Send headers if we haven't sent anything yet. + addHeadersIfNotSent(); + // As we lack supported trailers in standard HTTP, we'll have to make do with emitting an error to the // primary stream Optional errorPayload = ErrorHandler.handleStreamingError(t); - if(errorPayload.isPresent()) { + if (errorPayload.isPresent()) { write(errorPayload.get()); } - output.close(); - } - catch(IOException e) { + + closed = true; + outputStream.close(); + asyncContext.complete(); + } catch (IOException e) { // Something really broke, try closing the connection. try { - output.close(); + outputStream.close(); + asyncContext.complete(); } catch (IOException e1) { // Ignored if we already have. } @@ -72,31 +104,47 @@ public void onError(Throwable t) { @Override public void onCompleted() { + addHeadersIfNotSent(); + try { closed = true; - output.close(); - } - catch(IOException e) { + outputStream.flush(); + outputStream.close(); + asyncContext.complete(); + } catch (IOException e) { onError(e); } } + private void addHeadersIfNotSent() { + if (!first || closed) { + return; + } else { + first = false; + } + + for (Map.Entry header : httpHeaderClientInterceptor.getHttpResponseHeaders().entries()) { + httpServletResponse.addHeader(header.getKey(), header.getValue()); + } + } + private void write(String value) throws IOException { - if(value.isEmpty()) { + if (value.isEmpty()) { return; } - if(sse) { + if (sse) { buffer.append("data: "); } buffer.append(value).append('\n'); - if(sse) { + if (sse) { buffer.append('\n'); } - output.write(buffer.toString()); + outputStream.print(buffer.toString()); + outputStream.flush(); // Reset buffer position to 0. At this point, the buffer will have a capacity of the max size(value) passed // through so far. In the majority of cases, other messages will be of similar (or larger) size, diff --git a/jersey-rpc-support/src/main/java/com/fullcontact/rpc/jersey/JerseyUnaryObserver.java b/jersey-rpc-support/src/main/java/com/fullcontact/rpc/jersey/JerseyUnaryObserver.java index 6c8356e..d1a791f 100644 --- a/jersey-rpc-support/src/main/java/com/fullcontact/rpc/jersey/JerseyUnaryObserver.java +++ b/jersey-rpc-support/src/main/java/com/fullcontact/rpc/jersey/JerseyUnaryObserver.java @@ -1,32 +1,42 @@ package com.fullcontact.rpc.jersey; +import com.fullcontact.rpc.jersey.HttpHeaderInterceptors.HttpHeaderClientInterceptor; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.Message; import io.grpc.stub.StreamObserver; +import java.util.Optional; import javax.ws.rs.container.AsyncResponse; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.ResponseBuilder; /** - * gRPC StreamObserver which publishes to a Jersey AsyncResponse. Used for unary (singular request/response) - * semantics. + * gRPC StreamObserver which publishes to a Jersey AsyncResponse. Used for unary (singular request/response) semantics. */ public class JerseyUnaryObserver implements StreamObserver { private final AsyncResponse asyncResponse; + private final HttpHeaderClientInterceptor httpHeaderClientInterceptor; private volatile boolean closed = false; - public JerseyUnaryObserver(AsyncResponse asyncResponse) { + public JerseyUnaryObserver(AsyncResponse asyncResponse, HttpHeaderClientInterceptor httpHeaderClientInterceptor) { this.asyncResponse = asyncResponse; + this.httpHeaderClientInterceptor = httpHeaderClientInterceptor; } @Override public void onNext(V value) { - if(closed) + if (closed) { throw new IllegalStateException("JerseyUnaryObserver has already been closed"); - try { - asyncResponse.resume(JsonHandler.unaryPrinter().print(value)); } - catch(InvalidProtocolBufferException e) { + try { + Response response = httpHeaderClientInterceptor + .withResponseHeaders(Response.ok()) + .entity(JsonHandler.unaryPrinter().print(value)) + .build(); + asyncResponse.resume(response); + closed = true; + } catch (InvalidProtocolBufferException e) { onError(e); } } @@ -34,7 +44,13 @@ public void onNext(V value) { @Override public void onError(Throwable t) { closed = true; - ErrorHandler.handleUnaryError(t, asyncResponse); + Optional response = ErrorHandler + .handleUnaryError(t, httpHeaderClientInterceptor.getHttpResponseHeaders()); + if (response.isPresent()) { + asyncResponse.resume(response.get()); + } else { + asyncResponse.cancel(); + } } @Override diff --git a/jersey-rpc-support/src/main/java/com/fullcontact/rpc/jersey/RequestParser.java b/jersey-rpc-support/src/main/java/com/fullcontact/rpc/jersey/RequestParser.java index 35cc950..003760e 100644 --- a/jersey-rpc-support/src/main/java/com/fullcontact/rpc/jersey/RequestParser.java +++ b/jersey-rpc-support/src/main/java/com/fullcontact/rpc/jersey/RequestParser.java @@ -59,8 +59,10 @@ public static > T parseHeaders(HttpHeaders headers, T public static Metadata parseHeaders(HttpHeaders headers){ Metadata newHeaders = new Metadata(); - headers.getRequestHeaders().forEach((k, v) -> - newHeaders.put(Metadata.Key.of(k, Metadata.ASCII_STRING_MARSHALLER), v.get(0)) + + headers.getRequestHeaders().forEach((k, v) -> { + newHeaders.put(Metadata.Key.of(k, Metadata.ASCII_STRING_MARSHALLER), v.get(0)); + } ); return newHeaders; diff --git a/jersey-rpc-support/src/main/proto/grpcjersey/headers.proto b/jersey-rpc-support/src/main/proto/grpcjersey/headers.proto new file mode 100644 index 0000000..468b24b --- /dev/null +++ b/jersey-rpc-support/src/main/proto/grpcjersey/headers.proto @@ -0,0 +1,16 @@ +syntax = "proto3"; + +package grpcjersey; + +option java_multiple_files = true; +option java_package = "com.fullcontact.rpc"; + +message Headers { + // List of headers, allows duplicated key names. + repeated Header header = 1; +} + +message Header { + string name = 1; + string value = 2; +} \ No newline at end of file diff --git a/protoc-gen-jersey/src/main/java/com/fullcontact/rpc/jersey/CodeGenerator.java b/protoc-gen-jersey/src/main/java/com/fullcontact/rpc/jersey/CodeGenerator.java index 9b0d976..f365c64 100644 --- a/protoc-gen-jersey/src/main/java/com/fullcontact/rpc/jersey/CodeGenerator.java +++ b/protoc-gen-jersey/src/main/java/com/fullcontact/rpc/jersey/CodeGenerator.java @@ -174,7 +174,7 @@ ResourceToGenerate buildResourceSpec( .className(className) .grpcStub(grpcImplClass) .methods(methods.build()) - .parseHeaders(isProxy) + .isProxy(isProxy) .fileName(fileName) .build(); } @@ -315,7 +315,7 @@ static class ResourceToGenerate { String className; String grpcStub; // fully-qualified class name; List methods; - boolean parseHeaders; + boolean isProxy; String fileName; String grpcJerseyVersion() { diff --git a/protoc-gen-jersey/src/main/resources/resource.tmpl.java b/protoc-gen-jersey/src/main/resources/resource.tmpl.java index c165caf..81757a0 100644 --- a/protoc-gen-jersey/src/main/resources/resource.tmpl.java +++ b/protoc-gen-jersey/src/main/resources/resource.tmpl.java @@ -1,5 +1,6 @@ package {{javaPackage}}; +import com.fullcontact.rpc.jersey.HttpHeaderInterceptors; import com.fullcontact.rpc.jersey.JerseyUnaryObserver; import com.fullcontact.rpc.jersey.JerseyStreamingObserver; import com.fullcontact.rpc.jersey.RequestParser; @@ -7,9 +8,12 @@ import com.google.protobuf.Descriptors; import com.google.protobuf.Descriptors; import com.google.protobuf.Message; -import org.glassfish.jersey.server.ChunkedOutput; +import java.io.OutputStream; import java.io.IOException; +import javax.servlet.AsyncContext; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; import javax.ws.rs.*; import javax.ws.rs.core.*; import javax.ws.rs.container.AsyncResponse; @@ -37,29 +41,27 @@ public class {{className}} { {{#pathParams}} @PathParam("{{name}}") String {{nameSanitized}}, {{/pathParams}} - @Context UriInfo uriInfo - {{#parseHeaders}} - ,@Context HttpHeaders headers - {{/parseHeaders}} + @Context UriInfo uriInfo, + @Context HttpHeaders headers {{#bodyFieldPath}} ,String body {{/bodyFieldPath}} ,@Suspended final AsyncResponse asyncResponse) throws IOException { - JerseyUnaryObserver<{{responseType}}> observer = new JerseyUnaryObserver<>(asyncResponse); + HttpHeaderInterceptors.HttpHeaderClientInterceptor interceptor = + HttpHeaderInterceptors.clientInterceptor(headers); + JerseyUnaryObserver<{{responseType}}> observer = new JerseyUnaryObserver<>(asyncResponse, interceptor); {{requestType}}.Builder r = {{requestType}}.newBuilder(); - {{#parseHeaders}} - // Shadowed to prevent building up headers - {{grpcStub}} stub; - {{/parseHeaders}} + {{grpcStub}} stub = this.stub; try { - {{#parseHeaders}} - stub = RequestParser.parseHeaders(headers, this.stub); - {{/parseHeaders}} + {{#isProxy}} + stub = RequestParser.parseHeaders(headers, stub); + stub = stub.withInterceptors(interceptor); + {{/isProxy}} {{#bodyFieldPath}} - RequestParser.handleBody("{{bodyFieldPath}}",r,body); + RequestParser.handleBody("{{bodyFieldPath}}", r, body); {{/bodyFieldPath}} {{^bodyFieldPath}} - RequestParser.parseQueryParams(uriInfo,r); + RequestParser.parseQueryParams(uriInfo, r); {{/bodyFieldPath}} {{#pathParams}} RequestParser.setFieldSafely(r, "{{name}}", {{nameSanitized}}); @@ -76,47 +78,47 @@ public class {{className}} { @{{method}} @Path("{{path}}") @Produces({"application/json; charset=utf-8", "text/event-stream; charset=utf-8"}) - public ChunkedOutput {{methodName}}_{{method}}_{{methodIndex}}( + public void {{methodName}}_{{method}}_{{methodIndex}}( {{#pathParams}} @PathParam("{{name}}") String {{nameSanitized}}, {{/pathParams}} - @Context UriInfo uriInfo - {{#parseHeaders}} - ,@Context HttpHeaders headers - {{/parseHeaders}} - ,@Context Request context + @Suspended final AsyncResponse asyncResponse, + @Context HttpServletRequest servletRequest, + @Context HttpServletResponse servletResponse, + @Context UriInfo uriInfo, + @Context HttpHeaders headers, + @Context Request context {{#bodyFieldPath}} ,String body{{/bodyFieldPath}}) throws IOException { - final ChunkedOutput output = new ChunkedOutput(String.class); Variant variant = context.selectVariant(VARIANT_LIST); boolean sse = "text/event-stream".equals(variant.getMediaType().toString()); - JerseyStreamingObserver<{{responseType}}> observer = new JerseyStreamingObserver<>(output, sse); + + HttpHeaderInterceptors.HttpHeaderClientInterceptor interceptor = + HttpHeaderInterceptors.clientInterceptor(headers); + JerseyStreamingObserver<{{responseType}}> observer = + new JerseyStreamingObserver<>(interceptor, servletRequest, servletResponse, sse); {{requestType}}.Builder r = {{requestType}}.newBuilder(); - {{#parseHeaders}} - // Shadowed to prevent building up headers - {{grpcStub}} stub; - {{/parseHeaders}} + {{grpcStub}} stub = this.stub; try { - {{#parseHeaders}} - stub = RequestParser.parseHeaders(headers, this.stub); - {{/parseHeaders}} + {{#isProxy}} + stub = RequestParser.parseHeaders(headers, stub); + stub = stub.withInterceptors(interceptor); + {{/isProxy}} {{#bodyFieldPath}} - RequestParser.handleBody("{{bodyFieldPath}}",r,body); + RequestParser.handleBody("{{bodyFieldPath}}", r, body); {{/bodyFieldPath}} {{^bodyFieldPath}} - RequestParser.parseQueryParams(uriInfo,r); + RequestParser.parseQueryParams(uriInfo, r); {{/bodyFieldPath}} {{#pathParams}} RequestParser.setFieldSafely(r, "{{name}}", {{nameSanitized}}); {{/pathParams}} } catch(Exception e) { observer.onError(e); - return output; + return; } stub.{{methodNameLower}}(r.build(), observer); - - return output; } {{/streamMethods}} } diff --git a/protos/test.proto b/protos/test.proto index 9e0212a..f8781c9 100644 --- a/protos/test.proto +++ b/protos/test.proto @@ -65,4 +65,4 @@ message TestRequest { } message TestResponse { TestRequest request = 1; -} +} \ No newline at end of file