Skip to content

Commit

Permalink
Expose public creation methods for GRPCAsyncRequestStream and `GRPC…
Browse files Browse the repository at this point in the history
…AsyncResponseStreamWriter` (grpc#1485)

Motivation:

It is highly desirable to be able to write tests against the generated method of a service. Currently, this is close to impossible since both the `GRPCAsyncRequestStream` and the `GRPCAsyncResponseStreamWriter` don't expose a public init so users cannot drive and observe the functions.

Modification:

Adds new public methods to drive and observe the request stream and the response writer.

Result

We can now test functions which use the request stream and response stream writer.
  • Loading branch information
FranzBusch authored and WendellXY committed Aug 24, 2023
1 parent fb0a898 commit 6ad7c0b
Show file tree
Hide file tree
Showing 5 changed files with 288 additions and 18 deletions.
120 changes: 107 additions & 13 deletions Sources/GRPC/AsyncAwaitSupport/GRPCAsyncRequestStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,46 +15,140 @@
*/

#if compiler(>=5.6)

/// This is currently a wrapper around AsyncThrowingStream because we want to be
/// A type for the stream of request messages send to a gRPC server method.
///
/// To enable testability this type provides a static ``GRPCAsyncRequestStream/makeTestingRequestStream()``
/// method which allows you to create a stream that you can drive.
///
/// - Note: This is currently a wrapper around AsyncThrowingStream because we want to be
/// able to swap out the implementation for something else in the future.
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
public struct GRPCAsyncRequestStream<Element: Sendable>: AsyncSequence {
/// A source used for driving a ``GRPCAsyncRequestStream`` during tests.
public struct Source {
@usableFromInline
internal let continuation: AsyncThrowingStream<Element, Error>.Continuation

@inlinable
init(continuation: AsyncThrowingStream<Element, Error>.Continuation) {
self.continuation = continuation
}

/// Yields the element to the request stream.
///
/// - Parameter element: The element to yield to the request stream.
@inlinable
public func yield(_ element: Element) {
self.continuation.yield(element)
}

/// Finished the request stream.
@inlinable
public func finish() {
self.continuation.finish()
}

/// Finished the request stream.
///
/// - Parameter error: An optional `Error` to finish the request stream with.
@inlinable
public func finish(throwing error: Error?) {
self.continuation.finish(throwing: error)
}
}

/// Simple struct for the return type of ``GRPCAsyncRequestStream/makeTestingRequestStream()``.
///
/// This struct contains two properties:
/// 1. The ``stream`` which is the actual ``GRPCAsyncRequestStream`` and should be passed to the method under testing.
/// 2. The ``source`` which can be used to drive the stream.
public struct TestingStream {
/// The actual stream.
public let stream: GRPCAsyncRequestStream<Element>
/// The source used to drive the stream.
public let source: Source

@inlinable
init(stream: GRPCAsyncRequestStream<Element>, source: Source) {
self.stream = stream
self.source = source
}
}

@usableFromInline
internal typealias _WrappedStream = PassthroughMessageSequence<Element, Error>
enum Backing: Sendable {
case passthroughMessageSequence(PassthroughMessageSequence<Element, Error>)
case asyncStream(AsyncThrowingStream<Element, Error>)
}

@usableFromInline
internal let _stream: _WrappedStream
internal let backing: Backing

@inlinable
internal init(_ sequence: PassthroughMessageSequence<Element, Error>) {
self.backing = .passthroughMessageSequence(sequence)
}

@inlinable
internal init(_ stream: _WrappedStream) {
self._stream = stream
internal init(_ stream: AsyncThrowingStream<Element, Error>) {
self.backing = .asyncStream(stream)
}

/// Creates a new testing stream.
///
/// This is useful for writing unit tests for your gRPC method implementations since it allows you to drive the stream passed
/// to your method.
///
/// - Returns: A new ``TestingStream`` containing the actual ``GRPCAsyncRequestStream`` and a ``Source``.
@inlinable
public static func makeTestingRequestStream() -> TestingStream {
var continuation: AsyncThrowingStream<Element, Error>.Continuation!
let stream = AsyncThrowingStream<Element, Error> { continuation = $0 }
let source = Source(continuation: continuation)
let requestStream = Self(stream)
return TestingStream(stream: requestStream, source: source)
}

@inlinable
public func makeAsyncIterator() -> Iterator {
Self.AsyncIterator(self._stream)
switch self.backing {
case let .passthroughMessageSequence(sequence):
return Self.AsyncIterator(.passthroughMessageSequence(sequence.makeAsyncIterator()))
case let .asyncStream(stream):
return Self.AsyncIterator(.asyncStream(stream.makeAsyncIterator()))
}
}

public struct Iterator: AsyncIteratorProtocol {
@usableFromInline
internal var iterator: _WrappedStream.AsyncIterator
enum BackingIterator {
case passthroughMessageSequence(PassthroughMessageSequence<Element, Error>.Iterator)
case asyncStream(AsyncThrowingStream<Element, Error>.Iterator)
}

@usableFromInline
internal var iterator: BackingIterator

@usableFromInline
internal init(_ stream: _WrappedStream) {
self.iterator = stream.makeAsyncIterator()
internal init(_ iterator: BackingIterator) {
self.iterator = iterator
}

@inlinable
public mutating func next() async throws -> Element? {
try await self.iterator.next()
switch self.iterator {
case let .passthroughMessageSequence(iterator):
return try await iterator.next()
case var .asyncStream(iterator):
let element = try await iterator.next()
self.iterator = .asyncStream(iterator)
return element
}
}
}
}

@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
extension GRPCAsyncRequestStream: Sendable where Element: Sendable {}
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
extension GRPCAsyncRequestStream.Iterator: Sendable where Element: Sendable {}

#endif
115 changes: 112 additions & 3 deletions Sources/GRPC/AsyncAwaitSupport/GRPCAsyncResponseStreamWriter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,137 @@
#if compiler(>=5.6)

/// Writer for server-streaming RPC handlers to provide responses.
///
/// To enable testability this type provides a static ``GRPCAsyncResponseStreamWriter/makeTestingResponseStreamWriter()``
/// method which allows you to create a stream that you can drive.
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
public struct GRPCAsyncResponseStreamWriter<Response: Sendable>: Sendable {
/// An `AsyncSequence` backing a ``GRPCAsyncResponseStreamWriter`` for testing purposes.
///
/// - Important: This `AsyncSequence` is never finishing.
public struct ResponseStream: AsyncSequence {
public typealias Element = (Response, Compression)

@usableFromInline
internal let stream: AsyncStream<(Response, Compression)>

@usableFromInline
internal let continuation: AsyncStream<(Response, Compression)>.Continuation

@inlinable
init(
stream: AsyncStream<(Response, Compression)>,
continuation: AsyncStream<(Response, Compression)>.Continuation
) {
self.stream = stream
self.continuation = continuation
}

public func makeAsyncIterator() -> AsyncIterator {
AsyncIterator(iterator: self.stream.makeAsyncIterator())
}

/// Finishes the response stream.
///
/// This is useful in tests to finish the stream after the async method finished and allows you to collect all written responses.
public func finish() {
self.continuation.finish()
}

public struct AsyncIterator: AsyncIteratorProtocol {
@usableFromInline
internal var iterator: AsyncStream<(Response, Compression)>.AsyncIterator

@inlinable
init(iterator: AsyncStream<(Response, Compression)>.AsyncIterator) {
self.iterator = iterator
}

public mutating func next() async -> Element? {
await self.iterator.next()
}
}
}

/// Simple struct for the return type of ``GRPCAsyncResponseStreamWriter/makeTestingResponseStreamWriter()``.
///
/// This struct contains two properties:
/// 1. The ``writer`` which is the actual ``GRPCAsyncResponseStreamWriter`` and should be passed to the method under testing.
/// 2. The ``stream`` which can be used to observe the written responses.
public struct TestingStreamWriter {
/// The actual writer.
public let writer: GRPCAsyncResponseStreamWriter<Response>
/// The written responses in a stream.
///
/// - Important: This `AsyncSequence` is never finishing.
public let stream: ResponseStream

@inlinable
init(writer: GRPCAsyncResponseStreamWriter<Response>, stream: ResponseStream) {
self.writer = writer
self.stream = stream
}
}

@usableFromInline
enum Backing: Sendable {
case asyncWriter(AsyncWriter<Delegate>)
case closure(@Sendable ((Response, Compression)) async -> Void)
}

@usableFromInline
internal typealias Element = (Response, Compression)

@usableFromInline
internal typealias Delegate = AsyncResponseStreamWriterDelegate<Response>

@usableFromInline
internal let asyncWriter: AsyncWriter<Delegate>
internal let backing: Backing

@inlinable
internal init(wrapping asyncWriter: AsyncWriter<Delegate>) {
self.asyncWriter = asyncWriter
self.backing = .asyncWriter(asyncWriter)
}

@inlinable
internal init(onWrite: @escaping @Sendable ((Response, Compression)) async -> Void) {
self.backing = .closure(onWrite)
}

@inlinable
public func send(
_ response: Response,
compression: Compression = .deferToCallDefault
) async throws {
try await self.asyncWriter.write((response, compression))
switch self.backing {
case let .asyncWriter(writer):
try await writer.write((response, compression))

case let .closure(closure):
await closure((response, compression))
}
}

/// Creates a new `GRPCAsyncResponseStreamWriter` backed by a ``ResponseStream``.
/// This is mostly useful for testing purposes where one wants to observe the written responses.
///
/// - Note: For most tests it is useful to call ``ResponseStream/finish()`` after the async method under testing
/// resumed. This allows you to easily collect all written responses.
@inlinable
public static func makeTestingResponseStreamWriter() -> TestingStreamWriter {
var continuation: AsyncStream<(Response, Compression)>.Continuation!
let asyncStream = AsyncStream<(Response, Compression)> { cont in
continuation = cont
}
let writer = Self.init { [continuation] in
continuation!.yield($0)
}
let responseStream = ResponseStream(
stream: asyncStream,
continuation: continuation
)

return TestingStreamWriter(writer: writer, stream: responseStream)
}
}

Expand Down
3 changes: 1 addition & 2 deletions Sources/GRPC/Interceptor/ClientTransportFactory.swift
Original file line number Diff line number Diff line change
Expand Up @@ -304,8 +304,7 @@ internal struct FakeClientTransportFactory<Request, Response> {
) where RequestSerializer.Input == Request,
RequestDeserializer.Output == Request,
ResponseSerializer.Input == Response,
ResponseDeserializer.Output == Response
{
ResponseDeserializer.Output == Response {
self.fakeResponseStream = fakeResponseStream
self.requestSerializer = AnySerializer(wrapping: requestSerializer)
self.responseDeserializer = AnyDeserializer(wrapping: responseDeserializer)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2022, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#if compiler(>=5.6)

import GRPC
import XCTest

@available(macOS 12, iOS 13, tvOS 13, watchOS 6, *)
final class GRPCAsyncRequestStreamTests: XCTestCase {
func testRecorder() async throws {
let testingStream = GRPCAsyncRequestStream<Int>.makeTestingRequestStream()

testingStream.source.yield(1)
testingStream.source.finish(throwing: nil)

let results = try await testingStream.stream.collect()

XCTAssertEqual(results, [1])
}
}
#endif
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2022, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#if compiler(>=5.6)

import GRPC
import XCTest

@available(macOS 12, iOS 13, tvOS 13, watchOS 6, *)
final class GRPCAsyncResponseStreamWriterTests: XCTestCase {
func testRecorder() async throws {
let responseStreamWriter = GRPCAsyncResponseStreamWriter<Int>.makeTestingResponseStreamWriter()

try await responseStreamWriter.writer.send(1, compression: .disabled)
responseStreamWriter.stream.finish()

let results = try await responseStreamWriter.stream.collect()
XCTAssertEqual(results[0].0, 1)
XCTAssertEqual(results[0].1, .disabled)
}
}
#endif

0 comments on commit 6ad7c0b

Please sign in to comment.