Skip to content

Commit

Permalink
fix: gRPC segfault due to Low Request Cancellation Timeout (#7840)
Browse files Browse the repository at this point in the history
  • Loading branch information
yinggeh committed Dec 13, 2024
1 parent 59c1842 commit 3032c43
Show file tree
Hide file tree
Showing 7 changed files with 202 additions and 67 deletions.
1 change: 0 additions & 1 deletion qa/L0_grpc_state_cleanup/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ function check_state_release() {
num_state_new=`cat $log_file | grep "StateNew" | wc -l`

if [ $num_state_release -ne $num_state_new ]; then
cat $log_file
echo -e "\n***\n*** Test Failed: Mismatch detected, $num_state_new state(s) created, $num_state_release state(s) released. \n***" >> $log_file
return 1
fi
Expand Down
62 changes: 48 additions & 14 deletions qa/L0_request_cancellation/grpc_cancellation_test.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python3

# Copyright 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# Copyright 2023-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
Expand All @@ -27,6 +27,7 @@
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import asyncio
import os
import queue
import re
import time
Expand Down Expand Up @@ -169,28 +170,61 @@ def test_grpc_async_infer_cancellation_at_step_start(self):
with open(server_log_name, "r") as f:
server_log = f.read()

cancel_at_start_count = len(
re.findall(
r"Cancellation notification received for ModelInferHandler, rpc_ok=1, context \d+, \d+ step START",
server_log,
)
)
cur_new_req_handl_count = len(
re.findall("New request handler for ModelInferHandler", server_log)
)
self.assertEqual(
cancel_at_start_count,
2,
"Expected 2 cancellation at step START log entries, but got {}".format(
cancel_at_start_count
),
)
self.assertGreater(
cur_new_req_handl_count,
prev_new_req_handl_count,
"gRPC Cancellation on step START Test Failed: New request handler for ModelInferHandler was not created",
)

def test_grpc_async_infer_response_complete_during_cancellation(self):
# long test
self.test_duration_delta = 2
delay_notification_sec = (
int(os.getenv("TRITONSERVER_DELAY_GRPC_NOTIFICATION")) / 1000
)
delay_queue_cancellation_sec = (
int(os.getenv("TRITONSERVER_DELAY_GRPC_ENQUEUE")) / 1000
)
future = self._client.async_infer(
model_name=self._model_name,
inputs=self._inputs,
callback=self._callback,
outputs=self._outputs,
)
# ensure cancellation is received before InferResponseComplete and is processed after InferResponseComplete
time.sleep(self._model_delay - 2)
future.cancel()
time.sleep(
delay_notification_sec + delay_queue_cancellation_sec
) # ensure the cancellation is processed
self._assert_callback_cancelled()

def test_grpc_async_infer_cancellation_during_response_complete(self):
# long test
self.test_duration_delta = 2.5
delay_notification_sec = (
int(os.getenv("TRITONSERVER_DELAY_GRPC_NOTIFICATION")) / 1000
)
delay_response_completion_sec = (
int(os.getenv("TRITONSERVER_DELAY_RESPONSE_COMPLETION")) / 1000
)
future = self._client.async_infer(
model_name=self._model_name,
inputs=self._inputs,
callback=self._callback,
outputs=self._outputs,
)
# ensure the cancellation is received between InferResponseComplete checking cancellation and Finish
time.sleep(self._model_delay + 2)
future.cancel()
time.sleep(
delay_notification_sec + delay_response_completion_sec
) # ensure the cancellation is processed
self._assert_callback_cancelled()


if __name__ == "__main__":
unittest.main()
32 changes: 27 additions & 5 deletions qa/L0_request_cancellation/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ mkdir -p models/model/1 && (cd models/model && \
echo 'instance_group [{ kind: KIND_CPU }]' >> config.pbtxt)

SERVER_LOG=server.log
LD_LIBRARY_PATH=/opt/tritonserver/lib:$LD_LIBRARY_PATH ./request_cancellation_test > $SERVER_LOG
LD_LIBRARY_PATH=/opt/tritonserver/lib:$LD_LIBRARY_PATH ./request_cancellation_test > $SERVER_LOG 2>&1
if [ $? -ne 0 ]; then
echo -e "\n***\n*** Unit Tests Failed\n***"
cat $SERVER_LOG
Expand All @@ -78,12 +78,23 @@ mkdir -p models/custom_identity_int32/1 && (cd models/custom_identity_int32 && \
echo 'instance_group [{ kind: KIND_CPU }]' >> config.pbtxt && \
echo -e 'parameters [{ key: "execute_delay_ms" \n value: { string_value: "10000" } }]' >> config.pbtxt)

for TEST_CASE in "test_grpc_async_infer" "test_grpc_stream_infer" "test_aio_grpc_async_infer" "test_aio_grpc_stream_infer" "test_grpc_async_infer_cancellation_at_step_start"; do

for TEST_CASE in "test_grpc_async_infer" \
"test_grpc_stream_infer" \
"test_aio_grpc_async_infer" \
"test_aio_grpc_stream_infer" \
"test_grpc_async_infer_cancellation_at_step_start" \
"test_grpc_async_infer_response_complete_during_cancellation" \
"test_grpc_async_infer_cancellation_during_response_complete"; do
TEST_LOG="./grpc_cancellation_test.$TEST_CASE.log"
SERVER_LOG="grpc_cancellation_test.$TEST_CASE.server.log"
if [ "$TEST_CASE" == "test_grpc_async_infer_cancellation_at_step_start" ]; then
export TRITONSERVER_DELAY_GRPC_PROCESS=5000
elif [ "$TEST_CASE" == "test_grpc_async_infer_response_complete_during_cancellation" ]; then
export TRITONSERVER_DELAY_GRPC_NOTIFICATION=5000
export TRITONSERVER_DELAY_GRPC_ENQUEUE=5000
elif [ "$TEST_CASE" == "test_grpc_async_infer_cancellation_during_response_complete" ]; then
export TRITONSERVER_DELAY_GRPC_NOTIFICATION=5000
export TRITONSERVER_DELAY_RESPONSE_COMPLETION=5000
fi

SERVER_ARGS="--model-repository=`pwd`/models --log-verbose=1"
Expand All @@ -101,11 +112,16 @@ for TEST_CASE in "test_grpc_async_infer" "test_grpc_stream_infer" "test_aio_grpc
cat $TEST_LOG
RET=1
fi
grep "Cancellation notification received for" $SERVER_LOG
if [ $? -ne 0 ]; then

count=$(grep -o "Cancellation notification received for" $SERVER_LOG | wc -l)
if [ $count == 0 ]; then
echo -e "\n***\n*** Cancellation not received by server on $TEST_CASE\n***"
cat $SERVER_LOG
RET=1
elif [ $count -ne 1 ]; then
echo -e "\n***\n*** Unexpected cancellation received by server on $TEST_CASE. Expected 1 but received $count.\n***"
cat $SERVER_LOG
RET=1
fi
set -e

Expand All @@ -114,6 +130,12 @@ for TEST_CASE in "test_grpc_async_infer" "test_grpc_stream_infer" "test_aio_grpc

if [ "$TEST_CASE" == "test_grpc_async_infer_cancellation_at_step_start" ]; then
unset TRITONSERVER_DELAY_GRPC_PROCESS
elif [ "$TEST_CASE" == "test_grpc_async_infer_response_complete_during_cancellation" ]; then
unset TRITONSERVER_DELAY_GRPC_NOTIFICATION
unset TRITONSERVER_DELAY_GRPC_ENQUEUE
elif [ "$TEST_CASE" == "test_grpc_async_infer_cancellation_during_response_complete" ]; then
unset TRITONSERVER_DELAY_GRPC_NOTIFICATION
unset TRITONSERVER_DELAY_RESPONSE_COMPLETION
fi
done

Expand Down
39 changes: 31 additions & 8 deletions src/grpc/infer_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,8 @@ ModelInferHandler::StartNewRequest()
}

bool
ModelInferHandler::Process(InferHandler::State* state, bool rpc_ok)
ModelInferHandler::Process(
InferHandler::State* state, bool rpc_ok, bool is_notification)
{
// There are multiple handlers registered in the gRPC service.
// Hence, there we can have a case where a handler thread is
Expand All @@ -690,8 +691,8 @@ ModelInferHandler::Process(InferHandler::State* state, bool rpc_ok)
// Will delay the Process execution by the specified time.
// This can be used to test the flow when cancellation request
// issued for the request, which is still at START step.
LOG_INFO << "Delaying the write of the response by "
<< state->delay_process_ms_ << " ms...";
LOG_INFO << "Delaying the Process execution by " << state->delay_process_ms_
<< " ms...";
std::this_thread::sleep_for(
std::chrono::milliseconds(state->delay_process_ms_));
}
Expand All @@ -711,11 +712,12 @@ ModelInferHandler::Process(InferHandler::State* state, bool rpc_ok)
// because we will never leave this if body. Refer to PR 7325.
// This is a special case for ModelInferHandler, since we have 2 threads,
// and each of them can process cancellation. ModelStreamInfer has only 1
// thread, and cancellation at step START was not reproducible in a
// thread, and cancellation at step START was not reproducible in a
// single thread scenario.
StartNewRequest();
}
bool resume = state->context_->HandleCancellation(state, rpc_ok, Name());
bool resume = state->context_->HandleCancellation(
state, rpc_ok, Name(), is_notification);
return resume;
}

Expand Down Expand Up @@ -765,7 +767,7 @@ ModelInferHandler::Process(InferHandler::State* state, bool rpc_ok)
std::make_pair("GRPC_SEND_START", TraceManager::CaptureTimestamp()));
#endif // TRITON_ENABLE_TRACING

state->step_ = COMPLETE;
state->step_ = Steps::COMPLETE;
state->context_->responder_->Finish(
inference::ModelInferResponse(), status, state);
}
Expand Down Expand Up @@ -1001,7 +1003,7 @@ ModelInferHandler::Execute(InferHandler::State* state)
}
#endif // TRITON_ENABLE_TRACING

state->step_ = COMPLETE;
state->step_ = Steps::COMPLETE;
state->context_->responder_->Finish(error_response, status, state);
}
}
Expand Down Expand Up @@ -1051,6 +1053,17 @@ ModelInferHandler::InferResponseComplete(
<< ", skipping response generation as grpc transaction was "
"cancelled... ";

if (state->delay_enqueue_ms_ != 0) {
// Will delay PutTaskBackToQueue by the specified time.
// This can be used to test the flow when cancellation request
// issued for the request during InferResponseComplete
// callback right before Process in the notification thread.
LOG_INFO << "Delaying PutTaskBackToQueue by " << state->delay_enqueue_ms_
<< " ms...";
std::this_thread::sleep_for(
std::chrono::milliseconds(state->delay_enqueue_ms_));
}

// Send state back to the queue so that state can be released
// in the next cycle.
state->context_->PutTaskBackToQueue(state);
Expand Down Expand Up @@ -1113,7 +1126,17 @@ ModelInferHandler::InferResponseComplete(
std::make_pair("GRPC_SEND_START", TraceManager::CaptureTimestamp()));
#endif // TRITON_ENABLE_TRACING

state->step_ = COMPLETE;
if (state->delay_response_completion_ms_ != 0) {
// Will delay the Process execution of state at step COMPLETE by the
// specified time. This can be used to test the flow when cancellation
// request issued for the request, which is at InferResponseComplete.
LOG_INFO << "Delaying InferResponseComplete by "
<< state->delay_response_completion_ms_ << " ms...";
std::this_thread::sleep_for(
std::chrono::milliseconds(state->delay_response_completion_ms_));
}

state->step_ = Steps::COMPLETE;
state->context_->responder_->Finish(*response, state->status_, state);
if (response_created) {
delete response;
Expand Down
Loading

0 comments on commit 3032c43

Please sign in to comment.