Skip to content

Commit

Permalink
Support multiple tags for health and catalog api endpoints
Browse files Browse the repository at this point in the history
Fixes #1781.

Adds a `ServiceTags` field to the ServiceSpecificRequest to support
multiple tags, updates the filter logic in the catalog store, and
propagates these change through to the health and catalog endpoints.

Note: Leaves `ServiceTag` in the struct, since it is being used as
part of the DNS lookup, which in turn uses the health check.
  • Loading branch information
adilyse committed Oct 10, 2018
1 parent be52793 commit fea07f2
Show file tree
Hide file tree
Showing 9 changed files with 177 additions and 25 deletions.
2 changes: 1 addition & 1 deletion agent/catalog_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func (s *HTTPServer) catalogServiceNodes(resp http.ResponseWriter, req *http.Req
// Check for a tag
params := req.URL.Query()
if _, ok := params["tag"]; ok {
args.ServiceTag = params.Get("tag")
args.ServiceTags = params["tag"]
args.TagFilter = true
}

Expand Down
10 changes: 9 additions & 1 deletion agent/consul/catalog_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *stru
}

if args.TagFilter {
return s.ServiceTagNodes(ws, args.ServiceName, args.ServiceTag)
return s.ServiceTagNodes(ws, args.ServiceName, args.ServiceTags)
}

return s.ServiceNodes(ws, args.ServiceName)
Expand Down Expand Up @@ -334,6 +334,14 @@ func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *stru
metrics.IncrCounterWithLabels([]string{"catalog", key, "query-tag"}, 1,
[]metrics.Label{{Name: "service", Value: args.ServiceName}, {Name: "tag", Value: args.ServiceTag}})
}
if len(args.ServiceTags) > 0 {
// Build metric labels
labels := []metrics.Label{{Name: "service", Value: args.ServiceName}}
for _, tag := range args.ServiceTags {
labels = append(labels, metrics.Label{Name: "tag", Value: tag})
}
metrics.IncrCounterWithLabels([]string{"catalog", key, "query-tags"}, 1, labels)
}
if len(reply.ServiceNodes) == 0 {
metrics.IncrCounterWithLabels([]string{"catalog", key, "not-found"}, 1,
[]metrics.Label{{Name: "service", Value: args.ServiceName}})
Expand Down
33 changes: 24 additions & 9 deletions agent/consul/catalog_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1589,7 +1589,7 @@ func TestCatalog_ListServiceNodes(t *testing.T) {
args := structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "db",
ServiceTag: "slave",
ServiceTags: []string{"slave"},
TagFilter: false,
}
var out structs.IndexedServiceNodes
Expand Down Expand Up @@ -1647,16 +1647,16 @@ func TestCatalog_ListServiceNodes_NodeMetaFilter(t *testing.T) {
if err := s1.fsm.State().EnsureNode(2, node2); err != nil {
t.Fatalf("err: %v", err)
}
if err := s1.fsm.State().EnsureService(3, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"primary"}, Address: "127.0.0.1", Port: 5000}); err != nil {
if err := s1.fsm.State().EnsureService(3, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"primary", "v2"}, Address: "127.0.0.1", Port: 5000}); err != nil {
t.Fatalf("err: %v", err)
}
if err := s1.fsm.State().EnsureService(4, "bar", &structs.NodeService{ID: "db2", Service: "db", Tags: []string{"secondary"}, Address: "127.0.0.2", Port: 5000}); err != nil {
if err := s1.fsm.State().EnsureService(4, "bar", &structs.NodeService{ID: "db2", Service: "db", Tags: []string{"secondary", "v2"}, Address: "127.0.0.2", Port: 5000}); err != nil {
t.Fatalf("err: %v", err)
}

cases := []struct {
filters map[string]string
tag string
tags []string
services structs.ServiceNodes
}{
// Basic meta filter
Expand All @@ -1667,7 +1667,7 @@ func TestCatalog_ListServiceNodes_NodeMetaFilter(t *testing.T) {
// Basic meta filter, tag
{
filters: map[string]string{"somekey": "somevalue"},
tag: "primary",
tags: []string{"primary"},
services: structs.ServiceNodes{&structs.ServiceNode{Node: "foo", ServiceID: "db"}},
},
// Common meta filter
Expand All @@ -1681,7 +1681,7 @@ func TestCatalog_ListServiceNodes_NodeMetaFilter(t *testing.T) {
// Common meta filter, tag
{
filters: map[string]string{"common": "1"},
tag: "secondary",
tags: []string{"secondary"},
services: structs.ServiceNodes{
&structs.ServiceNode{Node: "bar", ServiceID: "db2"},
},
Expand All @@ -1699,7 +1699,22 @@ func TestCatalog_ListServiceNodes_NodeMetaFilter(t *testing.T) {
// Multiple filter values, tag
{
filters: map[string]string{"somekey": "somevalue", "common": "1"},
tag: "primary",
tags: []string{"primary"},
services: structs.ServiceNodes{&structs.ServiceNode{Node: "foo", ServiceID: "db"}},
},
// Common meta filter, single tag
{
filters: map[string]string{"common": "1"},
tags: []string{"v2"},
services: structs.ServiceNodes{
&structs.ServiceNode{Node: "bar", ServiceID: "db2"},
&structs.ServiceNode{Node: "foo", ServiceID: "db"},
},
},
// Common meta filter, multiple tags
{
filters: map[string]string{"common": "1"},
tags: []string{"v2", "primary"},
services: structs.ServiceNodes{&structs.ServiceNode{Node: "foo", ServiceID: "db"}},
},
}
Expand All @@ -1709,8 +1724,8 @@ func TestCatalog_ListServiceNodes_NodeMetaFilter(t *testing.T) {
Datacenter: "dc1",
NodeMetaFilters: tc.filters,
ServiceName: "db",
ServiceTag: tc.tag,
TagFilter: tc.tag != "",
ServiceTags: tc.tags,
TagFilter: len(tc.tags) > 0,
}
var out structs.IndexedServiceNodes
if err := msgpackrpc.CallWithCodec(codec, "Catalog.ServiceNodes", &args, &out); err != nil {
Expand Down
14 changes: 13 additions & 1 deletion agent/consul/health_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,13 @@ func (h *Health) ServiceNodes(args *structs.ServiceSpecificRequest, reply *struc
metrics.IncrCounterWithLabels([]string{"health", key, "query-tag"}, 1,
[]metrics.Label{{Name: "service", Value: args.ServiceName}, {Name: "tag", Value: args.ServiceTag}})
}
if len(args.ServiceTags) > 0 {
labels := []metrics.Label{{Name: "service", Value: args.ServiceName}}
for _, tag := range args.ServiceTags {
labels = append(labels, metrics.Label{Name: "tag", Value: tag})
}
metrics.IncrCounterWithLabels([]string{"health", key, "query-tags"}, 1, labels)
}
if len(reply.Nodes) == 0 {
metrics.IncrCounterWithLabels([]string{"health", key, "not-found"}, 1,
[]metrics.Label{{Name: "service", Value: args.ServiceName}})
Expand All @@ -186,7 +193,12 @@ func (h *Health) serviceNodesConnect(ws memdb.WatchSet, s *state.Store, args *st
}

func (h *Health) serviceNodesTagFilter(ws memdb.WatchSet, s *state.Store, args *structs.ServiceSpecificRequest) (uint64, structs.CheckServiceNodes, error) {
return s.CheckServiceTagNodes(ws, args.ServiceName, args.ServiceTag)
// DNS service lookups populate the ServiceTag field. In this case,
// use ServiceTag instead of the ServiceTags field.
if args.ServiceTag != "" {
return s.CheckServiceTagNodes(ws, args.ServiceName, []string{args.ServiceTag})
}
return s.CheckServiceTagNodes(ws, args.ServiceName, args.ServiceTags)
}

func (h *Health) serviceNodesDefault(ws memdb.WatchSet, s *state.Store, args *structs.ServiceSpecificRequest) (uint64, structs.CheckServiceNodes, error) {
Expand Down
78 changes: 78 additions & 0 deletions agent/consul/health_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,84 @@ func TestHealth_ServiceNodes(t *testing.T) {
}
}

func TestHealth_ServiceNodes_MultipleServiceTags(t *testing.T) {
t.Parallel()
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()

testrpc.WaitForLeader(t, s1.RPC, "dc1")

arg := structs.RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.1",
Service: &structs.NodeService{
ID: "db",
Service: "db",
Tags: []string{"master", "v2"},
},
Check: &structs.HealthCheck{
Name: "db connect",
Status: api.HealthPassing,
ServiceID: "db",
},
}
var out struct{}
if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}

arg = structs.RegisterRequest{
Datacenter: "dc1",
Node: "bar",
Address: "127.0.0.2",
Service: &structs.NodeService{
ID: "db",
Service: "db",
Tags: []string{"slave", "v2"},
},
Check: &structs.HealthCheck{
Name: "db connect",
Status: api.HealthWarning,
ServiceID: "db",
},
}
if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}

var out2 structs.IndexedCheckServiceNodes
req := structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "db",
ServiceTags: []string{"master", "v2"},
TagFilter: true,
}
if err := msgpackrpc.CallWithCodec(codec, "Health.ServiceNodes", &req, &out2); err != nil {
t.Fatalf("err: %v", err)
}

nodes := out2.Nodes
if len(nodes) != 1 {
t.Fatalf("Bad: %v", nodes)
}
if nodes[0].Node.Node != "foo" {
t.Fatalf("Bad: %v", nodes[0])
}
if !lib.StrContains(nodes[0].Service.Tags, "v2") {
t.Fatalf("Bad: %v", nodes[0])
}
if !lib.StrContains(nodes[0].Service.Tags, "master") {
t.Fatalf("Bad: %v", nodes[0])
}
if nodes[0].Checks[0].Status != api.HealthPassing {
t.Fatalf("Bad: %v", nodes[0])
}
}

func TestHealth_ServiceNodes_NodeMetaFilter(t *testing.T) {
t.Parallel()
dir1, s1 := testServer(t)
Expand Down
24 changes: 19 additions & 5 deletions agent/consul/state/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -897,8 +897,8 @@ func (s *Store) serviceNodes(ws memdb.WatchSet, serviceName string, connect bool
}

// ServiceTagNodes returns the nodes associated with a given service, filtering
// out services that don't contain the given tag.
func (s *Store) ServiceTagNodes(ws memdb.WatchSet, service string, tag string) (uint64, structs.ServiceNodes, error) {
// out services that don't contain the given tags.
func (s *Store) ServiceTagNodes(ws memdb.WatchSet, service string, tags []string) (uint64, structs.ServiceNodes, error) {
tx := s.db.Txn(false)
defer tx.Abort()

Expand All @@ -916,7 +916,7 @@ func (s *Store) ServiceTagNodes(ws memdb.WatchSet, service string, tag string) (
var results structs.ServiceNodes
for service := services.Next(); service != nil; service = services.Next() {
svc := service.(*structs.ServiceNode)
if !serviceTagFilter(svc, tag) {
if !serviceTagsFilter(svc, tags) {
results = append(results, svc)
}
}
Expand Down Expand Up @@ -945,6 +945,20 @@ func serviceTagFilter(sn *structs.ServiceNode, tag string) bool {
return true
}

// serviceTagsFilter returns true (should filter) if the given service node
// doesn't contain the given set of tags.
func serviceTagsFilter(sn *structs.ServiceNode, tags []string) bool {
for _, tag := range tags {
if serviceTagFilter(sn, tag) {
// If any one of the expected tags was not found, filter the service
return true
}
}

// If all tags were found, don't filter the service
return false
}

// ServiceAddressNodes returns the nodes associated with a given service, filtering
// out services that don't match the given serviceAddress
func (s *Store) ServiceAddressNodes(ws memdb.WatchSet, address string) (uint64, structs.ServiceNodes, error) {
Expand Down Expand Up @@ -1614,7 +1628,7 @@ func (s *Store) checkServiceNodes(ws memdb.WatchSet, serviceName string, connect

// CheckServiceTagNodes is used to query all nodes and checks for a given
// service, filtering out services that don't contain the given tag.
func (s *Store) CheckServiceTagNodes(ws memdb.WatchSet, serviceName, tag string) (uint64, structs.CheckServiceNodes, error) {
func (s *Store) CheckServiceTagNodes(ws memdb.WatchSet, serviceName string, tags []string) (uint64, structs.CheckServiceNodes, error) {
tx := s.db.Txn(false)
defer tx.Abort()

Expand All @@ -1632,7 +1646,7 @@ func (s *Store) CheckServiceTagNodes(ws memdb.WatchSet, serviceName, tag string)
var results structs.ServiceNodes
for service := iter.Next(); service != nil; service = iter.Next() {
svc := service.(*structs.ServiceNode)
if !serviceTagFilter(svc, tag) {
if !serviceTagsFilter(svc, tags) {
results = append(results, svc)
}
}
Expand Down
36 changes: 30 additions & 6 deletions agent/consul/state/catalog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1809,7 +1809,7 @@ func TestStateStore_ServiceTagNodes(t *testing.T) {

// Listing with no results returns an empty list.
ws := memdb.NewWatchSet()
idx, nodes, err := s.ServiceTagNodes(ws, "db", "master")
idx, nodes, err := s.ServiceTagNodes(ws, "db", []string{"master"})
if err != nil {
t.Fatalf("err: %s", err)
}
Expand Down Expand Up @@ -1842,7 +1842,7 @@ func TestStateStore_ServiceTagNodes(t *testing.T) {

// Read everything back.
ws = memdb.NewWatchSet()
idx, nodes, err = s.ServiceTagNodes(ws, "db", "master")
idx, nodes, err = s.ServiceTagNodes(ws, "db", []string{"master"})
if err != nil {
t.Fatalf("err: %s", err)
}
Expand Down Expand Up @@ -1903,7 +1903,7 @@ func TestStateStore_ServiceTagNodes_MultipleTags(t *testing.T) {
t.Fatalf("err: %v", err)
}

idx, nodes, err := s.ServiceTagNodes(nil, "db", "master")
idx, nodes, err := s.ServiceTagNodes(nil, "db", []string{"master"})
if err != nil {
t.Fatalf("err: %s", err)
}
Expand All @@ -1926,7 +1926,7 @@ func TestStateStore_ServiceTagNodes_MultipleTags(t *testing.T) {
t.Fatalf("bad: %v", nodes)
}

idx, nodes, err = s.ServiceTagNodes(nil, "db", "v2")
idx, nodes, err = s.ServiceTagNodes(nil, "db", []string{"v2"})
if err != nil {
t.Fatalf("err: %s", err)
}
Expand All @@ -1937,7 +1937,31 @@ func TestStateStore_ServiceTagNodes_MultipleTags(t *testing.T) {
t.Fatalf("bad: %v", nodes)
}

idx, nodes, err = s.ServiceTagNodes(nil, "db", "dev")
// Test filtering on multiple tags
idx, nodes, err = s.ServiceTagNodes(nil, "db", []string{"v2", "slave"})
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 19 {
t.Fatalf("bad: %v", idx)
}
if len(nodes) != 2 {
t.Fatalf("bad: %v", nodes)
}
if !lib.StrContains(nodes[0].ServiceTags, "v2") {
t.Fatalf("bad: %v", nodes)
}
if !lib.StrContains(nodes[0].ServiceTags, "slave") {
t.Fatalf("bad: %v", nodes)
}
if !lib.StrContains(nodes[1].ServiceTags, "v2") {
t.Fatalf("bad: %v", nodes)
}
if !lib.StrContains(nodes[1].ServiceTags, "slave") {
t.Fatalf("bad: %v", nodes)
}

idx, nodes, err = s.ServiceTagNodes(nil, "db", []string{"dev"})
if err != nil {
t.Fatalf("err: %s", err)
}
Expand Down Expand Up @@ -3088,7 +3112,7 @@ func TestStateStore_CheckServiceTagNodes(t *testing.T) {
}

ws := memdb.NewWatchSet()
idx, nodes, err := s.CheckServiceTagNodes(ws, "db", "master")
idx, nodes, err := s.CheckServiceTagNodes(ws, "db", []string{"master"})
if err != nil {
t.Fatalf("err: %s", err)
}
Expand Down
4 changes: 2 additions & 2 deletions agent/health_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,10 @@ func (s *HTTPServer) healthServiceNodes(resp http.ResponseWriter, req *http.Requ
return nil, nil
}

// Check for a tag
// Check for tags
params := req.URL.Query()
if _, ok := params["tag"]; ok {
args.ServiceTag = params.Get("tag")
args.ServiceTags = params["tag"]
args.TagFilter = true
}

Expand Down
1 change: 1 addition & 0 deletions agent/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,7 @@ type ServiceSpecificRequest struct {
NodeMetaFilters map[string]string
ServiceName string
ServiceTag string
ServiceTags []string
ServiceAddress string
TagFilter bool // Controls tag filtering
Source QuerySource
Expand Down

0 comments on commit fea07f2

Please sign in to comment.