diff --git a/manager/orchestrator/constraintenforcer/constraint_enforcer.go b/manager/orchestrator/constraintenforcer/constraint_enforcer.go index 5faae5a025..296767852e 100644 --- a/manager/orchestrator/constraintenforcer/constraint_enforcer.go +++ b/manager/orchestrator/constraintenforcer/constraint_enforcer.go @@ -174,17 +174,22 @@ loop: removeTasks[t.ID] = t continue } + + available.MemoryBytes -= t.Spec.Resources.Reservations.MemoryBytes + available.NanoCPUs -= t.Spec.Resources.Reservations.NanoCPUs + } + + // Ensure that the task assigned to the node + // still satisfies the available generic resources + if t.AssignedGenericResources != nil { for _, ta := range t.AssignedGenericResources { // Type change or no longer available - if genericresource.HasResource(ta, available.Generic) { + if !genericresource.HasResource(ta, available.Generic) { removeTasks[t.ID] = t break loop } } - available.MemoryBytes -= t.Spec.Resources.Reservations.MemoryBytes - available.NanoCPUs -= t.Spec.Resources.Reservations.NanoCPUs - genericresource.ClaimResources(&available.Generic, &fakeStore, t.AssignedGenericResources) } diff --git a/manager/orchestrator/constraintenforcer/constraint_enforcer_test.go b/manager/orchestrator/constraintenforcer/constraint_enforcer_test.go index f9e5638e31..a7b007c099 100644 --- a/manager/orchestrator/constraintenforcer/constraint_enforcer_test.go +++ b/manager/orchestrator/constraintenforcer/constraint_enforcer_test.go @@ -4,6 +4,7 @@ import ( "testing" "github.com/moby/swarmkit/v2/api" + "github.com/moby/swarmkit/v2/api/genericresource" "github.com/moby/swarmkit/v2/manager/orchestrator/testutils" "github.com/moby/swarmkit/v2/manager/state" "github.com/moby/swarmkit/v2/manager/state/store" @@ -262,7 +263,7 @@ func TestOutdatedTaskPlacementConstraints(t *testing.T) { defer s.Close() require.NoError(t, s.Update(func(tx store.Tx) error { - // Prepoulate node, service, and task. + // Prepopulate node, service, and task. for _, err := range []error{ store.CreateNode(tx, node), store.CreateService(tx, service), @@ -294,3 +295,99 @@ func TestOutdatedTaskPlacementConstraints(t *testing.T) { task = testutils.WatchTaskUpdate(t, watch) assert.Equal(t, api.TaskStateRejected, task.Status.State) } + +func TestGenericResourcesPlacementConstraints(t *testing.T) { + node := &api.Node{ + ID: "id0", + Spec: api.NodeSpec{ + Annotations: api.Annotations{ + Name: "node1", + }, + Availability: api.NodeAvailabilityActive, + }, + Status: api.NodeStatus{ + State: api.NodeStatus_READY, + }, + Role: api.NodeRoleWorker, + Description: &api.NodeDescription{ + Resources: &api.Resources{ + Generic: genericresource.NewSet("mygeneric", "1"), + }, + }, + } + + service := &api.Service{ + ID: "id1", + Spec: api.ServiceSpec{ + Annotations: api.Annotations{ + Name: "service1", + }, + Task: api.TaskSpec{ + Resources: &api.ResourceRequirements{ + Reservations: &api.Resources{ + Generic: genericresource.NewSet("mygeneric", "1"), + }, + }, + }, + }, + } + + task := &api.Task{ + ID: "id2", + Spec: api.TaskSpec{ + Resources: &api.ResourceRequirements{ + Reservations: &api.Resources{ + Generic: genericresource.NewSet("mygeneric", "1"), + }, + }, + }, + ServiceID: service.ID, + NodeID: node.ID, + Status: api.TaskStatus{ + State: api.TaskStateRunning, + }, + DesiredState: api.TaskStateRunning, + AssignedGenericResources: genericresource.NewSet("mygeneric", "1"), + } + + s := store.NewMemoryStore(nil) + require.NotNil(t, s) + defer s.Close() + + require.NoError(t, s.Update(func(tx store.Tx) error { + // Prepopulate node, service, and task. + for _, err := range []error{ + store.CreateNode(tx, node), + store.CreateService(tx, service), + store.CreateTask(tx, task), + } { + if err != nil { + return err + } + } + return nil + })) + + watch, cancel := state.Watch(s.WatchQueue(), api.EventUpdateTask{}) + defer cancel() + + constraintEnforcer := New(s) + defer constraintEnforcer.Stop() + + go constraintEnforcer.Run() + + // Update the node to remove the generic resource + require.NoError(t, s.Update(func(tx store.Tx) error { + node = store.GetNode(tx, node.ID) + node.Description = &api.NodeDescription{ + Resources: &api.Resources{ + Generic: genericresource.NewSet("mygeneric", "2"), + }, + } + return store.UpdateNode(tx, node) + })) + + // The task should be rejected immediately. + task = testutils.WatchTaskUpdate(t, watch) + assert.Equal(t, api.TaskStateRejected, task.Status.State) +}