Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Coscheduling: remove podgroup.scheduled #574

Merged
merged 5 commits into from
Jul 1, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 1 addition & 11 deletions apis/scheduling/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,16 +91,10 @@ const (
// PodGroupRunning means the `spec.minMember` pods of the pod group are in running phase.
PodGroupRunning PodGroupPhase = "Running"

// PodGroupPreScheduling means all pods of the pod group have enqueued and are waiting to be scheduled.
PodGroupPreScheduling PodGroupPhase = "PreScheduling"

// PodGroupScheduling means partial pods of the pod group have been scheduled and are in running phase
// PodGroupScheduling means the number of pods scheduled is bigger than `spec.minMember`
// but the number of running pods has not reached the `spec.minMember` pods of PodGroups.
PodGroupScheduling PodGroupPhase = "Scheduling"

// PodGroupScheduled means the `spec.minMember` pods of the pod group have been scheduled and are in running phase.
PodGroupScheduled PodGroupPhase = "Scheduled"

// PodGroupUnknown means a part of `spec.minMember` pods of the pod group have been scheduled but the others can not
// be scheduled due to, e.g. not enough resource; scheduler will wait for related controllers to recover them.
PodGroupUnknown PodGroupPhase = "Unknown"
Expand Down Expand Up @@ -163,10 +157,6 @@ type PodGroupStatus struct {
// It is empty if not initialized.
OccupiedBy string `json:"occupiedBy,omitempty"`

// The number of actively running pods.
// +optional
Scheduled int32 `json:"scheduled,omitempty"`

// The number of actively running pods.
// +optional
Running int32 `json:"running,omitempty"`
Expand Down
4 changes: 0 additions & 4 deletions config/crd/bases/scheduling.x-k8s.io_podgroups.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,6 @@ spec:
description: ScheduleStartTime of the group
format: date-time
type: string
scheduled:
description: The number of actively running pods.
format: int32
type: integer
succeeded:
description: The number of pods which reached phase Succeeded.
format: int32
Expand Down
57 changes: 48 additions & 9 deletions kep/42-podgroup-coscheduling/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@
- [Use Cases](#use-cases)
- [Design Details](#design-details)
- [PodGroup](#podgroup)
- [PodGroupPhase](#podgroupphase)
- [Controller](#controller)
- [Extension points](#extension-points)
- [QueueSort](#queuesort)
- [PreFilter](#prefilter)
- [PostFilter](#postfilter)
- [Permit](#permit)
- [PostBind](#postbind)
- [Known Limitations](#known-limitations)
<!-- /toc -->

Expand Down Expand Up @@ -62,10 +62,6 @@ type PodGroupStatus struct {
// It is empty if not initialized.
OccupiedBy string `json:"occupiedBy,omitempty"`

// The number of actively running pods.
// +optional
Scheduled uint32 `json:"scheduled"`

// The number of actively running pods.
// +optional
Running uint32 `json:"running"`
Expand All @@ -83,6 +79,53 @@ type PodGroupStatus struct {
}
```

#### PodGroupPhase
`PodGroup` has the following states called `PodGroupStatus`.

```go
// These are the valid phase of podGroups.
const (
// PodGroupPending means the pod group has been accepted by the system, but scheduler can not allocate
// enough resources to it.
PodGroupPending PodGroupPhase = "Pending"

// PodGroupRunning means the `spec.minMember` pods of the pod group are in running phase.
PodGroupRunning PodGroupPhase = "Running"

// PodGroupScheduling means the number of pods scheduled is bigger than `spec.minMember`
// but the number of running pods has not reached the `spec.minMember` pods of PodGroups.
PodGroupScheduling PodGroupPhase = "Scheduling"

// PodGroupUnknown means a part of `spec.minMember` pods of the pod group have been scheduled but the others can not
// be scheduled due to, e.g. not enough resource; scheduler will wait for related controllers to recover them.
PodGroupUnknown PodGroupPhase = "Unknown"

// PodGroupFinished means the `spec.minMember` pods of the pod group are successfully finished.
PodGroupFinished PodGroupPhase = "Finished"

// PodGroupFailed means at least one of `spec.minMember` pods have failed.
PodGroupFailed PodGroupPhase = "Failed"

// PodGroupLabel is the default label of coscheduling
PodGroupLabel = scheduling.GroupName + "/pod-group"
)
```

```mermaid
stateDiagram-v2
state if_minMember <<choice>>
[*] --> Pending
Pending --> Scheduling: minMember pods are scheduling
Scheduling --> Running: minMember pods are running
Running --> Failed: at least one of the pods failed
Failed --> if_minMember: failed fixed
Gekko0114 marked this conversation as resolved.
Show resolved Hide resolved
if_minMember --> Scheduling: minMember pods are scheduling
if_minMember --> Running: minMember pods are running
Running --> Finished: all pods successfully finished
Finished --> [*]
```


### Controller

We define a controller to reconcile PodGroup status, and we can query the job status through describing the PodGroup. Once a pod in a group failed, the Group Status is marked Failed. Controller would also help recover from abnormal cases, e.g. batch scheduling is interrupted due to
Expand Down Expand Up @@ -124,10 +167,6 @@ If the gap to reach the quorum of a PodGroup is greater than 10%, we reject the

We can define `MaxScheduleTime` for a PodGroup. If any pod times out, the whole group would be rejected.

#### PostBind

This extension is primarily used to record the PodGroup Status. When a pod is bound successfully, we would update the status of its affiliated PodGroup.

### Known Limitations

1. We cannot support group preemption very well now. Though we have tried to implement the extensions, it still cannot meet production requirements.
4 changes: 0 additions & 4 deletions manifests/coscheduling/crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,6 @@ spec:
description: ScheduleStartTime of the group
format: date-time
type: string
scheduled:
description: The number of actively running pods.
format: int32
type: integer
succeeded:
description: The number of pods which reached phase Succeeded.
format: int32
Expand Down
12 changes: 5 additions & 7 deletions pkg/controllers/podgroup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (r *PodGroupReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
}
// If startScheduleTime - createTime > 2days,
// do not reconcile again because pod may have been GCed
if pg.Status.Scheduled == pg.Spec.MinMember && pg.Status.Running == 0 &&
if (pg.Status.Phase == schedv1alpha1.PodGroupScheduling || pg.Status.Phase == schedv1alpha1.PodGroupPending) && pg.Status.Running == 0 &&
pg.Status.ScheduleStartTime.Sub(pg.CreationTimestamp.Time) > 48*time.Hour {
r.recorder.Event(pg, v1.EventTypeWarning,
"Timeout", "schedule time longer than 48 hours")
Expand All @@ -108,20 +108,18 @@ func (r *PodGroupReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
pgCopy.Status.Phase = schedv1alpha1.PodGroupPending
case schedv1alpha1.PodGroupPending:
if len(pods) >= int(pg.Spec.MinMember) {
pgCopy.Status.Phase = schedv1alpha1.PodGroupPreScheduling
pgCopy.Status.Phase = schedv1alpha1.PodGroupScheduling
fillOccupiedObj(pgCopy, &pods[0])
}
default:
pgCopy.Status.Running, pgCopy.Status.Succeeded, pgCopy.Status.Failed = getCurrentPodStats(pods)

if len(pods) == 0 {
if len(pods) < int(pg.Spec.MinMember) {
pgCopy.Status.Phase = schedv1alpha1.PodGroupPending
break
}

if pgCopy.Status.Scheduled >= pgCopy.Spec.MinMember &&
pgCopy.Status.Phase == schedv1alpha1.PodGroupScheduling {
pgCopy.Status.Phase = schedv1alpha1.PodGroupScheduled
if pgCopy.Status.Succeeded+pgCopy.Status.Running < pg.Spec.MinMember {
pgCopy.Status.Phase = schedv1alpha1.PodGroupScheduling
}

if pgCopy.Status.Succeeded+pgCopy.Status.Running >= pg.Spec.MinMember {
Expand Down
85 changes: 44 additions & 41 deletions pkg/controllers/podgroup_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2/klogr"
st "k8s.io/kubernetes/pkg/scheduler/testing"

Expand Down Expand Up @@ -58,16 +59,16 @@ func Test_Run(t *testing.T) {
minMember: 2,
podNames: []string{"pod1", "pod2"},
podPhase: v1.PodRunning,
previousPhase: v1alpha1.PodGroupScheduled,
previousPhase: v1alpha1.PodGroupScheduling,
desiredGroupPhase: v1alpha1.PodGroupRunning,
},
{
name: "Group running, more than min member",
pgName: "pg11",
minMember: 2,
podNames: []string{"pod11", "pod21"},
podNames: []string{"pod11", "pod21", "pod31", "pod41"},
podPhase: v1.PodRunning,
previousPhase: v1alpha1.PodGroupScheduled,
previousPhase: v1alpha1.PodGroupScheduling,
desiredGroupPhase: v1alpha1.PodGroupRunning,
},
{
Expand All @@ -76,7 +77,7 @@ func Test_Run(t *testing.T) {
minMember: 2,
podNames: []string{"pod1", "pod2"},
podPhase: v1.PodFailed,
previousPhase: v1alpha1.PodGroupScheduled,
previousPhase: v1alpha1.PodGroupScheduling,
desiredGroupPhase: v1alpha1.PodGroupFailed,
},
{
Expand All @@ -85,20 +86,20 @@ func Test_Run(t *testing.T) {
minMember: 2,
podNames: []string{"pod1", "pod2"},
podPhase: v1.PodSucceeded,
previousPhase: v1alpha1.PodGroupScheduled,
previousPhase: v1alpha1.PodGroupScheduling,
desiredGroupPhase: v1alpha1.PodGroupFinished,
},
{
name: "Group status convert from scheduling to scheduled",
name: "Group status convert from pending to prescheduling",
pgName: "pg4",
minMember: 2,
podNames: []string{"pod1", "pod2"},
podPhase: v1.PodPending,
previousPhase: v1alpha1.PodGroupScheduling,
desiredGroupPhase: v1alpha1.PodGroupScheduled,
previousPhase: v1alpha1.PodGroupPending,
desiredGroupPhase: v1alpha1.PodGroupScheduling,
},
{
name: "Group status convert from scheduling to succeed",
name: "Group status convert from scheduling to finished",
pgName: "pg5",
minMember: 2,
podNames: []string{"pod1", "pod2"},
Expand All @@ -108,16 +109,16 @@ func Test_Run(t *testing.T) {
podNextPhase: v1.PodSucceeded,
},
{
name: "Group status convert from pending to scheduling",
name: "Group status keeps pending",
pgName: "pg6",
minMember: 3,
podNames: []string{"pod1", "pod2"},
podPhase: v1.PodRunning,
previousPhase: v1alpha1.PodGroupPending,
desiredGroupPhase: v1alpha1.PodGroupScheduling,
desiredGroupPhase: v1alpha1.PodGroupPending,
},
{
name: "Group status convert from pending to prescheduling",
name: "Group status convert from pending to finished",
pgName: "pg7",
minMember: 2,
podNames: []string{"pod1", "pod2"},
Expand Down Expand Up @@ -148,8 +149,8 @@ func Test_Run(t *testing.T) {
{
name: "Group status convert from running to pending",
pgName: "pg10",
minMember: 2,
podNames: []string{},
minMember: 4,
podNames: []string{"pod101", "pod102"},
podPhase: v1.PodPending,
previousPhase: v1alpha1.PodGroupRunning,
desiredGroupPhase: v1alpha1.PodGroupPending,
Expand All @@ -158,35 +159,37 @@ func Test_Run(t *testing.T) {
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
controller, kClient := setUp(ctx, c.podNames, c.pgName, c.podPhase, c.minMember, c.previousPhase, c.podGroupCreateTime, nil)
ps := makePods(c.podNames, c.pgName, c.podPhase, nil)
Gekko0114 marked this conversation as resolved.
Show resolved Hide resolved
// 0 means not set
if len(c.podNextPhase) != 0 {
Gekko0114 marked this conversation as resolved.
Show resolved Hide resolved
ps := makePods(c.podNames, c.pgName, c.podNextPhase, nil)
for _, p := range ps {
kClient.Status().Update(ctx, p)
reqs := controller.podToPodGroup(p)
for _, req := range reqs {
if _, err := controller.Reconcile(ctx, req); err != nil {
t.Errorf("reconcile: (%v)", err)
}
ps = makePods(c.podNames, c.pgName, c.podNextPhase, nil)
}
for _, p := range ps {
kClient.Status().Update(ctx, p)
reqs := controller.podToPodGroup(p)
for _, req := range reqs {
if _, err := controller.Reconcile(ctx, req); err != nil {
t.Errorf("reconcile: (%v)", err)
}
}
}

pg := &v1alpha1.PodGroup{
ObjectMeta: metav1.ObjectMeta{
Name: c.pgName,
Namespace: metav1.NamespaceDefault,
},
}
err := kClient.Get(ctx, client.ObjectKeyFromObject(pg), pg)
if err != nil {
t.Fatal(err)
}
if pg.Status.Phase != c.desiredGroupPhase {
t.Fatalf("want %v, got %v", c.desiredGroupPhase, pg.Status.Phase)
}
if err != nil {
t.Fatal("Unexpected error", err)
}
pg := &v1alpha1.PodGroup{
ObjectMeta: metav1.ObjectMeta{
Name: c.pgName,
Namespace: metav1.NamespaceDefault,
},
}
err := kClient.Get(ctx, client.ObjectKeyFromObject(pg), pg)
if err != nil {
t.Fatal(err)
}

if pg.Status.Phase != c.desiredGroupPhase {
t.Fatalf("want %v, got %v", c.desiredGroupPhase, pg.Status.Phase)
}
if err != nil {
t.Fatal("Unexpected error", err)
}
})
}
Expand Down Expand Up @@ -284,8 +287,9 @@ func setUp(ctx context.Context,
client := fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(objs...).Build()

controller := &PodGroupReconciler{
Client: client,
Scheme: s,
Client: client,
Scheme: s,
recorder: record.NewFakeRecorder(3),

log: klogr.New().WithName("podGroupTest"),
}
Expand Down Expand Up @@ -319,7 +323,6 @@ func makePG(pgName string, minMember int32, previousPhase v1alpha1.PodGroupPhase
},
Status: v1alpha1.PodGroupStatus{
OccupiedBy: "test",
Scheduled: minMember,
ScheduleStartTime: metav1.Time{Time: time.Now()},
Phase: previousPhase,
},
Expand Down
3 changes: 0 additions & 3 deletions pkg/coscheduling/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,6 @@ profiles:
reserve:
enabled:
- name: Coscheduling
postBind:
enabled:
- name: Coscheduling
```

### Demo
Expand Down
2 changes: 1 addition & 1 deletion test/integration/podgroup_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func TestPodGroupController(t *testing.T) {
midPriority).Label(v1alpha1.PodGroupLabel, "pg1-1").Node(nodeName).Obj(),
},
intermediatePGState: []*v1alpha1.PodGroup{
util.UpdatePGStatus(util.MakePG("pg1-1", ns, 3, nil, nil), "PreScheduling", "", 0, 0, 0, 0),
util.UpdatePGStatus(util.MakePG("pg1-1", ns, 3, nil, nil), "Scheduling", "", 0, 0, 0, 0),
},
incomingPods: []*v1.Pod{
st.MakePod().Namespace(ns).Name("t1-p1-1").Req(map[v1.ResourceName]string{v1.ResourceMemory: "50"}).Priority(
Expand Down
1 change: 0 additions & 1 deletion test/util/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ func UpdatePGStatus(pg *v1alpha1.PodGroup, phase v1alpha1.PodGroupPhase, occupie
pg.Status = v1alpha1.PodGroupStatus{
Phase: phase,
OccupiedBy: occupiedBy,
Scheduled: scheduled,
Running: running,
Succeeded: succeeded,
Failed: failed,
Expand Down