Skip to content

Commit

Permalink
Fixes on the gRPC frontend to handle AsyncNotifyWhenDone() API (#6345)
Browse files Browse the repository at this point in the history
* Fix segmentation fault in gRPC frontend

* Finalize all states upon completion

* Fixes all state cleanups

* Handle completed states when cancellation notification is received

* Add more documentation steps

* Retrieve dormant states to minimize the memory footprint for long streams

* Update src/grpc/grpc_utils.h

Co-authored-by: Ryan McCormick <rmccormick@nvidia.com>

* Use a boolean state instead of raw pointer

---------

Co-authored-by: Ryan McCormick <rmccormick@nvidia.com>
  • Loading branch information
tanmayv25 and rmccorm4 authored Oct 3, 2023
1 parent 788b19d commit aefb307
Show file tree
Hide file tree
Showing 5 changed files with 260 additions and 86 deletions.
42 changes: 42 additions & 0 deletions src/grpc/grpc_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,48 @@

namespace triton { namespace server { namespace grpc {

std::ostream&
operator<<(std::ostream& out, const Steps& step)
{
switch (step) {
case START:
out << "START";
break;
case COMPLETE:
out << "COMPLETE";
break;
case FINISH:
out << "FINISH";
break;
case ISSUED:
out << "ISSUED";
break;
case READ:
out << "READ";
break;
case WRITEREADY:
out << "WRITEREADY";
break;
case WRITTEN:
out << "WRITTEN";
break;
case WAITING_NOTIFICATION:
out << "WAITING_NOTIFICATION";
break;
case CANCELLATION_ISSUED:
out << "CANCELLATION_ISSUED";
break;
case CANCELLED:
out << "CANCELLED";
break;
case PARTIAL_COMPLETION:
out << "PARTIAL_COMPLETION";
break;
}

return out;
}

void
GrpcStatusUtil::Create(::grpc::Status* status, TRITONSERVER_Error* err)
{
Expand Down
41 changes: 41 additions & 0 deletions src/grpc/grpc_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,47 @@

namespace triton { namespace server { namespace grpc {

// The step of processing that the state is in. Every state must
// recognize START, COMPLETE and FINISH and the others are optional.
typedef enum {
// This marks the starting stage of the RPC
START,
// This marks that RPC is complete.
COMPLETE,
// This marks the stage where all the notifications from the gRPC
// completion queue is received and state can be safely released.
FINISH,
// This stage means that RPC has been issued to Triton for inference
// and is waiting for the server callbacks or cancellation to be
// invoked.
ISSUED,
// This stage means the request has been read from the network and
// can be sent to Triton for execution.
READ,
// This stage means that the response is ready to be written back to
// the network.
WRITEREADY,
// This stage means that response has been written completely to the
// network.
WRITTEN,
// This marks the special stage for the state object to differentiate
// the tag delivered from AsyncNotifyWhenDone() method.
WAITING_NOTIFICATION,
// This stage means that the cancellation for the RPC has been issued
// to the server.
CANCELLATION_ISSUED,
// This stage marks that the state has been successfully cancelled.
CANCELLED,
// This is intermediary stage where the state has been been partially
// completed by grpc responder Finish call or AsyncNotifyWhenDone()
// notification. The other next call will move the stage to fully
// complete.
PARTIAL_COMPLETION
} Steps;

// Debugging helper
std::ostream& operator<<(std::ostream& out, const Steps& step);

//
// GrpcStatusUtil
//
Expand Down
37 changes: 1 addition & 36 deletions src/grpc/infer_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,42 +37,6 @@ NextUniqueId()

namespace triton { namespace server { namespace grpc {

std::ostream&
operator<<(std::ostream& out, const Steps& step)
{
switch (step) {
case START:
out << "START";
break;
case COMPLETE:
out << "COMPLETE";
break;
case FINISH:
out << "FINISH";
break;
case ISSUED:
out << "ISSUED";
break;
case READ:
out << "READ";
break;
case WRITEREADY:
out << "WRITEREADY";
break;
case WRITTEN:
out << "WRITTEN";
break;
case CANCELLATION_ISSUED:
out << "CANCELLATION_ISSUED";
break;
case CANCELLED:
out << "CANCELLED";
break;
}

return out;
}

TRITONSERVER_Error*
OutputBufferAttributesHelper(
TRITONSERVER_ResponseAllocator* allocator, const char* tensor_name,
Expand Down Expand Up @@ -785,6 +749,7 @@ ModelInferHandler::Process(InferHandler::State* state, bool rpc_ok)
#endif // TRITON_ENABLE_TRACING

state->step_ = Steps::FINISH;
} else if (state->step_ == Steps::FINISH) {
finished = true;
}

Expand Down
Loading

0 comments on commit aefb307

Please sign in to comment.