Skip to content

Commit

Permalink
bugfix: use ServiceTags to generate cache key hash (#4987)
Browse files Browse the repository at this point in the history
* bugfix: use ServiceTags to generate cahce key hash

* update unit test

* update

* remote print log

* Update .gitignore

* Completely deprecate ServiceTag field internally for clarity

* Add explicit test for CacheInfo cases
  • Loading branch information
banks authored Jan 7, 2019
1 parent 0f22706 commit b29bc90
Show file tree
Hide file tree
Showing 10 changed files with 237 additions and 19 deletions.
10 changes: 6 additions & 4 deletions agent/cache-types/catalog_services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,28 +25,30 @@ func TestCatalogServices(t *testing.T) {
require.Equal(uint64(24), req.QueryOptions.MinQueryIndex)
require.Equal(1*time.Second, req.QueryOptions.MaxQueryTime)
require.Equal("web", req.ServiceName)
require.Equal("canary", req.ServiceTag)
require.True(req.AllowStale)

reply := args.Get(2).(*structs.IndexedServiceNodes)
reply.ServiceNodes = []*structs.ServiceNode{
&structs.ServiceNode{ServiceTags: req.ServiceTags},
}
reply.QueryMeta.Index = 48
resp = reply
})

// Fetch
result, err := typ.Fetch(cache.FetchOptions{
resultA, err := typ.Fetch(cache.FetchOptions{
MinIndex: 24,
Timeout: 1 * time.Second,
}, &structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "web",
ServiceTag: "canary",
ServiceTags: []string{"tag1", "tag2"},
})
require.NoError(err)
require.Equal(cache.FetchResult{
Value: resp,
Index: 48,
}, result)
}, resultA)
}

func TestCatalogServices_badReqType(t *testing.T) {
Expand Down
10 changes: 6 additions & 4 deletions agent/cache-types/health_services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,28 +25,30 @@ func TestHealthServices(t *testing.T) {
require.Equal(uint64(24), req.QueryOptions.MinQueryIndex)
require.Equal(1*time.Second, req.QueryOptions.MaxQueryTime)
require.Equal("web", req.ServiceName)
require.Equal("canary", req.ServiceTag)
require.True(req.AllowStale)

reply := args.Get(2).(*structs.IndexedCheckServiceNodes)
reply.Nodes = []structs.CheckServiceNode{
{Service: &structs.NodeService{Tags: req.ServiceTags}},
}
reply.QueryMeta.Index = 48
resp = reply
})

// Fetch
result, err := typ.Fetch(cache.FetchOptions{
resultA, err := typ.Fetch(cache.FetchOptions{
MinIndex: 24,
Timeout: 1 * time.Second,
}, &structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "web",
ServiceTag: "canary",
ServiceTags: []string{"tag1", "tag2"},
})
require.NoError(err)
require.Equal(cache.FetchResult{
Value: resp,
Index: 48,
}, result)
}, resultA)
}

func TestHealthServices_badReqType(t *testing.T) {
Expand Down
7 changes: 5 additions & 2 deletions agent/consul/catalog_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,8 +275,9 @@ func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *stru

if args.TagFilter {
tags := args.ServiceTags

// Agents < v1.3.0 and DNS service lookups populate the ServiceTag field. In this case,
// DEPRECATED (singular-service-tag) - remove this when backwards RPC compat
// with 1.2.x is not required.
// Agents < v1.3.0 populate the ServiceTag field. In this case,
// use ServiceTag instead of the ServiceTags field.
if args.ServiceTag != "" {
tags = []string{args.ServiceTag}
Expand Down Expand Up @@ -339,6 +340,8 @@ func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *stru

metrics.IncrCounterWithLabels([]string{"catalog", key, "query"}, 1,
[]metrics.Label{{Name: "service", Value: args.ServiceName}})
// DEPRECATED (singular-service-tag) - remove this when backwards RPC compat
// with 1.2.x is not required.
if args.ServiceTag != "" {
metrics.IncrCounterWithLabels([]string{"catalog", key, "query-tag"}, 1,
[]metrics.Label{{Name: "service", Value: args.ServiceName}, {Name: "tag", Value: args.ServiceTag}})
Expand Down
6 changes: 6 additions & 0 deletions agent/consul/catalog_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1655,6 +1655,8 @@ func TestCatalog_ListServiceNodes_ServiceTags_V1_2_3Compat(t *testing.T) {
err = s1.fsm.State().EnsureService(2, "foo", &structs.NodeService{ID: "db2", Service: "db", Tags: []string{"secondary"}, Address: "127.0.0.1", Port: 5001})
require.NoError(t, err)

// DEPRECATED (singular-service-tag) - remove this when backwards RPC compat
// with 1.2.x is not required.
// make a request with the <=1.2.3 ServiceTag tag field (vs ServiceTags)
args := structs.ServiceSpecificRequest{
Datacenter: "dc1",
Expand All @@ -1670,6 +1672,8 @@ func TestCatalog_ListServiceNodes_ServiceTags_V1_2_3Compat(t *testing.T) {
require.Equal(t, 1, len(out.ServiceNodes))
require.Equal(t, "db1", out.ServiceNodes[0].ServiceID)

// DEPRECATED (singular-service-tag) - remove this when backwards RPC compat
// with 1.2.x is not required.
// test with the other tag
args = structs.ServiceSpecificRequest{
Datacenter: "dc1",
Expand All @@ -1693,6 +1697,8 @@ func TestCatalog_ListServiceNodes_ServiceTags_V1_2_3Compat(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 2, len(out.ServiceNodes))

// DEPRECATED (singular-service-tag) - remove this when backwards RPC compat
// with 1.2.x is not required.
// when both ServiceTag and ServiceTags fields are populated, use ServiceTag
args = structs.ServiceSpecificRequest{
Datacenter: "dc1",
Expand Down
6 changes: 5 additions & 1 deletion agent/consul/health_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ func (h *Health) ServiceNodes(args *structs.ServiceSpecificRequest, reply *struc

metrics.IncrCounterWithLabels([]string{"health", key, "query"}, 1,
[]metrics.Label{{Name: "service", Value: args.ServiceName}})
// DEPRECATED (singular-service-tag) - remove this when backwards RPC compat
// with 1.2.x is not required.
if args.ServiceTag != "" {
metrics.IncrCounterWithLabels([]string{"health", key, "query-tag"}, 1,
[]metrics.Label{{Name: "service", Value: args.ServiceName}, {Name: "tag", Value: args.ServiceTag}})
Expand Down Expand Up @@ -198,7 +200,9 @@ func (h *Health) serviceNodesConnect(ws memdb.WatchSet, s *state.Store, args *st
}

func (h *Health) serviceNodesTagFilter(ws memdb.WatchSet, s *state.Store, args *structs.ServiceSpecificRequest) (uint64, structs.CheckServiceNodes, error) {
// Agents < v1.3.0 and DNS service lookups populate the ServiceTag field. In this case,
// DEPRECATED (singular-service-tag) - remove this when backwards RPC compat
// with 1.2.x is not required.
// Agents < v1.3.0 populate the ServiceTag field. In this case,
// use ServiceTag instead of the ServiceTags field.
if args.ServiceTag != "" {
return s.CheckServiceTagNodes(ws, args.ServiceName, []string{args.ServiceTag})
Expand Down
41 changes: 40 additions & 1 deletion agent/consul/health_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,7 @@ func TestHealth_ServiceNodes(t *testing.T) {
req := structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "db",
ServiceTag: "master",
ServiceTags: []string{"master"},
TagFilter: false,
}
if err := msgpackrpc.CallWithCodec(codec, "Health.ServiceNodes", &req, &out2); err != nil {
Expand Down Expand Up @@ -601,6 +601,45 @@ func TestHealth_ServiceNodes(t *testing.T) {
if nodes[1].Checks[0].Status != api.HealthPassing {
t.Fatalf("Bad: %v", nodes[1])
}

// Same should still work for <1.3 RPCs with singular tags
// DEPRECATED (singular-service-tag) - remove this when backwards RPC compat
// with 1.2.x is not required.
{
var out2 structs.IndexedCheckServiceNodes
req := structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "db",
ServiceTag: "master",
TagFilter: false,
}
if err := msgpackrpc.CallWithCodec(codec, "Health.ServiceNodes", &req, &out2); err != nil {
t.Fatalf("err: %v", err)
}

nodes := out2.Nodes
if len(nodes) != 2 {
t.Fatalf("Bad: %v", nodes)
}
if nodes[0].Node.Node != "bar" {
t.Fatalf("Bad: %v", nodes[0])
}
if nodes[1].Node.Node != "foo" {
t.Fatalf("Bad: %v", nodes[1])
}
if !lib.StrContains(nodes[0].Service.Tags, "slave") {
t.Fatalf("Bad: %v", nodes[0])
}
if !lib.StrContains(nodes[1].Service.Tags, "master") {
t.Fatalf("Bad: %v", nodes[1])
}
if nodes[0].Checks[0].Status != api.HealthWarning {
t.Fatalf("Bad: %v", nodes[0])
}
if nodes[1].Checks[0].Status != api.HealthPassing {
t.Fatalf("Bad: %v", nodes[1])
}
}
}

func TestHealth_ServiceNodes_MultipleServiceTags(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion agent/dns.go
Original file line number Diff line number Diff line change
Expand Up @@ -1009,7 +1009,7 @@ func (d *DNSServer) lookupServiceNodes(datacenter, service, tag string, connect
Connect: connect,
Datacenter: datacenter,
ServiceName: service,
ServiceTag: tag,
ServiceTags: []string{tag},
TagFilter: tag != "",
QueryOptions: structs.QueryOptions{
Token: d.agent.tokens.UserToken(),
Expand Down
15 changes: 14 additions & 1 deletion agent/dns_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2571,11 +2571,24 @@ func TestDNS_ServiceLookup_TagPeriod(t *testing.T) {
t.Fatalf("err: %v", err)
}

m1 := new(dns.Msg)
m1.SetQuestion("v1.master2.db.service.consul.", dns.TypeSRV)

c1 := new(dns.Client)
in, _, err := c1.Exchange(m1, a.DNSAddr())
if err != nil {
t.Fatalf("err: %v", err)
}

if len(in.Answer) != 0 {
t.Fatalf("Bad: %#v", in)
}

m := new(dns.Msg)
m.SetQuestion("v1.master.db.service.consul.", dns.TypeSRV)

c := new(dns.Client)
in, _, err := c.Exchange(m, a.DNSAddr())
in, _, err = c.Exchange(m, a.DNSAddr())
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down
22 changes: 17 additions & 5 deletions agent/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"math/rand"
"reflect"
"regexp"
"sort"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -351,11 +352,13 @@ type ServiceSpecificRequest struct {
Datacenter string
NodeMetaFilters map[string]string
ServiceName string
ServiceTag string
ServiceTags []string
ServiceAddress string
TagFilter bool // Controls tag filtering
Source QuerySource
// DEPRECATED (singular-service-tag) - remove this when backwards RPC compat
// with 1.2.x is not required.
ServiceTag string
ServiceTags []string
ServiceAddress string
TagFilter bool // Controls tag filtering
Source QuerySource

// Connect if true will only search for Connect-compatible services.
Connect bool
Expand Down Expand Up @@ -384,10 +387,19 @@ func (r *ServiceSpecificRequest) CacheInfo() cache.RequestInfo {
// cached results, we need to be careful we maintain the same order of fields
// here. We could alternatively use `hash:set` struct tag on an anonymous
// struct to make it more robust if it becomes significant.
sort.Strings(r.ServiceTags)
v, err := hashstructure.Hash([]interface{}{
r.NodeMetaFilters,
r.ServiceName,
// DEPRECATED (singular-service-tag) - remove this when upgrade RPC compat
// with 1.2.x is not required. We still need this in because <1.3 agents
// might still send RPCs with singular tag set. In fact the only place we
// use this method is in agent cache so if the agent is new enough to have
// this code this should never be set, but it's safer to include it until we
// completely remove this field just in case it's erroneously used anywhere
// (e.g. until this change DNS still used it).
r.ServiceTag,
r.ServiceTags,
r.ServiceAddress,
r.TagFilter,
r.Connect,
Expand Down
Loading

0 comments on commit b29bc90

Please sign in to comment.