-
Notifications
You must be signed in to change notification settings - Fork 0
/
consumer_options.go
136 lines (117 loc) · 3.4 KB
/
consumer_options.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package amqpx
import (
"runtime"
)
var defaultLimitConcurrency = runtime.GOMAXPROCS(0)
// ConsumerOption is used to configure a consumer.
type ConsumerOption func(*consumerOptions)
type consumerOptions struct {
tag string
channel channelOptions
concurrency int
interceptor []ConsumeInterceptor
unmarshaler map[string]Unmarshaler
}
type channelOptions struct {
autoAck bool
exclusive bool
prefetchCount int
queueDeclare *QueueDeclare
exchangeDeclare *ExchangeDeclare
queueBind *QueueBind
}
func (c *consumerOptions) validate(fn HandlerValue) error {
if fn == nil {
return errFuncNil
}
if _, ok := fn.(*handleValue[[]byte]); !ok && len(c.unmarshaler) == 0 {
return errUnmarshalerNotFound
}
if !c.channel.autoAck {
c.concurrency = c.channel.prefetchCount
}
if c.concurrency == 0 {
c.concurrency = defaultLimitConcurrency
}
return nil
}
// ConsumerTag sets tag.
func ConsumerTag(tag string) ConsumerOption {
return func(o *consumerOptions) {
o.tag = tag
}
}
// SetAutoAckMode sets auto ack mode.
// The default is false.
func SetAutoAckMode() ConsumerOption {
return func(o *consumerOptions) {
o.channel.autoAck = true
}
}
// SetExclusive sets exclusive.
// The default is false.
//
// When exclusive is true, the server will ensure that this is the sole consumer
// from this queue. When exclusive is false, the server will fairly distribute
// deliveries across multiple consumers.
func SetExclusive(b bool) ConsumerOption {
return func(o *consumerOptions) {
o.channel.exclusive = b
}
}
// SetPrefetchCount sets prefetch count.
// prefetchCount controls how many messages the server will try to keep on
// the network for consumers before receiving delivery acks. The intent of prefetchCount is
// to make sure the network buffers stay full between the server and client.
//
// With a prefetch count greater than zero, the server will deliver that many
// messages to consumers before acknowledgments are received. The server ignores
// this option when consumers are started with AutoAck=false because no acknowledgments
// are expected or sent.
func SetPrefetchCount(i int) ConsumerOption {
return func(o *consumerOptions) {
o.channel.prefetchCount = i
}
}
// SetConcurrency sets limit the number of goroutines for every delivered message.
// The default is runtime.GOMAXPROCS(0).
// The consumer ignores this option when prefetch count greater than zero with AutoAck=false.
func SetConcurrency(i int) ConsumerOption {
return func(o *consumerOptions) {
if i > 0 {
o.concurrency = i
}
}
}
// SetConsumeInterceptor sets consume interceptor.
func SetConsumeInterceptor(i ...ConsumeInterceptor) ConsumerOption {
return func(o *consumerOptions) {
o.interceptor = append(o.interceptor, i...)
}
}
// SetUnmarshaler sets unmarshaler.
func SetUnmarshaler(m Unmarshaler) ConsumerOption {
return func(o *consumerOptions) {
if m != nil {
o.unmarshaler = map[string]Unmarshaler{m.ContentType(): m}
}
}
}
// DeclareQueue sets queue declare.
func DeclareQueue(q QueueDeclare) ConsumerOption {
return func(o *consumerOptions) {
o.channel.queueDeclare = &q
}
}
// DeclareExchange sets exchange declare.
func DeclareExchange(e ExchangeDeclare) ConsumerOption {
return func(o *consumerOptions) {
o.channel.exchangeDeclare = &e
}
}
// BindQueue sets queue bind.
func BindQueue(q QueueBind) ConsumerOption {
return func(o *consumerOptions) {
o.channel.queueBind = &q
}
}