Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: gRPC segfault due to Low Request Cancellation Timeout #7840

Merged
merged 8 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion qa/L0_grpc_state_cleanup/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ function check_state_release() {
num_state_new=`cat $log_file | grep "StateNew" | wc -l`

if [ $num_state_release -ne $num_state_new ]; then
cat $log_file
echo -e "\n***\n*** Test Failed: Mismatch detected, $num_state_new state(s) created, $num_state_release state(s) released. \n***" >> $log_file
return 1
fi
Expand Down
62 changes: 48 additions & 14 deletions qa/L0_request_cancellation/grpc_cancellation_test.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python3

# Copyright 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# Copyright 2023-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
Expand All @@ -27,6 +27,7 @@
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import asyncio
import os
import queue
import re
import time
Expand Down Expand Up @@ -169,28 +170,61 @@ def test_grpc_async_infer_cancellation_at_step_start(self):
with open(server_log_name, "r") as f:
server_log = f.read()

cancel_at_start_count = len(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move the counting for "Cancellation notification received" to test.sh.
Expect one "Cancellation notification received" in log per cancellation after the change to grpc infer_handler.
cc @oandreeva-nv

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ould you please briefly explain why this move is needed?

Copy link
Contributor Author

@yinggeh yinggeh Nov 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After the change, the message "Cancellation notification received" is logged once, which is consistent with other tests. There is an existing check for this message in test.sh already but it does not check the number of occurances. So I just modify the check in test.sh to also check for number of occurances.

if [ $count == 0 ]; then
echo -e "\n***\n*** Cancellation not received by server on $TEST_CASE\n***"
cat $SERVER_LOG
RET=1
elif [ $count -ne 1 ]; then
echo -e "\n***\n*** Unexpected cancellation received by server on $TEST_CASE. Expected 1 but received $count.\n***"
cat $SERVER_LOG
RET=1
fi

re.findall(
r"Cancellation notification received for ModelInferHandler, rpc_ok=1, context \d+, \d+ step START",
server_log,
)
)
cur_new_req_handl_count = len(
re.findall("New request handler for ModelInferHandler", server_log)
)
self.assertEqual(
cancel_at_start_count,
2,
"Expected 2 cancellation at step START log entries, but got {}".format(
cancel_at_start_count
),
)
self.assertGreater(
cur_new_req_handl_count,
prev_new_req_handl_count,
"gRPC Cancellation on step START Test Failed: New request handler for ModelInferHandler was not created",
)

def test_grpc_async_infer_response_complete_during_cancellation(self):
# long test
self.test_duration_delta = 2
delay_notification_sec = (
int(os.getenv("TRITONSERVER_DELAY_GRPC_NOTIFICATION")) / 1000
)
delay_queue_cancellation_sec = (
int(os.getenv("TRITONSERVER_DELAY_GRPC_ENQUEUE")) / 1000
)
future = self._client.async_infer(
model_name=self._model_name,
inputs=self._inputs,
callback=self._callback,
outputs=self._outputs,
)
# ensure cancellation is received before InferResponseComplete and is processed after InferResponseComplete
time.sleep(self._model_delay - 2)
future.cancel()
time.sleep(
delay_notification_sec + delay_queue_cancellation_sec
) # ensure the cancellation is processed
self._assert_callback_cancelled()

def test_grpc_async_infer_cancellation_during_response_complete(self):
# long test
self.test_duration_delta = 2.5
delay_notification_sec = (
int(os.getenv("TRITONSERVER_DELAY_GRPC_NOTIFICATION")) / 1000
)
delay_response_completion_sec = (
int(os.getenv("TRITONSERVER_DELAY_RESPONSE_COMPLETION")) / 1000
)
future = self._client.async_infer(
model_name=self._model_name,
inputs=self._inputs,
callback=self._callback,
outputs=self._outputs,
)
# ensure the cancellation is received between InferResponseComplete checking cancellation and Finish
time.sleep(self._model_delay + 2)
future.cancel()
time.sleep(
delay_notification_sec + delay_response_completion_sec
) # ensure the cancellation is processed
self._assert_callback_cancelled()


if __name__ == "__main__":
unittest.main()
32 changes: 27 additions & 5 deletions qa/L0_request_cancellation/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ mkdir -p models/model/1 && (cd models/model && \
echo 'instance_group [{ kind: KIND_CPU }]' >> config.pbtxt)

SERVER_LOG=server.log
LD_LIBRARY_PATH=/opt/tritonserver/lib:$LD_LIBRARY_PATH ./request_cancellation_test > $SERVER_LOG
LD_LIBRARY_PATH=/opt/tritonserver/lib:$LD_LIBRARY_PATH ./request_cancellation_test > $SERVER_LOG 2>&1
if [ $? -ne 0 ]; then
echo -e "\n***\n*** Unit Tests Failed\n***"
cat $SERVER_LOG
Expand All @@ -78,12 +78,23 @@ mkdir -p models/custom_identity_int32/1 && (cd models/custom_identity_int32 && \
echo 'instance_group [{ kind: KIND_CPU }]' >> config.pbtxt && \
echo -e 'parameters [{ key: "execute_delay_ms" \n value: { string_value: "10000" } }]' >> config.pbtxt)

for TEST_CASE in "test_grpc_async_infer" "test_grpc_stream_infer" "test_aio_grpc_async_infer" "test_aio_grpc_stream_infer" "test_grpc_async_infer_cancellation_at_step_start"; do

for TEST_CASE in "test_grpc_async_infer" \
"test_grpc_stream_infer" \
"test_aio_grpc_async_infer" \
"test_aio_grpc_stream_infer" \
"test_grpc_async_infer_cancellation_at_step_start" \
"test_grpc_async_infer_response_complete_during_cancellation" \
"test_grpc_async_infer_cancellation_during_response_complete"; do
TEST_LOG="./grpc_cancellation_test.$TEST_CASE.log"
SERVER_LOG="grpc_cancellation_test.$TEST_CASE.server.log"
if [ "$TEST_CASE" == "test_grpc_async_infer_cancellation_at_step_start" ]; then
export TRITONSERVER_DELAY_GRPC_PROCESS=5000
elif [ "$TEST_CASE" == "test_grpc_async_infer_response_complete_during_cancellation" ]; then
export TRITONSERVER_DELAY_GRPC_NOTIFICATION=5000
export TRITONSERVER_DELAY_GRPC_ENQUEUE=5000
elif [ "$TEST_CASE" == "test_grpc_async_infer_cancellation_during_response_complete" ]; then
export TRITONSERVER_DELAY_GRPC_NOTIFICATION=5000
export TRITONSERVER_DELAY_RESPONSE_COMPLETION=5000
fi

SERVER_ARGS="--model-repository=`pwd`/models --log-verbose=1"
Expand All @@ -101,11 +112,16 @@ for TEST_CASE in "test_grpc_async_infer" "test_grpc_stream_infer" "test_aio_grpc
cat $TEST_LOG
RET=1
fi
grep "Cancellation notification received for" $SERVER_LOG
if [ $? -ne 0 ]; then

count=$(grep -o "Cancellation notification received for" $SERVER_LOG | wc -l)
if [ $count == 0 ]; then
echo -e "\n***\n*** Cancellation not received by server on $TEST_CASE\n***"
cat $SERVER_LOG
RET=1
elif [ $count -ne 1 ]; then
echo -e "\n***\n*** Unexpected cancellation received by server on $TEST_CASE. Expected 1 but received $count.\n***"
cat $SERVER_LOG
RET=1
fi
set -e

Expand All @@ -114,6 +130,12 @@ for TEST_CASE in "test_grpc_async_infer" "test_grpc_stream_infer" "test_aio_grpc

if [ "$TEST_CASE" == "test_grpc_async_infer_cancellation_at_step_start" ]; then
unset TRITONSERVER_DELAY_GRPC_PROCESS
elif [ "$TEST_CASE" == "test_grpc_async_infer_response_complete_during_cancellation" ]; then
unset TRITONSERVER_DELAY_GRPC_NOTIFICATION
unset TRITONSERVER_DELAY_GRPC_ENQUEUE
elif [ "$TEST_CASE" == "test_grpc_async_infer_cancellation_during_response_complete" ]; then
unset TRITONSERVER_DELAY_GRPC_NOTIFICATION
unset TRITONSERVER_DELAY_RESPONSE_COMPLETION
fi
done

Expand Down
39 changes: 31 additions & 8 deletions src/grpc/infer_handler.cc
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does the PR do?

Handles multiple corner cases under Low Request Cancellation Timeout.

Please go into more detail in the PR description on what the corner cases were, and how they were addressed by this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can give a try... They are not easy to describle though but two new test cases should explain.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perplexity might help to clarify your thoughts =)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Description updated.

Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,8 @@ ModelInferHandler::StartNewRequest()
}

bool
ModelInferHandler::Process(InferHandler::State* state, bool rpc_ok)
ModelInferHandler::Process(
InferHandler::State* state, bool rpc_ok, bool is_notification)
{
// There are multiple handlers registered in the gRPC service.
// Hence, there we can have a case where a handler thread is
Expand All @@ -690,8 +691,8 @@ ModelInferHandler::Process(InferHandler::State* state, bool rpc_ok)
// Will delay the Process execution by the specified time.
// This can be used to test the flow when cancellation request
// issued for the request, which is still at START step.
LOG_INFO << "Delaying the write of the response by "
<< state->delay_process_ms_ << " ms...";
LOG_INFO << "Delaying the Process execution by " << state->delay_process_ms_
<< " ms...";
std::this_thread::sleep_for(
std::chrono::milliseconds(state->delay_process_ms_));
}
Expand All @@ -711,11 +712,12 @@ ModelInferHandler::Process(InferHandler::State* state, bool rpc_ok)
// because we will never leave this if body. Refer to PR 7325.
// This is a special case for ModelInferHandler, since we have 2 threads,
// and each of them can process cancellation. ModelStreamInfer has only 1
// thread, and cancellation at step START was not reproducible in a
// thread, and cancellation at step START was not reproducible in a
// single thread scenario.
StartNewRequest();
}
bool resume = state->context_->HandleCancellation(state, rpc_ok, Name());
bool resume = state->context_->HandleCancellation(
state, rpc_ok, Name(), is_notification);
return resume;
}

Expand Down Expand Up @@ -765,7 +767,7 @@ ModelInferHandler::Process(InferHandler::State* state, bool rpc_ok)
std::make_pair("GRPC_SEND_START", TraceManager::CaptureTimestamp()));
#endif // TRITON_ENABLE_TRACING

state->step_ = COMPLETE;
state->step_ = Steps::COMPLETE;
state->context_->responder_->Finish(
inference::ModelInferResponse(), status, state);
}
Expand Down Expand Up @@ -1001,7 +1003,7 @@ ModelInferHandler::Execute(InferHandler::State* state)
}
#endif // TRITON_ENABLE_TRACING

state->step_ = COMPLETE;
state->step_ = Steps::COMPLETE;
state->context_->responder_->Finish(error_response, status, state);
}
}
Expand Down Expand Up @@ -1051,6 +1053,17 @@ ModelInferHandler::InferResponseComplete(
<< ", skipping response generation as grpc transaction was "
"cancelled... ";

if (state->delay_enqueue_ms_ != 0) {
// Will delay PutTaskBackToQueue by the specified time.
// This can be used to test the flow when cancellation request
// issued for the request during InferResponseComplete
// callback right before Process in the notification thread.
LOG_INFO << "Delaying PutTaskBackToQueue by " << state->delay_enqueue_ms_
<< " ms...";
std::this_thread::sleep_for(
std::chrono::milliseconds(state->delay_enqueue_ms_));
}

// Send state back to the queue so that state can be released
// in the next cycle.
state->context_->PutTaskBackToQueue(state);
Expand Down Expand Up @@ -1113,7 +1126,17 @@ ModelInferHandler::InferResponseComplete(
std::make_pair("GRPC_SEND_START", TraceManager::CaptureTimestamp()));
#endif // TRITON_ENABLE_TRACING

state->step_ = COMPLETE;
if (state->delay_response_completion_ms_ != 0) {
// Will delay the Process execution of state at step COMPLETE by the
// specified time. This can be used to test the flow when cancellation
// request issued for the request, which is at InferResponseComplete.
LOG_INFO << "Delaying InferResponseComplete by "
<< state->delay_response_completion_ms_ << " ms...";
std::this_thread::sleep_for(
std::chrono::milliseconds(state->delay_response_completion_ms_));
}

state->step_ = Steps::COMPLETE;
state->context_->responder_->Finish(*response, state->status_, state);
if (response_created) {
delete response;
Expand Down
Loading
Loading