Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flexible Scheduling with or_slot [WIP] #1296

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions resource/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ set(RESOURCE_HEADERS
schema/ephemeral.hpp
traversers/dfu.hpp
traversers/dfu_impl.hpp
traversers/dfu_flexible.hpp
traversers/dfu_traverser_policy_factory.hpp
policies/base/dfu_match_cb.hpp
policies/base/matcher.hpp
readers/resource_namespace_remapper.hpp
Expand Down Expand Up @@ -62,6 +64,8 @@ add_library(resource STATIC
schema/ephemeral.cpp
traversers/dfu.cpp
traversers/dfu_impl.cpp
traversers/dfu_flexible.cpp
traversers/dfu_traverser_policy_factory.cpp
traversers/dfu_impl_update.cpp
policies/base/dfu_match_cb.cpp
policies/base/matcher.cpp
Expand Down
1 change: 1 addition & 0 deletions resource/schema/data_std.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ resource_type_t gpu_rt{"gpu"};
resource_type_t node_rt{"node"};
resource_type_t rack_rt{"rack"};
resource_type_t slot_rt{"slot"};
resource_type_t or_slot_rt{"or_slot"};

} // namespace resource_model
} // namespace Flux
Expand Down
1 change: 1 addition & 0 deletions resource/schema/data_std.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ constexpr uint64_t resource_type_id{1};
struct resource_type_tag {};
using resource_type_t = intern::interned_string<intern::dense_storage<resource_type_tag, uint16_t>>;
extern resource_type_t slot_rt;
extern resource_type_t or_slot_rt;
extern resource_type_t cluster_rt;
extern resource_type_t rack_rt;
extern resource_type_t node_rt;
Expand Down
324 changes: 324 additions & 0 deletions resource/traversers/dfu_flexible.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,324 @@
/*****************************************************************************\
* Copyright 2024 Lawrence Livermore National Security, LLC
* (c.f. AUTHORS, NOTICE.LLNS, LICENSE)
*
* This file is part of the Flux resource manager framework.
* For details, see https://github.com/flux-framework.
*
* SPDX-License-Identifier: LGPL-3.0
\*****************************************************************************/

extern "C" {
#if HAVE_CONFIG_H
#include "config.h"
#endif
}

#include "resource/traversers/dfu_flexible.hpp"

using namespace Flux::Jobspec;
using namespace Flux::resource_model;
using namespace Flux::resource_model::detail;

int dfu_flexible_t::match (vtx_t u,
const std::vector<Resource> &resources,
const Resource **slot_resource,
unsigned int *nslots,
const Resource **match_resource,
const std::vector<Resource> **slot_resources)
{
int rc = -1;
bool matched = false, or_matched = false;
for (auto &resource : resources) {
if ((*m_graph)[u].type == resource.type) {
// Limitations of DFU traverser: jobspec must not
// have same type at same level Please read utilities/README.md
if (matched || or_matched)
goto ret;
*match_resource = &resource;
if (!resource.with.empty ()) {
for (auto &c_resource : resource.with) {
if (c_resource.type == slot_rt) {
*slot_resource = &c_resource;
*nslots = m_match->calc_effective_max (c_resource);
}
}
}
matched = true;
} else if (resource.type == slot_rt) {
// Limitations of DFU traverser: jobspec must not
// have same type at same level except for or_slot.
if (matched)
goto ret;
*slot_resources = &resources;
// This value is not well defined. In this state, nslots is
// determined by the last listed or_slot sibling in the jobspec.
*nslots = m_match->calc_effective_max (resource);
or_matched = true;
}
}
rc = 0;

ret:
return rc;
}

const std::vector<Resource> &dfu_flexible_t::test (vtx_t u,
const std::vector<Resource> &resources,
bool &pristine,
unsigned int &nslots,
match_kind_t &spec)
{
/* Note on the purpose of pristine: we differentiate two similar but
* distinct cases with this parameter.
* Jobspec is allowed to omit the prefix so you can have a spec like
* socket[1]->core[2] which will match
* cluster[1]->node[1]->socket[2]->core[22].
* For this case, when you visit the "node" resource vertex, the next
* Jobspec resource that should be used at the next recursion should
* be socket[1]. And we enable this if pristine is true.
* But then once the first match is made, any mismatch afterwards
* should result in a match failure. For example,
* socket[1]->core[2] must fail to match
* cluster[1]->socket[1]->numanode[1]->core[22].
* pristine is used to detect this case.
*/
bool slot = true;
const std::vector<Resource> *ret = &resources;
const Resource *slot_resources = NULL;
const Resource *match_resources = NULL;
const std::vector<Resource> *slot_or_resources = NULL;
if (match (u, resources, &slot_resources, &nslots, &match_resources, &slot_or_resources) < 0) {
m_err_msg += __FUNCTION__;
m_err_msg += ": siblings in jobspec request same resource type ";
m_err_msg += ": " + (*m_graph)[u].type + ".\n";
spec = match_kind_t::NONE_MATCH;
goto done;
}
if ((slot_or_resources)) {
// set default spec in case no match is found
spec = pristine ? match_kind_t::PRISTINE_NONE_MATCH : match_kind_t::NONE_MATCH;

for (Resource r : *slot_or_resources) {
if ((slot_match (u, &r))) {
spec = match_kind_t::SLOT_MATCH;
pristine = false;
ret = slot_or_resources;
}
}
} else if (match_resources) {
spec = match_kind_t::RESOURCE_MATCH;
pristine = false;
ret = &(match_resources->with);
} else {
spec = pristine ? match_kind_t::PRISTINE_NONE_MATCH : match_kind_t::NONE_MATCH;
}

done:
return *ret;
}

/* Same as above except that lowest is unorder_map */
int dfu_flexible_t::min_if (subsystem_t subsystem,
resource_type_t type,
unsigned int counts,
std::unordered_map<resource_type_t, int64_t> &lowest)
{
int rc = -1;
if (m_match->is_pruning_type (subsystem, type)) {
if (lowest.find (type) == lowest.end ())
lowest[type] = counts;
else if (lowest[type] > counts)
lowest[type] = counts;
rc = 0;
}
return rc;
}

void dfu_flexible_t::prime_jobspec (std::vector<Resource> &resources,
std::unordered_map<resource_type_t, int64_t> &to_parent)
{
subsystem_t subsystem = m_match->dom_subsystem ();
for (auto &resource : resources) {
// If the resource is requested as exclusive in the
// jobspec, add it to the matcher's exclusive resource
// set. This ensures that the full resource set (which
// includes shadow resources) is emitted.
if (resource.exclusive == Jobspec::tristate_t::TRUE)
m_match->add_exclusive_resource_type (resource.type);
// Use minimum requirement because you don't want to prune search
// as far as a subtree satisfies the minimum requirement
accum_if (subsystem, resource.type, resource.count.min, to_parent);
prime_jobspec (resource.with, resource.user_data);

// Or slots should use a minimum of values rather than an accumulation
// otherwise possible matches may be filtered out
if (resource.type == or_slot_rt) {
for (auto &aggregate : resource.user_data) {
min_if (subsystem,
aggregate.first,
resource.count.min * aggregate.second,
to_parent);
}
} else {
for (auto &aggregate : resource.user_data) {
accum_if (subsystem,
aggregate.first,
resource.count.min * aggregate.second,
to_parent);
}
}
}
}

std::tuple<std::map<resource_type_t, int>, int, int> dfu_flexible_t::select_or_config (
const std::vector<Resource> &slots,
std::map<resource_type_t, int> resource_counts,
unsigned int nslots,
std::unordered_map<Key, std::tuple<std::map<resource_type_t, int>, int, int>, Hash> &or_config)
{
int best = -1;
int i = -1;
Key index = Key (resource_counts);

// if available, use precomputed result
auto it = or_config.find (resource_counts);
if (it != or_config.end ())
return it->second;

for (auto slot : slots) {
int test;
++i;
bool match = true;
std::map<resource_type_t, int> updated_counts;
updated_counts = resource_counts;

// determine if there are enough resources to match with this or_slot
for (auto slot_elem : slot.with) {
unsigned int qc = resource_counts[slot_elem.type];
unsigned int count = m_match->calc_count (slot_elem, qc);
if (count <= 0) {
match = false;
break;
}
updated_counts[slot_elem.type] = updated_counts[slot_elem.type] - count;
}
if (!match)
continue;

test = std::get<1> (select_or_config (slots, updated_counts, nslots, or_config));
if (best < test) {
best = test;
or_config[index] = std::make_tuple (updated_counts, best + 1, i);
}
}

// if there are no matches, set default score of 0
// score represents the total number of or_slots that can be scheduled
// with optimal selection of or_slots
if (best < 0) {
std::map<resource_type_t, int> empty;
or_config[index] = std::make_tuple (empty, best + 1, -1);
}
return or_config[index];
}

int dfu_flexible_t::dom_slot (const jobmeta_t &meta,
vtx_t u,
const std::vector<Resource> &slots,
unsigned int nslots,
bool pristine,
bool *excl,
scoring_api_t &dfu)
{
int rc;
bool x_inout = true;
unsigned int qual_num_slots = 0;
std::vector<eval_egroup_t> edg_group_vector;
const subsystem_t &dom = m_match->dom_subsystem ();
std::unordered_set<edg_t *> edges_used;
scoring_api_t dfu_slot;
std::unordered_map<Key, std::tuple<std::map<resource_type_t, int>, int, int>, Hash> or_config;
std::tuple<std::map<resource_type_t, int>, int, int> current_config;

// collect a set of all resource types in the or_slots to get resource
// counts. This does not work well with non leaf vertex resources because
// it cannot distinguish beyond type. This may be resolveable if graph
// coloring is removed during the selection process.
std::vector<Resource> slot_resource_union;
std::map<resource_type_t, int> resource_types;
for (auto &slot : slots) {
for (auto r : slot.with) {
if (resource_types.find (r.type) == resource_types.end ()) {
resource_types[r.type] = 0;
slot_resource_union.push_back (r);
}
}
}

if ((rc = explore (meta,
u,
dom,
slot_resource_union,
pristine,
&x_inout,
visit_t::DFV,
dfu_slot,
nslots))
!= 0)
goto done;
if ((rc = m_match->dom_finish_slot (dom, dfu_slot)) != 0)
goto done;

for (auto &it : resource_types) {
it.second = dfu_slot.qualified_count (dom, it.first);
}

// calculate the ideal or_slot config for avail resources.
// tuple is (key to next best option, current score, index of current best or_slot)
current_config = select_or_config (slots, resource_types, nslots, or_config);

qual_num_slots = std::get<1> (current_config);
for (unsigned int i = 0; i < qual_num_slots; ++i) {
auto slot_index = std::get<2> (current_config);
eval_egroup_t edg_group;
int64_t score = MATCH_MET;

// use calculated index to determine which or_slot type to use
for (auto &slot_elem : slots[slot_index].with) {
unsigned int j = 0;
unsigned int qc = dfu_slot.qualified_count (dom, slot_elem.type);
unsigned int count = m_match->calc_count (slot_elem, qc);
while (j < count) {
auto egroup_i = dfu_slot.eval_egroups_iter_next (dom, slot_elem.type);
if (egroup_i == dfu_slot.eval_egroups_end (dom, slot_elem.type)) {
m_err_msg += __FUNCTION__;
m_err_msg += ": not enough slots.\n";
qual_num_slots = 0;
goto done;
}
eval_edg_t ev_edg ((*egroup_i).edges[0].count,
(*egroup_i).edges[0].count,
1,
(*egroup_i).edges[0].edge);
score += (*egroup_i).score;
edg_group.edges.push_back (ev_edg);
j += (*egroup_i).edges[0].count;
}
}
edg_group.score = score;
edg_group.count = 1;
edg_group.exclusive = 1;
edg_group_vector.push_back (edg_group);

current_config = or_config[Key (std::get<0> (current_config))];
}
for (auto &edg_group : edg_group_vector)
dfu.add (dom, or_slot_rt, edg_group);

done:
return (qual_num_slots) ? 0 : -1;
}
/*
* vi:tabstop=4 shiftwidth=4 expandtab
*/
Loading
Loading