Skip to content

Commit

Permalink
Merge pull request #4106 from hashicorp/b-servers
Browse files Browse the repository at this point in the history
Improved Client handling of failed RPCs
  • Loading branch information
dadgar authored Apr 5, 2018
2 parents 1388638 + 9ce59c5 commit b2ae8b7
Show file tree
Hide file tree
Showing 6 changed files with 235 additions and 82 deletions.
23 changes: 12 additions & 11 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,30 +40,31 @@ IMPROVEMENTS:
* cli: Clearer task event descriptions in `nomad alloc-status` when there are server side failures authenticating to Vault [[GH-3968](https://github.com/hashicorp/nomad/issues/3968)]
* client: Allow '.' in environment variable names [[GH-3760](https://github.com/hashicorp/nomad/issues/3760)]
* client: Refactor client fingerprint methods to a request/response format
* client: Improved handling of failed RPCs and heartbeat retry logic [[GH-4106](https://github.com/hashicorp/nomad/issues/4106)]
[[GH-3781](https://github.com/hashicorp/nomad/issues/3781)]
* discovery: Allow `check_restart` to be specified in the `service` stanza.
* discovery: Allow `check_restart` to be specified in the `service` stanza
[[GH-3718](https://github.com/hashicorp/nomad/issues/3718)]
* discovery: Allow configuring names of Nomad client and server health checks.
* discovery: Allow configuring names of Nomad client and server health checks
[[GH-4003](https://github.com/hashicorp/nomad/issues/4003)]
* discovery: Only log if Consul does not support TLSSkipVerify instead of
dropping checks which relied on it. Consul has had this feature since 0.7.2. [[GH-3983](https://github.com/hashicorp/nomad/issues/3983)]
dropping checks which relied on it. Consul has had this feature since 0.7.2 [[GH-3983](https://github.com/hashicorp/nomad/issues/3983)]
* driver/docker: Support hard CPU limits [[GH-3825](https://github.com/hashicorp/nomad/issues/3825)]
* driver/docker: Support advertising IPv6 addresses [[GH-3790](https://github.com/hashicorp/nomad/issues/3790)]
* driver/docker; Support overriding image entrypoint [[GH-3788](https://github.com/hashicorp/nomad/issues/3788)]
* driver/docker: Support adding or dropping capabilities [[GH-3754](https://github.com/hashicorp/nomad/issues/3754)]
* driver/docker: Support mounting root filesystem as read-only [[GH-3802](https://github.com/hashicorp/nomad/issues/3802)]
* driver/docker: Retry on Portworx "volume is attached on another node" errors.
* driver/docker: Retry on Portworx "volume is attached on another node" errors
[[GH-3993](https://github.com/hashicorp/nomad/issues/3993)]
* driver/lxc: Add volumes config to LXC driver [[GH-3687](https://github.com/hashicorp/nomad/issues/3687)]
* driver/rkt: Allow overriding group [[GH-3990](https://github.com/hashicorp/nomad/issues/3990)]
* telemetry: Support DataDog tags [[GH-3839](https://github.com/hashicorp/nomad/issues/3839)]
* ui: Specialized job detail pages for each job type (system, service, batch, periodic, parameterized, periodic instance, parameterized instance). [[GH-3829](https://github.com/hashicorp/nomad/issues/3829)]
* ui: Allocation stats requests are made through the server instead of directly through clients. [[GH-3908](https://github.com/hashicorp/nomad/issues/3908)]
* ui: Allocation log requests fallback to using the server when the client can't be reached. [[GH-3908](https://github.com/hashicorp/nomad/issues/3908)]
* ui: All views poll for changes using long-polling via blocking queries. [[GH-3936](https://github.com/hashicorp/nomad/issues/3936)]
* ui: Dispatch payload on the parameterized instance job detail page. [[GH-3829](https://github.com/hashicorp/nomad/issues/3829)]
* ui: Periodic force launch button on the periodic job detail page. [[GH-3829](https://github.com/hashicorp/nomad/issues/3829)]
* ui: Allocation breadcrumbs now extend job breadcrumbs. [[GH-3829](https://github.com/hashicorp/nomad/issues/3974)]
* ui: Specialized job detail pages for each job type (system, service, batch, periodic, parameterized, periodic instance, parameterized instance) [[GH-3829](https://github.com/hashicorp/nomad/issues/3829)]
* ui: Allocation stats requests are made through the server instead of directly through clients [[GH-3908](https://github.com/hashicorp/nomad/issues/3908)]
* ui: Allocation log requests fallback to using the server when the client can't be reached [[GH-3908](https://github.com/hashicorp/nomad/issues/3908)]
* ui: All views poll for changes using long-polling via blocking queries [[GH-3936](https://github.com/hashicorp/nomad/issues/3936)]
* ui: Dispatch payload on the parameterized instance job detail page [[GH-3829](https://github.com/hashicorp/nomad/issues/3829)]
* ui: Periodic force launch button on the periodic job detail page [[GH-3829](https://github.com/hashicorp/nomad/issues/3829)]
* ui: Allocation breadcrumbs now extend job breadcrumbs [[GH-3829](https://github.com/hashicorp/nomad/issues/3974)]
* vault: Allow Nomad to create orphaned tokens for allocations [[GH-3992](https://github.com/hashicorp/nomad/issues/3992)]

BUG FIXES:
Expand Down
89 changes: 69 additions & 20 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,11 @@ type Client struct {
// server for the node event
triggerEmitNodeEvent chan *structs.NodeEvent

// discovered will be ticked whenever Consul discovery completes
// successfully
serversDiscoveredCh chan struct{}
// rpcRetryCh is closed when there an event such as server discovery or a
// successful RPC occurring happens such that a retry should happen. Access
// should only occur via the getter method
rpcRetryCh chan struct{}
rpcRetryLock sync.Mutex

// allocs maps alloc IDs to their AllocRunner. This map includes all
// AllocRunners - running and GC'd - until the server GCs them.
Expand Down Expand Up @@ -217,7 +219,6 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic
shutdownCh: make(chan struct{}),
triggerDiscoveryCh: make(chan struct{}),
triggerNodeUpdate: make(chan struct{}, 8),
serversDiscoveredCh: make(chan struct{}),
triggerEmitNodeEvent: make(chan *structs.NodeEvent, 8),
}

Expand Down Expand Up @@ -1154,7 +1155,7 @@ func (c *Client) registerAndHeartbeat() {

for {
select {
case <-c.serversDiscoveredCh:
case <-c.rpcRetryWatcher():
case <-heartbeat:
case <-c.shutdownCh:
return
Expand All @@ -1169,11 +1170,11 @@ func (c *Client) registerAndHeartbeat() {
c.retryRegisterNode()
heartbeat = time.After(lib.RandomStagger(initialHeartbeatStagger))
} else {
intv := c.retryIntv(registerRetryIntv)
intv := c.getHeartbeatRetryIntv(err)
c.logger.Printf("[ERR] client: heartbeating failed. Retrying in %v: %v", intv, err)
heartbeat = time.After(intv)

// if heartbeating fails, trigger Consul discovery
// If heartbeating fails, trigger Consul discovery
c.triggerDiscovery()
}
} else {
Expand All @@ -1184,6 +1185,56 @@ func (c *Client) registerAndHeartbeat() {
}
}

// getHeartbeatRetryIntv is used to retrieve the time to wait before attempting
// another heartbeat.
func (c *Client) getHeartbeatRetryIntv(err error) time.Duration {
if c.config.DevMode {
return devModeRetryIntv
}

// Collect the useful heartbeat info
c.heartbeatLock.Lock()
haveHeartbeated := c.haveHeartbeated
last := c.lastHeartbeat
ttl := c.heartbeatTTL
c.heartbeatLock.Unlock()

// If we haven't even successfully heartbeated once or there is no leader
// treat it as a registration. In the case that there is a leadership loss,
// we will have our heartbeat timer reset to a much larger threshold, so
// do not put unnecessary pressure on the new leader.
if !haveHeartbeated || err == structs.ErrNoLeader {
return c.retryIntv(registerRetryIntv)
}

// Determine how much time we have left to heartbeat
left := last.Add(ttl).Sub(time.Now())

// Logic for retrying is:
// * Do not retry faster than once a second
// * Do not retry less that once every 30 seconds
// * If we have missed the heartbeat by more than 30 seconds, start to use
// the absolute time since we do not want to retry indefinitely
switch {
case left < -30*time.Second:
// Make left the absolute value so we delay and jitter properly.
left *= -1
case left < 0:
return time.Second + lib.RandomStagger(time.Second)
default:
}

stagger := lib.RandomStagger(left)
switch {
case stagger < time.Second:
return time.Second + lib.RandomStagger(time.Second)
case stagger > 30*time.Second:
return 25*time.Second + lib.RandomStagger(5*time.Second)
default:
return stagger
}
}

// periodicSnapshot is a long lived goroutine used to periodically snapshot the
// state of the client
func (c *Client) periodicSnapshot() {
Expand Down Expand Up @@ -1307,7 +1358,7 @@ func (c *Client) retryRegisterNode() {
c.logger.Printf("[ERR] client: registration failure: %v", err)
}
select {
case <-c.serversDiscoveredCh:
case <-c.rpcRetryWatcher():
case <-time.After(c.retryIntv(registerRetryIntv)):
case <-c.shutdownCh:
return
Expand Down Expand Up @@ -1567,7 +1618,7 @@ OUTER:
}
retry := c.retryIntv(getAllocRetryIntv)
select {
case <-c.serversDiscoveredCh:
case <-c.rpcRetryWatcher():
continue
case <-time.After(retry):
continue
Expand Down Expand Up @@ -1622,7 +1673,7 @@ OUTER:
c.logger.Printf("[ERR] client: failed to query updated allocations: %v", err)
retry := c.retryIntv(getAllocRetryIntv)
select {
case <-c.serversDiscoveredCh:
case <-c.rpcRetryWatcher():
continue
case <-time.After(retry):
continue
Expand Down Expand Up @@ -2085,18 +2136,16 @@ DISCOLOOP:
}

c.logger.Printf("[INFO] client.consul: discovered following Servers: %s", nomadServers)
c.servers.SetServers(nomadServers)

// Notify waiting rpc calls. If a goroutine just failed an RPC call and
// isn't receiving on this chan yet they'll still retry eventually.
// This is a shortcircuit for the longer retry intervals.
for {
select {
case c.serversDiscoveredCh <- struct{}{}:
default:
return nil
}
// Fire the retry trigger if we have updated the set of servers.
if c.servers.SetServers(nomadServers) {
// Notify waiting rpc calls. If a goroutine just failed an RPC call and
// isn't receiving on this chan yet they'll still retry eventually.
// This is a shortcircuit for the longer retry intervals.
c.fireRpcRetryWatcher()
}

return nil
}

// emitStats collects host resource usage stats periodically
Expand Down
28 changes: 28 additions & 0 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,34 @@ func TestClient_RPC(t *testing.T) {
})
}

func TestClient_RPC_FireRetryWatchers(t *testing.T) {
t.Parallel()
s1, addr := testServer(t, nil)
defer s1.Shutdown()

c1 := TestClient(t, func(c *config.Config) {
c.Servers = []string{addr}
})
defer c1.Shutdown()

watcher := c1.rpcRetryWatcher()

// RPC should succeed
testutil.WaitForResult(func() (bool, error) {
var out struct{}
err := c1.RPC("Status.Ping", struct{}{}, &out)
return err == nil, err
}, func(err error) {
t.Fatalf("err: %v", err)
})

select {
case <-watcher:
default:
t.Fatal("watcher should be fired")
}
}

func TestClient_RPC_Passthrough(t *testing.T) {
t.Parallel()
s1, _ := testServer(t, nil)
Expand Down
25 changes: 25 additions & 0 deletions client/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ TRY:
// Make the request.
rpcErr := c.connPool.RPC(c.Region(), server.Addr, c.RPCMajorVersion(), method, args, reply)
if rpcErr == nil {
c.fireRpcRetryWatcher()
return nil
}

Expand Down Expand Up @@ -382,3 +383,27 @@ func (c *Client) Ping(srv net.Addr) error {
err := c.connPool.RPC(c.Region(), srv, c.RPCMajorVersion(), "Status.Ping", struct{}{}, &reply)
return err
}

// rpcRetryWatcher returns a channel that will be closed if an event happens
// such that we expect the next RPC to be successful.
func (c *Client) rpcRetryWatcher() <-chan struct{} {
c.rpcRetryLock.Lock()
defer c.rpcRetryLock.Unlock()

if c.rpcRetryCh == nil {
c.rpcRetryCh = make(chan struct{})
}

return c.rpcRetryCh
}

// fireRpcRetryWatcher causes any RPC retryloops to retry their RPCs because we
// believe the will be successful.
func (c *Client) fireRpcRetryWatcher() {
c.rpcRetryLock.Lock()
defer c.rpcRetryLock.Unlock()
if c.rpcRetryCh != nil {
close(c.rpcRetryCh)
c.rpcRetryCh = nil
}
}
55 changes: 53 additions & 2 deletions client/servers/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"log"
"math/rand"
"net"
"sort"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -74,6 +75,16 @@ func (s *Server) String() string {
return s.addr
}

func (s *Server) Equal(o *Server) bool {
if s == nil && o == nil {
return true
} else if s == nil && o != nil || s != nil && o == nil {
return false
}

return s.Addr.String() == o.Addr.String() && s.DC == o.DC
}

type Servers []*Server

func (s Servers) String() string {
Expand Down Expand Up @@ -106,6 +117,32 @@ func (s Servers) shuffle() {
}
}

func (s Servers) Sort() {
sort.Slice(s, func(i, j int) bool {
a, b := s[i], s[j]
if addr1, addr2 := a.Addr.String(), b.Addr.String(); addr1 == addr2 {
return a.DC < b.DC
} else {
return addr1 < addr2
}
})
}

// Equal returns if the two server lists are equal, including the ordering.
func (s Servers) Equal(o Servers) bool {
if len(s) != len(o) {
return false
}

for i, v := range s {
if !v.Equal(o[i]) {
return false
}
}

return true
}

type Manager struct {
// servers is the list of all known Nomad servers.
servers Servers
Expand Down Expand Up @@ -157,10 +194,24 @@ func (m *Manager) Start() {
}
}

func (m *Manager) SetServers(servers Servers) {
// SetServers sets the servers and returns if the new server list is different
// than the existing server set
func (m *Manager) SetServers(servers Servers) bool {
m.Lock()
defer m.Unlock()

// Sort both the existing and incoming servers
servers.Sort()
m.servers.Sort()

// Determine if they are equal
equal := servers.Equal(m.servers)

// Randomize the incoming servers
servers.shuffle()
m.servers = servers

return !equal
}

// FindServer returns a server to send an RPC too. If there are no servers, nil
Expand Down Expand Up @@ -204,7 +255,7 @@ func (m *Manager) NotifyFailedServer(s *Server) {
// If the server being failed is not the first server on the list,
// this is a noop. If, however, the server is failed and first on
// the list, move the server to the end of the list.
if len(m.servers) > 1 && m.servers[0] == s {
if len(m.servers) > 1 && m.servers[0].Equal(s) {
m.servers.cycle()
}
}
Expand Down
Loading

0 comments on commit b2ae8b7

Please sign in to comment.