-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathconnection.go
86 lines (73 loc) · 2.03 KB
/
connection.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package main
import (
"github.com/streadway/amqp"
"log"
)
/**
* Retrieves a connection from the configured AMQP host, with the given ConnectionDefinition
*/
func GetConnection(definition ConnectionDefinition) (*amqp.Connection, *amqp.Channel, error) {
var conn *amqp.Connection
var channel *amqp.Channel
conn, ok := Connection[definition.Vhost]
if !ok {
err := Connect(definition)
if err != nil {
return nil, nil, err
}
conn = Connection[definition.Vhost]
}
channel, ok = Channel[definition.Vhost]
if !ok {
err := OpenChannel(conn, definition)
if err != nil {
return nil, nil, err
}
channel = Channel[definition.Vhost]
}
return conn, channel, nil
}
func Connect(definition ConnectionDefinition) error {
log.Printf("Opening connection to %s:%s/%s\n", RABBITMQ_HOST, RABBITMQ_PORT, definition.Vhost)
connection, err := amqp.Dial("amqp://" + definition.Username + ":" + definition.Password + "@" + RABBITMQ_HOST + ":" + RABBITMQ_PORT + "/" + definition.Vhost)
if err != nil {
log.Print(err)
if err == amqp.ErrClosed {
return retryAmqpConnection(definition)
}
return err
}
Connection[definition.Vhost] = connection
err = OpenChannel(Connection[definition.Vhost], definition)
return err
}
/**
* Retrieves a channel from the given connection, ConnectionDefinition
*/
func OpenChannel(connection *amqp.Connection, cd ConnectionDefinition) error {
channel, err := connection.Channel()
if err != nil {
return err
}
Channel[cd.Vhost] = channel
return nil
}
/**
* Handles closed connection errors with a retry
*/
func retryAmqpConnection(connection ConnectionDefinition) error {
log.Printf("AMQP connection dropped.")
/**
* Don't want to loop through this for too long.
*/
if Statistics[RUNTIME_RABBITMQ_CONNECTION_FAILURE] > HARE_RABBITMQ_CONNECTION_RETRY_MAXIMUM {
log.Fatal("Maximum connection retry count has been reached.")
}
err := Connect(connection)
if err != nil {
return err
}
statisticsIncrement(RUNTIME_RABBITMQ_CONNECTION_FAILURE)
log.Print("Reconnected to remote host.")
return nil
}