Skip to content
Permalink

Comparing changes

This is a direct comparison between two commits made in this repository or its related repositories. View the default comparison for this range or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: moby/swarmkit
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: f44b2d8e710cf1d0f5dc48727f0d13de9ceb5622
Choose a base ref
..
head repository: moby/swarmkit
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: bf851cf00e6ea84630d139ecd4c59841c266b2f4
Choose a head ref
74 changes: 37 additions & 37 deletions Godeps/Godeps.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 11 additions & 12 deletions agent/node.go
Original file line number Diff line number Diff line change
@@ -533,24 +533,22 @@ func (n *Node) initManagerConnection(ctx context.Context, ready chan<- struct{})
if err != nil {
return err
}
state := grpc.Idle
client := api.NewHealthClient(conn)
for {
s, err := conn.WaitForStateChange(ctx, state)
resp, err := client.Check(ctx, &api.HealthCheckRequest{Service: "ControlAPI"})
if err != nil {
n.setControlSocket(nil)
return err
}
if s == grpc.Ready {
n.setControlSocket(conn)
if ready != nil {
close(ready)
ready = nil
}
} else if state == grpc.Shutdown {
n.setControlSocket(nil)
if resp.Status == api.HealthCheckResponse_SERVING {
break
}
state = s
time.Sleep(500 * time.Millisecond)
}
n.setControlSocket(conn)
if ready != nil {
close(ready)
}
return nil
}

func (n *Node) waitRole(ctx context.Context, role string) error {
@@ -647,6 +645,7 @@ func (n *Node) runManager(ctx context.Context, securityConfig *ca.SecurityConfig
<-done
}
connCancel()
n.setControlSocket(nil)

if err != nil {
return err
8 changes: 8 additions & 0 deletions manager/allocator/network.go
Original file line number Diff line number Diff line change
@@ -564,7 +564,9 @@ func (a *Allocator) allocateNode(ctx context.Context, nc *networkContext, node *

func (a *Allocator) allocateService(ctx context.Context, nc *networkContext, s *api.Service) error {
if s.Spec.Endpoint != nil {
// service has user-defined endpoint
if s.Endpoint == nil {
// service currently has no allocated endpoint, need allocated.
s.Endpoint = &api.Endpoint{
Spec: s.Spec.Endpoint.Copy(),
}
@@ -587,6 +589,12 @@ func (a *Allocator) allocateService(ctx context.Context, nc *networkContext, s *
&api.Endpoint_VirtualIP{NetworkID: nc.ingressNetwork.ID})
}
}
} else if s.Endpoint != nil {
// service has no user-defined endpoints while has already allocated network resources,
// need deallocated.
if err := nc.nwkAllocator.ServiceDeallocate(s); err != nil {
return err
}
}

if err := nc.nwkAllocator.ServiceAllocate(s); err != nil {
13 changes: 12 additions & 1 deletion manager/allocator/networkallocator/portallocator.go
Original file line number Diff line number Diff line change
@@ -155,7 +155,18 @@ func (pa *portAllocator) serviceDeallocatePorts(s *api.Service) {
}

func (pa *portAllocator) isPortsAllocated(s *api.Service) bool {
if s.Endpoint == nil || s.Spec.Endpoint == nil {
// If service has no user-defined endpoint and allocated endpoint,
// we assume it is allocated and return true.
if s.Endpoint == nil && s.Spec.Endpoint == nil {
return true
}

// If service has allocated endpoint while has no user-defined endpoint,
// we assume allocated endpoints are redudant, and they need deallocated.
// If service has no allocated endpoint while has user-defined endpoint,
// we assume it is not allocated.
if (s.Endpoint != nil && s.Spec.Endpoint == nil) ||
(s.Endpoint == nil && s.Spec.Endpoint != nil) {
return false
}

23 changes: 14 additions & 9 deletions manager/controlapi/node_test.go
Original file line number Diff line number Diff line change
@@ -333,16 +333,21 @@ func TestListManagerNodes(t *testing.T) {
nodes[5] = raftutils.RestartNode(t, clockSource, nodes[5], false)
raftutils.WaitForCluster(t, clockSource, nodes)

// All the nodes should be reachable again
r, err = ts.Client.ListNodes(context.Background(), &api.ListNodesRequest{})
assert.NoError(t, err)
assert.NotNil(t, r)
managers = getMap(t, r.Nodes)
assert.Len(t, ts.Server.raft.GetMemberlist(), 5)
assert.Len(t, r.Nodes, 5)
for i := 1; i <= 5; i++ {
assert.Equal(t, api.RaftMemberStatus_REACHABLE, managers[nodes[uint64(i)].Config.ID].Reachability)
}
// All the nodes should be reachable again
assert.NoError(t, raftutils.PollFunc(clockSource, func() error {
r, err = ts.Client.ListNodes(context.Background(), &api.ListNodesRequest{})
if err != nil {
return err
}
managers = getMap(t, r.Nodes)
for i := 1; i <= 5; i++ {
if managers[nodes[uint64(i)].Config.ID].Reachability != api.RaftMemberStatus_REACHABLE {
return fmt.Errorf("node %x is unreachable", nodes[uint64(i)].Config.ID)
}
}
return nil
}))

// Switch the raft node used by the server
ts.Server.raft = nodes[2].Node
2 changes: 1 addition & 1 deletion manager/dispatcher/nodes.go
Original file line number Diff line number Diff line change
@@ -86,7 +86,7 @@ func (s *nodeStore) AddUnknown(n *api.Node, expireFunc func()) error {
return nil
}

// CheckRateLimit returs error if node with specified id is allowed to re-register
// CheckRateLimit returns error if node with specified id is allowed to re-register
// again.
func (s *nodeStore) CheckRateLimit(id string) error {
s.mu.Lock()
6 changes: 5 additions & 1 deletion manager/manager.go
Original file line number Diff line number Diff line change
@@ -255,6 +255,7 @@ func (m *Manager) Run(parent context.Context) error {
baseControlAPI := controlapi.NewServer(m.RaftNode.MemoryStore(), m.RaftNode, m.config.SecurityConfig.RootCA())
baseResourceAPI := resourceapi.New(m.RaftNode.MemoryStore())
healthServer := health.NewHealthServer()
localHealthServer := health.NewHealthServer()

authenticatedControlAPI := api.NewAuthenticatedWrapperControlServer(baseControlAPI, authorize)
authenticatedResourceAPI := api.NewAuthenticatedWrapperResourceAllocatorServer(baseResourceAPI, authorize)
@@ -287,18 +288,21 @@ func (m *Manager) Run(parent context.Context) error {
api.RegisterRaftServer(m.server, authenticatedRaftAPI)
api.RegisterHealthServer(m.server, authenticatedHealthAPI)
api.RegisterRaftMembershipServer(m.server, proxyRaftMembershipAPI)
api.RegisterControlServer(m.localserver, localProxyControlAPI)
api.RegisterControlServer(m.server, authenticatedControlAPI)
api.RegisterResourceAllocatorServer(m.server, proxyResourceAPI)
api.RegisterDispatcherServer(m.server, proxyDispatcherAPI)

api.RegisterControlServer(m.localserver, localProxyControlAPI)
api.RegisterHealthServer(m.localserver, localHealthServer)

errServe := make(chan error, 2)
for proto, l := range m.listeners {
go m.serveListener(ctx, errServe, proto, l)
}

// Set the raft server as serving for the health server
healthServer.SetServingStatus("Raft", api.HealthCheckResponse_SERVING)
localHealthServer.SetServingStatus("ControlAPI", api.HealthCheckResponse_SERVING)

defer func() {
m.server.Stop()
43 changes: 17 additions & 26 deletions manager/orchestrator/updater.go
Original file line number Diff line number Diff line change
@@ -288,16 +288,8 @@ func (u *Updater) updateTask(ctx context.Context, slot slot, updated *api.Task)
return err
}

u.removeOldTasks(ctx, batch, slot)

for _, t := range slot {
if t.DesiredState == api.TaskStateRunning {
// Wait for the old task to stop or time out, and then set the new one
// to RUNNING.
delayStartCh = u.restarts.DelayStart(ctx, nil, t, updated.ID, 0, true)
break
}
}
oldTask := u.removeOldTasks(ctx, batch, slot)
delayStartCh = u.restarts.DelayStart(ctx, nil, oldTask, updated.ID, 0, true)

return nil

@@ -333,34 +325,29 @@ func (u *Updater) useExistingTask(ctx context.Context, slot slot, existing *api.
}
}
if len(removeTasks) != 0 || existing.DesiredState != api.TaskStateRunning {
var delayStartCh <-chan struct{}
_, err := u.store.Batch(func(batch *store.Batch) error {
u.removeOldTasks(ctx, batch, removeTasks)
oldTask := u.removeOldTasks(ctx, batch, removeTasks)

if existing.DesiredState != api.TaskStateRunning {
err := batch.Update(func(tx store.Tx) error {
t := store.GetTask(tx, existing.ID)
if t == nil {
return fmt.Errorf("task %s not found while trying to start it", existing.ID)
}
if t.DesiredState >= api.TaskStateRunning {
return fmt.Errorf("task %s was already started when reached by updater", existing.ID)
}
t.DesiredState = api.TaskStateRunning
return store.UpdateTask(tx, t)
})
if err != nil {
log.G(ctx).WithError(err).Errorf("starting task %s failed", existing.ID)
}
delayStartCh = u.restarts.DelayStart(ctx, nil, oldTask, existing.ID, 0, true)
}
return nil
})
if err != nil {
log.G(ctx).WithError(err).Error("updater batch transaction failed")
}

if delayStartCh != nil {
<-delayStartCh
}
}
}

func (u *Updater) removeOldTasks(ctx context.Context, batch *store.Batch, removeTasks []*api.Task) {
// removeOldTasks shuts down the given tasks and returns one of the tasks that
// was shut down, or nil.
func (u *Updater) removeOldTasks(ctx context.Context, batch *store.Batch, removeTasks []*api.Task) *api.Task {
var removedTask *api.Task
for _, original := range removeTasks {
err := batch.Update(func(tx store.Tx) error {
t := store.GetTask(tx, original.ID)
@@ -375,8 +362,12 @@ func (u *Updater) removeOldTasks(ctx context.Context, batch *store.Batch, remove
})
if err != nil {
log.G(ctx).WithError(err).Errorf("shutting down stale task %s failed", original.ID)
} else {
removedTask = original
}
}

return removedTask
}

func (u *Updater) isTaskDirty(t *api.Task) bool {
Loading