Skip to content

Commit 1086e7b

Browse files
author
Dariusz Jedrzejczyk
committed
Local Agent used for service list reads (#276)
If defined, the local consul agent is used to read list of services. Otherwise, a random agent is picked, but only from the cache of agents already syncing registrations. This change prevents situations, where contacting a random agent in the cluster fails, because it has no ACL-DC configured.
1 parent 50a69eb commit 1086e7b

File tree

3 files changed

+30
-31
lines changed

3 files changed

+30
-31
lines changed

README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ consul-auth-password | | The basic authentication passwor
191191
consul-auth-username | | The basic authentication username
192192
consul-enable-tag-override | `false` | Disable the anti-entropy feature for all services
193193
consul-ignored-healthchecks | | A comma separated blacklist of Marathon health check types that will not be migrated to Consul, e.g. command,tcp
194-
consul-local-agent-host | | Consul Agent hostname or IP that should be used for startup sync
194+
consul-local-agent-host | | Consul Agent hostname or IP that should be used for startup sync and service listing operations
195195
consul-name-separator | `.` | Separator used to create default service name for Consul
196196
consul-get-services-retry | `3` | Number of retries on failure when performing requests to Consul. Each retry uses different cached agent
197197
consul-max-agent-failures | `3` | Max number of consecutive request failures for agent before removal from cache

consul/agents.go

+18-24
Original file line numberDiff line numberDiff line change
@@ -10,20 +10,21 @@ import (
1010
log "github.com/Sirupsen/logrus"
1111
"github.com/allegro/marathon-consul/metrics"
1212
"github.com/allegro/marathon-consul/utils"
13-
consulapi "github.com/hashicorp/consul/api"
1413
)
1514

1615
type Agents interface {
17-
GetAgent(agentAddress string) (agent *consulapi.Client, err error)
16+
GetAgent(agentAddress string) (agent *Agent, err error)
17+
GetLocalAgent() (agent *Agent, err error)
1818
GetAnyAgent() (agent *Agent, err error)
1919
RemoveAgent(agentAddress string)
2020
}
2121

2222
type ConcurrentAgents struct {
23-
agents map[string]*Agent
24-
config *Config
25-
lock sync.Mutex
26-
client *http.Client
23+
localAgent *Agent
24+
agents map[string]*Agent
25+
config *Config
26+
lock sync.Mutex
27+
client *http.Client
2728
}
2829

2930
func NewAgents(config *Config) *ConcurrentAgents {
@@ -48,21 +49,7 @@ func NewAgents(config *Config) *ConcurrentAgents {
4849
log.WithError(err).WithField("agent", config.LocalAgentHost).Fatal(
4950
"Cannot connect with consul agent. Check if configuration is valid.")
5051
}
51-
52-
// Get all agents from current DC and store them in cache
53-
nodes, _, err := agent.Catalog().Nodes(nil)
54-
if err != nil {
55-
log.WithError(err).WithField("agent", config.LocalAgentHost).Warn(
56-
"Cannot obtain agents from local consul agent.")
57-
return agents
58-
}
59-
for _, node := range nodes {
60-
_, err := agents.GetAgent(node.Address)
61-
if err != nil {
62-
log.WithError(err).WithField("agent", node.Address).Warn(
63-
"Cannot connect with consul agent. Check if configuration is valid.")
64-
}
65-
}
52+
agents.localAgent = agent
6653
}
6754
return agents
6855
}
@@ -78,6 +65,13 @@ func (a *ConcurrentAgents) GetAnyAgent() (*Agent, error) {
7865
return nil, errors.New("No Consul client available in agents cache")
7966
}
8067

68+
func (a *ConcurrentAgents) GetLocalAgent() (*Agent, error) {
69+
if a.localAgent == nil {
70+
return nil, errors.New("No local consul agent defined")
71+
}
72+
return a.localAgent, nil
73+
}
74+
8175
func (a *ConcurrentAgents) getRandomAgentIPAddress() string {
8276
ipAddresses := []string{}
8377
for ipAddress := range a.agents {
@@ -101,7 +95,7 @@ func (a *ConcurrentAgents) RemoveAgent(agentAddress string) {
10195
}
10296
}
10397

104-
func (a *ConcurrentAgents) GetAgent(agentAddress string) (*consulapi.Client, error) {
98+
func (a *ConcurrentAgents) GetAgent(agentAddress string) (*Agent, error) {
10599
a.lock.Lock()
106100
defer a.lock.Unlock()
107101

@@ -112,7 +106,7 @@ func (a *ConcurrentAgents) GetAgent(agentAddress string) (*consulapi.Client, err
112106
ipAddress := IP.String()
113107

114108
if agent, ok := a.agents[ipAddress]; ok {
115-
return agent.Client, nil
109+
return agent, nil
116110
}
117111

118112
newAgent, err := a.createAgent(ipAddress)
@@ -121,7 +115,7 @@ func (a *ConcurrentAgents) GetAgent(agentAddress string) (*consulapi.Client, err
121115
}
122116
a.addAgent(ipAddress, newAgent)
123117

124-
return newAgent.Client, nil
118+
return newAgent, nil
125119
}
126120

127121
func (a *ConcurrentAgents) addAgent(agentHost string, agent *Agent) {

consul/consul.go

+11-6
Original file line numberDiff line numberDiff line change
@@ -38,14 +38,18 @@ func (c *Consul) GetServices(name string) ([]*service.Service, error) {
3838

3939
func (c *Consul) getServicesUsingProviderWithRetriesOnAgentFailure(provide ServicesProvider) ([]*service.Service, error) {
4040
for retry := uint32(0); retry <= c.config.RequestRetries; retry++ {
41-
agent, err := c.agents.GetAnyAgent()
41+
localAgent, err := c.agents.GetLocalAgent()
42+
agent := localAgent
43+
if err != nil {
44+
agent, err = c.agents.GetAnyAgent()
45+
}
4246
if err != nil {
4347
return nil, err
4448
}
4549
if services, err := provide(agent.Client); err != nil {
4650
log.WithError(err).WithField("Address", agent.IP).
47-
Error("An error occurred getting services from Consul, retrying with another agent")
48-
if failures := agent.IncFailures(); failures > c.config.AgentFailuresTolerance {
51+
Error("An error occurred getting services from Consul, retrying locally or with another agent")
52+
if failures := agent.IncFailures(); agent != localAgent && failures > c.config.AgentFailuresTolerance {
4953
log.WithField("Address", agent.IP).WithField("Failures", failures).
5054
Warn("Removing agent due to too many failures")
5155
c.agents.RemoveAgent(agent.IP)
@@ -181,6 +185,7 @@ func (c *Consul) register(service *consulapi.AgentServiceRegistration) error {
181185
if err != nil {
182186
return err
183187
}
188+
client := agent.Client
184189
fields := log.Fields{
185190
"Name": service.Name,
186191
"Id": service.ID,
@@ -191,7 +196,7 @@ func (c *Consul) register(service *consulapi.AgentServiceRegistration) error {
191196
}
192197
log.WithFields(fields).Info("Registering")
193198

194-
err = agent.Agent().ServiceRegister(service)
199+
err = client.Agent().ServiceRegister(service)
195200
if err != nil {
196201
log.WithError(err).WithFields(fields).Error("Unable to register")
197202
}
@@ -265,10 +270,10 @@ func (c *Consul) deregister(toDeregister *service.Service) error {
265270
if err != nil {
266271
return err
267272
}
268-
273+
client := agent.Client
269274
log.WithField("Id", toDeregister.ID).WithField("Address", toDeregister.RegisteringAgentAddress).Info("Deregistering")
270275

271-
err = agent.Agent().ServiceDeregister(toDeregister.ID.String())
276+
err = client.Agent().ServiceDeregister(toDeregister.ID.String())
272277
if err != nil {
273278
log.WithError(err).WithField("Id", toDeregister.ID).WithField("Address", toDeregister.RegisteringAgentAddress).Error("Unable to deregister")
274279
}

0 commit comments

Comments
 (0)