Skip to content

Commit 4302881

Browse files
authored
fix: rabbitmq will reconnect if the channel is closed (#99)
* fix: rabbitmq will reconnect if the channel is closed * chore: ensure push fail after 5 attempt
1 parent a622d40 commit 4302881

File tree

2 files changed

+91
-17
lines changed

2 files changed

+91
-17
lines changed

pkg/storage/rabbitmq/rabbitmq.go

+62-14
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
package rabbitmq
22

33
import (
4+
"errors"
45
"fmt"
6+
"time"
57

8+
"github.com/rs/zerolog/log"
69
"github.com/streadway/amqp"
710

811
"atomys.codes/webhooked/internal/valuable"
@@ -32,10 +35,12 @@ type config struct {
3235
Exchange string `mapstructure:"exchange" json:"exchange"`
3336
}
3437

38+
const maxAttempt = 5
39+
3540
// ContentType is the function for get content type used to push data in the
3641
// storage. When no content type is defined, the default one is used instead
3742
// Default: text/plain
38-
func (c config) ContentType() string {
43+
func (c *config) ContentType() string {
3944
if c.DefinedContentType != "" {
4045
return c.DefinedContentType
4146
}
@@ -67,6 +72,15 @@ func NewStorage(configRaw map[string]interface{}) (*storage, error) {
6772
return nil, err
6873
}
6974

75+
go func() {
76+
for {
77+
reason := <-newClient.client.NotifyClose(make(chan *amqp.Error))
78+
log.Warn().Msgf("connection to rabbitmq closed, reason: %v", reason)
79+
80+
newClient.reconnect()
81+
}
82+
}()
83+
7084
if newClient.routingKey, err = newClient.channel.QueueDeclare(
7185
newClient.config.QueueName,
7286
newClient.config.Durable,
@@ -83,26 +97,60 @@ func NewStorage(configRaw map[string]interface{}) (*storage, error) {
8397

8498
// Name is the function for identified if the storage config is define in the webhooks
8599
// Run is made from external caller
86-
func (c storage) Name() string {
100+
func (c *storage) Name() string {
87101
return "rabbitmq"
88102
}
89103

90104
// Push is the function for push data in the storage
91105
// A run is made from external caller
92106
// @param value that will be pushed
93107
// @return an error if the push failed
94-
func (c storage) Push(value interface{}) error {
95-
if err := c.channel.Publish(
96-
c.config.Exchange,
97-
c.routingKey.Name,
98-
c.config.Mandatory,
99-
c.config.Immediate,
100-
amqp.Publishing{
101-
ContentType: c.config.ContentType(),
102-
Body: []byte(fmt.Sprintf("%v", value)),
103-
}); err != nil {
104-
return err
108+
func (c *storage) Push(value interface{}) error {
109+
for attempt := 0; attempt < maxAttempt; attempt++ {
110+
err := c.channel.Publish(
111+
c.config.Exchange,
112+
c.routingKey.Name,
113+
c.config.Mandatory,
114+
c.config.Immediate,
115+
amqp.Publishing{
116+
ContentType: c.config.ContentType(),
117+
Body: []byte(fmt.Sprintf("%v", value)),
118+
})
119+
120+
if err != nil {
121+
if errors.Is(err, amqp.ErrClosed) {
122+
log.Warn().Err(err).Msg("connection to rabbitmq closed. reconnecting...")
123+
c.reconnect()
124+
continue
125+
} else {
126+
return err
127+
}
128+
}
129+
return nil
105130
}
106131

107-
return nil
132+
return errors.New("max attempt to publish reached")
133+
}
134+
135+
// reconnect is the function to reconnect to the amqp server if the connection
136+
// is lost. It will try to reconnect every seconds until it succeed to connect
137+
func (c *storage) reconnect() {
138+
for {
139+
// wait 1s for reconnect
140+
time.Sleep(time.Second)
141+
142+
conn, err := amqp.Dial(c.config.DatabaseURL.First())
143+
if err == nil {
144+
c.client = conn
145+
c.channel, err = c.client.Channel()
146+
if err != nil {
147+
log.Error().Err(err).Msg("channel cannot be connected")
148+
continue
149+
}
150+
log.Debug().Msg("reconnect success")
151+
break
152+
}
153+
154+
log.Error().Err(err).Msg("reconnect failed")
155+
}
108156
}

pkg/storage/rabbitmq/rabbitmq_test.go

+29-3
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,33 @@ func TestRunRabbitMQPush(t *testing.T) {
6868
}
6969

7070
func TestContentType(t *testing.T) {
71-
assert.Equal(t, "text/plain", config{}.ContentType())
72-
assert.Equal(t, "text/plain", config{DefinedContentType: ""}.ContentType())
73-
assert.Equal(t, "application/json", config{DefinedContentType: "application/json"}.ContentType())
71+
assert.Equal(t, "text/plain", (&config{}).ContentType())
72+
assert.Equal(t, "text/plain", (&config{DefinedContentType: ""}).ContentType())
73+
assert.Equal(t, "application/json", (&config{DefinedContentType: "application/json"}).ContentType())
74+
}
75+
76+
func TestReconnect(t *testing.T) {
77+
if testing.Short() {
78+
t.Skip("rabbitmq testing is skiped in short version of test")
79+
return
80+
}
81+
82+
newClient, err := NewStorage(map[string]interface{}{
83+
"databaseUrl": "amqp://user:password@127.0.0.1:5672",
84+
"queueName": "hello",
85+
"contentType": "text/plain",
86+
"durable": false,
87+
"deleteWhenUnused": false,
88+
"exclusive": false,
89+
"noWait": false,
90+
"mandatory": false,
91+
"immediate": false,
92+
})
93+
assert.NoError(t, err)
94+
95+
assert.NoError(t, newClient.Push("Hello"))
96+
assert.NoError(t, newClient.client.Close())
97+
assert.NoError(t, newClient.Push("Hello"))
98+
assert.NoError(t, newClient.channel.Close())
99+
assert.NoError(t, newClient.Push("Hello"))
74100
}

0 commit comments

Comments
 (0)