Skip to content

Commit

Permalink
Merge pull request #1457 from hashicorp/f-monitor-retries
Browse files Browse the repository at this point in the history
Adds a retry capability to lock monitors in the API client.
  • Loading branch information
slackpad committed Dec 1, 2015
2 parents 87ccd1e + 380658f commit 86f20a3
Show file tree
Hide file tree
Showing 2 changed files with 153 additions and 6 deletions.
40 changes: 34 additions & 6 deletions api/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package api

import (
"fmt"
"strings"
"sync"
"time"
)
Expand All @@ -22,9 +23,15 @@ const (

// DefaultLockRetryTime is how long we wait after a failed lock acquisition
// before attempting to do the lock again. This is so that once a lock-delay
// is in affect, we do not hot loop retrying the acquisition.
// is in effect, we do not hot loop retrying the acquisition.
DefaultLockRetryTime = 5 * time.Second

// DefaultMonitorRetryTime is how long we wait after a failed monitor check
// of a lock (500 response code). This allows the monitor to ride out brief
// periods of unavailability, subject to the MonitorRetries setting in the
// lock options which is by default set to 0, disabling this feature.
DefaultMonitorRetryTime = 2 * time.Second

// LockFlagValue is a magic flag we set to indicate a key
// is being used for a lock. It is used to detect a potential
// conflict with a semaphore.
Expand Down Expand Up @@ -62,11 +69,13 @@ type Lock struct {

// LockOptions is used to parameterize the Lock behavior.
type LockOptions struct {
Key string // Must be set and have write permissions
Value []byte // Optional, value to associate with the lock
Session string // Optional, created if not specified
SessionName string // Optional, defaults to DefaultLockSessionName
SessionTTL string // Optional, defaults to DefaultLockSessionTTL
Key string // Must be set and have write permissions
Value []byte // Optional, value to associate with the lock
Session string // Optional, created if not specified
SessionName string // Optional, defaults to DefaultLockSessionName
SessionTTL string // Optional, defaults to DefaultLockSessionTTL
MonitorRetries int // Optional, defaults to 0 which means no retries
MonitorRetryTime time.Duration // Optional, defaults to DefaultMonitorRetryTime
}

// LockKey returns a handle to a lock struct which can be used
Expand Down Expand Up @@ -96,6 +105,9 @@ func (c *Client) LockOpts(opts *LockOptions) (*Lock, error) {
return nil, fmt.Errorf("invalid SessionTTL: %v", err)
}
}
if opts.MonitorRetryTime == 0 {
opts.MonitorRetryTime = DefaultMonitorRetryTime
}
l := &Lock{
c: c,
opts: opts,
Expand Down Expand Up @@ -327,8 +339,24 @@ func (l *Lock) monitorLock(session string, stopCh chan struct{}) {
kv := l.c.KV()
opts := &QueryOptions{RequireConsistent: true}
WAIT:
retries := l.opts.MonitorRetries
RETRY:
pair, meta, err := kv.Get(l.opts.Key, opts)
if err != nil {
// TODO (slackpad) - Make a real error type here instead of using
// a string check.
const serverError = "Unexpected response code: 500"

// If configured we can try to ride out a brief Consul unavailability
// by doing retries. Note that we have to attempt the retry in a non-
// blocking fashion so that we have a clean place to reset the retry
// counter if service is restored.
if retries > 0 && strings.Contains(err.Error(), serverError) {
time.Sleep(l.opts.MonitorRetryTime)
retries--
opts.WaitIndex = 0
goto RETRY
}
return
}
if pair != nil && pair.Session == session {
Expand Down
119 changes: 119 additions & 0 deletions api/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ package api

import (
"log"
"net/http"
"net/http/httptest"
"net/http/httputil"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -369,3 +373,118 @@ func TestLock_ReclaimLock(t *testing.T) {
t.Fatalf("should not be leader")
}
}

func TestLock_MonitorRetry(t *testing.T) {
t.Parallel()
raw, s := makeClient(t)
defer s.Stop()

// Set up a server that always responds with 500 errors.
failer := func(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(500)
}
outage := httptest.NewServer(http.HandlerFunc(failer))
defer outage.Close()

// Set up a reverse proxy that will send some requests to the
// 500 server and pass everything else through to the real Consul
// server.
var mutex sync.Mutex
errors := 0
director := func(req *http.Request) {
mutex.Lock()
defer mutex.Unlock()

req.URL.Scheme = "http"
if errors > 0 && req.Method == "GET" && strings.Contains(req.URL.Path, "/v1/kv/test/lock") {
req.URL.Host = outage.URL[7:] // Strip off "http://".
errors--
} else {
req.URL.Host = raw.config.Address
}
}
proxy := httptest.NewServer(&httputil.ReverseProxy{Director: director})
defer proxy.Close()

// Make another client that points at the proxy instead of the real
// Consul server.
config := raw.config
config.Address = proxy.URL[7:] // Strip off "http://".
c, err := NewClient(&config)
if err != nil {
t.Fatalf("err: %v", err)
}

// Set up a lock with retries enabled.
opts := &LockOptions{
Key: "test/lock",
SessionTTL: "60s",
MonitorRetries: 3,
}
lock, err := c.LockOpts(opts)
if err != nil {
t.Fatalf("err: %v", err)
}

// Make sure the default got set.
if lock.opts.MonitorRetryTime != DefaultMonitorRetryTime {
t.Fatalf("bad: %d", lock.opts.MonitorRetryTime)
}

// Now set a custom time for the test.
opts.MonitorRetryTime = 250 * time.Millisecond
lock, err = c.LockOpts(opts)
if err != nil {
t.Fatalf("err: %v", err)
}
if lock.opts.MonitorRetryTime != 250*time.Millisecond {
t.Fatalf("bad: %d", lock.opts.MonitorRetryTime)
}

// Should get the lock.
leaderCh, err := lock.Lock(nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if leaderCh == nil {
t.Fatalf("not leader")
}

// Poke the key using the raw client to force the monitor to wake up
// and check the lock again. This time we will return errors for some
// of the responses.
mutex.Lock()
errors = 2
mutex.Unlock()
pair, _, err := raw.KV().Get("test/lock", &QueryOptions{})
if err != nil {
t.Fatalf("err: %v", err)
}
if _, err := raw.KV().Put(pair, &WriteOptions{}); err != nil {
t.Fatalf("err: %v", err)
}
time.Sleep(5 * opts.MonitorRetryTime)

// Should still be the leader.
select {
case <-leaderCh:
t.Fatalf("should be leader")
default:
}

// Now return an overwhelming number of errors.
mutex.Lock()
errors = 10
mutex.Unlock()
if _, err := raw.KV().Put(pair, &WriteOptions{}); err != nil {
t.Fatalf("err: %v", err)
}
time.Sleep(5 * opts.MonitorRetryTime)

// Should lose leadership.
select {
case <-leaderCh:
case <-time.After(time.Second):
t.Fatalf("should not be leader")
}
}

0 comments on commit 86f20a3

Please sign in to comment.