diff --git a/qa/L0_backend_python/decoupled/decoupled_test.py b/qa/L0_backend_python/decoupled/decoupled_test.py index 21fa8b757e..f0ca870664 100755 --- a/qa/L0_backend_python/decoupled/decoupled_test.py +++ b/qa/L0_backend_python/decoupled/decoupled_test.py @@ -256,6 +256,39 @@ def test_decoupled_send_after_close_error(self): "The completed request size must be zero.", ) + def test_decoupled_execute_cancel(self): + model_name = "execute_cancel" + log_path = "decoupled_server.log" + execute_delay = 4.0 # seconds + shape = [1, 1] + + user_data = UserData() + with grpcclient.InferenceServerClient("localhost:8001") as client: + client.start_stream(callback=partial(callback, user_data)) + input_data = np.array([[execute_delay]], dtype=np.float32) + inputs = [ + grpcclient.InferInput( + "EXECUTE_DELAY", shape, np_to_triton_dtype(input_data.dtype) + ) + ] + inputs[0].set_data_from_numpy(input_data) + client.async_stream_infer(model_name, inputs) + time.sleep(2) # model delay for decoupling request and response sender + time.sleep(2) # ensure the request is executing + client.stop_stream(cancel_requests=True) + time.sleep(2) # ensure the cancellation is delivered + + self.assertFalse(user_data._completed_requests.empty()) + while not user_data._completed_requests.empty(): + data_item = user_data._completed_requests.get() + self.assertIsInstance(data_item, InferenceServerException) + self.assertEqual(data_item.status(), "StatusCode.CANCELLED") + + with open(log_path, mode="r", encoding="utf-8", errors="strict") as f: + log_text = f.read() + self.assertIn("[execute_cancel] Request not cancelled at 1.0 s", log_text) + self.assertIn("[execute_cancel] Request cancelled at ", log_text) + if __name__ == "__main__": unittest.main() diff --git a/qa/L0_backend_python/decoupled/test.sh b/qa/L0_backend_python/decoupled/test.sh index b4fa4ffe75..07c8f5b4ee 100755 --- a/qa/L0_backend_python/decoupled/test.sh +++ b/qa/L0_backend_python/decoupled/test.sh @@ -27,7 +27,7 @@ CLIENT_PY=./decoupled_test.py CLIENT_LOG="./decoupled_client.log" -EXPECTED_NUM_TESTS="5" +EXPECTED_NUM_TESTS="6" TEST_RESULT_FILE='test_results.txt' TRITON_DIR=${TRITON_DIR:="/opt/tritonserver"} SERVER=${TRITON_DIR}/bin/tritonserver @@ -50,6 +50,11 @@ mkdir -p models/dlpack_add_sub/1/ cp ../../python_models/dlpack_add_sub/model.py models/dlpack_add_sub/1/ cp ../../python_models/dlpack_add_sub/config.pbtxt models/dlpack_add_sub/ +mkdir -p models/execute_cancel/1/ +cp ../../python_models/execute_cancel/model.py ./models/execute_cancel/1/ +cp ../../python_models/execute_cancel/config.pbtxt ./models/execute_cancel/ +echo "model_transaction_policy { decoupled: True }" >> ./models/execute_cancel/config.pbtxt + git clone https://github.com/triton-inference-server/python_backend -b $PYTHON_BACKEND_REPO_TAG mkdir -p models/square_int32/1/ cp python_backend/examples/decoupled/square_model.py models/square_int32/1/model.py diff --git a/qa/L0_backend_python/lifecycle/lifecycle_test.py b/qa/L0_backend_python/lifecycle/lifecycle_test.py index 9c3bf7efa9..82856bbd32 100755 --- a/qa/L0_backend_python/lifecycle/lifecycle_test.py +++ b/qa/L0_backend_python/lifecycle/lifecycle_test.py @@ -31,6 +31,7 @@ sys.path.append("../../common") import queue +import time import unittest from functools import partial @@ -70,6 +71,7 @@ def test_error_code(self): ("UNAVAILABLE", "[StatusCode.UNAVAILABLE]"), ("UNSUPPORTED", "[StatusCode.UNIMPLEMENTED]"), ("ALREADY_EXISTS", "[StatusCode.ALREADY_EXISTS]"), + ("CANCELLED", "[StatusCode.CANCELLED]"), ("(default)", "[StatusCode.INTERNAL] unrecognized"), ] with self._shm_leak_detector.Probe() as shm_probe: @@ -91,6 +93,42 @@ def test_error_code(self): expected_grpc_error_start + " error code: " + error, ) + def test_execute_cancel(self): + model_name = "execute_cancel" + log_path = "lifecycle_server.log" + execute_delay = 4.0 # seconds + shape = [1, 1] + response = {"responded": False, "result": None, "error": None} + + def callback(result, error): + response["responded"] = True + response["result"] = result + response["error"] = error + + with self._shm_leak_detector.Probe() as shm_probe: + with grpcclient.InferenceServerClient("localhost:8001") as client: + input_data = np.array([[execute_delay]], dtype=np.float32) + inputs = [ + grpcclient.InferInput( + "EXECUTE_DELAY", shape, np_to_triton_dtype(input_data.dtype) + ) + ] + inputs[0].set_data_from_numpy(input_data) + exec_future = client.async_infer(model_name, inputs, callback) + time.sleep(2) # ensure the request is executing + self.assertFalse(response["responded"]) + exec_future.cancel() + time.sleep(2) # ensure the cancellation is delivered + self.assertTrue(response["responded"]) + + self.assertEqual(response["result"], None) + self.assertIsInstance(response["error"], InferenceServerException) + self.assertEqual(response["error"].status(), "StatusCode.CANCELLED") + with open(log_path, mode="r", encoding="utf-8", errors="strict") as f: + log_text = f.read() + self.assertIn("[execute_cancel] Request not cancelled at 1.0 s", log_text) + self.assertIn("[execute_cancel] Request cancelled at ", log_text) + def test_batch_error(self): # The execute_error model returns an error for the first and third # request and successfully processes the second request. This is making diff --git a/qa/L0_backend_python/lifecycle/test.sh b/qa/L0_backend_python/lifecycle/test.sh index 2abf107813..eb7f868940 100755 --- a/qa/L0_backend_python/lifecycle/test.sh +++ b/qa/L0_backend_python/lifecycle/test.sh @@ -26,7 +26,7 @@ # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. CLIENT_LOG="./lifecycle_client.log" -EXPECTED_NUM_TESTS="4" +EXPECTED_NUM_TESTS="5" TEST_RESULT_FILE='test_results.txt' source ../common.sh source ../../common/util.sh @@ -44,6 +44,10 @@ mkdir -p models/error_code/1/ cp ../../python_models/error_code/model.py ./models/error_code/1/ cp ../../python_models/error_code/config.pbtxt ./models/error_code/ +mkdir -p models/execute_cancel/1/ +cp ../../python_models/execute_cancel/model.py ./models/execute_cancel/1/ +cp ../../python_models/execute_cancel/config.pbtxt ./models/execute_cancel/ + mkdir -p models/execute_error/1/ cp ../../python_models/execute_error/model.py ./models/execute_error/1/ cp ../../python_models/execute_error/config.pbtxt ./models/execute_error/ diff --git a/qa/python_models/error_code/model.py b/qa/python_models/error_code/model.py index 350457ca79..078a4afb73 100644 --- a/qa/python_models/error_code/model.py +++ b/qa/python_models/error_code/model.py @@ -37,6 +37,7 @@ def execute(self, requests): "UNAVAILABLE": pb_utils.TritonError.UNAVAILABLE, "UNSUPPORTED": pb_utils.TritonError.UNSUPPORTED, "ALREADY_EXISTS": pb_utils.TritonError.ALREADY_EXISTS, + "CANCELLED": pb_utils.TritonError.CANCELLED, } responses = [] diff --git a/qa/python_models/execute_cancel/config.pbtxt b/qa/python_models/execute_cancel/config.pbtxt new file mode 100644 index 0000000000..df509863ad --- /dev/null +++ b/qa/python_models/execute_cancel/config.pbtxt @@ -0,0 +1,47 @@ +# Copyright 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# * Neither the name of NVIDIA CORPORATION nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY +# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY +# OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +name: "execute_cancel" +backend: "python" +max_batch_size: 1 + +input [ + { + name: "EXECUTE_DELAY" + data_type: TYPE_FP32 + dims: [ 1 ] + } +] + +output [ + { + name: "DUMMY_OUT" + data_type: TYPE_FP32 + dims: [ 1 ] + } +] + +instance_group [{ kind: KIND_CPU }] diff --git a/qa/python_models/execute_cancel/model.py b/qa/python_models/execute_cancel/model.py new file mode 100644 index 0000000000..ec7b96ec1a --- /dev/null +++ b/qa/python_models/execute_cancel/model.py @@ -0,0 +1,108 @@ +# Copyright 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# * Neither the name of NVIDIA CORPORATION nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY +# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY +# OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import json +import threading +import time + +import triton_python_backend_utils as pb_utils + + +class TritonPythonModel: + def initialize(self, args): + self._logger = pb_utils.Logger + self._model_config = json.loads(args["model_config"]) + self._using_decoupled = pb_utils.using_decoupled_model_transaction_policy( + self._model_config + ) + + def execute(self, requests): + processed_requests = [] + for request in requests: + delay_tensor = pb_utils.get_input_tensor_by_name( + request, "EXECUTE_DELAY" + ).as_numpy() + delay = delay_tensor[0][0] # seconds + if self._using_decoupled: + processed_requests.append( + {"response_sender": request.get_response_sender(), "delay": delay} + ) + else: + processed_requests.append({"request": request, "delay": delay}) + if self._using_decoupled: + return self._execute_decoupled(processed_requests) + return self._execute_processed_requests(processed_requests) + + def _execute_processed_requests(self, processed_requests): + responses = [] + for processed_request in processed_requests: + error = pb_utils.TritonError(message="not cancelled") + object_to_check_cancelled = None + if "response_sender" in processed_request: + object_to_check_cancelled = processed_request["response_sender"] + elif "request" in processed_request: + object_to_check_cancelled = processed_request["request"] + delay = processed_request["delay"] # seconds + time_elapsed = 0.0 # seconds + while time_elapsed < delay: + time.sleep(1) + time_elapsed += 1.0 + if object_to_check_cancelled.is_cancelled(): + self._logger.log_info( + "[execute_cancel] Request cancelled at " + + str(time_elapsed) + + " s" + ) + error = pb_utils.TritonError( + message="cancelled", code=pb_utils.TritonError.CANCELLED + ) + break + self._logger.log_info( + "[execute_cancel] Request not cancelled at " + + str(time_elapsed) + + " s" + ) + responses.append(pb_utils.InferenceResponse(error=error)) + return responses + + def _execute_decoupled(self, processed_requests): + def response_thread(execute_processed_requests, processed_requests): + time.sleep(2) # execute after requests are released + responses = execute_processed_requests(processed_requests) + for i in range(len(responses)): # len(responses) == len(processed_requests) + response_sender = processed_requests[i]["response_sender"] + response_sender.send(responses[i]) + response_sender.send( + flags=pb_utils.TRITONSERVER_RESPONSE_COMPLETE_FINAL + ) + + thread = threading.Thread( + target=response_thread, + args=(self._execute_processed_requests, processed_requests), + ) + thread.daemon = True + thread.start() + return None