Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid rate limiter resource count update from iterating over all instances #208

Merged
merged 7 commits into from
Jun 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/backend_model.cc
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,9 @@ TritonModel::UpdateInstanceGroup(
this, backend_cmdline_config_map_, host_policy_map_, model_config);
caller_lock->lock();
if (!status.IsOk()) {
// Remove any pending instances if created.
bg_instances_.clear();
bg_passive_instances_.clear();
return status;
}

Expand Down
4 changes: 1 addition & 3 deletions src/backend_model_instance.cc
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,7 @@ TritonModelInstance::~TritonModelInstance()
triton_backend_thread_->StopBackendThread();
}

LOG_STATUS_ERROR(
model_->Server()->GetRateLimiter()->UnregisterModelInstance(this),
"failed unregistering model instance");
model_->Server()->GetRateLimiter()->UnregisterModelInstance(this);

// Model finalization is optional...
if (model_->Backend()->ModelInstanceFiniFn() != nullptr) {
Expand Down
12 changes: 7 additions & 5 deletions src/model_lifecycle.cc
Original file line number Diff line number Diff line change
Expand Up @@ -640,10 +640,12 @@ ModelLifeCycle::UpdateModelConfig(

std::unique_lock<std::mutex> model_info_lock(model_info->mtx_);

// Make sure the state reason is empty before attempting to update.
model_info->state_reason_.clear();

// Downcast 'Model' to 'TritonModel'.
TritonModel* model = (TritonModel*)model_info->model_.get();
if (model == nullptr) {
model_info->state_ = ModelReadyState::UNAVAILABLE;
model_info->state_reason_ =
"Unable to downcast '" + model_id.str() +
"' from 'Model' to 'TritonModel' during model update.";
Expand All @@ -654,8 +656,8 @@ ModelLifeCycle::UpdateModelConfig(
Status status =
model->UpdateInstanceGroup(new_model_config, &model_info_lock);
if (!status.IsOk()) {
model_info->state_ = ModelReadyState::UNAVAILABLE;
model_info->state_reason_ = status.AsString();
return;
}

// Write new config into 'model_info'.
Expand All @@ -682,10 +684,10 @@ ModelLifeCycle::OnLoadComplete(
std::lock_guard<std::mutex> model_info_lock(model_info->mtx_);

// Write failed to load reason into tracker, if any.
// A newly created model should be at state 'LOADING' unless failed to load.
// A updated model should be at state 'READY' unless failed to update.
// A newly created model is at 'LOADING' state unless it failed to load.
// An updated model has an empty state reason unless it failed to update.
if ((!is_update && model_info->state_ != ModelReadyState::LOADING) ||
(is_update && model_info->state_ != ModelReadyState::READY)) {
(is_update && !model_info->state_reason_.empty())) {
load_tracker->load_failed_ = true;
load_tracker->reason_ +=
("version " + std::to_string(version) + " is at " +
Expand Down
167 changes: 106 additions & 61 deletions src/rate_limiter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,12 @@ RateLimiter::RegisterModelInstance(
// Without this serialization instances of other models might fail
// to load because of the resource constraints in this instance.
std::lock_guard<std::mutex> lk(resource_manager_mtx_);
resource_manager_->AddModelInstance(pair_it.first->second.get());
const auto& status = resource_manager_->UpdateResourceLimits();
const auto& status =
resource_manager_->AddModelInstance(pair_it.first->second.get());
if (!status.IsOk()) {
resource_manager_->RemoveModelInstance(pair_it.first->second.get());
LOG_STATUS_ERROR(
resource_manager_->RemoveModelInstance(pair_it.first->second.get()),
"Cannot remove instance from resource manager");
return status;
}
}
Expand All @@ -92,7 +94,7 @@ RateLimiter::RegisterModelInstance(
return Status::Success;
}

Status
void
RateLimiter::UnregisterModelInstance(TritonModelInstance* triton_model_instance)
{
std::lock_guard<std::mutex> lk1(model_ctx_mtx_);
Expand All @@ -105,18 +107,14 @@ RateLimiter::UnregisterModelInstance(TritonModelInstance* triton_model_instance)
auto i_it = model_instances.find(triton_model_instance);
if (i_it != model_instances.end()) {
if (!ignore_resources_and_priority_) {
resource_manager_->RemoveModelInstance(i_it->second.get());
LOG_STATUS_ERROR(
resource_manager_->RemoveModelInstance(i_it->second.get()),
"Cannot remove instance from resource manager");
}
model_context.RemoveInstance(i_it->second.get());
model_instances.erase(i_it);
}

if (!ignore_resources_and_priority_) {
// The update can be skipped if there is only model instance count update,
// which new results will not be generated in such case. [FIXME: DLIS-4821]
RETURN_IF_ERROR(resource_manager_->UpdateResourceLimits());
}

{
std::lock_guard<std::mutex> lk(payload_queues_mu_);
auto p_it = payload_queues_.find(model);
Expand All @@ -127,11 +125,9 @@ RateLimiter::UnregisterModelInstance(TritonModelInstance* triton_model_instance)
}
}
}

return Status::Success;
}

Status
void
RateLimiter::UnregisterModel(const TritonModel* model)
{
{
Expand All @@ -143,26 +139,22 @@ RateLimiter::UnregisterModel(const TritonModel* model)
model_context.RequestRemoval();
for (const auto& instance : model_instance_ctxs_[model]) {
if (!ignore_resources_and_priority_) {
resource_manager_->RemoveModelInstance(instance.second.get());
LOG_STATUS_ERROR(
resource_manager_->RemoveModelInstance(instance.second.get()),
"Cannot remove instance from resource manager");
}
}

model_instance_ctxs_.erase(model);
model_contexts_.erase(model);
}

if (!ignore_resources_and_priority_) {
RETURN_IF_ERROR(resource_manager_->UpdateResourceLimits());
}

{
std::lock_guard<std::mutex> lk(payload_queues_mu_);
if (payload_queues_.find(model) != payload_queues_.end()) {
payload_queues_.erase(model);
}
}

return Status::Success;
}

bool
Expand Down Expand Up @@ -191,7 +183,9 @@ RateLimiter::EnqueuePayload(
{
std::lock_guard<std::mutex> lk(payload_queues_mu_);
if (payload_queues_.find(model) == payload_queues_.end()) {
LOG_INFO << "Should not print this ";
return Status(
Status::Code::INTERNAL,
"Should not print this! Enqueuing payload with an unknown model.");
}
payload_queue = payload_queues_[model].get();
}
Expand Down Expand Up @@ -238,7 +232,9 @@ RateLimiter::DequeuePayload(
{
std::lock_guard<std::mutex> lk(payload_queues_mu_);
if (payload_queues_.find(instances[0]->Model()) == payload_queues_.end()) {
LOG_INFO << "Should not print this ";
LOG_ERROR << "Should not print this! Dequeuing payload with an unknown "
"instance.";
return;
}
payload_queue = payload_queues_[instances[0]->Model()].get();
}
Expand Down Expand Up @@ -321,13 +317,19 @@ RateLimiter::PayloadRelease(std::shared_ptr<Payload>& payload)
// available, so mark the instance as removing.
if (payload->GetOpType() == Payload::Operation::EXIT) {
std::lock_guard<std::mutex> lk(model_instance_ctx_mtx_);
auto& instances = model_instance_ctxs_[payload->GetInstance()->Model()];
auto it = instances.find(payload->GetInstance());
if (it == instances.end()) {
LOG_INFO << "Should not print this ";
auto it_model = model_instance_ctxs_.find(payload->GetInstance()->Model());
if (it_model == model_instance_ctxs_.end()) {
LOG_ERROR << "Should not print this! Releasing payload containing an "
"instance of an unknown model.";
return;
}
auto it_instance = it_model->second.find(payload->GetInstance());
if (it_instance == it_model->second.end()) {
LOG_ERROR << "Should not print this! Releasing payload containing an "
"unknown instance.";
return;
}
it->second->RequestRemoval(); // mark the instance as removing
it_instance->second->RequestRemoval(); // mark the instance as removing
}

payload->OnRelease();
Expand Down Expand Up @@ -733,13 +735,15 @@ RateLimiter::ModelInstanceContext::ScaledPriority()
}

void
RateLimiter::ModelInstanceContext::RequestRemoval() {
RateLimiter::ModelInstanceContext::RequestRemoval()
{
std::unique_lock<std::mutex> lk(state_mtx_);
removal_in_progress_ = true;
}

bool
RateLimiter::ModelInstanceContext::IsRemovalInProgress() {
RateLimiter::ModelInstanceContext::IsRemovalInProgress()
{
std::unique_lock<std::mutex> lk(state_mtx_);
return removal_in_progress_;
}
Expand All @@ -760,11 +764,12 @@ RateLimiter::ResourceManager::Create(
return Status::Success;
}

void
Status
RateLimiter::ResourceManager::AddModelInstance(
const ModelInstanceContext* instance)
{
std::lock_guard<std::mutex> lk(model_resources_mtx_);
// Add instance into model resources.
std::lock_guard<std::mutex> lk1(model_resources_mtx_);
auto pr = model_resources_.emplace(std::make_pair(instance, ResourceMap()));
for (const auto& resource : instance->GetRateLimiterConfig()->resources()) {
if (resource.global()) {
Expand All @@ -775,52 +780,92 @@ RateLimiter::ResourceManager::AddModelInstance(
resource.count();
}
}
// Increase max resource if needed.
std::lock_guard<std::mutex> lk2(max_resources_mtx_);
UpdateMaxResource(pr.first->second);
return ParseAndValidateResources();
}

Status
RateLimiter::ResourceManager::RemoveModelInstance(
const ModelInstanceContext* instance)
{
std::lock_guard<std::mutex> lk(model_resources_mtx_);
// Find instance from model resources.
std::lock_guard<std::mutex> lk1(model_resources_mtx_);
const auto& itr = model_resources_.find(instance);
if (itr == model_resources_.end()) {
return Status(
Status::Code::INTERNAL, "Can not find the instance to remove");
return Status(Status::Code::INTERNAL, "Cannot find the instance to remove");
}
// Check if max resources need to be updated.
bool update_needed = false;
std::lock_guard<std::mutex> lk2(max_resources_mtx_);
for (const auto& resource_device_map : itr->second) {
auto ditr = max_resources_.find(resource_device_map.first);
if (ditr != max_resources_.end()) {
for (const auto& resource : resource_device_map.second) {
auto ritr = ditr->second.find(resource.first);
if (ritr != ditr->second.end() && ritr->second <= resource.second) {
update_needed = true;
if (ritr->second < resource.second) {
LOG_ERROR << "Should not print this! Removing an instance with "
tanmayv25 marked this conversation as resolved.
Show resolved Hide resolved
"resource above max resource.";
}
break;
}
}
}
if (update_needed) {
rmccorm4 marked this conversation as resolved.
Show resolved Hide resolved
break;
}
}
// Remove instance from model resources.
model_resources_.erase(instance);
return Status::Success;
// Re-compute max resource if needed.
if (update_needed) {
ComputeResourceLimits();
}
return ParseAndValidateResources();
}

Status
RateLimiter::ResourceManager::UpdateResourceLimits()
void
RateLimiter::ResourceManager::ComputeResourceLimits()
{
std::lock_guard<std::mutex> lk1(model_resources_mtx_);
std::lock_guard<std::mutex> lk2(max_resources_mtx_);
// Obtain the maximum resource across all the instances and use it as the
// default available.
max_resources_.clear();
// Obtain the maximum resource across all the instances
// and use it as the default available.
for (const auto& instance_resources : model_resources_) {
for (const auto& resource_device_map : instance_resources.second) {
auto ditr = max_resources_.find(resource_device_map.first);
if (ditr == max_resources_.end()) {
ditr =
max_resources_
.emplace(resource_device_map.first, resource_device_map.second)
.first;
} else {
for (const auto& resource : resource_device_map.second) {
auto ritr = ditr->second.find(resource.first);
if (ritr == ditr->second.end()) {
ritr = ditr->second.emplace(resource.first, resource.second).first;
} else {
if (ritr->second < resource.second) {
ritr->second = resource.second;
}
UpdateMaxResource(instance_resources.second);
}
}

void
RateLimiter::ResourceManager::UpdateMaxResource(
const ResourceMap& instance_resource_map)
{
for (const auto& resource_device_map : instance_resource_map) {
auto ditr = max_resources_.find(resource_device_map.first);
if (ditr == max_resources_.end()) {
ditr = max_resources_
.emplace(resource_device_map.first, resource_device_map.second)
.first;
} else {
for (const auto& resource : resource_device_map.second) {
auto ritr = ditr->second.find(resource.first);
if (ritr == ditr->second.end()) {
ritr = ditr->second.emplace(resource.first, resource.second).first;
} else {
if (ritr->second < resource.second) {
ritr->second = resource.second;
}
}
}
}
}
}

Status
RateLimiter::ResourceManager::ParseAndValidateResources()
{
if (!explicit_max_resources_.empty()) {
RETURN_IF_ERROR(ParseAndValidateExplicitResources());
}
Expand All @@ -830,9 +875,9 @@ RateLimiter::ResourceManager::UpdateResourceLimits()
std::string resource_map_str{"\nMax Resource Map===>\n"};
for (const auto& ditr : max_resources_) {
if (!ditr.second.empty()) {
std::string device_str{(ditr.first == GLOBAL_RESOURCE_KEY)
? "GLOBAL"
: std::to_string(ditr.first)};
std::string device_str{
(ditr.first == GLOBAL_RESOURCE_KEY) ? "GLOBAL"
: std::to_string(ditr.first)};
resource_map_str += "\tDevice: " + device_str + "\n";
for (const auto& ritr : ditr.second) {
resource_map_str += "\t\tResource: " + ritr.first +
Expand Down
Loading