Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

local: fixes a data race in anti-entropy sync #12324

Merged
merged 7 commits into from
Feb 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/12324.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
local: fixes a data race in anti-entropy sync that could cause the wrong tags to be applied to a service when EnableTagOverride is used
```
57 changes: 48 additions & 9 deletions agent/local/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/armon/go-metrics"
"github.com/armon/go-metrics/prometheus"
"github.com/hashicorp/go-hclog"
"github.com/mitchellh/copystructure"

"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul"
Expand Down Expand Up @@ -267,6 +268,13 @@ func (l *State) addServiceLocked(service *structs.NodeService, token string) err
return fmt.Errorf("no service")
}

// Avoid having the stored service have any call-site ownership.
var err error
service, err = cloneService(service)
if err != nil {
return err
}

// use the service name as id if the id was omitted
if service.ID == "" {
service.ID = service.Service
Expand Down Expand Up @@ -530,8 +538,12 @@ func (l *State) addCheckLocked(check *structs.HealthCheck, token string) error {
return fmt.Errorf("no check")
}

// clone the check since we will be modifying it.
check = check.Clone()
// Avoid having the stored check have any call-site ownership.
var err error
check, err = cloneCheck(check)
if err != nil {
return err
}

if l.discardCheckOutput.Load().(bool) {
check.Output = ""
Expand Down Expand Up @@ -1083,30 +1095,39 @@ func (l *State) updateSyncState() error {
continue
}

// Make a shallow copy since we may mutate it below and other readers
// may be reading it and we want to avoid a race.
nextService := *ls.Service
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still copies even if it's not going to mutate it for 1 of the 2 reasons below, but I sense that the clarity this provides in reading might be fine given the slightly increased allocations in the AE loop.

changed := false

// If our definition is different, we need to update it. Make a
// copy so that we don't retain a pointer to any actual state
// store info for in-memory RPCs.
if ls.Service.EnableTagOverride {
tags := make([]string, len(rs.Tags))
copy(tags, rs.Tags)
ls.Service.Tags = tags
if nextService.EnableTagOverride {
nextService.Tags = structs.CloneStringSlice(rs.Tags)
changed = true
}

// Merge any tagged addresses with the consul- prefix (set by the server)
// back into the local state.
if !reflect.DeepEqual(ls.Service.TaggedAddresses, rs.TaggedAddresses) {
if !reflect.DeepEqual(nextService.TaggedAddresses, rs.TaggedAddresses) {
// Make a copy of TaggedAddresses to prevent races when writing
// since other goroutines may be reading from the map
m := make(map[string]structs.ServiceAddress)
for k, v := range ls.Service.TaggedAddresses {
for k, v := range nextService.TaggedAddresses {
m[k] = v
}
for k, v := range rs.TaggedAddresses {
if strings.HasPrefix(k, structs.MetaKeyReservedPrefix) {
m[k] = v
}
}
ls.Service.TaggedAddresses = m
nextService.TaggedAddresses = m
changed = true
}

if changed {
ls.Service = &nextService
}
ls.InSync = ls.Service.IsSame(rs)
}
Expand Down Expand Up @@ -1549,3 +1570,21 @@ func (l *State) aclAccessorID(secretID string) string {
}
return ident.AccessorID()
}

func cloneService(ns *structs.NodeService) (*structs.NodeService, error) {
// TODO: consider doing a hand-managed clone function
raw, err := copystructure.Copy(ns)
if err != nil {
return nil, err
}
return raw.(*structs.NodeService), err
}

func cloneCheck(check *structs.HealthCheck) (*structs.HealthCheck, error) {
// TODO: consider doing a hand-managed clone function
raw, err := copystructure.Copy(check)
if err != nil {
return nil, err
}
return raw.(*structs.HealthCheck), err
}
93 changes: 67 additions & 26 deletions agent/local/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-uuid"
"github.com/mitchellh/copystructure"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -267,13 +268,14 @@ func TestAgentAntiEntropy_Services_ConnectProxy(t *testing.T) {
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")

clone := func(ns *structs.NodeService) *structs.NodeService {
raw, err := copystructure.Copy(ns)
require.NoError(t, err)
return raw.(*structs.NodeService)
}

// Register node info
var out struct{}
args := &structs.RegisterRequest{
Datacenter: "dc1",
Node: a.Config.NodeName,
Address: "127.0.0.1",
}

// Exists both same (noop)
srv1 := &structs.NodeService{
Expand All @@ -289,8 +291,12 @@ func TestAgentAntiEntropy_Services_ConnectProxy(t *testing.T) {
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
}
a.State.AddService(srv1, "")
args.Service = srv1
assert.Nil(t, a.RPC("Catalog.Register", args, &out))
require.NoError(t, a.RPC("Catalog.Register", &structs.RegisterRequest{
Datacenter: "dc1",
Node: a.Config.NodeName,
Address: "127.0.0.1",
Service: srv1,
}, &out))

// Exists both, different (update)
srv2 := &structs.NodeService{
Expand All @@ -307,11 +313,14 @@ func TestAgentAntiEntropy_Services_ConnectProxy(t *testing.T) {
}
a.State.AddService(srv2, "")

srv2_mod := new(structs.NodeService)
*srv2_mod = *srv2
srv2_mod := clone(srv2)
srv2_mod.Port = 9000
args.Service = srv2_mod
assert.Nil(t, a.RPC("Catalog.Register", args, &out))
require.NoError(t, a.RPC("Catalog.Register", &structs.RegisterRequest{
Datacenter: "dc1",
Node: a.Config.NodeName,
Address: "127.0.0.1",
Service: srv2_mod,
}, &out))

// Exists local (create)
srv3 := &structs.NodeService{
Expand Down Expand Up @@ -341,8 +350,12 @@ func TestAgentAntiEntropy_Services_ConnectProxy(t *testing.T) {
},
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
}
args.Service = srv4
assert.Nil(t, a.RPC("Catalog.Register", args, &out))
require.NoError(t, a.RPC("Catalog.Register", &structs.RegisterRequest{
Datacenter: "dc1",
Node: a.Config.NodeName,
Address: "127.0.0.1",
Service: srv4,
}, &out))

// Exists local, in sync, remote missing (create)
srv5 := &structs.NodeService{
Expand All @@ -362,28 +375,56 @@ func TestAgentAntiEntropy_Services_ConnectProxy(t *testing.T) {
InSync: true,
})

assert.Nil(t, a.State.SyncFull())
require.NoError(t, a.State.SyncFull())

var services structs.IndexedNodeServices
req := structs.NodeSpecificRequest{
Datacenter: "dc1",
Node: a.Config.NodeName,
}
assert.Nil(t, a.RPC("Catalog.NodeServices", &req, &services))
require.NoError(t, a.RPC("Catalog.NodeServices", &req, &services))

// We should have 5 services (consul included)
assert.Len(t, services.NodeServices.Services, 5)
require.Len(t, services.NodeServices.Services, 5)

// Check that virtual IPs have been set
vips := make(map[string]struct{})
serviceToVIP := make(map[string]string)
for _, serv := range services.NodeServices.Services {
if serv.TaggedAddresses != nil {
serviceVIP := serv.TaggedAddresses[structs.TaggedAddressVirtualIP].Address
assert.NotEmpty(t, serviceVIP)
require.NotEmpty(t, serviceVIP)
vips[serviceVIP] = struct{}{}
serviceToVIP[serv.ID] = serviceVIP
}
}
assert.Len(t, vips, 4)
require.Len(t, vips, 4)

// Update our assertions for the tagged addresses.
srv1.TaggedAddresses = map[string]structs.ServiceAddress{
structs.TaggedAddressVirtualIP: {
Address: serviceToVIP["mysql-proxy"],
Port: srv1.Port,
},
}
srv2.TaggedAddresses = map[string]structs.ServiceAddress{
structs.TaggedAddressVirtualIP: {
Address: serviceToVIP["redis-proxy"],
Port: srv2.Port,
},
}
srv3.TaggedAddresses = map[string]structs.ServiceAddress{
structs.TaggedAddressVirtualIP: {
Address: serviceToVIP["web-proxy"],
Port: srv3.Port,
},
}
srv5.TaggedAddresses = map[string]structs.ServiceAddress{
structs.TaggedAddressVirtualIP: {
Address: serviceToVIP["cache-proxy"],
Port: srv5.Port,
},
}

// All the services should match
// Retry to mitigate data races between local and remote state
Expand All @@ -408,34 +449,34 @@ func TestAgentAntiEntropy_Services_ConnectProxy(t *testing.T) {
}
})

assert.NoError(t, servicesInSync(a.State, 4, structs.DefaultEnterpriseMetaInDefaultPartition()))
require.NoError(t, servicesInSync(a.State, 4, structs.DefaultEnterpriseMetaInDefaultPartition()))

// Remove one of the services
a.State.RemoveService(structs.NewServiceID("cache-proxy", nil))
assert.Nil(t, a.State.SyncFull())
assert.Nil(t, a.RPC("Catalog.NodeServices", &req, &services))
require.NoError(t, a.State.SyncFull())
require.NoError(t, a.RPC("Catalog.NodeServices", &req, &services))

// We should have 4 services (consul included)
assert.Len(t, services.NodeServices.Services, 4)
require.Len(t, services.NodeServices.Services, 4)

// All the services should match
for id, serv := range services.NodeServices.Services {
serv.CreateIndex, serv.ModifyIndex = 0, 0
switch id {
case "mysql-proxy":
assert.Equal(t, srv1, serv)
require.Equal(t, srv1, serv)
case "redis-proxy":
assert.Equal(t, srv2, serv)
require.Equal(t, srv2, serv)
case "web-proxy":
assert.Equal(t, srv3, serv)
require.Equal(t, srv3, serv)
case structs.ConsulServiceID:
// ignore
default:
t.Fatalf("unexpected service: %v", id)
}
}

assert.Nil(t, servicesInSync(a.State, 3, structs.DefaultEnterpriseMetaInDefaultPartition()))
require.NoError(t, servicesInSync(a.State, 3, structs.DefaultEnterpriseMetaInDefaultPartition()))
}

func TestAgent_ServiceWatchCh(t *testing.T) {
Expand Down