Skip to content

Commit

Permalink
Avoid rate limiter resource count update from iterating over all inst…
Browse files Browse the repository at this point in the history
…ances (#208)

* Combine resource update into instance add and remove

* Minor fix and return error on instance remove failure

* Enhance should not print this message

* Fix bug

* Revert specific queue erase removal

* Fix inconsistent model info state after failed update

* Fix rate limiter issue after instance failed to add
  • Loading branch information
kthui authored Jun 15, 2023
1 parent 1c61aff commit be341bd
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 77 deletions.
3 changes: 3 additions & 0 deletions src/backend_model.cc
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,9 @@ TritonModel::UpdateInstanceGroup(
this, backend_cmdline_config_map_, host_policy_map_, model_config);
caller_lock->lock();
if (!status.IsOk()) {
// Remove any pending instances if created.
bg_instances_.clear();
bg_passive_instances_.clear();
return status;
}

Expand Down
4 changes: 1 addition & 3 deletions src/backend_model_instance.cc
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,7 @@ TritonModelInstance::~TritonModelInstance()
triton_backend_thread_->StopBackendThread();
}

LOG_STATUS_ERROR(
model_->Server()->GetRateLimiter()->UnregisterModelInstance(this),
"failed unregistering model instance");
model_->Server()->GetRateLimiter()->UnregisterModelInstance(this);

// Model finalization is optional...
if (model_->Backend()->ModelInstanceFiniFn() != nullptr) {
Expand Down
12 changes: 7 additions & 5 deletions src/model_lifecycle.cc
Original file line number Diff line number Diff line change
Expand Up @@ -640,10 +640,12 @@ ModelLifeCycle::UpdateModelConfig(

std::unique_lock<std::mutex> model_info_lock(model_info->mtx_);

// Make sure the state reason is empty before attempting to update.
model_info->state_reason_.clear();

// Downcast 'Model' to 'TritonModel'.
TritonModel* model = (TritonModel*)model_info->model_.get();
if (model == nullptr) {
model_info->state_ = ModelReadyState::UNAVAILABLE;
model_info->state_reason_ =
"Unable to downcast '" + model_id.str() +
"' from 'Model' to 'TritonModel' during model update.";
Expand All @@ -654,8 +656,8 @@ ModelLifeCycle::UpdateModelConfig(
Status status =
model->UpdateInstanceGroup(new_model_config, &model_info_lock);
if (!status.IsOk()) {
model_info->state_ = ModelReadyState::UNAVAILABLE;
model_info->state_reason_ = status.AsString();
return;
}

// Write new config into 'model_info'.
Expand All @@ -682,10 +684,10 @@ ModelLifeCycle::OnLoadComplete(
std::lock_guard<std::mutex> model_info_lock(model_info->mtx_);

// Write failed to load reason into tracker, if any.
// A newly created model should be at state 'LOADING' unless failed to load.
// A updated model should be at state 'READY' unless failed to update.
// A newly created model is at 'LOADING' state unless it failed to load.
// An updated model has an empty state reason unless it failed to update.
if ((!is_update && model_info->state_ != ModelReadyState::LOADING) ||
(is_update && model_info->state_ != ModelReadyState::READY)) {
(is_update && !model_info->state_reason_.empty())) {
load_tracker->load_failed_ = true;
load_tracker->reason_ +=
("version " + std::to_string(version) + " is at " +
Expand Down
167 changes: 106 additions & 61 deletions src/rate_limiter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,12 @@ RateLimiter::RegisterModelInstance(
// Without this serialization instances of other models might fail
// to load because of the resource constraints in this instance.
std::lock_guard<std::mutex> lk(resource_manager_mtx_);
resource_manager_->AddModelInstance(pair_it.first->second.get());
const auto& status = resource_manager_->UpdateResourceLimits();
const auto& status =
resource_manager_->AddModelInstance(pair_it.first->second.get());
if (!status.IsOk()) {
resource_manager_->RemoveModelInstance(pair_it.first->second.get());
LOG_STATUS_ERROR(
resource_manager_->RemoveModelInstance(pair_it.first->second.get()),
"Cannot remove instance from resource manager");
return status;
}
}
Expand All @@ -92,7 +94,7 @@ RateLimiter::RegisterModelInstance(
return Status::Success;
}

Status
void
RateLimiter::UnregisterModelInstance(TritonModelInstance* triton_model_instance)
{
std::lock_guard<std::mutex> lk1(model_ctx_mtx_);
Expand All @@ -105,18 +107,14 @@ RateLimiter::UnregisterModelInstance(TritonModelInstance* triton_model_instance)
auto i_it = model_instances.find(triton_model_instance);
if (i_it != model_instances.end()) {
if (!ignore_resources_and_priority_) {
resource_manager_->RemoveModelInstance(i_it->second.get());
LOG_STATUS_ERROR(
resource_manager_->RemoveModelInstance(i_it->second.get()),
"Cannot remove instance from resource manager");
}
model_context.RemoveInstance(i_it->second.get());
model_instances.erase(i_it);
}

if (!ignore_resources_and_priority_) {
// The update can be skipped if there is only model instance count update,
// which new results will not be generated in such case. [FIXME: DLIS-4821]
RETURN_IF_ERROR(resource_manager_->UpdateResourceLimits());
}

{
std::lock_guard<std::mutex> lk(payload_queues_mu_);
auto p_it = payload_queues_.find(model);
Expand All @@ -127,11 +125,9 @@ RateLimiter::UnregisterModelInstance(TritonModelInstance* triton_model_instance)
}
}
}

return Status::Success;
}

Status
void
RateLimiter::UnregisterModel(const TritonModel* model)
{
{
Expand All @@ -143,26 +139,22 @@ RateLimiter::UnregisterModel(const TritonModel* model)
model_context.RequestRemoval();
for (const auto& instance : model_instance_ctxs_[model]) {
if (!ignore_resources_and_priority_) {
resource_manager_->RemoveModelInstance(instance.second.get());
LOG_STATUS_ERROR(
resource_manager_->RemoveModelInstance(instance.second.get()),
"Cannot remove instance from resource manager");
}
}

model_instance_ctxs_.erase(model);
model_contexts_.erase(model);
}

if (!ignore_resources_and_priority_) {
RETURN_IF_ERROR(resource_manager_->UpdateResourceLimits());
}

{
std::lock_guard<std::mutex> lk(payload_queues_mu_);
if (payload_queues_.find(model) != payload_queues_.end()) {
payload_queues_.erase(model);
}
}

return Status::Success;
}

bool
Expand Down Expand Up @@ -191,7 +183,9 @@ RateLimiter::EnqueuePayload(
{
std::lock_guard<std::mutex> lk(payload_queues_mu_);
if (payload_queues_.find(model) == payload_queues_.end()) {
LOG_INFO << "Should not print this ";
return Status(
Status::Code::INTERNAL,
"Should not print this! Enqueuing payload with an unknown model.");
}
payload_queue = payload_queues_[model].get();
}
Expand Down Expand Up @@ -238,7 +232,9 @@ RateLimiter::DequeuePayload(
{
std::lock_guard<std::mutex> lk(payload_queues_mu_);
if (payload_queues_.find(instances[0]->Model()) == payload_queues_.end()) {
LOG_INFO << "Should not print this ";
LOG_ERROR << "Should not print this! Dequeuing payload with an unknown "
"instance.";
return;
}
payload_queue = payload_queues_[instances[0]->Model()].get();
}
Expand Down Expand Up @@ -321,13 +317,19 @@ RateLimiter::PayloadRelease(std::shared_ptr<Payload>& payload)
// available, so mark the instance as removing.
if (payload->GetOpType() == Payload::Operation::EXIT) {
std::lock_guard<std::mutex> lk(model_instance_ctx_mtx_);
auto& instances = model_instance_ctxs_[payload->GetInstance()->Model()];
auto it = instances.find(payload->GetInstance());
if (it == instances.end()) {
LOG_INFO << "Should not print this ";
auto it_model = model_instance_ctxs_.find(payload->GetInstance()->Model());
if (it_model == model_instance_ctxs_.end()) {
LOG_ERROR << "Should not print this! Releasing payload containing an "
"instance of an unknown model.";
return;
}
auto it_instance = it_model->second.find(payload->GetInstance());
if (it_instance == it_model->second.end()) {
LOG_ERROR << "Should not print this! Releasing payload containing an "
"unknown instance.";
return;
}
it->second->RequestRemoval(); // mark the instance as removing
it_instance->second->RequestRemoval(); // mark the instance as removing
}

payload->OnRelease();
Expand Down Expand Up @@ -733,13 +735,15 @@ RateLimiter::ModelInstanceContext::ScaledPriority()
}

void
RateLimiter::ModelInstanceContext::RequestRemoval() {
RateLimiter::ModelInstanceContext::RequestRemoval()
{
std::unique_lock<std::mutex> lk(state_mtx_);
removal_in_progress_ = true;
}

bool
RateLimiter::ModelInstanceContext::IsRemovalInProgress() {
RateLimiter::ModelInstanceContext::IsRemovalInProgress()
{
std::unique_lock<std::mutex> lk(state_mtx_);
return removal_in_progress_;
}
Expand All @@ -760,11 +764,12 @@ RateLimiter::ResourceManager::Create(
return Status::Success;
}

void
Status
RateLimiter::ResourceManager::AddModelInstance(
const ModelInstanceContext* instance)
{
std::lock_guard<std::mutex> lk(model_resources_mtx_);
// Add instance into model resources.
std::lock_guard<std::mutex> lk1(model_resources_mtx_);
auto pr = model_resources_.emplace(std::make_pair(instance, ResourceMap()));
for (const auto& resource : instance->GetRateLimiterConfig()->resources()) {
if (resource.global()) {
Expand All @@ -775,52 +780,92 @@ RateLimiter::ResourceManager::AddModelInstance(
resource.count();
}
}
// Increase max resource if needed.
std::lock_guard<std::mutex> lk2(max_resources_mtx_);
UpdateMaxResource(pr.first->second);
return ParseAndValidateResources();
}

Status
RateLimiter::ResourceManager::RemoveModelInstance(
const ModelInstanceContext* instance)
{
std::lock_guard<std::mutex> lk(model_resources_mtx_);
// Find instance from model resources.
std::lock_guard<std::mutex> lk1(model_resources_mtx_);
const auto& itr = model_resources_.find(instance);
if (itr == model_resources_.end()) {
return Status(
Status::Code::INTERNAL, "Can not find the instance to remove");
return Status(Status::Code::INTERNAL, "Cannot find the instance to remove");
}
// Check if max resources need to be updated.
bool update_needed = false;
std::lock_guard<std::mutex> lk2(max_resources_mtx_);
for (const auto& resource_device_map : itr->second) {
auto ditr = max_resources_.find(resource_device_map.first);
if (ditr != max_resources_.end()) {
for (const auto& resource : resource_device_map.second) {
auto ritr = ditr->second.find(resource.first);
if (ritr != ditr->second.end() && ritr->second <= resource.second) {
update_needed = true;
if (ritr->second < resource.second) {
LOG_ERROR << "Should not print this! Removing an instance with "
"resource above max resource.";
}
break;
}
}
}
if (update_needed) {
break;
}
}
// Remove instance from model resources.
model_resources_.erase(instance);
return Status::Success;
// Re-compute max resource if needed.
if (update_needed) {
ComputeResourceLimits();
}
return ParseAndValidateResources();
}

Status
RateLimiter::ResourceManager::UpdateResourceLimits()
void
RateLimiter::ResourceManager::ComputeResourceLimits()
{
std::lock_guard<std::mutex> lk1(model_resources_mtx_);
std::lock_guard<std::mutex> lk2(max_resources_mtx_);
// Obtain the maximum resource across all the instances and use it as the
// default available.
max_resources_.clear();
// Obtain the maximum resource across all the instances
// and use it as the default available.
for (const auto& instance_resources : model_resources_) {
for (const auto& resource_device_map : instance_resources.second) {
auto ditr = max_resources_.find(resource_device_map.first);
if (ditr == max_resources_.end()) {
ditr =
max_resources_
.emplace(resource_device_map.first, resource_device_map.second)
.first;
} else {
for (const auto& resource : resource_device_map.second) {
auto ritr = ditr->second.find(resource.first);
if (ritr == ditr->second.end()) {
ritr = ditr->second.emplace(resource.first, resource.second).first;
} else {
if (ritr->second < resource.second) {
ritr->second = resource.second;
}
UpdateMaxResource(instance_resources.second);
}
}

void
RateLimiter::ResourceManager::UpdateMaxResource(
const ResourceMap& instance_resource_map)
{
for (const auto& resource_device_map : instance_resource_map) {
auto ditr = max_resources_.find(resource_device_map.first);
if (ditr == max_resources_.end()) {
ditr = max_resources_
.emplace(resource_device_map.first, resource_device_map.second)
.first;
} else {
for (const auto& resource : resource_device_map.second) {
auto ritr = ditr->second.find(resource.first);
if (ritr == ditr->second.end()) {
ritr = ditr->second.emplace(resource.first, resource.second).first;
} else {
if (ritr->second < resource.second) {
ritr->second = resource.second;
}
}
}
}
}
}

Status
RateLimiter::ResourceManager::ParseAndValidateResources()
{
if (!explicit_max_resources_.empty()) {
RETURN_IF_ERROR(ParseAndValidateExplicitResources());
}
Expand All @@ -830,9 +875,9 @@ RateLimiter::ResourceManager::UpdateResourceLimits()
std::string resource_map_str{"\nMax Resource Map===>\n"};
for (const auto& ditr : max_resources_) {
if (!ditr.second.empty()) {
std::string device_str{(ditr.first == GLOBAL_RESOURCE_KEY)
? "GLOBAL"
: std::to_string(ditr.first)};
std::string device_str{
(ditr.first == GLOBAL_RESOURCE_KEY) ? "GLOBAL"
: std::to_string(ditr.first)};
resource_map_str += "\tDevice: " + device_str + "\n";
for (const auto& ritr : ditr.second) {
resource_map_str += "\t\tResource: " + ritr.first +
Expand Down
Loading

0 comments on commit be341bd

Please sign in to comment.