Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

advertise specific address for a service #570

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions command/agent/dns.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,17 +409,17 @@ RPC:
}

// Add the node record
records := d.formatNodeRecord(&out.NodeServices.Node, req.Question[0].Name,
qType, d.config.NodeTTL)
records := d.formatNodeRecord(&out.NodeServices.Node, out.NodeServices.Node.Address,
req.Question[0].Name, qType, d.config.NodeTTL)
if records != nil {
resp.Answer = append(resp.Answer, records...)
}
}

// formatNodeRecord takes a Node and returns an A, AAAA, or CNAME record
func (d *DNSServer) formatNodeRecord(node *structs.Node, qName string, qType uint16, ttl time.Duration) (records []dns.RR) {
func (d *DNSServer) formatNodeRecord(node *structs.Node, addr, qName string, qType uint16, ttl time.Duration) (records []dns.RR) {
// Parse the IP
ip := net.ParseIP(node.Address)
ip := net.ParseIP(addr)
var ipv4 net.IP
if ip != nil {
ipv4 = ip.To4()
Expand Down Expand Up @@ -457,7 +457,7 @@ func (d *DNSServer) formatNodeRecord(node *structs.Node, qName string, qType uin
Class: dns.ClassINET,
Ttl: uint32(ttl / time.Second),
},
Target: dns.Fqdn(node.Address),
Target: dns.Fqdn(addr),
}
records = append(records, cnRec)

Expand Down Expand Up @@ -584,13 +584,17 @@ func (d *DNSServer) serviceNodeRecords(nodes structs.CheckServiceNodes, req, res
// Avoid duplicate entries, possible if a node has
// the same service on multiple ports, etc.
addr := node.Node.Address
if node.Service.Address != "" {
addr = node.Service.Address
}

if _, ok := handled[addr]; ok {
continue
}
handled[addr] = struct{}{}

// Add the node record
records := d.formatNodeRecord(&node.Node, qName, qType, ttl)
records := d.formatNodeRecord(&node.Node, addr, qName, qType, ttl)
if records != nil {
resp.Answer = append(resp.Answer, records...)
}
Expand Down Expand Up @@ -625,7 +629,7 @@ func (d *DNSServer) serviceSRVRecords(dc string, nodes structs.CheckServiceNodes
resp.Answer = append(resp.Answer, srvRec)

// Add the extra record
records := d.formatNodeRecord(&node.Node, srvRec.Target, dns.TypeANY, ttl)
records := d.formatNodeRecord(&node.Node, node.Node.Address, srvRec.Target, dns.TypeANY, ttl)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the SRV also needs to handle the case of the override address

if records != nil {
resp.Extra = append(resp.Extra, records...)
}
Expand Down
12 changes: 7 additions & 5 deletions command/agent/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,20 @@ import (

// ServiceDefinition is used to JSON decode the Service definitions
type ServiceDefinition struct {
ID string
Name string
Tags []string
Port int
Check CheckType
ID string
Name string
Tags []string
Address string
Port int
Check CheckType
}

func (s *ServiceDefinition) NodeService() *structs.NodeService {
ns := &structs.NodeService{
ID: s.ID,
Service: s.Name,
Tags: s.Tags,
Address: s.Address,
Port: s.Port,
}
if ns.ID == "" && ns.Service != "" {
Expand Down
12 changes: 6 additions & 6 deletions consul/catalog_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ func TestCatalogListServices(t *testing.T) {

// Just add a node
s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
s1.fsm.State().EnsureService(2, "foo", &structs.NodeService{"db", "db", []string{"primary"}, 5000})
s1.fsm.State().EnsureService(2, "foo", &structs.NodeService{"db", "db", []string{"primary"}, "127.0.0.1", 5000})

if err := client.Call("Catalog.ListServices", &args, &out); err != nil {
t.Fatalf("err: %v", err)
Expand Down Expand Up @@ -544,7 +544,7 @@ func TestCatalogListServices_Blocking(t *testing.T) {
go func() {
time.Sleep(100 * time.Millisecond)
s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
s1.fsm.State().EnsureService(2, "foo", &structs.NodeService{"db", "db", []string{"primary"}, 5000})
s1.fsm.State().EnsureService(2, "foo", &structs.NodeService{"db", "db", []string{"primary"}, "127.0.0.1", 5000})
}()

// Re-run the query
Expand Down Expand Up @@ -625,7 +625,7 @@ func TestCatalogListServices_Stale(t *testing.T) {

// Inject a fake service
s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
s1.fsm.State().EnsureService(2, "foo", &structs.NodeService{"db", "db", []string{"primary"}, 5000})
s1.fsm.State().EnsureService(2, "foo", &structs.NodeService{"db", "db", []string{"primary"}, "127.0.0.1", 5000})

// Run the query, do not wait for leader!
if err := client.Call("Catalog.ListServices", &args, &out); err != nil {
Expand Down Expand Up @@ -666,7 +666,7 @@ func TestCatalogListServiceNodes(t *testing.T) {

// Just add a node
s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
s1.fsm.State().EnsureService(2, "foo", &structs.NodeService{"db", "db", []string{"primary"}, 5000})
s1.fsm.State().EnsureService(2, "foo", &structs.NodeService{"db", "db", []string{"primary"}, "127.0.0.1", 5000})

if err := client.Call("Catalog.ServiceNodes", &args, &out); err != nil {
t.Fatalf("err: %v", err)
Expand Down Expand Up @@ -709,8 +709,8 @@ func TestCatalogNodeServices(t *testing.T) {

// Just add a node
s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
s1.fsm.State().EnsureService(2, "foo", &structs.NodeService{"db", "db", []string{"primary"}, 5000})
s1.fsm.State().EnsureService(3, "foo", &structs.NodeService{"web", "web", nil, 80})
s1.fsm.State().EnsureService(2, "foo", &structs.NodeService{"db", "db", []string{"primary"}, "127.0.0.1", 5000})
s1.fsm.State().EnsureService(3, "foo", &structs.NodeService{"web", "web", nil, "127.0.0.1", 80})

if err := client.Call("Catalog.NodeServices", &args, &out); err != nil {
t.Fatalf("err: %v", err)
Expand Down
8 changes: 4 additions & 4 deletions consul/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,10 +337,10 @@ func TestFSM_SnapshotRestore(t *testing.T) {
// Add some state
fsm.state.EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
fsm.state.EnsureNode(2, structs.Node{"baz", "127.0.0.2"})
fsm.state.EnsureService(3, "foo", &structs.NodeService{"web", "web", nil, 80})
fsm.state.EnsureService(4, "foo", &structs.NodeService{"db", "db", []string{"primary"}, 5000})
fsm.state.EnsureService(5, "baz", &structs.NodeService{"web", "web", nil, 80})
fsm.state.EnsureService(6, "baz", &structs.NodeService{"db", "db", []string{"secondary"}, 5000})
fsm.state.EnsureService(3, "foo", &structs.NodeService{"web", "web", nil, "127.0.0.1", 80})
fsm.state.EnsureService(4, "foo", &structs.NodeService{"db", "db", []string{"primary"}, "127.0.0.1", 5000})
fsm.state.EnsureService(5, "baz", &structs.NodeService{"web", "web", nil, "127.0.0.2", 80})
fsm.state.EnsureService(6, "baz", &structs.NodeService{"db", "db", []string{"secondary"}, "127.0.0.2", 5000})
fsm.state.EnsureCheck(7, &structs.HealthCheck{
Node: "foo",
CheckID: "web",
Expand Down
30 changes: 19 additions & 11 deletions consul/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,11 +500,12 @@ func (s *StateStore) ensureServiceTxn(index uint64, node string, ns *structs.Nod

// Create the entry
entry := structs.ServiceNode{
Node: node,
ServiceID: ns.ID,
ServiceName: ns.Service,
ServiceTags: ns.Tags,
ServicePort: ns.Port,
Node: node,
ServiceID: ns.ID,
ServiceName: ns.Service,
ServiceTags: ns.Tags,
ServiceAddress: ns.Address,
ServicePort: ns.Port,
}

// Ensure the service entry is set
Expand Down Expand Up @@ -568,6 +569,7 @@ func (s *StateStore) parseNodeServices(tables MDBTables, tx *MDBTxn, name string
ID: service.ServiceID,
Service: service.ServiceName,
Tags: service.ServiceTags,
Address: service.ServiceAddress,
Port: service.ServicePort,
}
ns.Services[srv.ID] = srv
Expand Down Expand Up @@ -743,13 +745,17 @@ func (s *StateStore) parseServiceNodes(tx *MDBTxn, table *MDBTable, res []interf
for i, r := range res {
srv := r.(*structs.ServiceNode)

// Get the address of the node
nodeRes, err := table.GetTxn(tx, "id", srv.Node)
if err != nil || len(nodeRes) != 1 {
s.logger.Printf("[ERR] consul.state: Failed to join service node %#v with node: %v", *srv, err)
continue
if srv.ServiceAddress != "" {
srv.Address = srv.ServiceAddress
} else {
// Get the address of the node
nodeRes, err := table.GetTxn(tx, "id", srv.Node)
if err != nil || len(nodeRes) != 1 {
s.logger.Printf("[ERR] consul.state: Failed to join service node %#v with node: %v", *srv, err)
continue
}
srv.Address = nodeRes[0].(*structs.Node).Address
}
srv.Address = nodeRes[0].(*structs.Node).Address

nodes[i] = *srv
}
Expand Down Expand Up @@ -952,6 +958,7 @@ func (s *StateStore) parseCheckServiceNodes(tx *MDBTxn, res []interface{}, err e
ID: srv.ServiceID,
Service: srv.ServiceName,
Tags: srv.ServiceTags,
Address: srv.ServiceAddress,
Port: srv.ServicePort,
}
nodes[i].Checks = checks
Expand Down Expand Up @@ -1026,6 +1033,7 @@ func (s *StateStore) parseNodeInfo(tx *MDBTxn, res []interface{}, err error) str
ID: service.ServiceID,
Service: service.ServiceName,
Tags: service.ServiceTags,
Address: service.ServiceAddress,
Port: service.ServicePort,
}
info.Services = append(info.Services, srv)
Expand Down
Loading