Skip to content

Commit

Permalink
fluence: refactor to use new PodGroup
Browse files Browse the repository at this point in the history
Problem: fluence should only be storing state of jobid and
presence of a group name in a map to indicate node assignment.
Soluion: update the code here. Note that this is not working
yet, and I am pushing / opening the PR to not use the work
(and will update accordingly, and using this PR to test).

Signed-off-by: vsoch <vsoch@users.noreply.github.com>
  • Loading branch information
vsoch committed Feb 19, 2024
1 parent 000baac commit 7874d57
Show file tree
Hide file tree
Showing 9 changed files with 466 additions and 520 deletions.
198 changes: 101 additions & 97 deletions README.md

Large diffs are not rendered by default.

25 changes: 25 additions & 0 deletions docs/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Development Notes

## Thinking

> Updated February 15, 2024
What I think might be happening (and not always, sometimes)

- New pod group, no node list
- Fluence assigns nodes
- Nodes get assigned to pods 1:1
- POD group is deleted
- Some pod is sent back to queue (kubelet rejects, etc)
- POD group does not exist and is recreated, no node list
- Fluence asks again, but still has the first job. Not enough resources, asks forever.

The above would not happen with the persistent pod group (if it wasn't cleaned up until the deletion of the job) and wouldn't happen if there are just enough resources to account for the overlap.

- Does Fluence allocate resources for itself?
- It would be nice to be able to inspect the state of Fluence.
- At some point we want to be using the TBA fluxion-go instead of the one off branch we currently have (but we don't need to be blocked for that)
- We should (I think) restore pod group (it's in the controller here) and have our own container built. That way we have total control over the custom resource, and we don't risk it going away.
- As a part of that, we can add add a mutating webhook that emulates what we are doing in fluence now to find the label, but instead we will create the CRD to hold state instead of trying to hold in the operator.
- It could then also be investigated that we can more flexibly change the size of the group, within some min/max size (also determined by labels?) to help with scheduling.
- Note that kueue has added a Pod Group object, so probably addresses the static case here.
4 changes: 2 additions & 2 deletions examples/simple_example/fluence-scheduler-pod.yaml
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
apiVersion: v1
kind: Pod
metadata:
name: fluence-scheduled-pod-1
name: fluence-scheduled-pod
labels:
name: scheduler-example
spec:
schedulerName: fluence
containers:
- name: fluence-scheduled-container
image: registry.k8s.io/pause:2.0
image: registry.k8s.io/pause:2.0
202 changes: 80 additions & 122 deletions sig-scheduler-plugins/pkg/fluence/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,7 @@ package core
import (
"fmt"

v1 "k8s.io/api/core/v1"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
klog "k8s.io/klog/v2"

"k8s.io/kubernetes/pkg/scheduler/framework"
pb "sigs.k8s.io/scheduler-plugins/pkg/fluence/fluxcli-grpc"
Expand All @@ -26,13 +23,9 @@ func (s *FluxStateData) Clone() framework.StateData {
return &FluxStateData{NodeCache: s.NodeCache}
}

// NewFluxState creates an entry for the CycleState with the minimum that we might need
func NewFluxState(nodeName string, groupName string, size int32) *FluxStateData {
cache := NodeCache{
NodeName: nodeName,
GroupName: groupName,
MinGroupSize: size,
}
// NewFluxState creates an entry for the CycleState with the node and group name
func NewFluxState(nodeName string, groupName string) *FluxStateData {
cache := NodeCache{NodeName: nodeName}
return &FluxStateData{NodeCache: cache}
}

Expand All @@ -42,162 +35,127 @@ func NewFluxState(nodeName string, groupName string, size int32) *FluxStateData
type NodeCache struct {
NodeName string

// This is derived from tasks, where
// task is an allocation to some node
// High level it is most often akin to the
// number of pods on the node. I'm not sure that I understand this
// https://github.com/flux-framework/flux-k8s/blob/9f24f36752e3cced1b1112d93bfa366fb58b3c84/src/fluence/fluxion/fluxion.go#L94-L97
// How does that relate to a single pod? It is called "Count" in other places
Tasks int
// Tie assignment back to PodGroup, which can be used to get size and time created
GroupName string

// These fields are primarily for the FluxStateData
// Without a PodGroup CRD we keep min size here
MinGroupSize int32
GroupName string
// Assigned tasks (often pods) to nodes
// https://github.com/flux-framework/flux-k8s/blob/9f24f36752e3cced1b1112d93bfa366fb58b3c84/src/fluence/fluxion/fluxion.go#L94-L97
AssignedTasks int
}

// A pod group cache holds a list of nodes for an allocation, where each has some number of tasks
// along with the expected group size. This is intended to replace PodGroup
// given the group name, size (derived from annotations) and timestamp
type PodGroupCache struct {
GroupName string

// This is a cache of nodes for pods
Nodes []NodeCache
Size int32
Name string

// Keep track of when the group was initially created!
// This is like, the main thing we need.
TimeCreated metav1.MicroTime
}

// Memory cache of pod group name to pod group cache, above
var podGroupCache map[string]*PodGroupCache
// PodGroups seen by fluence
var groupsSeen map[string]*PodGroupCache

// Init populates the podGroupCache
// Init populates the groupsSeen cache
func Init() {
podGroupCache = map[string]*PodGroupCache{}
}

// RegisterPodGroup ensures that the PodGroup exists in the cache
// This is an experimental replacement for an actual PodGroup
// We take a timestampo, which if called from Less (during sorting) is tiem.Time
// if called later (an individual pod) we go for its creation timestamp
func RegisterPodGroup(pod *v1.Pod, groupName string, groupSize int32) error {
entry, ok := podGroupCache[groupName]

if !ok {

// Assume we create the group with the timestamp
// of the first pod seen. There might be imperfections
// by the second, but as long as we sort them via millisecond
// this should prevent interleaving
nodes := []NodeCache{}

// Create the new entry for the pod group
entry = &PodGroupCache{
Name: groupName,
Size: groupSize,
Nodes: nodes,
TimeCreated: metav1.NowMicro(),
}

// Tell the user when it was created
klog.Infof("[Fluence] Pod group %s was created at %s\n", entry.Name, entry.TimeCreated)
}

// If the size has changed, we currently do not allow updating it.
// We issue a warning. In the future this could be supported with a grow command.
if entry.Size != groupSize {
klog.Infof("[Fluence] Pod group %s request to change size from %s to %s is not yet supported\n", groupName, entry.Size, groupSize)
// entry.GroupSize = groupSize
}
podGroupCache[groupName] = entry
return nil
groupsSeen = map[string]*PodGroupCache{}
}

// GetPodGroup gets a pod group in the cache by name
func GetPodGroup(groupName string) *PodGroupCache {
entry, _ := podGroupCache[groupName]
// GetFluenceCache determines if a group has been seen.
// Yes -> we return the PodGroupCache entry
// No -> the entry is nil / does not exist
func GetFluenceCache(groupName string) *PodGroupCache {
entry, _ := groupsSeen[groupName]
return entry
}

// DeletePodGroup deletes a pod from the group cache
func DeletePodGroup(groupName string) {
delete(podGroupCache, groupName)
}

// ListGroups lists groups, primarily for debugging
func ListGroups() {
for name, pg := range podGroupCache {
fmt.Printf(" %s: size %s, created at %s\n", name, pg.Size, &pg.TimeCreated)
}
delete(groupsSeen, groupName)
}

// CreateNodePodsList creates a list of node pod caches
func CreateNodePodsList(nodelist []*pb.NodeAlloc, groupName string) (nodepods []NodeCache) {
func CreateNodeList(nodelist []*pb.NodeAlloc, groupName string) (nodepods []NodeCache) {

// Create a pod cache for each node
nodepods = make([]NodeCache, len(nodelist))

// TODO: should we be integrating topology information here? Could it be the
// case that some nodes (pods) in the group should be closer?
for i, v := range nodelist {
nodepods[i] = NodeCache{
NodeName: v.GetNodeID(),
Tasks: int(v.GetTasks()),
NodeName: v.GetNodeID(),
AssignedTasks: int(v.GetTasks()),
GroupName: groupName,
}
}

// Update the pods in the PodGraphCache
updatePodGroupNodes(groupName, nodepods)
klog.Infof("[Fluence] Pod group cache updated with nodes\n", podGroupCache)
// Update the pods in the PodGroupCache (groupsSeen)
updatePodGroupCache(groupName, nodepods)
return nodepods
}

// updatePodGroupList updates the PodGroupCache with a listing of nodes
func updatePodGroupNodes(groupName string, nodes []NodeCache) {
group := podGroupCache[groupName]
group.Nodes = nodes
podGroupCache[groupName] = group
func updatePodGroupCache(groupName string, nodes []NodeCache) {
cache := PodGroupCache{
Nodes: nodes,
GroupName: groupName,
}
groupsSeen[groupName] = &cache
}

// HavePodNodes returns true if the listing of pods is not empty
// This should be all pods that are needed - the allocation will not
// be successful otherwise, so we just check > 0
func (p *PodGroupCache) HavePodNodes() bool {
return len(p.Nodes) > 0
}
// GetNextNode gets the next node in the PodGroupCache
func (p *PodGroupCache) GetNextNode() (string, error) {

// CancelAllocation resets the node cache and allocation status
func (p *PodGroupCache) CancelAllocation() {
p.Nodes = []NodeCache{}
}
nextnode := ""

// GetNextNode gets the next available node we can allocate for a group
func GetNextNode(groupName string) (string, error) {
entry, ok := podGroupCache[groupName]
if !ok {
return "", fmt.Errorf("[Fluence] Map is empty\n")
}
if len(entry.Nodes) == 0 {
return "", fmt.Errorf("[Fluence] Error while getting a node\n")
// Quick failure state - we ran out of nodes
if len(p.Nodes) == 0 {
return nextnode, fmt.Errorf("[Fluence] PodGroup %s ran out of nodes.", p.GroupName)
}

nodename := entry.Nodes[0].NodeName
klog.Infof("[Fluence] Next node for group %s is %s", groupName, nodename)
// The next is the 0th in the list
nextnode = p.Nodes[0].NodeName
klog.Infof("[Fluence] Next node for group %s is %s", p.GroupName, nextnode)

if entry.Nodes[0].Tasks == 1 {
klog.Infof("[Fluence] First node has one task")
slice := entry.Nodes[1:]
// If there is only one task left, we are going to use it (and remove the node)
if p.Nodes[0].AssignedTasks == 1 {
klog.Infof("[Fluence] First node has one remaining task slot")
slice := p.Nodes[1:]

// If after we remove the node there are no nodes left...
// Note that I'm not deleting the node from the cache because that is the
// only way fluence knows it has already assigned work (presence of the key)
if len(slice) == 0 {
klog.Infof("[Fluence] After this node, the slice is empty, deleting group %s from cache\n", groupName)
delete(podGroupCache, groupName)
return nodename, nil
klog.Infof("[Fluence] Assigning node %s. There are NO reamining nodes for group %s\n", nextnode, p.GroupName)
// delete(podGroupCache, groupName)
return nextnode, nil
}
klog.Infof("[Fluence] After this node, the slide still has nodes")
updatePodGroupNodes(groupName, slice)
return nodename, nil

klog.Infof("[Fluence] Assigning node %s. There are nodes left for group", nextnode, p.GroupName)
updatePodGroupCache(p.GroupName, slice)
return nextnode, nil
}

// If we get here the first node had >1 assigned tasks
klog.Infof("[Fluence] Assigning node %s for group %s. There are still task assignments available for this node.", nextnode, p.GroupName)
p.Nodes[0].AssignedTasks = p.Nodes[0].AssignedTasks - 1
return nextnode, nil
}

// GetNextNode gets the next available node we can allocate for a group
// TODO this should be able to take and pass forward a number of tasks.
// It is implicity 1 now, but doesn't have to be.
func GetNextNode(groupName string) (string, error) {

// Get our entry from the groupsSeen cache
klog.Infof("[Fluence] groups seen %s", groupsSeen)
entry, ok := groupsSeen[groupName]

// This case should not happen
if !ok {
return "", fmt.Errorf("[Fluence] Map is empty")
}
klog.Infof("[Fluence] Subtracting one task from first node")
entry.Nodes[0].Tasks = entry.Nodes[0].Tasks - 1
return nodename, nil
// Get the next node from the PodGroupCache
return entry.GetNextNode()
}
Loading

0 comments on commit 7874d57

Please sign in to comment.