Skip to content

Commit 1cf2a3e

Browse files
authored
Merge pull request redis#105 from x-martinez/add-node-info-error
Add node information in error
2 parents 4cb46bb + 81c4ce0 commit 1cf2a3e

File tree

3 files changed

+78
-14
lines changed

3 files changed

+78
-14
lines changed

error.go

+33-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package redsync
22

3-
import "errors"
3+
import (
4+
"errors"
5+
"fmt"
6+
)
47

58
// ErrFailed is the error resulting if Redsync fails to acquire the lock after
69
// exhausting all retries.
@@ -9,3 +12,32 @@ var ErrFailed = errors.New("redsync: failed to acquire lock")
912
// ErrExtendFailed is the error resulting if Redsync fails to extend the
1013
// lock.
1114
var ErrExtendFailed = errors.New("redsync: failed to extend lock")
15+
16+
// ErrTaken happens when the lock is already taken in a quorum on nodes.
17+
type ErrTaken struct {
18+
Nodes []int
19+
}
20+
21+
func (err ErrTaken) Error() string {
22+
return fmt.Sprintf("lock already taken, locked nodes: %v", err.Nodes)
23+
}
24+
25+
// ErrNodeTaken is the error resulting if the lock is already taken in one of
26+
// the cluster's nodes
27+
type ErrNodeTaken struct {
28+
Node int
29+
}
30+
31+
func (err ErrNodeTaken) Error() string {
32+
return fmt.Sprintf("node #%d: lock already taken", err.Node)
33+
}
34+
35+
// A RedisError is an error communicating with one of the Redis nodes.
36+
type RedisError struct {
37+
Node int
38+
Err error
39+
}
40+
41+
func (err RedisError) Error() string {
42+
return fmt.Sprintf("node #%d: %s", err.Node, err.Err)
43+
}

mutex.go

+15-6
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ func (m *Mutex) LockContext(ctx context.Context) error {
9595
m.until = until
9696
return nil
9797
}
98-
_, err = func() (int, error) {
98+
func() (int, error) {
9999
ctx, cancel := context.WithTimeout(ctx, time.Duration(int64(float64(m.expiry)*m.timeoutFactor)))
100100
defer cancel()
101101
return m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) {
@@ -252,27 +252,36 @@ func (m *Mutex) touch(ctx context.Context, pool redis.Pool, value string, expiry
252252

253253
func (m *Mutex) actOnPoolsAsync(actFn func(redis.Pool) (bool, error)) (int, error) {
254254
type result struct {
255+
Node int
255256
Status bool
256257
Err error
257258
}
258259

259260
ch := make(chan result)
260-
for _, pool := range m.pools {
261-
go func(pool redis.Pool) {
262-
r := result{}
261+
for node, pool := range m.pools {
262+
go func(node int, pool redis.Pool) {
263+
r := result{Node: node}
263264
r.Status, r.Err = actFn(pool)
264265
ch <- r
265-
}(pool)
266+
}(node, pool)
266267
}
267268
n := 0
269+
var taken []int
268270
var err error
269271
for range m.pools {
270272
r := <-ch
271273
if r.Status {
272274
n++
273275
} else if r.Err != nil {
274-
err = multierror.Append(err, r.Err)
276+
err = multierror.Append(err, RedisError{Node: r.Node, Err: r.Err})
277+
} else {
278+
taken = append(taken, r.Node)
279+
err = multierror.Append(err, ErrNodeTaken{Node: r.Node})
275280
}
276281
}
282+
283+
if len(taken) >= m.quorum {
284+
return n, ErrTaken{Nodes: taken}
285+
}
277286
return n, err
278287
}

mutex_test.go

+30-7
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package redsync
22

33
import (
4+
"errors"
45
"strconv"
56
"testing"
67
"time"
@@ -33,6 +34,28 @@ func TestMutex(t *testing.T) {
3334
}
3435
}
3536

37+
func TestMutexAlreadyLocked(t *testing.T) {
38+
for k, v := range makeCases(4) {
39+
t.Run(k, func(t *testing.T) {
40+
rs := New(v.pools...)
41+
key := "test-lock"
42+
43+
mutex1 := rs.NewMutex(key)
44+
err := mutex1.Lock()
45+
if err != nil {
46+
t.Fatalf("mutex lock failed: %s", err)
47+
}
48+
assertAcquired(t, v.pools, mutex1)
49+
50+
mutex2 := rs.NewMutex(key)
51+
err = mutex2.Lock()
52+
if !errors.As(err, &ErrTaken{}) {
53+
t.Fatalf("mutex was not already locked: %s", err)
54+
}
55+
})
56+
}
57+
}
58+
3659
func TestMutexExtend(t *testing.T) {
3760
for k, v := range makeCases(8) {
3861
t.Run(k, func(t *testing.T) {
@@ -82,8 +105,8 @@ func TestMutexExtendExpired(t *testing.T) {
82105
time.Sleep(1 * time.Second)
83106

84107
ok, err := mutex.Extend()
85-
if err != nil {
86-
t.Fatalf("mutex extend failed: %s", err)
108+
if err == nil {
109+
t.Fatalf("mutex extend didn't fail")
87110
}
88111
if ok {
89112
t.Fatalf("Expected ok == false, got %v", ok)
@@ -108,8 +131,8 @@ func TestMutexUnlockExpired(t *testing.T) {
108131
time.Sleep(1 * time.Second)
109132

110133
ok, err := mutex.Unlock()
111-
if err != nil {
112-
t.Fatalf("mutex unlock failed: %s", err)
134+
if err == nil {
135+
t.Fatalf("mutex unlock didn't fail")
113136
}
114137
if ok {
115138
t.Fatalf("Expected ok == false, got %v", ok)
@@ -136,8 +159,8 @@ func TestMutexQuorum(t *testing.T) {
136159
assertAcquired(t, v.pools, mutex)
137160
} else {
138161
err := mutex.Lock()
139-
if err != ErrFailed {
140-
t.Fatalf("Expected err == %q, got %q", ErrFailed, err)
162+
if errors.Is(err, &ErrNodeTaken{}) {
163+
t.Fatalf("Expected err == %q, got %q", ErrNodeTaken{}, err)
141164
}
142165
}
143166
}
@@ -258,7 +281,7 @@ func newTestMutexes(pools []redis.Pool, name string, n int) []*Mutex {
258281
mutexes := make([]*Mutex, n)
259282
for i := 0; i < n; i++ {
260283
mutexes[i] = &Mutex{
261-
name: name,
284+
name: name + "-" + strconv.Itoa(i),
262285
expiry: 8 * time.Second,
263286
tries: 32,
264287
delayFunc: func(tries int) time.Duration { return 500 * time.Millisecond },

0 commit comments

Comments
 (0)