From 6c7280541a58148a0159f50c061a2351671051c9 Mon Sep 17 00:00:00 2001 From: tanmayv25 Date: Wed, 6 Sep 2023 19:52:55 -0700 Subject: [PATCH 1/7] Supporting client-side gRPC cancellation --- .../library/tritonclient/grpc/__init__.py | 2 +- .../library/tritonclient/grpc/_client.py | 61 +++++++++++++++---- .../tritonclient/grpc/_infer_stream.py | 35 ++++++----- .../library/tritonclient/grpc/_utils.py | 4 ++ .../library/tritonclient/grpc/aio/__init__.py | 51 ++++++++++++++-- 5 files changed, 119 insertions(+), 34 deletions(-) diff --git a/src/python/library/tritonclient/grpc/__init__.py b/src/python/library/tritonclient/grpc/__init__.py index 852d5f0d6..260d8147a 100755 --- a/src/python/library/tritonclient/grpc/__init__.py +++ b/src/python/library/tritonclient/grpc/__init__.py @@ -36,7 +36,7 @@ from ._infer_input import InferInput from ._infer_result import InferResult from ._requested_output import InferRequestedOutput - from ._utils import raise_error, raise_error_grpc + from ._utils import CancelledError, raise_error, raise_error_grpc except ModuleNotFoundError as error: raise RuntimeError( "The installation does not include grpc support. " diff --git a/src/python/library/tritonclient/grpc/_client.py b/src/python/library/tritonclient/grpc/_client.py index 1c1a63799..a8751b473 100755 --- a/src/python/library/tritonclient/grpc/_client.py +++ b/src/python/library/tritonclient/grpc/_client.py @@ -39,6 +39,7 @@ from ._infer_result import InferResult from ._infer_stream import _InferStream, _RequestIterator from ._utils import ( + CancelledError, _get_inference_request, _grpc_compression_type, get_error_grpc, @@ -1391,10 +1392,13 @@ def async_infer( callback : function Python function that is invoked once the request is completed. The function must reserve the last two arguments (result, error) - to hold InferResult and InferenceServerException objects - respectively which will be provided to the function when executing - the callback. The ownership of these objects will be given to the - user. The 'error' would be None for a successful inference. + to hold InferResult and InferenceServerException(or CancelledError) + objects respectively which will be provided to the function when + executing the callback. The ownership of these objects will be given + to the user. The 'error' would be None for a successful inference. + Note if the request is cancelled using the returned future object, + error provided to callback will be a CancelledError exception + object. model_version: str The version of the model to run inference. The default value is an empty string which means then the server will choose @@ -1451,6 +1455,26 @@ def async_infer( Optional custom parameters to be included in the inference request. + Returns + ------- + grpc.future + A representation of a computation in another control flow. + Computations represented by a Future may be yet to be begun, + may be ongoing, or may have already completed. + + This object can be used to cancel the inference request like + below: + ---------- + future = async_infer(...) + ret = future.cancel() + ---------- + See here for more details of future object: + https://grpc.github.io/grpc/python/grpc.html#grpc.Future + + The callback will be invoked with + (result=None, error=CancelledError) for the requests that + were successfully cancelled. + Raises ------ InferenceServerException @@ -1466,6 +1490,8 @@ def wrapped_callback(call_future): result = InferResult(response) except grpc.RpcError as rpc_error: error = get_error_grpc(rpc_error) + except grpc.FutureCancelledError: + error = CancelledError() callback(result=result, error=error) metadata = self._get_metadata(headers) @@ -1502,6 +1528,7 @@ def wrapped_callback(call_future): if request_id != "": verbose_message = verbose_message + " '{}'".format(request_id) print(verbose_message) + return self._call_future except grpc.RpcError as rpc_error: raise_error_grpc(rpc_error) @@ -1518,10 +1545,13 @@ def start_stream( Python function that is invoked upon receiving response from the underlying stream. The function must reserve the last two arguments (result, error) to hold InferResult and - InferenceServerException objects respectively which will be - provided to the function when executing the callback. The - ownership of these objects will be given to the user. The - 'error' would be None for a successful inference. + InferenceServerException(or CancelledError) objects respectively + which will be provided to the function when executing the callback. + The ownership of these objects will be given to the user. The 'error' + would be None for a successful inference. + Note if the stream is closed with cancel_requests set True, then + the error provided to callback will be a CancelledError object. + stream_timeout : float Optional stream timeout (in seconds). The stream will be closed once the specified timeout expires. @@ -1561,10 +1591,19 @@ def start_stream( except grpc.RpcError as rpc_error: raise_error_grpc(rpc_error) - def stop_stream(self): - """Stops a stream if one available.""" + def stop_stream(self, cancel_requests=False): + """Stops a stream if one available. + + Parameters + ---------- + cancel_requests : bool + If set True, then client cancels all the pending requests + and closes the stream. If set False, the call blocks till + all the pending requests on the stream are processed. + + """ if self._stream is not None: - self._stream.close() + self._stream.close(cancel_requests) self._stream = None def async_stream_infer( diff --git a/src/python/library/tritonclient/grpc/_infer_stream.py b/src/python/library/tritonclient/grpc/_infer_stream.py index fc9924067..692ff6450 100755 --- a/src/python/library/tritonclient/grpc/_infer_stream.py +++ b/src/python/library/tritonclient/grpc/_infer_stream.py @@ -33,7 +33,7 @@ from tritonclient.utils import * from ._infer_result import InferResult -from ._utils import get_error_grpc, raise_error +from ._utils import CancelledError, get_error_grpc, raise_error class _InferStream: @@ -57,18 +57,25 @@ def __init__(self, callback, verbose): self._verbose = verbose self._request_queue = queue.Queue() self._handler = None + self._cancelled = False self._active = True + self._response_iterator = None def __del__(self): self.close() - def close(self): + def close(self, cancel_requests=False): """Gracefully close underlying gRPC streams. Note that this call blocks till response of all currently enqueued requests are not received. """ + if cancel_requests and self._response_iterator: + self._response_iterator.cancel() + self._cancelled = True + if self._handler is not None: - self._request_queue.put(None) + if not self._cancelled: + self._request_queue.put(None) if self._handler.is_alive(): self._handler.join() if self._verbose: @@ -85,12 +92,11 @@ def _init_handler(self, response_iterator): The iterator over the gRPC response stream. """ + self._response_iterator = response_iterator if self._handler is not None: raise_error("Attempted to initialize already initialized InferStream") # Create a new thread to handle the gRPC response stream - self._handler = threading.Thread( - target=self._process_response, args=(response_iterator,) - ) + self._handler = threading.Thread(target=self._process_response) self._handler.start() if self._verbose: print("stream started...") @@ -129,19 +135,13 @@ def _get_request(self): request = self._request_queue.get() return request - def _process_response(self, responses): + def _process_response(self): """Worker thread function to iterate through the response stream and executes the provided callbacks. - Parameters - ---------- - responses : iterator - The iterator to the response from the server for the - requests in the stream. - """ try: - for response in responses: + for response in self._response_iterator: if self._verbose: print(response) result = error = None @@ -155,8 +155,11 @@ def _process_response(self, responses): # can still be used. The stream won't be closed here as the thread # executing this function is managed by stream and may cause # circular wait - self._active = responses.is_active() - error = get_error_grpc(rpc_error) + self._active = self._response_iterator.is_active() + if rpc_error.cancelled: + error = CancelledError() + else: + error = get_error_grpc(rpc_error) self._callback(result=None, error=error) diff --git a/src/python/library/tritonclient/grpc/_utils.py b/src/python/library/tritonclient/grpc/_utils.py index 4496a1981..2f75323b9 100755 --- a/src/python/library/tritonclient/grpc/_utils.py +++ b/src/python/library/tritonclient/grpc/_utils.py @@ -31,6 +31,10 @@ from tritonclient.utils import * +class CancelledError(Exception): + """Indicates that the issued operation was cancelled.""" + + def get_error_grpc(rpc_error): """Convert a gRPC error to an InferenceServerException. diff --git a/src/python/library/tritonclient/grpc/aio/__init__.py b/src/python/library/tritonclient/grpc/aio/__init__.py index fc5eaccdb..60ad4c127 100755 --- a/src/python/library/tritonclient/grpc/aio/__init__.py +++ b/src/python/library/tritonclient/grpc/aio/__init__.py @@ -586,8 +586,28 @@ async def infer( headers=None, compression_algorithm=None, parameters=None, + get_call_obj=False, ): - """Refer to tritonclient.grpc.InferenceServerClient""" + """Refer to tritonclient.grpc.InferenceServerClient + The additional parameters for this functions are + described below: + + Parameters + ---------- + get_call_obj : bool + If set True, then this function will yield + grpc.aio.call object first bfore the + InferResult. + This object can be used to issue request + cancellation if required. This can be attained + by following: + ------- + call = await client.infer(..., get_call_obj=True) + call.cancel() + ------- + + + """ metadata = self._get_metadata(headers) @@ -609,18 +629,20 @@ async def infer( ) if self._verbose: print("infer, metadata {}\n{}".format(metadata, request)) - try: - response = await self._client_stub.ModelInfer( + call = self._client_stub.ModelInfer( request=request, metadata=metadata, timeout=client_timeout, compression=_grpc_compression_type(compression_algorithm), ) + if get_call_obj: + yield call + response = await call if self._verbose: print(response) result = InferResult(response) - return result + yield result except grpc.RpcError as rpc_error: raise_error_grpc(rpc_error) @@ -630,6 +652,7 @@ async def stream_infer( stream_timeout=None, headers=None, compression_algorithm=None, + get_call_obj=False, ): """Runs an asynchronous inference over gRPC bi-directional streaming API. @@ -650,11 +673,23 @@ async def stream_infer( Optional grpc compression algorithm to be used on client side. Currently supports "deflate", "gzip" and None. By default, no compression is used. + get_call_obj : bool + If set True, then the async_generator will first generate + grpc.aio.call object and then generate rest of the results. + The call object can be used to cancel the execution of the + ongoing stream and exit. This can be done like below: + ------- + async_generator = await client.infer(..., get_call_obj=True) + streaming_call = await response_iterator.__next__() + streaming_call.cancel() + ------- Returns ------- async_generator Yield tuple holding (InferResult, InferenceServerException) objects. + If get_call_obj is set True, then it yields the streaming_call + object before yielding the tuples. Raises ------ @@ -709,13 +744,17 @@ async def _request_iterator(inputs_iterator): ) try: - response_iterator = self._client_stub.ModelStreamInfer( + streaming_call = self._client_stub.ModelStreamInfer( _request_iterator(inputs_iterator), metadata=metadata, timeout=stream_timeout, compression=_grpc_compression_type(compression_algorithm), ) - async for response in response_iterator: + + if get_call_obj: + yield streaming_call + + async for response in streaming_call: if self._verbose: print(response) result = error = None From 8047f9efef4fd2e361310359c55a4b737f639443 Mon Sep 17 00:00:00 2001 From: tanmayv25 Date: Tue, 12 Sep 2023 17:44:42 -0700 Subject: [PATCH 2/7] Address review comments --- .../library/tritonclient/grpc/__init__.py | 2 +- .../library/tritonclient/grpc/_client.py | 18 +++++------------- .../tritonclient/grpc/_infer_stream.py | 12 +++++++----- .../library/tritonclient/grpc/_utils.py | 16 ++++++++++++---- .../library/tritonclient/grpc/aio/__init__.py | 19 +++++++++++++------ 5 files changed, 38 insertions(+), 29 deletions(-) diff --git a/src/python/library/tritonclient/grpc/__init__.py b/src/python/library/tritonclient/grpc/__init__.py index 260d8147a..852d5f0d6 100755 --- a/src/python/library/tritonclient/grpc/__init__.py +++ b/src/python/library/tritonclient/grpc/__init__.py @@ -36,7 +36,7 @@ from ._infer_input import InferInput from ._infer_result import InferResult from ._requested_output import InferRequestedOutput - from ._utils import CancelledError, raise_error, raise_error_grpc + from ._utils import raise_error, raise_error_grpc except ModuleNotFoundError as error: raise RuntimeError( "The installation does not include grpc support. " diff --git a/src/python/library/tritonclient/grpc/_client.py b/src/python/library/tritonclient/grpc/_client.py index a8751b473..5ff501cb0 100755 --- a/src/python/library/tritonclient/grpc/_client.py +++ b/src/python/library/tritonclient/grpc/_client.py @@ -39,9 +39,9 @@ from ._infer_result import InferResult from ._infer_stream import _InferStream, _RequestIterator from ._utils import ( - CancelledError, _get_inference_request, _grpc_compression_type, + get_cancelled_error, get_error_grpc, raise_error, raise_error_grpc, @@ -1392,13 +1392,10 @@ def async_infer( callback : function Python function that is invoked once the request is completed. The function must reserve the last two arguments (result, error) - to hold InferResult and InferenceServerException(or CancelledError) + to hold InferResult and InferenceServerException objects respectively which will be provided to the function when executing the callback. The ownership of these objects will be given to the user. The 'error' would be None for a successful inference. - Note if the request is cancelled using the returned future object, - error provided to callback will be a CancelledError exception - object. model_version: str The version of the model to run inference. The default value is an empty string which means then the server will choose @@ -1471,9 +1468,6 @@ def async_infer( See here for more details of future object: https://grpc.github.io/grpc/python/grpc.html#grpc.Future - The callback will be invoked with - (result=None, error=CancelledError) for the requests that - were successfully cancelled. Raises ------ @@ -1490,8 +1484,8 @@ def wrapped_callback(call_future): result = InferResult(response) except grpc.RpcError as rpc_error: error = get_error_grpc(rpc_error) - except grpc.FutureCancelledError: - error = CancelledError() + except grpc.FutureCancelledError as err: + error = get_cancelled_error() callback(result=result, error=error) metadata = self._get_metadata(headers) @@ -1545,12 +1539,10 @@ def start_stream( Python function that is invoked upon receiving response from the underlying stream. The function must reserve the last two arguments (result, error) to hold InferResult and - InferenceServerException(or CancelledError) objects respectively + InferenceServerException objects respectively which will be provided to the function when executing the callback. The ownership of these objects will be given to the user. The 'error' would be None for a successful inference. - Note if the stream is closed with cancel_requests set True, then - the error provided to callback will be a CancelledError object. stream_timeout : float Optional stream timeout (in seconds). The stream will be closed diff --git a/src/python/library/tritonclient/grpc/_infer_stream.py b/src/python/library/tritonclient/grpc/_infer_stream.py index 692ff6450..22910c2e8 100755 --- a/src/python/library/tritonclient/grpc/_infer_stream.py +++ b/src/python/library/tritonclient/grpc/_infer_stream.py @@ -33,7 +33,7 @@ from tritonclient.utils import * from ._infer_result import InferResult -from ._utils import CancelledError, get_error_grpc, raise_error +from ._utils import get_cancelled_error, get_error_grpc, raise_error class _InferStream: @@ -65,9 +65,11 @@ def __del__(self): self.close() def close(self, cancel_requests=False): - """Gracefully close underlying gRPC streams. Note that this call - blocks till response of all currently enqueued requests are not - received. + """Gracefully close underlying gRPC streams. + If cancel_requests is set True, then client cancels all the + pending requests and closes the stream. If set False, the + call blocks till all the pending requests on the stream are + processed. """ if cancel_requests and self._response_iterator: self._response_iterator.cancel() @@ -157,7 +159,7 @@ def _process_response(self): # circular wait self._active = self._response_iterator.is_active() if rpc_error.cancelled: - error = CancelledError() + error = get_cancelled_error() else: error = get_error_grpc(rpc_error) self._callback(result=None, error=error) diff --git a/src/python/library/tritonclient/grpc/_utils.py b/src/python/library/tritonclient/grpc/_utils.py index 2f75323b9..c1d22af69 100755 --- a/src/python/library/tritonclient/grpc/_utils.py +++ b/src/python/library/tritonclient/grpc/_utils.py @@ -31,10 +31,6 @@ from tritonclient.utils import * -class CancelledError(Exception): - """Indicates that the issued operation was cancelled.""" - - def get_error_grpc(rpc_error): """Convert a gRPC error to an InferenceServerException. @@ -54,6 +50,18 @@ def get_error_grpc(rpc_error): ) +def get_cancelled_error(): + """Get InferenceServerException object for a locally cancelled RPC. + + Returns + ------- + InferenceServerException + """ + return InferenceServerException( + msg="Locally cancelled by application!", status="StatusCode.CANCELLED" + ) + + def raise_error_grpc(rpc_error): """Raise an InferenceServerException from a gRPC error. diff --git a/src/python/library/tritonclient/grpc/aio/__init__.py b/src/python/library/tritonclient/grpc/aio/__init__.py index 60ad4c127..0c19eadff 100755 --- a/src/python/library/tritonclient/grpc/aio/__init__.py +++ b/src/python/library/tritonclient/grpc/aio/__init__.py @@ -602,10 +602,17 @@ async def infer( cancellation if required. This can be attained by following: ------- - call = await client.infer(..., get_call_obj=True) - call.cancel() + generator = client.infer(..., get_call_obj=True) + grpc_call = await anext(generator) + grpc_call.cancel() ------- + Returns + ------- + async_generator + If get_call_obj is set True, then it generates the + streaming_call object before generating the inference + results. """ @@ -679,8 +686,8 @@ async def stream_infer( The call object can be used to cancel the execution of the ongoing stream and exit. This can be done like below: ------- - async_generator = await client.infer(..., get_call_obj=True) - streaming_call = await response_iterator.__next__() + async_generator = client.stream_infer(..., get_call_obj=True) + streaming_call = await anext(response_iterator) streaming_call.cancel() ------- @@ -688,8 +695,8 @@ async def stream_infer( ------- async_generator Yield tuple holding (InferResult, InferenceServerException) objects. - If get_call_obj is set True, then it yields the streaming_call - object before yielding the tuples. + If get_call_obj is set True, then it first generates streaming_call + object associated with the call before generating these tuples. Raises ------ From dcc9e2730ec906f45f14d545b1e3a6daa653892a Mon Sep 17 00:00:00 2001 From: tanmayv25 Date: Wed, 13 Sep 2023 16:21:01 -0700 Subject: [PATCH 3/7] Wrap future to expose only cancel --- .../library/tritonclient/grpc/_client.py | 26 +++++++++++++++---- .../tritonclient/grpc/_infer_stream.py | 2 +- 2 files changed, 22 insertions(+), 6 deletions(-) diff --git a/src/python/library/tritonclient/grpc/_client.py b/src/python/library/tritonclient/grpc/_client.py index 5ff501cb0..fa7547a1e 100755 --- a/src/python/library/tritonclient/grpc/_client.py +++ b/src/python/library/tritonclient/grpc/_client.py @@ -98,6 +98,24 @@ def __init__( self.http2_max_pings_without_data = http2_max_pings_without_data +class CallContext(): + """ This is a wrapper over grpc future call which can be used to + issue cancellation on an ongoing RPC call. + + Parameters + ---------- + grpc_future : gRPC.Future + The future tracking gRPC call. + """ + def __init__(self, grpc_future): + self._grpc_future = grpc_future + + def cancel(self): + """Issues cancellation on the underlying request. + """ + self._grpc_future.cancel() + + class InferenceServerClient(InferenceServerClientBase): """An InferenceServerClient object is used to perform any kind of communication with the InferenceServer using gRPC protocol. Most @@ -1454,7 +1472,7 @@ def async_infer( Returns ------- - grpc.future + CallContext A representation of a computation in another control flow. Computations represented by a Future may be yet to be begun, may be ongoing, or may have already completed. @@ -1465,8 +1483,6 @@ def async_infer( future = async_infer(...) ret = future.cancel() ---------- - See here for more details of future object: - https://grpc.github.io/grpc/python/grpc.html#grpc.Future Raises @@ -1516,13 +1532,13 @@ def wrapped_callback(call_future): timeout=client_timeout, compression=_grpc_compression_type(compression_algorithm), ) - self._call_future.add_done_callback(wrapped_callback) if self._verbose: verbose_message = "Sent request" if request_id != "": verbose_message = verbose_message + " '{}'".format(request_id) print(verbose_message) - return self._call_future + self._call_future.add_done_callback(wrapped_callback) + return CallContext(self._call_future) except grpc.RpcError as rpc_error: raise_error_grpc(rpc_error) diff --git a/src/python/library/tritonclient/grpc/_infer_stream.py b/src/python/library/tritonclient/grpc/_infer_stream.py index 22910c2e8..f11d8e5ff 100755 --- a/src/python/library/tritonclient/grpc/_infer_stream.py +++ b/src/python/library/tritonclient/grpc/_infer_stream.py @@ -62,7 +62,7 @@ def __init__(self, callback, verbose): self._response_iterator = None def __del__(self): - self.close() + self.close(cancel_requests=True) def close(self, cancel_requests=False): """Gracefully close underlying gRPC streams. From 4b3774a235b730e1499d4aa90c005aa998c32273 Mon Sep 17 00:00:00 2001 From: Tanmay Verma Date: Wed, 13 Sep 2023 12:47:32 -0700 Subject: [PATCH 4/7] Update src/python/library/tritonclient/grpc/_client.py Co-authored-by: Iman Tabrizian --- src/python/library/tritonclient/grpc/_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/python/library/tritonclient/grpc/_client.py b/src/python/library/tritonclient/grpc/_client.py index fa7547a1e..2b87bd2ff 100755 --- a/src/python/library/tritonclient/grpc/_client.py +++ b/src/python/library/tritonclient/grpc/_client.py @@ -1475,7 +1475,7 @@ def async_infer( CallContext A representation of a computation in another control flow. Computations represented by a Future may be yet to be begun, - may be ongoing, or may have already completed. + ongoing, or have already completed. This object can be used to cancel the inference request like below: From e568b72a5e0ee6495562527c1769145336466645 Mon Sep 17 00:00:00 2001 From: tanmayv25 Date: Mon, 18 Sep 2023 15:59:52 -0700 Subject: [PATCH 5/7] Reverting changes in asyncio library --- .../library/tritonclient/grpc/aio/__init__.py | 58 ++----------------- 1 file changed, 6 insertions(+), 52 deletions(-) diff --git a/src/python/library/tritonclient/grpc/aio/__init__.py b/src/python/library/tritonclient/grpc/aio/__init__.py index 0c19eadff..fc5eaccdb 100755 --- a/src/python/library/tritonclient/grpc/aio/__init__.py +++ b/src/python/library/tritonclient/grpc/aio/__init__.py @@ -586,35 +586,8 @@ async def infer( headers=None, compression_algorithm=None, parameters=None, - get_call_obj=False, ): - """Refer to tritonclient.grpc.InferenceServerClient - The additional parameters for this functions are - described below: - - Parameters - ---------- - get_call_obj : bool - If set True, then this function will yield - grpc.aio.call object first bfore the - InferResult. - This object can be used to issue request - cancellation if required. This can be attained - by following: - ------- - generator = client.infer(..., get_call_obj=True) - grpc_call = await anext(generator) - grpc_call.cancel() - ------- - - Returns - ------- - async_generator - If get_call_obj is set True, then it generates the - streaming_call object before generating the inference - results. - - """ + """Refer to tritonclient.grpc.InferenceServerClient""" metadata = self._get_metadata(headers) @@ -636,20 +609,18 @@ async def infer( ) if self._verbose: print("infer, metadata {}\n{}".format(metadata, request)) + try: - call = self._client_stub.ModelInfer( + response = await self._client_stub.ModelInfer( request=request, metadata=metadata, timeout=client_timeout, compression=_grpc_compression_type(compression_algorithm), ) - if get_call_obj: - yield call - response = await call if self._verbose: print(response) result = InferResult(response) - yield result + return result except grpc.RpcError as rpc_error: raise_error_grpc(rpc_error) @@ -659,7 +630,6 @@ async def stream_infer( stream_timeout=None, headers=None, compression_algorithm=None, - get_call_obj=False, ): """Runs an asynchronous inference over gRPC bi-directional streaming API. @@ -680,23 +650,11 @@ async def stream_infer( Optional grpc compression algorithm to be used on client side. Currently supports "deflate", "gzip" and None. By default, no compression is used. - get_call_obj : bool - If set True, then the async_generator will first generate - grpc.aio.call object and then generate rest of the results. - The call object can be used to cancel the execution of the - ongoing stream and exit. This can be done like below: - ------- - async_generator = client.stream_infer(..., get_call_obj=True) - streaming_call = await anext(response_iterator) - streaming_call.cancel() - ------- Returns ------- async_generator Yield tuple holding (InferResult, InferenceServerException) objects. - If get_call_obj is set True, then it first generates streaming_call - object associated with the call before generating these tuples. Raises ------ @@ -751,17 +709,13 @@ async def _request_iterator(inputs_iterator): ) try: - streaming_call = self._client_stub.ModelStreamInfer( + response_iterator = self._client_stub.ModelStreamInfer( _request_iterator(inputs_iterator), metadata=metadata, timeout=stream_timeout, compression=_grpc_compression_type(compression_algorithm), ) - - if get_call_obj: - yield streaming_call - - async for response in streaming_call: + async for response in response_iterator: if self._verbose: print(response) result = error = None From 6a18b6cded3052c4e6b55508aeb87dcc4951bf52 Mon Sep 17 00:00:00 2001 From: tanmayv25 Date: Mon, 18 Sep 2023 16:20:51 -0700 Subject: [PATCH 6/7] Format --- src/python/library/tritonclient/grpc/_client.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/python/library/tritonclient/grpc/_client.py b/src/python/library/tritonclient/grpc/_client.py index 2b87bd2ff..0a134ebc9 100755 --- a/src/python/library/tritonclient/grpc/_client.py +++ b/src/python/library/tritonclient/grpc/_client.py @@ -98,8 +98,8 @@ def __init__( self.http2_max_pings_without_data = http2_max_pings_without_data -class CallContext(): - """ This is a wrapper over grpc future call which can be used to +class CallContext: + """This is a wrapper over grpc future call which can be used to issue cancellation on an ongoing RPC call. Parameters @@ -107,12 +107,12 @@ class CallContext(): grpc_future : gRPC.Future The future tracking gRPC call. """ + def __init__(self, grpc_future): self._grpc_future = grpc_future def cancel(self): - """Issues cancellation on the underlying request. - """ + """Issues cancellation on the underlying request.""" self._grpc_future.cancel() From fbd5035b00e59dff77a191ac0168a8608b5e0afc Mon Sep 17 00:00:00 2001 From: tanmayv25 Date: Mon, 18 Sep 2023 17:14:32 -0700 Subject: [PATCH 7/7] Make class variable real private --- src/python/library/tritonclient/grpc/_client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/python/library/tritonclient/grpc/_client.py b/src/python/library/tritonclient/grpc/_client.py index 0a134ebc9..c4f56521f 100755 --- a/src/python/library/tritonclient/grpc/_client.py +++ b/src/python/library/tritonclient/grpc/_client.py @@ -109,11 +109,11 @@ class CallContext: """ def __init__(self, grpc_future): - self._grpc_future = grpc_future + self.__grpc_future = grpc_future def cancel(self): """Issues cancellation on the underlying request.""" - self._grpc_future.cancel() + self.__grpc_future.cancel() class InferenceServerClient(InferenceServerClientBase):