Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved Client handling of failed RPCs #4106

Merged
merged 8 commits into from
Apr 5, 2018
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 11 additions & 15 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,11 @@ type Client struct {
// server for the node event
triggerEmitNodeEvent chan *structs.NodeEvent

// discovered will be ticked whenever Consul discovery completes
// successfully
serversDiscoveredCh chan struct{}
// rpcRetryCh is closed when there an event such as server discovery or a
// successful RPC occurring happens such that a retry should happen. Access
// should only occur via the getter method
rpcRetryCh chan struct{}
rpcRetryLock sync.Mutex

// allocs maps alloc IDs to their AllocRunner. This map includes all
// AllocRunners - running and GC'd - until the server GCs them.
Expand Down Expand Up @@ -217,7 +219,6 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic
shutdownCh: make(chan struct{}),
triggerDiscoveryCh: make(chan struct{}),
triggerNodeUpdate: make(chan struct{}, 8),
serversDiscoveredCh: make(chan struct{}),
triggerEmitNodeEvent: make(chan *structs.NodeEvent, 8),
}

Expand Down Expand Up @@ -1154,7 +1155,7 @@ func (c *Client) registerAndHeartbeat() {

for {
select {
case <-c.serversDiscoveredCh:
case <-c.rpcRetryWatcher():
case <-heartbeat:
case <-c.shutdownCh:
return
Expand Down Expand Up @@ -1307,7 +1308,7 @@ func (c *Client) retryRegisterNode() {
c.logger.Printf("[ERR] client: registration failure: %v", err)
}
select {
case <-c.serversDiscoveredCh:
case <-c.rpcRetryWatcher():
case <-time.After(c.retryIntv(registerRetryIntv)):
case <-c.shutdownCh:
return
Expand Down Expand Up @@ -1567,7 +1568,7 @@ OUTER:
}
retry := c.retryIntv(getAllocRetryIntv)
select {
case <-c.serversDiscoveredCh:
case <-c.rpcRetryWatcher():
continue
case <-time.After(retry):
continue
Expand Down Expand Up @@ -1622,7 +1623,7 @@ OUTER:
c.logger.Printf("[ERR] client: failed to query updated allocations: %v", err)
retry := c.retryIntv(getAllocRetryIntv)
select {
case <-c.serversDiscoveredCh:
case <-c.rpcRetryWatcher():
continue
case <-time.After(retry):
continue
Expand Down Expand Up @@ -2090,13 +2091,8 @@ DISCOLOOP:
// Notify waiting rpc calls. If a goroutine just failed an RPC call and
// isn't receiving on this chan yet they'll still retry eventually.
// This is a shortcircuit for the longer retry intervals.
for {
select {
case c.serversDiscoveredCh <- struct{}{}:
default:
return nil
}
}
c.fireRpcRetryWatcher()
return nil
}

// emitStats collects host resource usage stats periodically
Expand Down
28 changes: 28 additions & 0 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,34 @@ func TestClient_RPC(t *testing.T) {
})
}

func TestClient_RPC_FireRetryWatchers(t *testing.T) {
t.Parallel()
s1, addr := testServer(t, nil)
defer s1.Shutdown()

c1 := TestClient(t, func(c *config.Config) {
c.Servers = []string{addr}
})
defer c1.Shutdown()

watcher := c1.rpcRetryWatcher()

// RPC should succeed
testutil.WaitForResult(func() (bool, error) {
var out struct{}
err := c1.RPC("Status.Ping", struct{}{}, &out)
return err == nil, err
}, func(err error) {
t.Fatalf("err: %v", err)
})

select {
case <-watcher:
default:
t.Fatal("watcher should be fired")
}
}

func TestClient_RPC_Passthrough(t *testing.T) {
t.Parallel()
s1, _ := testServer(t, nil)
Expand Down
25 changes: 25 additions & 0 deletions client/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ TRY:
// Make the request.
rpcErr := c.connPool.RPC(c.Region(), server.Addr, c.RPCMajorVersion(), method, args, reply)
if rpcErr == nil {
c.fireRpcRetryWatcher()
return nil
}

Expand Down Expand Up @@ -382,3 +383,27 @@ func (c *Client) Ping(srv net.Addr) error {
err := c.connPool.RPC(c.Region(), srv, c.RPCMajorVersion(), "Status.Ping", struct{}{}, &reply)
return err
}

// rpcRetryWatcher returns a channel that will be closed if an event happens
// such that we expect the next RPC to be successful.
func (c *Client) rpcRetryWatcher() <-chan struct{} {
c.rpcRetryLock.Lock()
defer c.rpcRetryLock.Unlock()

if c.rpcRetryCh == nil {
c.rpcRetryCh = make(chan struct{})
}

return c.rpcRetryCh
}

// fireRpcRetryWatcher causes any RPC retryloops to retry their RPCs because we
// believe the will be successful.
func (c *Client) fireRpcRetryWatcher() {
c.rpcRetryLock.Lock()
defer c.rpcRetryLock.Unlock()
if c.rpcRetryCh != nil {
close(c.rpcRetryCh)
c.rpcRetryCh = nil
}
}
15 changes: 14 additions & 1 deletion client/servers/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,16 @@ func (s *Server) String() string {
return s.addr
}

func (s *Server) Equal(o *Server) bool {
if s == nil && o == nil {
return true
} else if s == nil && o != nil || s != nil && o == nil {
return false
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be shortened to if s == nil || o == nil { return s == o }

h/t to @chelseakomlo


return s.Addr.String() == o.Addr.String() && s.DC == o.DC
}

type Servers []*Server

func (s Servers) String() string {
Expand Down Expand Up @@ -160,6 +170,9 @@ func (m *Manager) Start() {
func (m *Manager) SetServers(servers Servers) {
m.Lock()
defer m.Unlock()

// Randomize the incoming servers
servers.shuffle()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you still want to shuffle if they're equal?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah otherwise we will have deterministic ordering through Consul discovery.

m.servers = servers
}

Expand Down Expand Up @@ -204,7 +217,7 @@ func (m *Manager) NotifyFailedServer(s *Server) {
// If the server being failed is not the first server on the list,
// this is a noop. If, however, the server is failed and first on
// the list, move the server to the end of the list.
if len(m.servers) > 1 && m.servers[0] == s {
if len(m.servers) > 1 && m.servers[0].Equal(s) {
m.servers.cycle()
}
}
Expand Down
76 changes: 37 additions & 39 deletions client/servers/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"testing"

"github.com/hashicorp/nomad/client/servers"
"github.com/hashicorp/nomad/helper/testlog"
)

type fauxAddr struct {
Expand All @@ -32,22 +33,22 @@ func (cp *fauxConnPool) Ping(net.Addr) error {
return fmt.Errorf("bad server")
}

func testManager() (m *servers.Manager) {
logger := log.New(os.Stderr, "", log.LstdFlags)
func testManager(t *testing.T) (m *servers.Manager) {
logger := testlog.Logger(t)
shutdownCh := make(chan struct{})
m = servers.New(logger, shutdownCh, &fauxConnPool{})
return m
}

func testManagerFailProb(failPct float64) (m *servers.Manager) {
logger := log.New(os.Stderr, "", log.LstdFlags)
func testManagerFailProb(t *testing.T, failPct float64) (m *servers.Manager) {
logger := testlog.Logger(t)
shutdownCh := make(chan struct{})
m = servers.New(logger, shutdownCh, &fauxConnPool{failPct: failPct})
return m
}

func TestServers_SetServers(t *testing.T) {
m := testManager()
m := testManager(t)
var num int
num = m.NumServers()
if num != 0 {
Expand All @@ -66,14 +67,10 @@ func TestServers_SetServers(t *testing.T) {
if l := len(all); l != 2 {
t.Fatalf("expected 2 servers got %d", l)
}

if all[0] == s1 || all[0] == s2 {
t.Fatalf("expected a copy, got actual server")
}
}

func TestServers_FindServer(t *testing.T) {
m := testManager()
m := testManager(t)

if m.FindServer() != nil {
t.Fatalf("Expected nil return")
Expand Down Expand Up @@ -105,20 +102,14 @@ func TestServers_FindServer(t *testing.T) {
t.Fatalf("Expected two servers")
}
s1 = m.FindServer()
if s1 == nil || s1.String() != "s1" {
t.Fatalf("Expected s1 server (still)")
}

m.NotifyFailedServer(s1)
s2 := m.FindServer()
if s2 == nil || s2.String() != "s2" {
t.Fatalf("Expected s2 server")
for _, srv := range srvs {
m.NotifyFailedServer(srv)
}

m.NotifyFailedServer(s2)
s1 = m.FindServer()
if s1 == nil || s1.String() != "s1" {
t.Fatalf("Expected s1 server")
s2 := m.FindServer()
if s1.Equal(s2) {
t.Fatalf("Expected different server")
}
}

Expand All @@ -132,7 +123,7 @@ func TestServers_New(t *testing.T) {
}

func TestServers_NotifyFailedServer(t *testing.T) {
m := testManager()
m := testManager(t)

if m.NumServers() != 0 {
t.Fatalf("Expected zero servers to start")
Expand All @@ -159,32 +150,39 @@ func TestServers_NotifyFailedServer(t *testing.T) {
t.Fatalf("Expected two servers")
}

s1 = m.FindServer()
if s1 == nil || s1.String() != "s1" {
t.Fatalf("Expected s1 server")
// Grab a server
first := m.FindServer()

// Find the other server
second := s1
if first.Equal(s1) {
second = s2
}

m.NotifyFailedServer(s2)
s1 = m.FindServer()
if s1 == nil || s1.String() != "s1" {
t.Fatalf("Expected s1 server (still)")
// Fail the other server
m.NotifyFailedServer(second)
next := m.FindServer()
if !next.Equal(first) {
t.Fatalf("Expected first server (still)")
}

m.NotifyFailedServer(s1)
s2 = m.FindServer()
if s2 == nil || s2.String() != "s2" {
t.Fatalf("Expected s2 server")
// Fail the first
m.NotifyFailedServer(first)
next = m.FindServer()
if !next.Equal(second) {
t.Fatalf("Expected second server")
}

m.NotifyFailedServer(s2)
s1 = m.FindServer()
if s1 == nil || s1.String() != "s1" {
t.Fatalf("Expected s1 server")
// Fail the second
m.NotifyFailedServer(second)
next = m.FindServer()
if !next.Equal(first) {
t.Fatalf("Expected first server")
}
}

func TestServers_NumServers(t *testing.T) {
m := testManager()
m := testManager(t)
var num int
num = m.NumServers()
if num != 0 {
Expand All @@ -201,7 +199,7 @@ func TestServers_NumServers(t *testing.T) {

func TestServers_RebalanceServers(t *testing.T) {
const failPct = 0.5
m := testManagerFailProb(failPct)
m := testManagerFailProb(t, failPct)
const maxServers = 100
const numShuffleTests = 100
const uniquePassRate = 0.5
Expand Down