diff --git a/manager/allocator/allocator_test.go b/manager/allocator/allocator_test.go index 38503d73cd..ebd8a16de4 100644 --- a/manager/allocator/allocator_test.go +++ b/manager/allocator/allocator_test.go @@ -16,6 +16,11 @@ import ( "github.com/stretchr/testify/require" ) +func init() { + // set artificially low retry interval for testing + retryInterval = 5 * time.Millisecond +} + func TestAllocator(t *testing.T) { s := store.NewMemoryStore(nil) assert.NotNil(t, s) diff --git a/manager/allocator/network.go b/manager/allocator/network.go index 38871fa0e7..675f13c79f 100644 --- a/manager/allocator/network.go +++ b/manager/allocator/network.go @@ -26,7 +26,11 @@ const ( allocatedStatusMessage = "pending task scheduling" ) -var errNoChanges = errors.New("task unchanged") +var ( + errNoChanges = errors.New("task unchanged") + + retryInterval = 5 * time.Minute +) func newIngressNetwork() *api.Network { return &api.Network{ @@ -57,19 +61,28 @@ type networkContext struct { // the actual network allocation. nwkAllocator *networkallocator.NetworkAllocator - // A table of unallocated tasks which will be revisited if any thing + // A set of tasks which are ready to be allocated as a batch. This is + // distinct from "unallocatedTasks" which are tasks that failed to + // allocate on the first try, being held for a future retry. + pendingTasks map[string]*api.Task + + // A set of unallocated tasks which will be revisited if any thing // changes in system state that might help task allocation. unallocatedTasks map[string]*api.Task - // A table of unallocated services which will be revisited if + // A set of unallocated services which will be revisited if // any thing changes in system state that might help service // allocation. unallocatedServices map[string]*api.Service - // A table of unallocated networks which will be revisited if + // A set of unallocated networks which will be revisited if // any thing changes in system state that might help network // allocation. unallocatedNetworks map[string]*api.Network + + // lastRetry is the last timestamp when unallocated + // tasks/services/networks were retried. + lastRetry time.Time } func (a *Allocator) doNetworkInit(ctx context.Context) (err error) { @@ -80,10 +93,12 @@ func (a *Allocator) doNetworkInit(ctx context.Context) (err error) { nc := &networkContext{ nwkAllocator: na, + pendingTasks: make(map[string]*api.Task), unallocatedTasks: make(map[string]*api.Task), unallocatedServices: make(map[string]*api.Service), unallocatedNetworks: make(map[string]*api.Network), ingressNetwork: newIngressNetwork(), + lastRetry: time.Now(), } a.netCtx = nc defer func() { @@ -403,10 +418,20 @@ func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) { case state.EventCreateTask, state.EventUpdateTask, state.EventDeleteTask: a.doTaskAlloc(ctx, ev) case state.EventCommit: - a.procUnallocatedNetworks(ctx) - a.procUnallocatedServices(ctx) - a.procUnallocatedTasksNetwork(ctx) - return + a.procTasksNetwork(ctx, false) + + if time.Since(nc.lastRetry) > retryInterval { + a.procUnallocatedNetworks(ctx) + a.procUnallocatedServices(ctx) + a.procTasksNetwork(ctx, true) + nc.lastRetry = time.Now() + } + + // Any left over tasks are moved to the unallocated set + for _, t := range nc.pendingTasks { + nc.unallocatedTasks[t.ID] = t + } + nc.pendingTasks = make(map[string]*api.Task) } } @@ -579,7 +604,8 @@ func (a *Allocator) doTaskAlloc(ctx context.Context, ev events.Event) { } } - // Cleanup any task references that might exist in unallocatedTasks + // Cleanup any task references that might exist + delete(nc.pendingTasks, t.ID) delete(nc.unallocatedTasks, t.ID) return } @@ -587,6 +613,7 @@ func (a *Allocator) doTaskAlloc(ctx context.Context, ev events.Event) { // If we are already in allocated state, there is // absolutely nothing else to do. if t.Status.State >= api.TaskStatePending { + delete(nc.pendingTasks, t.ID) delete(nc.unallocatedTasks, t.ID) return } @@ -616,7 +643,7 @@ func (a *Allocator) doTaskAlloc(ctx context.Context, ev events.Event) { // based on service spec. a.taskCreateNetworkAttachments(t, s) - nc.unallocatedTasks[t.ID] = t + nc.pendingTasks[t.ID] = t } func (a *Allocator) allocateNode(ctx context.Context, node *api.Node) error { @@ -948,15 +975,25 @@ func (a *Allocator) procUnallocatedServices(ctx context.Context) { } } -func (a *Allocator) procUnallocatedTasksNetwork(ctx context.Context) { +func (a *Allocator) procTasksNetwork(ctx context.Context, onRetry bool) { nc := a.netCtx - allocatedTasks := make([]*api.Task, 0, len(nc.unallocatedTasks)) + quiet := false + toAllocate := nc.pendingTasks + if onRetry { + toAllocate = nc.unallocatedTasks + quiet = true + } + allocatedTasks := make([]*api.Task, 0, len(toAllocate)) - for _, t := range nc.unallocatedTasks { + for _, t := range toAllocate { if err := a.allocateTask(ctx, t); err == nil { allocatedTasks = append(allocatedTasks, t) } else if err != errNoChanges { - log.G(ctx).WithError(err).Error("task allocation failure") + if quiet { + log.G(ctx).WithError(err).Debug("task allocation failure") + } else { + log.G(ctx).WithError(err).Error("task allocation failure") + } } } @@ -978,11 +1015,11 @@ func (a *Allocator) procUnallocatedTasksNetwork(ctx context.Context) { }) if err != nil { - log.G(ctx).WithError(err).Error("failed a store batch operation while processing unallocated tasks") + log.G(ctx).WithError(err).Error("failed a store batch operation while processing tasks") } for _, t := range allocatedTasks[:committed] { - delete(nc.unallocatedTasks, t.ID) + delete(toAllocate, t.ID) } }