-
Notifications
You must be signed in to change notification settings - Fork 1
/
consumer.go
120 lines (104 loc) · 3.51 KB
/
consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package astiamqp
import (
"context"
"fmt"
"strconv"
"sync/atomic"
"github.com/streadway/amqp"
)
// Consumer represents a Consumer
type Consumer struct {
cancel context.CancelFunc
ctx context.Context
configuration ConfigurationConsumer
tag string
}
// AddConsumer adds a consumer
func (a *AMQP) AddConsumer(c ConfigurationConsumer) (err error) {
// Lock
a.mc.Lock()
defer a.mc.Unlock()
// Create consumer
var csm = &Consumer{
configuration: c,
tag: strconv.Itoa(int(atomic.AddUint32(&a.consumerCounter, 1))),
}
// Set up consumer
if err = a.setupConsumer(csm); err != nil {
err = fmt.Errorf("astiamqp: setting up consumer %+v failed: %w", csm, err)
return
}
// Append consumer
a.consumers = append(a.consumers, csm)
return
}
func (a *AMQP) setupConsumer(c *Consumer) (err error) {
// No channel
if a.channel == nil {
return
}
// Declare exchange
if err = a.declareExchange(c.configuration.Exchange); err != nil {
err = fmt.Errorf("astiamqp: declaring exchange %+v failed: %w", c.configuration.Exchange, err)
return
}
// Declare queue
if err = a.declareQueue(c.configuration.Queue); err != nil {
err = fmt.Errorf("astiamqp: declaring queue %+v failed: %w", c.configuration.Queue, err)
return
}
// Bind queue
if err = a.bindQueue(c.configuration.Queue, c.configuration.Exchange, c.configuration.RoutingKey); err != nil {
err = fmt.Errorf("astiamqp: binding queue %+v to exchange %+v for routing key %s failed: %w", c.configuration.Queue, c.configuration.Exchange, c.configuration.RoutingKey, err)
return
}
// Consume
var deliveries <-chan amqp.Delivery
if deliveries, err = a.consume(c); err != nil {
err = fmt.Errorf("astiamqp: consuming on consumer %+v failed: %w", c.configuration, err)
return
}
// Reset context
c.ctx, c.cancel = context.WithCancel(a.ctx)
// Handle deliveries
a.l.Debugf("astiamqp: handling deliveries of consumer %s on queue %s", c.tag, c.configuration.Queue.Name)
a.t.NewSubTask().Do(func() {
for {
select {
case d := <-deliveries:
if d.DeliveryTag > 0 {
a.l.Debugf("astiamqp: received body %s on routing key %s, queue %s and exchange %s", string(d.Body), d.RoutingKey, c.configuration.Queue.Name, c.configuration.Exchange.Name)
var ctx context.Context
if ctx, err = c.configuration.Handler(d.Body, d.RoutingKey, newAcknowledger(d.Acknowledger, d.DeliveryTag, a.l)); err != nil {
a.l.ErrorC(ctx, fmt.Errorf("astiamqp: handling body %s on routing key %s, queue %s and exchange %s failed: %w", string(d.Body), d.RoutingKey, c.configuration.Queue.Name, c.configuration.Exchange.Name, err))
}
}
case <-c.ctx.Done():
a.l.Debugf("astiamqp: stopping handling deliveries for consumer %s", c.tag)
return
}
}
})
return
}
func (c *Consumer) stop() {
if c.cancel != nil {
c.cancel()
}
}
func (a *AMQP) consume(c *Consumer) (deliveries <-chan amqp.Delivery, err error) {
a.l.Debugf("astiamqp: consuming on queue %s with consumer %s", c.configuration.Queue.Name, c.tag)
if deliveries, err = a.channel.Consume(
c.configuration.Queue.Name, // queue
c.tag, // consumer
c.configuration.AutoAck, // auto-ack
c.configuration.Exclusive, // exclusive
c.configuration.NoLocal, // no-local
c.configuration.NoWait, // no-wait
amqp.Table(c.configuration.Arguments), // args
); err != nil {
err = fmt.Errorf("astiamqp: consuming on consumer %+v failed: %w", c.configuration, err)
return
}
return
}