Skip to content

Commit 183d0e9

Browse files
Jozott00MiletaA
authored andcommitted
grpc: Add gzip Compression Support (Kotlin#527)
1 parent 8a437fb commit 183d0e9

File tree

18 files changed

+2636
-0
lines changed

18 files changed

+2636
-0
lines changed
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
/*
2+
* Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.rpc.grpc.client
6+
7+
import kotlinx.coroutines.flow.Flow
8+
import kotlinx.rpc.grpc.GrpcMetadata
9+
import kotlinx.rpc.grpc.Status
10+
import kotlinx.rpc.grpc.client.GrpcCallOptions
11+
import kotlinx.rpc.grpc.descriptor.MethodDescriptor
12+
13+
/**
14+
* The scope of a single outgoing gRPC client call observed by a [ClientInterceptor].
15+
*
16+
* An interceptor receives this scope instance for every call and can:
17+
* - Inspect the RPC [method] being invoked.
18+
* - Read or populate [requestHeaders] before the request is sent.
19+
* - Read [callOptions] that affect transport-level behavior.
20+
* - Register callbacks with [onHeaders] and [onClose] to observe response metadata and final status.
21+
* - Cancel the call early via [cancel].
22+
* - Continue the call by calling [proceed] with a (possibly transformed) request [Flow].
23+
* - Transform the response by modifying the returned [Flow].
24+
*
25+
* ```kt
26+
* val interceptor = object : ClientInterceptor {
27+
* override fun <Request, Response> ClientCallScope<Request, Response>.intercept(
28+
* request: Flow<Request>
29+
* ): Flow<Response> {
30+
* // Example: add a header before proceeding
31+
* requestHeaders[MyKeys.Authorization] = token
32+
*
33+
* // Example: modify call options
34+
* callOptions.timeout = 5.seconds
35+
*
36+
* // Example: observe response metadata
37+
* onHeaders { headers -> /* inspect headers */ }
38+
* onClose { status, trailers -> /* log status/trailers */ }
39+
*
40+
* // IMPORTANT: proceed forwards the call to the next interceptor/transport.
41+
* // If you do not call proceed, no request will be sent and the call is short-circuited.
42+
* return proceed(request)
43+
* }
44+
* }
45+
* ```
46+
*
47+
* @param Request the request message type of the RPC.
48+
* @param Response the response message type of the RPC.
49+
*/
50+
public interface ClientCallScope<Request, Response> {
51+
/** Descriptor of the RPC method (name, marshalling, type) being invoked. */
52+
public val method: MethodDescriptor<Request, Response>
53+
54+
/**
55+
* Outgoing request headers for this call.
56+
*
57+
* Interceptors may read and mutate this metadata
58+
* before calling [proceed] so the headers are sent to the server. Headers added after
59+
* the call has already been proceeded may not be reflected on the wire.
60+
*/
61+
public val requestHeaders: GrpcMetadata
62+
63+
/**
64+
* Transport/engine options used for this call (deadlines, compression, etc.).
65+
* Modifying this object is only possible before the call is proceeded.
66+
*/
67+
public val callOptions: GrpcCallOptions
68+
69+
/**
70+
* Register a callback invoked when the initial response headers are received.
71+
* Typical gRPC semantics guarantee headers are delivered at most once per call
72+
* and before the first message is received.
73+
*/
74+
public fun onHeaders(block: (responseHeaders: GrpcMetadata) -> Unit)
75+
76+
/**
77+
* Register a callback invoked when the call completes, successfully or not.
78+
* The final `status` and trailing `responseTrailers` are provided.
79+
*/
80+
public fun onClose(block: (status: Status, responseTrailers: GrpcMetadata) -> Unit)
81+
82+
/**
83+
* Cancel the call locally, providing a human-readable [message] and an optional [cause].
84+
* This method won't return and abort all further processing.
85+
*
86+
* We made cancel throw a [kotlinx.rpc.grpc.StatusException] instead of returning, so control flow is explicit and
87+
* race conditions between interceptors and the transport layer are avoided.
88+
*/
89+
public fun cancel(message: String, cause: Throwable? = null): Nothing
90+
91+
/**
92+
* Continue the invocation by forwarding it to the next interceptor or to the underlying transport.
93+
*
94+
* This function is the heart of an interceptor:
95+
* - It must be called to actually perform the RPC. If you never call [proceed], the request is not sent
96+
* and the call is effectively short-circuited by the interceptor.
97+
* - You may transform the [request] flow before passing it to [proceed] (e.g., logging, retry orchestration,
98+
* compression, metrics). The returned [Flow] yields response messages and can also be transformed
99+
* before being returned to the caller.
100+
* - Call [proceed] at most once per intercepted call. Calling it multiple times or after cancellation
101+
* is not supported.
102+
*/
103+
public fun proceed(request: Flow<Request>): Flow<Response>
104+
}
105+
106+
/**
107+
* Client-side interceptor for gRPC calls.
108+
*
109+
* Implementations can observe and modify client calls in a structured way. The primary entry point is the
110+
* [intercept] extension function on [ClientCallScope], which receives the inbound request [Flow] and must
111+
* call [ClientCallScope.proceed] to forward the call.
112+
*
113+
* Common use-cases include:
114+
* - Adding authentication or custom headers.
115+
* - Implementing logging/metrics.
116+
* - Observing headers/trailers and final status.
117+
* - Transforming request/response flows (e.g., mapping, buffering, throttling).
118+
*/
119+
public interface ClientInterceptor {
120+
/**
121+
* Intercept a client call.
122+
*
123+
* You can:
124+
* - Inspect [ClientCallScope.method] and [ClientCallScope.callOptions].
125+
* - Read or populate [ClientCallScope.requestHeaders].
126+
* - Register [ClientCallScope.onHeaders] and [ClientCallScope.onClose] callbacks.
127+
* - Transform the [request] flow or wrap the resulting response flow.
128+
*
129+
* IMPORTANT: [ClientCallScope.proceed] must eventually be called to actually execute the RPC and obtain
130+
* the response [Flow]. If [ClientCallScope.proceed] is omitted, the call will not reach the server.
131+
*/
132+
public fun <Request, Response> ClientCallScope<Request, Response>.intercept(
133+
request: Flow<Request>,
134+
): Flow<Response>
135+
136+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.rpc.grpc.client
6+
7+
import kotlinx.rpc.grpc.GrpcCompression
8+
import kotlin.time.Duration
9+
10+
/**
11+
* The collection of runtime options for a new gRPC call.
12+
*
13+
* This class allows configuring per-call behavior such as timeouts.
14+
*/
15+
public class GrpcCallOptions {
16+
/**
17+
* The maximum duration to wait for the RPC to complete.
18+
*
19+
* If set, the RPC will be canceled (with `DEADLINE_EXCEEDED`)
20+
* if it does not complete within the specified duration.
21+
* The timeout is measured from the moment the call is initiated.
22+
* If `null`, no timeout is applied, and the call may run indefinitely.
23+
*
24+
* The default value is `null`.
25+
*
26+
* @see kotlin.time.Duration
27+
*/
28+
public var timeout: Duration? = null
29+
30+
/**
31+
* The compression algorithm to use for encoding outgoing messages in this call.
32+
*
33+
* When set to a value other than [GrpcCompression.None], the client will compress request messages
34+
* using the specified algorithm before sending them to the server. The chosen compression algorithm
35+
* is communicated to the server via the `grpc-encoding` header.
36+
*
37+
* ## Default Behavior
38+
* Defaults to [GrpcCompression.None], meaning no compression is applied to messages.
39+
*
40+
* ## Server Compatibility
41+
* **Important**: It is the caller's responsibility to ensure the server supports the chosen
42+
* compression algorithm. There is no automatic negotiation performed. If the server does not
43+
* support the requested compression, the call will fail.
44+
*
45+
* ## Available Algorithms
46+
* - [GrpcCompression.None]: No compression (identity encoding) - **default**
47+
* - [GrpcCompression.Gzip]: GZIP compression, widely supported
48+
*
49+
* @see GrpcCompression
50+
*/
51+
public var compression: GrpcCompression = GrpcCompression.None
52+
}

0 commit comments

Comments
 (0)