Skip to content

Commit

Permalink
[1.11.x] local: fixes a data race in anti-entropy sync
Browse files Browse the repository at this point in the history
Backport of #12324 to 1.11.x
  • Loading branch information
rboyer committed Feb 14, 2022
1 parent 15cb7a8 commit 2e66ccd
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 36 deletions.
3 changes: 3 additions & 0 deletions .changelog/12324.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
local: fixes a data race in anti-entropy sync that could cause the wrong tags to be applied to a service when EnableTagOverride is used
```
57 changes: 48 additions & 9 deletions agent/local/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/armon/go-metrics"
"github.com/armon/go-metrics/prometheus"
"github.com/hashicorp/go-hclog"
"github.com/mitchellh/copystructure"

"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/structs"
Expand Down Expand Up @@ -266,6 +267,13 @@ func (l *State) addServiceLocked(service *structs.NodeService, token string) err
return fmt.Errorf("no service")
}

// Avoid having the stored service have any call-site ownership.
var err error
service, err = cloneService(service)
if err != nil {
return err
}

// use the service name as id if the id was omitted
if service.ID == "" {
service.ID = service.Service
Expand Down Expand Up @@ -522,8 +530,12 @@ func (l *State) addCheckLocked(check *structs.HealthCheck, token string) error {
return fmt.Errorf("no check")
}

// clone the check since we will be modifying it.
check = check.Clone()
// Avoid having the stored check have any call-site ownership.
var err error
check, err = cloneCheck(check)
if err != nil {
return err
}

if l.discardCheckOutput.Load().(bool) {
check.Output = ""
Expand Down Expand Up @@ -1075,30 +1087,39 @@ func (l *State) updateSyncState() error {
continue
}

// Make a shallow copy since we may mutate it below and other readers
// may be reading it and we want to avoid a race.
nextService := *ls.Service
changed := false

// If our definition is different, we need to update it. Make a
// copy so that we don't retain a pointer to any actual state
// store info for in-memory RPCs.
if ls.Service.EnableTagOverride {
tags := make([]string, len(rs.Tags))
copy(tags, rs.Tags)
ls.Service.Tags = tags
if nextService.EnableTagOverride {
nextService.Tags = structs.CloneStringSlice(rs.Tags)
changed = true
}

// Merge any tagged addresses with the consul- prefix (set by the server)
// back into the local state.
if !reflect.DeepEqual(ls.Service.TaggedAddresses, rs.TaggedAddresses) {
if !reflect.DeepEqual(nextService.TaggedAddresses, rs.TaggedAddresses) {
// Make a copy of TaggedAddresses to prevent races when writing
// since other goroutines may be reading from the map
m := make(map[string]structs.ServiceAddress)
for k, v := range ls.Service.TaggedAddresses {
for k, v := range nextService.TaggedAddresses {
m[k] = v
}
for k, v := range rs.TaggedAddresses {
if strings.HasPrefix(k, structs.MetaKeyReservedPrefix) {
m[k] = v
}
}
ls.Service.TaggedAddresses = m
nextService.TaggedAddresses = m
changed = true
}

if changed {
ls.Service = &nextService
}
ls.InSync = ls.Service.IsSame(rs)
}
Expand Down Expand Up @@ -1544,3 +1565,21 @@ func (l *State) aclAccessorID(secretID string) string {
}
return ident.ID()
}

func cloneService(ns *structs.NodeService) (*structs.NodeService, error) {
// TODO: consider doing a hand-managed clone function
raw, err := copystructure.Copy(ns)
if err != nil {
return nil, err
}
return raw.(*structs.NodeService), err
}

func cloneCheck(check *structs.HealthCheck) (*structs.HealthCheck, error) {
// TODO: consider doing a hand-managed clone function
raw, err := copystructure.Copy(check)
if err != nil {
return nil, err
}
return raw.(*structs.HealthCheck), err
}
94 changes: 67 additions & 27 deletions agent/local/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-uuid"
"github.com/mitchellh/copystructure"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -261,18 +262,18 @@ func TestAgentAntiEntropy_Services_ConnectProxy(t *testing.T) {

t.Parallel()

assert := assert.New(t)
a := agent.NewTestAgent(t, "")
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")

clone := func(ns *structs.NodeService) *structs.NodeService {
raw, err := copystructure.Copy(ns)
require.NoError(t, err)
return raw.(*structs.NodeService)
}

// Register node info
var out struct{}
args := &structs.RegisterRequest{
Datacenter: "dc1",
Node: a.Config.NodeName,
Address: "127.0.0.1",
}

// Exists both same (noop)
srv1 := &structs.NodeService{
Expand All @@ -288,8 +289,12 @@ func TestAgentAntiEntropy_Services_ConnectProxy(t *testing.T) {
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
}
a.State.AddService(srv1, "")
args.Service = srv1
assert.Nil(a.RPC("Catalog.Register", args, &out))
require.NoError(t, a.RPC("Catalog.Register", &structs.RegisterRequest{
Datacenter: "dc1",
Node: a.Config.NodeName,
Address: "127.0.0.1",
Service: srv1,
}, &out))

// Exists both, different (update)
srv2 := &structs.NodeService{
Expand All @@ -306,11 +311,14 @@ func TestAgentAntiEntropy_Services_ConnectProxy(t *testing.T) {
}
a.State.AddService(srv2, "")

srv2_mod := new(structs.NodeService)
*srv2_mod = *srv2
srv2_mod := clone(srv2)
srv2_mod.Port = 9000
args.Service = srv2_mod
assert.Nil(a.RPC("Catalog.Register", args, &out))
require.NoError(t, a.RPC("Catalog.Register", &structs.RegisterRequest{
Datacenter: "dc1",
Node: a.Config.NodeName,
Address: "127.0.0.1",
Service: srv2_mod,
}, &out))

// Exists local (create)
srv3 := &structs.NodeService{
Expand Down Expand Up @@ -340,8 +348,12 @@ func TestAgentAntiEntropy_Services_ConnectProxy(t *testing.T) {
},
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
}
args.Service = srv4
assert.Nil(a.RPC("Catalog.Register", args, &out))
require.NoError(t, a.RPC("Catalog.Register", &structs.RegisterRequest{
Datacenter: "dc1",
Node: a.Config.NodeName,
Address: "127.0.0.1",
Service: srv4,
}, &out))

// Exists local, in sync, remote missing (create)
srv5 := &structs.NodeService{
Expand All @@ -361,28 +373,56 @@ func TestAgentAntiEntropy_Services_ConnectProxy(t *testing.T) {
InSync: true,
})

assert.Nil(a.State.SyncFull())
require.NoError(t, a.State.SyncFull())

var services structs.IndexedNodeServices
req := structs.NodeSpecificRequest{
Datacenter: "dc1",
Node: a.Config.NodeName,
}
assert.Nil(a.RPC("Catalog.NodeServices", &req, &services))
require.NoError(t, a.RPC("Catalog.NodeServices", &req, &services))

// We should have 5 services (consul included)
assert.Len(services.NodeServices.Services, 5)
require.Len(t, services.NodeServices.Services, 5)

// Check that virtual IPs have been set
vips := make(map[string]struct{})
serviceToVIP := make(map[string]string)
for _, serv := range services.NodeServices.Services {
if serv.TaggedAddresses != nil {
serviceVIP := serv.TaggedAddresses[structs.TaggedAddressVirtualIP].Address
assert.NotEmpty(serviceVIP)
require.NotEmpty(t, serviceVIP)
vips[serviceVIP] = struct{}{}
serviceToVIP[serv.ID] = serviceVIP
}
}
assert.Len(vips, 4)
require.Len(t, vips, 4)

// Update our assertions for the tagged addresses.
srv1.TaggedAddresses = map[string]structs.ServiceAddress{
structs.TaggedAddressVirtualIP: {
Address: serviceToVIP["mysql-proxy"],
Port: srv1.Port,
},
}
srv2.TaggedAddresses = map[string]structs.ServiceAddress{
structs.TaggedAddressVirtualIP: {
Address: serviceToVIP["redis-proxy"],
Port: srv2.Port,
},
}
srv3.TaggedAddresses = map[string]structs.ServiceAddress{
structs.TaggedAddressVirtualIP: {
Address: serviceToVIP["web-proxy"],
Port: srv3.Port,
},
}
srv5.TaggedAddresses = map[string]structs.ServiceAddress{
structs.TaggedAddressVirtualIP: {
Address: serviceToVIP["cache-proxy"],
Port: srv5.Port,
},
}

// All the services should match
// Retry to mitigate data races between local and remote state
Expand All @@ -407,34 +447,34 @@ func TestAgentAntiEntropy_Services_ConnectProxy(t *testing.T) {
}
})

assert.NoError(servicesInSync(a.State, 4, structs.DefaultEnterpriseMetaInDefaultPartition()))
require.NoError(t, servicesInSync(a.State, 4, structs.DefaultEnterpriseMetaInDefaultPartition()))

// Remove one of the services
a.State.RemoveService(structs.NewServiceID("cache-proxy", nil))
assert.Nil(a.State.SyncFull())
assert.Nil(a.RPC("Catalog.NodeServices", &req, &services))
require.NoError(t, a.State.SyncFull())
require.NoError(t, a.RPC("Catalog.NodeServices", &req, &services))

// We should have 4 services (consul included)
assert.Len(services.NodeServices.Services, 4)
require.Len(t, services.NodeServices.Services, 4)

// All the services should match
for id, serv := range services.NodeServices.Services {
serv.CreateIndex, serv.ModifyIndex = 0, 0
switch id {
case "mysql-proxy":
assert.Equal(srv1, serv)
require.Equal(t, srv1, serv)
case "redis-proxy":
assert.Equal(srv2, serv)
require.Equal(t, srv2, serv)
case "web-proxy":
assert.Equal(srv3, serv)
require.Equal(t, srv3, serv)
case structs.ConsulServiceID:
// ignore
default:
t.Fatalf("unexpected service: %v", id)
}
}

assert.Nil(servicesInSync(a.State, 3, structs.DefaultEnterpriseMetaInDefaultPartition()))
require.NoError(t, servicesInSync(a.State, 3, structs.DefaultEnterpriseMetaInDefaultPartition()))
}

func TestAgent_ServiceWatchCh(t *testing.T) {
Expand Down

0 comments on commit 2e66ccd

Please sign in to comment.