-
Notifications
You must be signed in to change notification settings - Fork 0
/
consumer_test.go
177 lines (141 loc) · 4.39 KB
/
consumer_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
package amqpx
import (
"context"
"encoding/json"
"fmt"
"testing"
"time"
"github.com/rabbitmq/amqp091-go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
type unmarshaler struct{}
func (*unmarshaler) ContentType() string {
return "application/json"
}
func (*unmarshaler) Unmarshal(b []byte, v any) error {
return json.Unmarshal(b, v)
}
var testUnmarshaler = &unmarshaler{}
func TestConsumer_Reconnect(t *testing.T) {
t.Parallel()
client, mock := prep(t)
defer client.Close()
defer time.AfterFunc(defaultTimeout, func() { panic("deadlock") }).Stop()
assert.NoError(t, client.NewConsumer("", D(func(context.Context, *Delivery[[]byte]) Action { return Ack })))
done := make(chan bool)
mock.Conn.ChannelFunc = func() (Channel, error) {
defer close(done)
return channelMock(), nil
}
mock.Conn.Close()
<-done
}
func TestClient_NewConsumer(t *testing.T) {
t.Parallel()
t.Run("channel error", func(t *testing.T) {
t.Parallel()
client, mock := prep(t)
defer client.Close()
mock.Conn.ChannelFunc = func() (Channel, error) {
return nil, fmt.Errorf("failed")
}
queue := "foo"
got := client.NewConsumer(queue, D(func(context.Context, *Delivery[[]byte]) Action { return Ack }))
assert.Errorf(t, got, "amqpx: queue %q consumer-tag %q: %s", queue, "", "create channel: failed")
})
t.Run("consume error", func(t *testing.T) {
t.Parallel()
client, mock := prep(t)
defer client.Close()
mock.Channel.ConsumeFunc = func(queue string, consumer string, autoAck bool, exclusive bool, noLocal bool, noWait bool, args amqp091.Table) (<-chan amqp091.Delivery, error) {
return nil, fmt.Errorf("failed")
}
queue := "foo"
got := client.NewConsumer(queue, D(func(context.Context, *Delivery[[]byte]) Action { return Ack }))
assert.Errorf(t, got, "amqpx: queue %q consumer-tag %q: %s", queue, "", "consume: failed")
})
}
func TestDeliveryRequest_setStatus(t *testing.T) {
t.Parallel()
t.Run("ack", func(t *testing.T) {
t.Parallel()
ackMock := &AcknowledgerMock{
AckFunc: func(tag uint64, multiple bool) error {
return nil
},
}
d := &DeliveryRequest{in: &amqp091.Delivery{Acknowledger: ackMock}}
require.NoError(t, d.setStatus(Ack))
assert.Equal(t, 1, len(ackMock.AckCalls()))
})
t.Run("nack", func(t *testing.T) {
t.Parallel()
ackMock := &AcknowledgerMock{
NackFunc: func(tag uint64, multiple bool, requeue bool) error {
return nil
},
}
d := &DeliveryRequest{in: &amqp091.Delivery{Acknowledger: ackMock}}
require.NoError(t, d.setStatus(Nack))
assert.Equal(t, 1, len(ackMock.NackCalls()))
})
t.Run("reject", func(t *testing.T) {
t.Parallel()
ackMock := &AcknowledgerMock{
RejectFunc: func(tag uint64, requeue bool) error {
return nil
},
}
d := &DeliveryRequest{in: &amqp091.Delivery{Acknowledger: ackMock}}
require.NoError(t, d.setStatus(Reject))
assert.Equal(t, 1, len(ackMock.RejectCalls()))
})
}
func TestDeliveryBytes(t *testing.T) {
t.Parallel()
client, mock := prep(t)
defer client.Close()
defer time.AfterFunc(defaultTimeout, func() { panic("deadlock") }).Stop()
msg := amqp091.Delivery{
Body: []byte("hello"),
Acknowledger: &AcknowledgerMock{
AckFunc: func(tag uint64, multiple bool) error {
return nil
},
}}
mock.Channel.ConsumeFunc = func(queue string, consumer string, autoAck bool, exclusive bool, noLocal bool, noWait bool, args amqp091.Table) (<-chan amqp091.Delivery, error) {
ch := make(chan amqp091.Delivery, 1)
ch <- msg
return ch, nil
}
done := make(chan bool)
got := client.NewConsumer("", D(func(ctx context.Context, d *Delivery[[]byte]) Action {
defer close(done)
assert.Equal(t, &msg.Body, d.Msg)
return Ack
}))
require.NoError(t, got)
<-done
}
func TestDeliveryStruct(t *testing.T) {
t.Parallel()
type Gopher struct {
Name string `json:"name"`
}
var call bool
fn := D[Gopher](func(ctx context.Context, got *Delivery[Gopher]) Action {
call = true
assert.Equal(t, &Delivery[Gopher]{Msg: &Gopher{Name: "gopher"}, Req: &DeliveryRequest{
in: &amqp091.Delivery{Body: nil, ContentType: testUnmarshaler.ContentType()},
}}, got)
return Ack
})
fn.init(map[string]Unmarshaler{testUnmarshaler.ContentType(): testUnmarshaler})
got := fn.serve(context.Background(), &DeliveryRequest{in: &amqp091.Delivery{
Body: []byte(`{"name":"gopher"}`),
ContentType: testUnmarshaler.ContentType()},
})
assert.Equal(t, true, call)
assert.Equal(t, Ack, got)
}