Skip to content

Commit

Permalink
Merge pull request #2021 from aaronlehmann/allocator-aggressive-retry
Browse files Browse the repository at this point in the history
allocator: Less aggressive retry
  • Loading branch information
aluzzardi authored Mar 9, 2017
2 parents c51357d + 513d028 commit e928827
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 16 deletions.
5 changes: 5 additions & 0 deletions manager/allocator/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ import (
"github.com/stretchr/testify/require"
)

func init() {
// set artificially low retry interval for testing
retryInterval = 5 * time.Millisecond
}

func TestAllocator(t *testing.T) {
s := store.NewMemoryStore(nil)
assert.NotNil(t, s)
Expand Down
69 changes: 53 additions & 16 deletions manager/allocator/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@ const (
allocatedStatusMessage = "pending task scheduling"
)

var errNoChanges = errors.New("task unchanged")
var (
errNoChanges = errors.New("task unchanged")

retryInterval = 5 * time.Minute
)

func newIngressNetwork() *api.Network {
return &api.Network{
Expand Down Expand Up @@ -57,19 +61,28 @@ type networkContext struct {
// the actual network allocation.
nwkAllocator *networkallocator.NetworkAllocator

// A table of unallocated tasks which will be revisited if any thing
// A set of tasks which are ready to be allocated as a batch. This is
// distinct from "unallocatedTasks" which are tasks that failed to
// allocate on the first try, being held for a future retry.
pendingTasks map[string]*api.Task

// A set of unallocated tasks which will be revisited if any thing
// changes in system state that might help task allocation.
unallocatedTasks map[string]*api.Task

// A table of unallocated services which will be revisited if
// A set of unallocated services which will be revisited if
// any thing changes in system state that might help service
// allocation.
unallocatedServices map[string]*api.Service

// A table of unallocated networks which will be revisited if
// A set of unallocated networks which will be revisited if
// any thing changes in system state that might help network
// allocation.
unallocatedNetworks map[string]*api.Network

// lastRetry is the last timestamp when unallocated
// tasks/services/networks were retried.
lastRetry time.Time
}

func (a *Allocator) doNetworkInit(ctx context.Context) (err error) {
Expand All @@ -80,10 +93,12 @@ func (a *Allocator) doNetworkInit(ctx context.Context) (err error) {

nc := &networkContext{
nwkAllocator: na,
pendingTasks: make(map[string]*api.Task),
unallocatedTasks: make(map[string]*api.Task),
unallocatedServices: make(map[string]*api.Service),
unallocatedNetworks: make(map[string]*api.Network),
ingressNetwork: newIngressNetwork(),
lastRetry: time.Now(),
}
a.netCtx = nc
defer func() {
Expand Down Expand Up @@ -403,10 +418,20 @@ func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) {
case state.EventCreateTask, state.EventUpdateTask, state.EventDeleteTask:
a.doTaskAlloc(ctx, ev)
case state.EventCommit:
a.procUnallocatedNetworks(ctx)
a.procUnallocatedServices(ctx)
a.procUnallocatedTasksNetwork(ctx)
return
a.procTasksNetwork(ctx, false)

if time.Since(nc.lastRetry) > retryInterval {
a.procUnallocatedNetworks(ctx)
a.procUnallocatedServices(ctx)
a.procTasksNetwork(ctx, true)
nc.lastRetry = time.Now()
}

// Any left over tasks are moved to the unallocated set
for _, t := range nc.pendingTasks {
nc.unallocatedTasks[t.ID] = t
}
nc.pendingTasks = make(map[string]*api.Task)
}
}

Expand Down Expand Up @@ -579,14 +604,16 @@ func (a *Allocator) doTaskAlloc(ctx context.Context, ev events.Event) {
}
}

// Cleanup any task references that might exist in unallocatedTasks
// Cleanup any task references that might exist
delete(nc.pendingTasks, t.ID)
delete(nc.unallocatedTasks, t.ID)
return
}

// If we are already in allocated state, there is
// absolutely nothing else to do.
if t.Status.State >= api.TaskStatePending {
delete(nc.pendingTasks, t.ID)
delete(nc.unallocatedTasks, t.ID)
return
}
Expand Down Expand Up @@ -616,7 +643,7 @@ func (a *Allocator) doTaskAlloc(ctx context.Context, ev events.Event) {
// based on service spec.
a.taskCreateNetworkAttachments(t, s)

nc.unallocatedTasks[t.ID] = t
nc.pendingTasks[t.ID] = t
}

func (a *Allocator) allocateNode(ctx context.Context, node *api.Node) error {
Expand Down Expand Up @@ -948,15 +975,25 @@ func (a *Allocator) procUnallocatedServices(ctx context.Context) {
}
}

func (a *Allocator) procUnallocatedTasksNetwork(ctx context.Context) {
func (a *Allocator) procTasksNetwork(ctx context.Context, onRetry bool) {
nc := a.netCtx
allocatedTasks := make([]*api.Task, 0, len(nc.unallocatedTasks))
quiet := false
toAllocate := nc.pendingTasks
if onRetry {
toAllocate = nc.unallocatedTasks
quiet = true
}
allocatedTasks := make([]*api.Task, 0, len(toAllocate))

for _, t := range nc.unallocatedTasks {
for _, t := range toAllocate {
if err := a.allocateTask(ctx, t); err == nil {
allocatedTasks = append(allocatedTasks, t)
} else if err != errNoChanges {
log.G(ctx).WithError(err).Error("task allocation failure")
if quiet {
log.G(ctx).WithError(err).Debug("task allocation failure")
} else {
log.G(ctx).WithError(err).Error("task allocation failure")
}
}
}

Expand All @@ -978,11 +1015,11 @@ func (a *Allocator) procUnallocatedTasksNetwork(ctx context.Context) {
})

if err != nil {
log.G(ctx).WithError(err).Error("failed a store batch operation while processing unallocated tasks")
log.G(ctx).WithError(err).Error("failed a store batch operation while processing tasks")
}

for _, t := range allocatedTasks[:committed] {
delete(nc.unallocatedTasks, t.ID)
delete(toAllocate, t.ID)
}
}

Expand Down

0 comments on commit e928827

Please sign in to comment.