Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

agent: handle re-bootstrapping in a secondary datacenter when WAN federation via mesh gateways is configured #7931

Merged
merged 1 commit into from
May 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion agent/consul/federation_state_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ import (
)

type FederationStateReplicator struct {
srv *Server
srv *Server
gatewayLocator *GatewayLocator
}

var _ IndexReplicatorDelegate = (*FederationStateReplicator)(nil)
Expand All @@ -26,6 +27,14 @@ func (r *FederationStateReplicator) MetricName() string { return "federation-sta

// FetchRemote implements IndexReplicatorDelegate.
func (r *FederationStateReplicator) FetchRemote(lastRemoteIndex uint64) (int, interface{}, uint64, error) {
lenRemote, remote, remoteIndex, err := r.fetchRemote(lastRemoteIndex)
if r.gatewayLocator != nil {
r.gatewayLocator.SetLastFederationStateReplicationError(err)
}
return lenRemote, remote, remoteIndex, err
}

func (r *FederationStateReplicator) fetchRemote(lastRemoteIndex uint64) (int, interface{}, uint64, error) {
req := structs.DCSpecificRequest{
Datacenter: r.srv.config.PrimaryDatacenter,
QueryOptions: structs.QueryOptions{
Expand Down
120 changes: 113 additions & 7 deletions agent/consul/gateway_locator.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,104 @@ type GatewayLocator struct {
// This will be closed the FIRST time we get some gateways populated
primaryGatewaysReadyCh chan struct{}
primaryGatewaysReadyOnce sync.Once

// these are a collection of measurements that factor into deciding if we
// should directly dial the primary's mesh gateways or if we should try to
// route through our local gateways (if they are up).
lastReplLock sync.Mutex
lastReplSuccess time.Time
lastReplFailure time.Time
lastReplSuccesses uint64
lastReplFailures uint64
}

// SetLastFederationStateReplicationError is used to indicate if the federation
// state replication loop has succeeded (nil) or failed during the last
// execution.
//
// Rather than introduce a completely new mechanism to periodically probe that
// our chosen mesh-gateway configuration can reach the primary's servers (like
// a ping or status RPC) we cheat and use the federation state replicator
// goroutine's success or failure as a proxy.
func (g *GatewayLocator) SetLastFederationStateReplicationError(err error) {
g.lastReplLock.Lock()
defer g.lastReplLock.Unlock()
oldChoice := g.dialPrimaryThroughLocalGateway()
if err == nil {
g.lastReplSuccess = time.Now().UTC()
g.lastReplSuccesses++
g.lastReplFailures = 0
} else {
g.lastReplFailure = time.Now().UTC()
g.lastReplFailures++
g.lastReplSuccesses = 0
}
newChoice := g.dialPrimaryThroughLocalGateway()
if oldChoice != newChoice {
g.logPrimaryDialingMessage(newChoice)
}
}

func (g *GatewayLocator) logPrimaryDialingMessage(useLocal bool) {
if g.datacenter == g.primaryDatacenter {
// These messages are useless when the server is in the primary
// datacenter.
return
}
if useLocal {
g.logger.Info("will dial the primary datacenter using our local mesh gateways if possible")
} else {
g.logger.Info("will dial the primary datacenter through its mesh gateways")
}
}

// DialPrimaryThroughLocalGateway determines if we should dial the primary's
// mesh gateways directly or use our local mesh gateways (if they are up).
//
// Generally the system has three states:
//
// 1. Servers dial primary MGWs using fallback addresses from the agent config.
// 2. Servers dial primary MGWs using replicated federation state data.
// 3. Servers dial primary MGWs indirectly through local MGWs.
//
// After initial bootstrapping most communication should go through (3). If the
// local mesh gateways are not coming up for chicken/egg problems (mostly the
// kind that arise from secondary datacenter bootstrapping) then (2) is useful
// to solve the chicken/egg problem and get back to (3). In the worst case
// where we completely lost communication with the primary AND all of their old
// mesh gateway addresses are changed then we need to go all the way back to
// square one and re-bootstrap via (1).
//
// Since both (1) and (2) are meant to be temporary we simplify things and make
// the system only consider two overall configurations: (1+2, with the
// addresses being unioned) or (3).
//
// This method returns true if in state (3) and false if in state (1+2).
func (g *GatewayLocator) DialPrimaryThroughLocalGateway() bool {
if g.datacenter == g.primaryDatacenter {
return false // not important
}
g.lastReplLock.Lock()
defer g.lastReplLock.Unlock()
return g.dialPrimaryThroughLocalGateway()
}

const localFederationStateReplicatorFailuresBeforeDialingDirectly = 3

func (g *GatewayLocator) dialPrimaryThroughLocalGateway() bool {
if g.lastReplSuccess.IsZero() && g.lastReplFailure.IsZero() {
return false // no data yet
}

if g.lastReplSuccess.After(g.lastReplFailure) {
return true // we have viable data
}

if g.lastReplFailures < localFederationStateReplicatorFailuresBeforeDialingDirectly {
return true // maybe it's just a little broken
}

return false
}

// PrimaryMeshGatewayAddressesReadyCh returns a channel that will be closed
Expand Down Expand Up @@ -82,15 +180,22 @@ func (g *GatewayLocator) listGateways(primary bool) []string {

var addrs []string
if primary {
addrs = g.primaryGateways
if g.datacenter == g.primaryDatacenter {
addrs = g.primaryGateways
} else if g.DialPrimaryThroughLocalGateway() && len(g.localGateways) > 0 {
addrs = g.localGateways
} else {
// Note calling StringSliceMergeSorted only works because both
// inputs are pre-sorted. If for some reason one of the lists has
// *duplicates* (which shouldn't happen) it's not great but it
// won't break anything other than biasing our eventual random
// choice a little bit.
addrs = lib.StringSliceMergeSorted(g.primaryGateways, g.PrimaryGatewayFallbackAddresses())
}
} else {
addrs = g.localGateways
}

if primary && len(addrs) == 0 {
addrs = g.PrimaryGatewayFallbackAddresses()
}

return addrs
}

Expand Down Expand Up @@ -133,7 +238,6 @@ func getRandomItem(items []string) string {

type serverDelegate interface {
blockingQuery(queryOpts structs.QueryOptionsCompat, queryMeta structs.QueryMetaCompat, fn queryFn) error
PrimaryGatewayFallbackAddresses() []string
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A stray piece of dead code leftover from an earlier refactoring session.

IsLeader() bool
LeaderLastContact() time.Time
}
Expand All @@ -144,13 +248,15 @@ func NewGatewayLocator(
datacenter string,
primaryDatacenter string,
) *GatewayLocator {
return &GatewayLocator{
g := &GatewayLocator{
logger: logger.Named(logging.GatewayLocator),
srv: srv,
datacenter: datacenter,
primaryDatacenter: primaryDatacenter,
primaryGatewaysReadyCh: make(chan struct{}),
}
g.logPrimaryDialingMessage(g.DialPrimaryThroughLocalGateway())
return g
}

var errGatewayLocalStateNotInitialized = errors.New("local state not initialized")
Expand Down
Loading