Skip to content

Commit

Permalink
reset coalesceTimer to nil as soon as the event is consumed (#11924)
Browse files Browse the repository at this point in the history
* reset `coalesceTimer` to nil as soon as the event is consumed

* add change log

* refactor to add relevant test.

* fix linter

* Apply suggestions from code review

Co-authored-by: Freddy <freddygv@users.noreply.github.com>

* remove non needed check

Co-authored-by: Freddy <freddygv@users.noreply.github.com>
  • Loading branch information
2 people authored and hc-github-team-consul-core committed Jan 5, 2022
1 parent 90c4a26 commit efed86d
Show file tree
Hide file tree
Showing 4 changed files with 160 additions and 19 deletions.
3 changes: 3 additions & 0 deletions .changelog/11924.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
xds: fix a deadlock when the snapshot channel already have a snapshot to be consumed.
```
22 changes: 12 additions & 10 deletions agent/proxycfg/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (m *Manager) Run() error {
defer m.State.StopNotify(stateCh)

for {
m.syncState()
m.syncState(m.notifyBroadcast)

// Wait for a state change
_, ok := <-stateCh
Expand All @@ -140,7 +140,7 @@ func (m *Manager) Run() error {

// syncState is called whenever the local state notifies a change. It holds the
// lock while finding any new or updated proxies and removing deleted ones.
func (m *Manager) syncState() {
func (m *Manager) syncState(notifyBroadcast func(ch <-chan ConfigSnapshot)) {
m.mu.Lock()
defer m.mu.Unlock()

Expand All @@ -160,7 +160,7 @@ func (m *Manager) syncState() {
// know that so we'd need to set it here if not during registration of the
// proxy service. Sidecar Service in the interim can do that, but we should
// validate more generally that that is always true.
err := m.ensureProxyServiceLocked(svc)
err := m.ensureProxyServiceLocked(svc, notifyBroadcast)
if err != nil {
m.Logger.Error("failed to watch proxy service",
"service", sid.String(),
Expand All @@ -179,7 +179,7 @@ func (m *Manager) syncState() {
}

// ensureProxyServiceLocked adds or changes the proxy to our state.
func (m *Manager) ensureProxyServiceLocked(ns *structs.NodeService) error {
func (m *Manager) ensureProxyServiceLocked(ns *structs.NodeService, notifyBroadcast func(ch <-chan ConfigSnapshot)) error {
sid := ns.CompoundServiceID()

// Retrieve the token used to register the service, or fallback to the
Expand Down Expand Up @@ -225,16 +225,18 @@ func (m *Manager) ensureProxyServiceLocked(ns *structs.NodeService) error {
m.proxies[sid] = state

// Start a goroutine that will wait for changes and broadcast them to watchers.
go func(ch <-chan ConfigSnapshot) {
// Run until ch is closed
for snap := range ch {
m.notify(&snap)
}
}(ch)
go notifyBroadcast(ch)

return nil
}

func (m *Manager) notifyBroadcast(ch <-chan ConfigSnapshot) {
// Run until ch is closed
for snap := range ch {
m.notify(&snap)
}
}

// removeProxyService is called when a service deregisters and frees all
// resources for that service.
func (m *Manager) removeProxyServiceLocked(proxyID structs.ServiceID) {
Expand Down
141 changes: 140 additions & 1 deletion agent/proxycfg/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,146 @@ func TestManager_SyncState_DefaultToken(t *testing.T) {

err = state.AddServiceWithChecks(srv, nil, "")
require.NoError(t, err)
m.syncState()
m.syncState(m.notifyBroadcast)

require.Equal(t, "default-token", m.proxies[srv.CompoundServiceID()].token)
}

func TestManager_SyncState_No_Notify(t *testing.T) {
types := NewTestCacheTypes(t)
c := TestCacheWithTypes(t, types)
logger := testutil.Logger(t)
tokens := new(token.Store)
tokens.UpdateUserToken("default-token", token.TokenSourceConfig)

state := local.NewState(local.Config{}, logger, tokens)
state.TriggerSyncChanges = func() {}

m, err := NewManager(ManagerConfig{
Cache: c,
Health: &health.Client{Cache: c, CacheName: cachetype.HealthServicesName},
State: state,
Tokens: tokens,
Source: &structs.QuerySource{Datacenter: "dc1"},
Logger: logger,
})
require.NoError(t, err)
defer m.Close()

srv := &structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
ID: "web-sidecar-proxy",
Service: "web-sidecar-proxy",
Port: 9999,
Meta: map[string]string{},
Proxy: structs.ConnectProxyConfig{
DestinationServiceID: "web",
DestinationServiceName: "web",
LocalServiceAddress: "127.0.0.1",
LocalServicePort: 8080,
Config: map[string]interface{}{
"foo": "bar",
},
},
}

err = state.AddServiceWithChecks(srv, nil, "")
require.NoError(t, err)

readEvent := make(chan bool, 1)
snapSent := make(chan bool, 1)

m.syncState(func(ch <-chan ConfigSnapshot) {
for {
<-readEvent
snap := <-ch
m.notify(&snap)
snapSent <- true
}
})

// Get the relevant notification Channel, should only have 1
notifyCH := m.proxies[srv.CompoundServiceID()].ch

// update the leaf certs
roots, issuedCert := TestCerts(t)
notifyCH <- cache.UpdateEvent{
CorrelationID: leafWatchID,
Result: issuedCert,
Err: nil,
}
// at this point the snapshot should not be valid and not be sent
after := time.After(200 * time.Millisecond)
select {
case <-snapSent:
t.Fatal("snap should not be valid")
case <-after:

}

// update the root certs
notifyCH <- cache.UpdateEvent{
CorrelationID: rootsWatchID,
Result: roots,
Err: nil,
}

// at this point the snapshot should not be valid and not be sent
after = time.After(200 * time.Millisecond)
select {
case <-snapSent:
t.Fatal("snap should not be valid")
case <-after:

}

// prepare to read a snapshot update as the next update should make the snapshot valid
readEvent <- true

// update the intentions
notifyCH <- cache.UpdateEvent{
CorrelationID: intentionsWatchID,
Result: &structs.IndexedIntentionMatches{},
Err: nil,
}

// at this point we have a valid snapshot
after = time.After(500 * time.Millisecond)
select {
case <-snapSent:
case <-after:
t.Fatal("snap should be valid")

}

// send two snapshots back to back without reading them to overflow the snapshot channel and get to the default use case
for i := 0; i < 2; i++ {
time.Sleep(250 * time.Millisecond)
notifyCH <- cache.UpdateEvent{
CorrelationID: leafWatchID,
Result: issuedCert,
Err: nil,
}
}

// make sure that we are not receiving any snapshot and wait for the snapshots to be processed
after = time.After(500 * time.Millisecond)
select {
case <-snapSent:
t.Fatal("snap should not be sent")
case <-after:
}

// now make sure that both snapshots got propagated
for i := 0; i < 2; i++ {

readEvent <- true
after = time.After(500 * time.Millisecond)
select {
case <-snapSent:
case <-after:
t.Fatal("snap should be valid")

}
}
}
13 changes: 5 additions & 8 deletions agent/proxycfg/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -648,6 +648,8 @@ func (s *state) run(snap *ConfigSnapshot) {
}

case <-sendCh:
// Allow the next change to trigger a send
coalesceTimer = nil
// Make a deep copy of snap so we don't mutate any of the embedded structs
// etc on future updates.
snapCopy, err := snap.Clone()
Expand All @@ -663,9 +665,6 @@ func (s *state) run(snap *ConfigSnapshot) {
case s.snapCh <- *snapCopy:
s.logger.Trace("Delivered new snapshot to proxy config watchers")

// Allow the next change to trigger a send
coalesceTimer = nil

// Skip rest of loop - there is nothing to send since nothing changed on
// this iteration
continue
Expand All @@ -676,11 +675,9 @@ func (s *state) run(snap *ConfigSnapshot) {
s.logger.Trace("Failed to deliver new snapshot to proxy config watchers")

// Reset the timer to retry later. This is to ensure we attempt to redeliver the updated snapshot shortly.
if coalesceTimer == nil {
coalesceTimer = time.AfterFunc(coalesceTimeout, func() {
sendCh <- struct{}{}
})
}
coalesceTimer = time.AfterFunc(coalesceTimeout, func() {
sendCh <- struct{}{}
})

// Do not reset coalesceTimer since we just queued a timer-based refresh
continue
Expand Down

0 comments on commit efed86d

Please sign in to comment.