Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reset coalesceTimer to nil as soon as the event is consumed #11924

Merged
merged 6 commits into from
Jan 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/11924.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
xds: fix a deadlock when the snapshot channel already have a snapshot to be consumed.
```
22 changes: 12 additions & 10 deletions agent/proxycfg/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (m *Manager) Run() error {
defer m.State.StopNotify(stateCh)

for {
m.syncState()
m.syncState(m.notifyBroadcast)

// Wait for a state change
_, ok := <-stateCh
Expand All @@ -140,7 +140,7 @@ func (m *Manager) Run() error {

// syncState is called whenever the local state notifies a change. It holds the
// lock while finding any new or updated proxies and removing deleted ones.
func (m *Manager) syncState() {
func (m *Manager) syncState(notifyBroadcast func(ch <-chan ConfigSnapshot)) {
m.mu.Lock()
defer m.mu.Unlock()

Expand All @@ -160,7 +160,7 @@ func (m *Manager) syncState() {
// know that so we'd need to set it here if not during registration of the
// proxy service. Sidecar Service in the interim can do that, but we should
// validate more generally that that is always true.
err := m.ensureProxyServiceLocked(svc)
err := m.ensureProxyServiceLocked(svc, notifyBroadcast)
if err != nil {
m.Logger.Error("failed to watch proxy service",
"service", sid.String(),
Expand All @@ -179,7 +179,7 @@ func (m *Manager) syncState() {
}

// ensureProxyServiceLocked adds or changes the proxy to our state.
func (m *Manager) ensureProxyServiceLocked(ns *structs.NodeService) error {
func (m *Manager) ensureProxyServiceLocked(ns *structs.NodeService, notifyBroadcast func(ch <-chan ConfigSnapshot)) error {
sid := ns.CompoundServiceID()

// Retrieve the token used to register the service, or fallback to the
Expand Down Expand Up @@ -227,16 +227,18 @@ func (m *Manager) ensureProxyServiceLocked(ns *structs.NodeService) error {
m.proxies[sid] = state

// Start a goroutine that will wait for changes and broadcast them to watchers.
go func(ch <-chan ConfigSnapshot) {
// Run until ch is closed
for snap := range ch {
m.notify(&snap)
}
}(ch)
go notifyBroadcast(ch)

return nil
}

func (m *Manager) notifyBroadcast(ch <-chan ConfigSnapshot) {
// Run until ch is closed
for snap := range ch {
m.notify(&snap)
}
}

// removeProxyService is called when a service deregisters and frees all
// resources for that service.
func (m *Manager) removeProxyServiceLocked(proxyID structs.ServiceID) {
Expand Down
141 changes: 140 additions & 1 deletion agent/proxycfg/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,146 @@ func TestManager_SyncState_DefaultToken(t *testing.T) {

err = state.AddServiceWithChecks(srv, nil, "")
require.NoError(t, err)
m.syncState()
m.syncState(m.notifyBroadcast)

require.Equal(t, "default-token", m.proxies[srv.CompoundServiceID()].serviceInstance.token)
}

func TestManager_SyncState_No_Notify(t *testing.T) {
types := NewTestCacheTypes(t)
c := TestCacheWithTypes(t, types)
logger := testutil.Logger(t)
tokens := new(token.Store)
tokens.UpdateUserToken("default-token", token.TokenSourceConfig)

state := local.NewState(local.Config{}, logger, tokens)
state.TriggerSyncChanges = func() {}

m, err := NewManager(ManagerConfig{
Cache: c,
Health: &health.Client{Cache: c, CacheName: cachetype.HealthServicesName},
State: state,
Tokens: tokens,
Source: &structs.QuerySource{Datacenter: "dc1"},
Logger: logger,
})
require.NoError(t, err)
defer m.Close()

srv := &structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
ID: "web-sidecar-proxy",
Service: "web-sidecar-proxy",
Port: 9999,
Meta: map[string]string{},
Proxy: structs.ConnectProxyConfig{
DestinationServiceID: "web",
DestinationServiceName: "web",
LocalServiceAddress: "127.0.0.1",
LocalServicePort: 8080,
Config: map[string]interface{}{
"foo": "bar",
},
},
}

err = state.AddServiceWithChecks(srv, nil, "")
require.NoError(t, err)

readEvent := make(chan bool, 1)
snapSent := make(chan bool, 1)

m.syncState(func(ch <-chan ConfigSnapshot) {
for {
<-readEvent
snap := <-ch
m.notify(&snap)
snapSent <- true
}
})

// Get the relevant notification Channel, should only have 1
notifyCH := m.proxies[srv.CompoundServiceID()].ch

// update the leaf certs
roots, issuedCert := TestCerts(t)
notifyCH <- cache.UpdateEvent{
CorrelationID: leafWatchID,
Result: issuedCert,
Err: nil,
}
// at this point the snapshot should not be valid and not be sent
after := time.After(200 * time.Millisecond)
select {
case <-snapSent:
t.Fatal("snap should not be valid")
case <-after:

}

// update the root certs
notifyCH <- cache.UpdateEvent{
CorrelationID: rootsWatchID,
Result: roots,
Err: nil,
}

// at this point the snapshot should not be valid and not be sent
after = time.After(200 * time.Millisecond)
select {
case <-snapSent:
t.Fatal("snap should not be valid")
case <-after:

}

// prepare to read a snapshot update as the next update should make the snapshot valid
readEvent <- true

// update the intentions
notifyCH <- cache.UpdateEvent{
CorrelationID: intentionsWatchID,
Result: &structs.IndexedIntentionMatches{},
Err: nil,
}

// at this point we have a valid snapshot
after = time.After(500 * time.Millisecond)
select {
case <-snapSent:
case <-after:
t.Fatal("snap should be valid")

}

// send two snapshots back to back without reading them to overflow the snapshot channel and get to the default use case
for i := 0; i < 2; i++ {
time.Sleep(250 * time.Millisecond)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's this sleep used for? Could it be removed to reduce the running time for the test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sleep is to ensure that we saturate the snap channel and get to the default. because of the check in http://github.com/hashicorp/consul/blob/f68b1a4a77741ae4c293571efd32b943971ec9ab/agent/proxycfg/state.go#L357-L357 if we don't wait between updates we only send once which do not fill snapCh and get us to the default case.

notifyCH <- cache.UpdateEvent{
CorrelationID: leafWatchID,
Result: issuedCert,
Err: nil,
}
}

// make sure that we are not receiving any snapshot and wait for the snapshots to be processed
after = time.After(500 * time.Millisecond)
select {
case <-snapSent:
t.Fatal("snap should not be sent")
case <-after:
}

// now make sure that both snapshots got propagated
for i := 0; i < 2; i++ {

readEvent <- true
after = time.After(500 * time.Millisecond)
select {
case <-snapSent:
case <-after:
t.Fatal("snap should be valid")

}
}
}
13 changes: 5 additions & 8 deletions agent/proxycfg/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,8 @@ func (s *state) run(ctx context.Context, snap *ConfigSnapshot) {
}

case <-sendCh:
// Allow the next change to trigger a send
coalesceTimer = nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is reset here, wouldn't the check in L322 always be true?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand what this timer is doing any more. The original intent was to prevent updates to multiple blocking queries that return at the same time from causing multiple reconfigurations of Envoy in quick succession.

For example on startup, a proxy might start watching 100 upstream services and config entries, without coalescing here, we would end up delivering O(100) entire snapshot reconfigurations to Envoy in the space of a few hundred milliseconds as all the blocking queries return their initial results.

It's not clear to me if this is actually doing that any more or if immediately resetting it here is causing it to effectively be removed? I've not traced this code for long enough to fully remember how it works though.

The other unknown for me is whether this coalescing is even needed/useful anymore since we implemented Delta XDS? My understanding is that now, even if we deliver multiple snapshots, the delta part of delta XDS will ensure we only send the changes in each one down so from Envoy's perspective it might not make any difference. It still could save some CPU cycles on the agent that is doing the delivering though which as we move towards Servers doing that could be significant. It's not clear that they will even use this package as it is though so I perhaps wouldn't optimize too much for that.

I'd be interested to know at any rate whether this change has a material impact on the amount of reconfigurations sent to envoy when there are a lot of upstreams, and if not, whether we should just remove all the coallesce code entirely and simplify this loop?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nevermind. I get it now. I missed the main place the timer is reset outside of the Select in the for loop way below (not in the default branch). Originally before the last deadlock fix it was always reset in this branch anyway: https://github.com/hashicorp/consul/blame/cf9a14ab6ad25e8932eb2b07616c0c33ddc54b12/agent/proxycfg/state.go#L614-L631

I think Freddy might be right that the conditional in the default case on 322 is redundant as it will always be true again now in that case, but this doesn't break all of coalescing because there is still the conditional outside the select that is the main place we throttle updates being delivered to sendCh.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that's right the check is not needed

The use case I found the bug from have a lot of updates happening (specially for an ingress Kind) not sure why but saw a lot of updates happening per second for the same ingress service, I would guess that the coalesceTimeout is useful for this type of behaviours.

// Make a deep copy of snap so we don't mutate any of the embedded structs
// etc on future updates.
snapCopy, err := snap.Clone()
Expand All @@ -307,9 +309,6 @@ func (s *state) run(ctx context.Context, snap *ConfigSnapshot) {
case s.snapCh <- *snapCopy:
s.logger.Trace("Delivered new snapshot to proxy config watchers")

// Allow the next change to trigger a send
coalesceTimer = nil

// Skip rest of loop - there is nothing to send since nothing changed on
// this iteration
continue
Expand All @@ -320,11 +319,9 @@ func (s *state) run(ctx context.Context, snap *ConfigSnapshot) {
s.logger.Trace("Failed to deliver new snapshot to proxy config watchers")

// Reset the timer to retry later. This is to ensure we attempt to redeliver the updated snapshot shortly.
if coalesceTimer == nil {
coalesceTimer = time.AfterFunc(coalesceTimeout, func() {
sendCh <- struct{}{}
})
}
coalesceTimer = time.AfterFunc(coalesceTimeout, func() {
sendCh <- struct{}{}
})

// Do not reset coalesceTimer since we just queued a timer-based refresh
continue
Expand Down