-
Notifications
You must be signed in to change notification settings - Fork 0
/
consumer_options_test.go
112 lines (93 loc) · 2.59 KB
/
consumer_options_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package amqpx
import (
"context"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestConsumerOption(t *testing.T) {
t.Parallel()
queueDeclare := QueueDeclare{
Durable: true,
AutoDelete: true,
Exclusive: true,
NoWait: true,
Args: nil,
}
exchangeDeclare := ExchangeDeclare{
Name: "exchange_name_value",
Type: "exchange_type_value",
Durable: true,
AutoDelete: true,
Internal: true,
NoWait: true,
Args: nil,
}
queueBind := QueueBind{
Exchange: "exchange_name_value",
RoutingKey: []string{"routing_key_value"},
NoWait: true,
Args: nil,
}
got := consumerOptions{}
for _, o := range []ConsumerOption{
ConsumerTag("consumer_tag_value"),
SetAutoAckMode(),
SetExclusive(true),
SetPrefetchCount(2),
SetConcurrency(3),
SetUnmarshaler(testUnmarshaler),
DeclareQueue(queueDeclare),
DeclareExchange(exchangeDeclare),
BindQueue(queueBind),
} {
o(&got)
}
want := consumerOptions{
tag: "consumer_tag_value",
channel: channelOptions{
autoAck: true,
exclusive: true,
prefetchCount: 2,
queueDeclare: &queueDeclare,
exchangeDeclare: &exchangeDeclare,
queueBind: &queueBind,
},
concurrency: 3,
interceptor: nil,
unmarshaler: map[string]Unmarshaler{testUnmarshaler.ContentType(): testUnmarshaler},
}
assert.Equal(t, want, got)
}
func TestConsumerOption_Validate(t *testing.T) {
t.Parallel()
fn := D(func(ctx context.Context, d *Delivery[[]byte]) Action { return Ack })
t.Run("equals prefetch count", func(t *testing.T) {
t.Parallel()
got := consumerOptions{channel: channelOptions{prefetchCount: 2}, unmarshaler: map[string]Unmarshaler{"any": nil}}
require.NoError(t, got.validate(fn))
assert.Equal(t, 2, got.concurrency)
})
t.Run("auto-ack mode", func(t *testing.T) {
t.Parallel()
got := consumerOptions{channel: channelOptions{autoAck: true}, concurrency: 2, unmarshaler: map[string]Unmarshaler{"any": nil}}
require.NoError(t, got.validate(fn))
assert.Equal(t, 2, got.concurrency)
})
t.Run("default concurrency", func(t *testing.T) {
t.Parallel()
got := consumerOptions{unmarshaler: map[string]Unmarshaler{"any": nil}}
require.NoError(t, got.validate(fn))
assert.Equal(t, defaultLimitConcurrency, got.concurrency)
})
t.Run("ignore unmarshaler error", func(t *testing.T) {
t.Parallel()
got := (&consumerOptions{}).validate(fn)
assert.Nil(t, got)
})
t.Run("func nil", func(t *testing.T) {
t.Parallel()
got := (&consumerOptions{}).validate(nil)
assert.ErrorIs(t, got, errFuncNil)
})
}