Skip to content

Commit

Permalink
On node, if a Join Address is specified, track the number of sessions…
Browse files Browse the repository at this point in the history
… established.

If a session has never been established, and we get an x509 error, the make sure the
agent does not rebuild new sessions and just exists.  Make sure that this error gets
propagated back from the node.

Signed-off-by: Ying Li <ying.li@docker.com>
  • Loading branch information
cyli committed Jun 5, 2017
1 parent 6b7b203 commit f03ac28
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 12 deletions.
8 changes: 1 addition & 7 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,11 +206,7 @@ func (a *Agent) run(ctx context.Context) {
leaving = a.leaving
subscriptions = map[string]context.CancelFunc{}
)
defer func() {
if session != nil {
session.close()
}
}()
defer session.close()

if err := a.worker.Init(ctx); err != nil {
log.G(ctx).WithError(err).Error("worker initialization failed")
Expand Down Expand Up @@ -385,8 +381,6 @@ func (a *Agent) run(ctx context.Context) {
if a.err == nil {
a.err = ctx.Err()
}
session.close()

return
}
}
Expand Down
2 changes: 1 addition & 1 deletion agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type Config struct {
// NodeTLSInfo contains the starting node TLS info to bootstrap into the agent
NodeTLSInfo *api.NodeTLSInfo

// SessionTracker, if provided, will have its OnSessionClosed and OnSessionError methods called
// SessionTracker, if provided, will have its SessionClosed and SessionError methods called
// when sessions close and error.
SessionTracker SessionTracker
}
Expand Down
67 changes: 67 additions & 0 deletions integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/ca"
cautils "github.com/docker/swarmkit/ca/testutils"
"github.com/docker/swarmkit/manager"
"github.com/docker/swarmkit/testutils"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -718,3 +719,69 @@ func TestRepeatedRootRotation(t *testing.T) {
return nil
}, opsTimeout))
}

func TestNodeRejoins(t *testing.T) {
t.Parallel()
numWorker, numManager := 1, 3
cl := newCluster(t, numWorker, numManager)
defer func() {
require.NoError(t, cl.Stop())
}()
pollClusterReady(t, cl, numWorker, numManager)

clusterInfo, err := cl.GetClusterInfo()
require.NoError(t, err)

leader, err := cl.Leader()
require.NoError(t, err)

// Find a manager (not the leader) and the worker to shut down.
getNonLeaderAndWorker := func() map[string]*testNode {
results := make(map[string]*testNode)
for _, n := range cl.nodes {
nodeID := n.node.NodeID()
if n.IsManager() {
if nodeID != leader.node.NodeID() {
results[ca.ManagerRole] = n
}
} else {
results[ca.WorkerRole] = n
}
}
return results
}

// rejoining succeeds - (both because the certs are correct, and because node.Pause sets the JoinAddr to "")
for _, n := range getNonLeaderAndWorker() {
nodeID := n.node.NodeID()
require.NoError(t, n.Pause(false))
require.NoError(t, cl.StartNode(nodeID))
}
pollClusterReady(t, cl, numWorker, numManager)

// rejoining if the certs are wrong will fail fast so long as the join address is passed, but will keep retrying
// forever if the join address is not passed
leader, err = cl.Leader() // in case leadership changed
require.NoError(t, err)
for role, n := range getNonLeaderAndWorker() {
nodeID := n.node.NodeID()
require.NoError(t, n.Pause(false))

// generate new certs with the same node ID, role, and cluster ID, but with the wrong CA
paths := ca.NewConfigPaths(filepath.Join(n.config.StateDir, "certificates"))
newRootCA, err := ca.CreateRootCA("bad root CA")
require.NoError(t, err)
ca.SaveRootCA(newRootCA, paths.RootCA)
krw := ca.NewKeyReadWriter(paths.Node, nil, &manager.RaftDEKData{}) // make sure the key headers are preserved
_, _, err = krw.Read()
require.NoError(t, err)
_, _, err = newRootCA.IssueAndSaveNewCertificates(krw, nodeID, role, clusterInfo.ID)
require.NoError(t, err)

n.config.JoinAddr, err = leader.node.RemoteAPIAddr()
require.NoError(t, err)
err = cl.StartNode(nodeID)
require.Error(t, err)
require.Contains(t, err.Error(), "certificate signed by unknown authority")
}
}
51 changes: 47 additions & 4 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"path/filepath"
"reflect"
"sort"
"strings"
"sync"
"time"

Expand All @@ -33,6 +34,7 @@ import (
"github.com/pkg/errors"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
)

Expand Down Expand Up @@ -432,10 +434,10 @@ func (n *Node) run(ctx context.Context) (err error) {
}()

wg.Wait()
if managerErr != nil && managerErr != context.Canceled {
if managerErr != nil && errors.Cause(managerErr) != context.Canceled {
return managerErr
}
if agentErr != nil && agentErr != context.Canceled {
if agentErr != nil && errors.Cause(agentErr) != context.Canceled {
return agentErr
}
return err
Expand Down Expand Up @@ -516,7 +518,7 @@ waitPeer:
rootCA := securityConfig.RootCA()
issuer := securityConfig.IssuerInfo()

a, err := agent.New(&agent.Config{
agentConfig := &agent.Config{
Hostname: n.config.Hostname,
ConnBroker: n.connBroker,
Executor: n.config.Executor,
Expand All @@ -529,7 +531,14 @@ waitPeer:
CertIssuerPublicKey: issuer.PublicKey,
CertIssuerSubject: issuer.Subject,
},
})
}
// if a join address has been specified, then if the agent fails to connect due to a TLS error, fail fast - don't
// keep re-trying to join
if n.config.JoinAddr != "" {
agentConfig.SessionTracker = &firstSessionErrorTracker{}
}

a, err := agent.New(agentConfig)
if err != nil {
return err
}
Expand Down Expand Up @@ -1055,3 +1064,37 @@ func (sp sortablePeers) Less(i, j int) bool { return sp[i].NodeID < sp[j].NodeID
func (sp sortablePeers) Len() int { return len(sp) }

func (sp sortablePeers) Swap(i, j int) { sp[i], sp[j] = sp[j], sp[i] }

// firstSessionErrorTracker is a utility that helps determine whether the agent should exit after
// a TLS failure on establishing the first session. This should only happen if a join address
// is specified. If establishing the first session succeeds, but later on some session fails
// because of a TLS error, we don't want to exit the agent because a previously successful
// session indicates that the TLS error may be a transient issue.
type firstSessionErrorTracker struct {
mu sync.Mutex
pastFirstSession bool
err error
}

func (fs *firstSessionErrorTracker) SessionEstablished() {
fs.mu.Lock()
fs.pastFirstSession = true
fs.mu.Unlock()
}

func (fs *firstSessionErrorTracker) SessionError(err error) {
fs.mu.Lock()
fs.err = err
fs.mu.Unlock()
}

func (fs *firstSessionErrorTracker) SessionClosed() error {
fs.mu.Lock()
defer fs.mu.Unlock()
// unfortunately grpc connection errors are type grpc.rpcError, which are not exposed, and we can't get at the underlying error type
if !fs.pastFirstSession && grpc.Code(fs.err) == codes.Internal &&
strings.HasPrefix(grpc.ErrorDesc(fs.err), "connection error") && strings.Contains(grpc.ErrorDesc(fs.err), "transport: x509:") {
return fs.err
}
return nil
}

0 comments on commit f03ac28

Please sign in to comment.