Skip to content

Commit

Permalink
Add FakeStreamUnaryCall to ease mocking
Browse files Browse the repository at this point in the history
  • Loading branch information
lidizheng committed Apr 22, 2020
1 parent ab99a45 commit 96f8b17
Showing 1 changed file with 21 additions and 0 deletions.
21 changes: 21 additions & 0 deletions google/api_core/grpc_helpers_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,3 +250,24 @@ def __init__(self, response=object()):
def __await__(self):
response = yield from self._future.__await__()
return response


class FakeStreamUnaryCall(_WrappedStreamUnaryCall):
"""Fake implementation for stream-unary RPCs.
It is a dummy object for response message. Supply the intended response
upon the initialization, and the coroutine will return the exact response
message.
"""

def __init__(self, response=object()):
self.response = response
self._future = asyncio.get_event_loop().create_future()
self._future.set_result(self.response)

def __await__(self):
response = yield from self._future.__await__()
return response

async def wait_for_connection(self):
pass

0 comments on commit 96f8b17

Please sign in to comment.