Skip to content

Commit 775145b

Browse files
authored
Merge pull request #297 from allegro/consuldatacenters
don't search in all DCs if configured
2 parents 3d0bb6d + 833e9b7 commit 775145b

File tree

7 files changed

+75
-9
lines changed

7 files changed

+75
-9
lines changed

.goxc.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
"deb-source"
99
],
1010
"BuildConstraints": "linux,!arm darwin",
11-
"PackageVersion": "1.5.1",
11+
"PackageVersion": "1.5.2",
1212
"TaskSettings": {
1313
"bintray": {
1414
"downloadspage": "bintray.md",

Makefile

+2-1
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ FPM-exists:
6464
@fpm -v || \
6565
(echo >&2 "FPM must be installed on the system. See https://github.com/jordansissel/fpm"; false)
6666

67-
deb: FPM-exists build
67+
deb: FPM-exists build-linux
6868
mkdir -p dist/$(VERSION)/
6969
cd dist/$(VERSION)/ && \
7070
fpm -s dir \
@@ -73,6 +73,7 @@ deb: FPM-exists build
7373
-v $(VERSION) \
7474
--url="https://github.com/allegro/marathon-consul" \
7575
--vendor=Allegro \
76+
--architecture=amd64 \
7677
--maintainer="Allegro Group <opensource@allegro.pl>" \
7778
--description "Marathon-consul service (performs Marathon Tasks registration as Consul Services for service discovery) Marathon-consul takes information provided by the Marathon event bus and forwards it to Consul agents. It also re-syncs all the information from Marathon to Consul on startup and repeats it with given interval." \
7879
--deb-priority optional \

config/config.go

+1
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ func (config *Config) parseFlags() {
7979
flag.StringVar(&config.Consul.IgnoredHealthChecks, "consul-ignored-healthchecks", "", "A comma separated blacklist of Marathon health check types that will not be migrated to Consul, e.g. command,tcp")
8080
flag.BoolVar(&config.Consul.EnableTagOverride, "consul-enable-tag-override", false, "Disable the anti-entropy feature for all services")
8181
flag.StringVar(&config.Consul.LocalAgentHost, "consul-local-agent-host", "", "Consul Agent hostname or IP that should be used for startup sync")
82+
flag.StringVar(&config.Consul.Dc, "consul-dc", "", "Consul DC where to look for services, all if empty")
8283

8384
// Web
8485
flag.StringVar(&config.Web.Listen, "listen", ":4000", "Accept connections at this address")

consul/config.go

+1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ type Config struct {
1111
SslCaCert string
1212
Token string
1313
Tag string
14+
Dc string
1415
Timeout time.Interval
1516
RequestRetries uint32
1617
AgentFailuresTolerance uint32

consul/consul.go

+15-7
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ func (c *Consul) getServicesUsingProviderWithRetriesOnAgentFailure(provide Servi
6464
}
6565

6666
func (c *Consul) getServicesUsingAgent(name string, agent *consulAPI.Client) ([]*service.Service, error) {
67-
dcAwareQueries, err := dcAwareQueriesForAllDCs(agent)
67+
dcAwareQueries, err := dcAwareQueries(agent, c.config.Dc)
6868
if err != nil {
6969
return nil, err
7070
}
@@ -80,12 +80,18 @@ func (c *Consul) getServicesUsingAgent(name string, agent *consulAPI.Client) ([]
8080
return allServices, nil
8181
}
8282

83-
func dcAwareQueriesForAllDCs(agent *consulAPI.Client) ([]*consulAPI.QueryOptions, error) {
83+
func dcAwareQueries(agent *consulAPI.Client, singleDc string) ([]*consulAPI.QueryOptions, error) {
84+
if singleDc != "" {
85+
var queries []*consulAPI.QueryOptions
86+
queries = append(queries, &consulAPI.QueryOptions{
87+
Datacenter: singleDc,
88+
})
89+
return queries, nil
90+
}
8491
datacenters, err := agent.Catalog().Datacenters()
8592
if err != nil {
8693
return nil, err
8794
}
88-
8995
var queries []*consulAPI.QueryOptions
9096
for _, dc := range datacenters {
9197
queries = append(queries, &consulAPI.QueryOptions{
@@ -101,7 +107,7 @@ func (c *Consul) GetAllServices() ([]*service.Service, error) {
101107
}
102108

103109
func (c *Consul) getAllServices(agent *consulAPI.Client) ([]*service.Service, error) {
104-
dcAwareQueries, err := dcAwareQueriesForAllDCs(agent)
110+
dcAwareQueries, err := dcAwareQueries(agent, c.config.Dc)
105111
if err != nil {
106112
return nil, err
107113
}
@@ -110,7 +116,8 @@ func (c *Consul) getAllServices(agent *consulAPI.Client) ([]*service.Service, er
110116
for _, dcAwareQuery := range dcAwareQueries {
111117
consulServices, _, err := agent.Catalog().Services(dcAwareQuery)
112118
if err != nil {
113-
return nil, err
119+
log.WithError(err).Error("An error occurred getting services from Consul, will continue with another DC")
120+
continue
114121
}
115122
for consulService, tags := range consulServices {
116123
if contains(tags, c.config.Tag) {
@@ -231,7 +238,7 @@ func (c *Consul) deregisterMultipleServices(services []*service.Service, taskID
231238

232239
func (c *Consul) findServicesByTaskID(searchedTaskID apps.TaskID) ([]*service.Service, error) {
233240
return c.getServicesUsingProviderWithRetriesOnAgentFailure(func(agent *consulAPI.Client) ([]*service.Service, error) {
234-
dcAwareQueries, err := dcAwareQueriesForAllDCs(agent)
241+
dcAwareQueries, err := dcAwareQueries(agent, c.config.Dc)
235242
if err != nil {
236243
return nil, err
237244
}
@@ -241,7 +248,8 @@ func (c *Consul) findServicesByTaskID(searchedTaskID apps.TaskID) ([]*service.Se
241248
for _, dcAwareQuery := range dcAwareQueries {
242249
consulServices, _, err := agent.Catalog().Services(dcAwareQuery)
243250
if err != nil {
244-
return nil, err
251+
log.WithError(err).Error("An error occurred getting services from Consul, will continue with another DC")
252+
continue
245253
}
246254
for consulService, tags := range consulServices {
247255
if contains(tags, searchedTag) {

consul/consul_test.go

+44
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,50 @@ func TestGetAllServices(t *testing.T) {
267267
assert.Contains(t, serviceNames, "serviceB")
268268
}
269269

270+
func TestGetServicesFromSingleDc(t *testing.T) {
271+
t.Parallel()
272+
// create cluster of 2 consul servers
273+
server1 := CreateTestServerDatacenter(t, "dc-1")
274+
defer server1.Stop()
275+
276+
server2 := CreateTestServerDatacenter(t, "dc-2")
277+
defer server2.Stop()
278+
279+
server1.JoinWAN(t, server2.LANAddr)
280+
281+
// create client
282+
consul := ClientAtServer(server1)
283+
consul.config.Tag = "marathon"
284+
// configure fetching services from single datacenter
285+
consul.config.Dc = "dc-1"
286+
287+
// given
288+
// register services in both servers
289+
server1.AddService(t, "serviceA", "passing", []string{"public", "marathon"})
290+
server1.AddService(t, "serviceB", "passing", []string{"marathon"})
291+
server1.AddService(t, "serviceC", "passing", []string{"zookeeper"})
292+
293+
server2.AddService(t, "serviceA", "passing", []string{"private", "marathon"})
294+
server2.AddService(t, "serviceB", "passing", []string{"zookeeper"})
295+
server2.AddService(t, "serviceD", "passing", []string{"marathon"})
296+
297+
// when
298+
services, err := consul.GetAllServices()
299+
300+
// then
301+
assert.NoError(t, err)
302+
assert.Len(t, services, 2)
303+
304+
serviceNames := make(map[string]struct{})
305+
for _, s := range services {
306+
serviceNames[s.Name] = struct{}{}
307+
}
308+
assert.Len(t, serviceNames, 2)
309+
assert.Contains(t, serviceNames, "serviceA")
310+
assert.Contains(t, serviceNames, "serviceB")
311+
assert.NotContains(t, serviceNames, "serviceD")
312+
}
313+
270314
func TestGetServicesUsingProviderWithRetriesOnAgentFailure_ShouldRetryConfiguredNumberOfTimes(t *testing.T) {
271315
t.Parallel()
272316
server1 := CreateTestServer(t)

consul/consul_test_server.go

+11
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,17 @@ func CreateTestServer(t *testing.T) *testutil.TestServer {
2323
return server
2424
}
2525

26+
func CreateTestServerDatacenter(t *testing.T, dc string) *testutil.TestServer {
27+
server, err := testutil.NewTestServerConfig(func(c *testutil.TestServerConfig) {
28+
c.Datacenter = dc
29+
c.Ports = testPortConfig(t)
30+
})
31+
32+
assert.NoError(t, err)
33+
34+
return server
35+
}
36+
2637
const MasterToken = "masterToken"
2738

2839
func CreateSecuredTestServer(t *testing.T) *testutil.TestServer {

0 commit comments

Comments
 (0)