-
Notifications
You must be signed in to change notification settings - Fork 0
/
client_test.go
137 lines (119 loc) · 2.93 KB
/
client_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package amqpx
import (
"context"
"fmt"
"sync"
"testing"
"time"
"github.com/rabbitmq/amqp091-go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
const defaultTimeout = time.Second * 2
func TestClient_ConnectErr(t *testing.T) {
t.Parallel()
_, err := Connect(setDialer(&dialerMock{
DialFunc: func(_ context.Context) (Connection, error) {
return nil, fmt.Errorf("conn failed")
},
}))
assert.EqualError(t, err, "conn failed")
}
func TestClient_Reconnect(t *testing.T) {
t.Parallel()
client, mock := prep(t)
defer client.Close()
done := make(chan bool, 1)
mock.Dialer.DialFunc = func(_ context.Context) (Connection, error) {
done <- true
return mock.Conn, nil
}
mock.Conn.Close()
select {
case <-time.After(defaultTimeout):
t.Errorf("no reconnect")
case <-done:
}
}
type mock struct {
Dialer *dialerMock
Conn *ConnectionMock
// returns the same channel for all
Channel *ChannelMock
}
func channelMock() *ChannelMock {
channel := &ChannelMock{
QosFunc: func(prefetchCount int, prefetchSize int, global bool) error {
return nil
},
ConsumeFunc: func(queue string, consumer string, autoAck bool, exclusive bool, noLocal bool, noWait bool, args amqp091.Table) (<-chan amqp091.Delivery, error) {
return make(chan amqp091.Delivery), nil
},
PublishWithDeferredConfirmWithContextFunc: func(ctx context.Context, exchange string, key string, mandatory bool, immediate bool, msg amqp091.Publishing) (*amqp091.DeferredConfirmation, error) {
return &amqp091.DeferredConfirmation{}, nil
},
NotifyCloseFunc: func(err chan *amqp091.Error) chan *amqp091.Error {
return err
},
NotifyCancelFunc: func(tag chan string) chan string {
return tag
},
NotifyReturnFunc: func(r chan amqp091.Return) chan amqp091.Return {
return r
},
}
m := sync.Mutex{}
channel.CloseFunc = func() error {
m.Lock()
defer m.Unlock()
for _, v := range channel.NotifyCloseCalls() {
select {
case <-v.ErrorCh:
default:
close(v.ErrorCh)
}
}
return nil
}
return channel
}
func prep(t *testing.T) (*Client, mock) {
channel := channelMock()
conn := &ConnectionMock{
IsClosedFunc: func() bool {
return false
},
NotifyCloseFunc: func(errorCh chan *amqp091.Error) chan *amqp091.Error {
return errorCh
},
ChannelFunc: func() (Channel, error) {
return channel, nil
},
}
conn.CloseFunc = func() error {
defer channel.Close()
for _, v := range conn.NotifyCloseCalls() {
select {
case <-v.ErrorCh:
default:
close(v.ErrorCh)
}
}
return nil
}
mock := mock{
Dialer: &dialerMock{
DialFunc: func(_ context.Context) (Connection, error) {
return conn, nil
},
},
Conn: conn,
Channel: channel,
}
client, err := Connect(setDialer(mock.Dialer),
WithLog(func(format string, v ...any) { t.Fatalf(format, v...) }),
UseUnmarshaler(testUnmarshaler),
UseMarshaler(defaultBytesMarshaler))
require.NoError(t, err)
return client, mock
}